This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 6f200ec9cd InterceptingExecutor.schedule returns a simulator-unsafe 
Future
6f200ec9cd is described below

commit 6f200ec9cd068eb871dd463eb4d6695a40399091
Author: Benedict Elliott Smith <bened...@apache.org>
AuthorDate: Sat Mar 8 14:24:49 2025 -0800

    InterceptingExecutor.schedule returns a simulator-unsafe Future
    
    patch by Benedict Elliott Smith; reviewed by David Capwell for 
CASSANDRA-20420
---
 .../simulator/systems/InterceptingExecutor.java    | 98 ++++++++++++----------
 .../simulator/systems/InterceptorOfExecution.java  |  4 +-
 .../simulator/systems/SimulatedExecution.java      |  7 +-
 .../apache/cassandra/net/MessageDeliveryTest.java  |  1 -
 4 files changed, 60 insertions(+), 50 deletions(-)

diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptingExecutor.java
 
b/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptingExecutor.java
index a9c0fb2e00..1c38cd813d 100644
--- 
a/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptingExecutor.java
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptingExecutor.java
@@ -91,43 +91,9 @@ public interface InterceptingExecutor extends OrderOn
 
     OrderOn orderAppliesAfterScheduling();
 
-    static class InterceptedScheduledFutureTask<T> extends SyncFutureTask<T> 
implements ScheduledFuture<T>
+    interface InterceptableScheduledFuture<T> extends ScheduledFuture<T>, 
RunnableFuture<T>
     {
-        final long delayNanos;
-        Runnable onCancel;
-        public InterceptedScheduledFutureTask(long delayNanos, Callable<T> 
call)
-        {
-            super(call);
-            this.delayNanos = delayNanos;
-        }
-
-        @Override
-        public long getDelay(TimeUnit unit)
-        {
-            return unit.convert(delayNanos, NANOSECONDS);
-        }
-
-        @Override
-        public int compareTo(Delayed that)
-        {
-            return Long.compare(delayNanos, that.getDelay(NANOSECONDS));
-        }
-
-        void onCancel(Runnable onCancel)
-        {
-            this.onCancel = onCancel;
-        }
-
-        @Override
-        public boolean cancel(boolean b)
-        {
-            if (onCancel != null)
-            {
-                onCancel.run();
-                onCancel = null;
-            }
-            return super.cancel(b);
-        }
+        void onCancel(Runnable onCancel);
     }
 
     @PerClassLoader
@@ -715,6 +681,45 @@ public interface InterceptingExecutor extends OrderOn
     @PerClassLoader
     class InterceptingSequentialExecutor extends 
AbstractSingleThreadedExecutorPlus implements InterceptingExecutor, 
ScheduledExecutorPlus, OrderOn
     {
+        static class InterceptableScheduledFutureTask<T> extends 
SyncFutureTask<T> implements InterceptableScheduledFuture<T>
+        {
+            final long delayNanos;
+            Runnable onCancel;
+            public InterceptableScheduledFutureTask(long delayNanos, 
Callable<T> call)
+            {
+                super(call);
+                this.delayNanos = delayNanos;
+            }
+
+            @Override
+            public long getDelay(TimeUnit unit)
+            {
+                return unit.convert(delayNanos, NANOSECONDS);
+            }
+
+            @Override
+            public int compareTo(Delayed that)
+            {
+                return Long.compare(delayNanos, that.getDelay(NANOSECONDS));
+            }
+
+            public void onCancel(Runnable onCancel)
+            {
+                this.onCancel = onCancel;
+            }
+
+            @Override
+            public boolean cancel(boolean b)
+            {
+                if (onCancel != null)
+                {
+                    onCancel.run();
+                    onCancel = null;
+                }
+                return super.cancel(b);
+            }
+        }
+
         InterceptingSequentialExecutor(InterceptorOfExecution 
interceptorOfExecution, ThreadFactory threadFactory, InterceptingTaskFactory 
taskFactory)
         {
             super(interceptorOfExecution, threadFactory, taskFactory);
@@ -765,7 +770,7 @@ public interface InterceptingExecutor extends OrderOn
                 throw new RejectedExecutionException();
 
             long delayNanos = unit.toNanos(delay);
-            return interceptorOfExecution.intercept().schedule(SCHEDULED_TASK, 
delayNanos, relativeToGlobalNanos(delayNanos), callable(run, null), this);
+            return schedule(SCHEDULED_TASK, delayNanos, 
relativeToGlobalNanos(delayNanos), callable(run, null));
         }
 
         public <V> ScheduledFuture<V> schedule(Callable<V> callable, long 
delay, TimeUnit unit)
@@ -774,7 +779,7 @@ public interface InterceptingExecutor extends OrderOn
                 throw new RejectedExecutionException();
 
             long delayNanos = unit.toNanos(delay);
-            return interceptorOfExecution.intercept().schedule(SCHEDULED_TASK, 
delayNanos, relativeToGlobalNanos(delayNanos), callable, this);
+            return schedule(SCHEDULED_TASK, delayNanos, 
relativeToGlobalNanos(delayNanos), callable);
         }
 
         public ScheduledFuture<?> scheduleTimeoutWithDelay(Runnable run, long 
delay, TimeUnit unit)
@@ -787,7 +792,7 @@ public interface InterceptingExecutor extends OrderOn
             if (isShutdown)
                 throw new RejectedExecutionException();
 
-            return interceptorOfExecution.intercept().schedule(SCHEDULED_TASK, 
localToRelativeNanos(deadlineNanos), localToGlobalNanos(deadlineNanos), 
callable(run, null), this);
+            return schedule(SCHEDULED_TASK, 
localToRelativeNanos(deadlineNanos), localToGlobalNanos(deadlineNanos), 
callable(run, null));
         }
 
         public ScheduledFuture<?> scheduleTimeoutAt(Runnable run, long 
deadlineNanos)
@@ -795,7 +800,7 @@ public interface InterceptingExecutor extends OrderOn
             if (isShutdown)
                 throw new RejectedExecutionException();
 
-            return 
interceptorOfExecution.intercept().schedule(SCHEDULED_TIMEOUT, 
localToRelativeNanos(deadlineNanos), localToGlobalNanos(deadlineNanos), 
callable(run, null), this);
+            return schedule(SCHEDULED_TIMEOUT, 
localToRelativeNanos(deadlineNanos), localToGlobalNanos(deadlineNanos), 
callable(run, null));
         }
 
         public ScheduledFuture<?> scheduleSelfRecurring(Runnable run, long 
delay, TimeUnit unit)
@@ -804,7 +809,7 @@ public interface InterceptingExecutor extends OrderOn
                 throw new RejectedExecutionException();
 
             long delayNanos = unit.toNanos(delay);
-            return 
interceptorOfExecution.intercept().schedule(SCHEDULED_DAEMON, delayNanos, 
relativeToGlobalNanos(delayNanos), callable(run, null), this);
+            return schedule(SCHEDULED_DAEMON, delayNanos, 
relativeToGlobalNanos(delayNanos), callable(run, null));
         }
 
         public ScheduledFuture<?> scheduleAtFixedRate(Runnable run, long 
initialDelay, long period, TimeUnit unit)
@@ -813,7 +818,7 @@ public interface InterceptingExecutor extends OrderOn
                 throw new RejectedExecutionException();
 
             long delayNanos = unit.toNanos(initialDelay);
-            return 
interceptorOfExecution.intercept().schedule(SCHEDULED_DAEMON, delayNanos, 
relativeToGlobalNanos(delayNanos), new Callable<Object>()
+            return schedule(SCHEDULED_DAEMON, delayNanos, 
relativeToGlobalNanos(delayNanos), new Callable<Object>()
             {
                 @Override
                 public Object call()
@@ -829,7 +834,12 @@ public interface InterceptingExecutor extends OrderOn
                 {
                     return run.toString();
                 }
-            }, this);
+            });
+        }
+
+        <T> ScheduledFuture<T> schedule(SimulatedAction.Kind kind, long 
delayNanos, long deadlineNanos, Callable<T> task)
+        {
+            return interceptorOfExecution.intercept().schedule(kind, 
delayNanos, deadlineNanos, new InterceptableScheduledFutureTask<>(delayNanos, 
task), this);
         }
 
         public ScheduledFuture<?> scheduleWithFixedDelay(Runnable run, long 
initialDelay, long delay, TimeUnit unit)
@@ -843,6 +853,8 @@ public interface InterceptingExecutor extends OrderOn
         }
     }
 
+
+
     @PerClassLoader
     class InterceptingPooledLocalAwareExecutor extends 
InterceptingPooledExecutor implements LocalAwareExecutorPlus
     {
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptorOfExecution.java
 
b/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptorOfExecution.java
index ac8255d506..6633e27408 100644
--- 
a/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptorOfExecution.java
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptorOfExecution.java
@@ -18,10 +18,10 @@
 
 package org.apache.cassandra.simulator.systems;
 
-import java.util.concurrent.Callable;
 import java.util.concurrent.ScheduledFuture;
 import java.util.function.Function;
 
+import 
org.apache.cassandra.simulator.systems.InterceptingExecutor.InterceptableScheduledFuture;
 import org.apache.cassandra.simulator.systems.SimulatedAction.Kind;
 import org.apache.cassandra.utils.Shared;
 import org.apache.cassandra.utils.concurrent.RunnableFuture;
@@ -38,7 +38,7 @@ public interface InterceptorOfExecution
     interface InterceptExecution
     {
         <V, T extends RunnableFuture<V>> T addTask(T task, 
InterceptingExecutor executor);
-        <T> ScheduledFuture<T> schedule(Kind kind, long delayNanos, long 
deadlineNanos, Callable<T> runnable, InterceptingExecutor executor);
+        <T> ScheduledFuture<T> schedule(Kind kind, long delayNanos, long 
deadlineNanos, InterceptableScheduledFuture<T> task, InterceptingExecutor 
executor);
         Thread start(Kind kind, Function<Runnable, InterceptibleThread> 
factory, Runnable run);
     }
 }
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/systems/SimulatedExecution.java
 
b/test/simulator/main/org/apache/cassandra/simulator/systems/SimulatedExecution.java
index bb6387908e..a56aa84ea6 100644
--- 
a/test/simulator/main/org/apache/cassandra/simulator/systems/SimulatedExecution.java
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/systems/SimulatedExecution.java
@@ -29,7 +29,7 @@ import org.apache.cassandra.simulator.Actions;
 import org.apache.cassandra.simulator.OrderOn;
 import 
org.apache.cassandra.simulator.systems.InterceptedExecution.InterceptedFutureTaskExecution;
 import 
org.apache.cassandra.simulator.systems.InterceptedExecution.InterceptedThreadStart;
-import 
org.apache.cassandra.simulator.systems.InterceptingExecutor.InterceptedScheduledFutureTask;
+import 
org.apache.cassandra.simulator.systems.InterceptingExecutor.InterceptableScheduledFuture;
 import org.apache.cassandra.utils.concurrent.Condition;
 import org.apache.cassandra.utils.concurrent.NotScheduledFuture;
 import org.apache.cassandra.utils.concurrent.RunnableFuture;
@@ -93,7 +93,7 @@ public class SimulatedExecution implements 
InterceptorOfExecution
             return task;
         }
 
-        public <T> ScheduledFuture<T> schedule(SimulatedAction.Kind kind, long 
delayNanos, long deadlineNanos, Callable<T> runnable, InterceptingExecutor 
executor)
+        public <T> ScheduledFuture<T> schedule(SimulatedAction.Kind kind, long 
delayNanos, long deadlineNanos, InterceptableScheduledFuture<T> task, 
InterceptingExecutor executor)
         {
             return new NotScheduledFuture<>();
         }
@@ -135,10 +135,9 @@ public class SimulatedExecution implements 
InterceptorOfExecution
             return task;
         }
 
-        public <V> ScheduledFuture<V> schedule(SimulatedAction.Kind kind, long 
delayNanos, long deadlineNanos, Callable<V> call, InterceptingExecutor executor)
+        public <V> ScheduledFuture<V> schedule(SimulatedAction.Kind kind, long 
delayNanos, long deadlineNanos, InterceptableScheduledFuture<V> task, 
InterceptingExecutor executor)
         {
             assert kind == SCHEDULED_TASK || kind == SCHEDULED_TIMEOUT || kind 
== SCHEDULED_DAEMON;
-            InterceptedScheduledFutureTask<V> task = new 
InterceptedScheduledFutureTask<>(delayNanos, call);
             InterceptedFutureTaskExecution<?> intercepted = new 
InterceptedFutureTaskExecution<>(kind, executor, task, deadlineNanos);
             task.onCancel(intercepted::cancel);
             intercept.interceptExecution(intercepted, 
executor.orderAppliesAfterScheduling());
diff --git a/test/unit/org/apache/cassandra/net/MessageDeliveryTest.java 
b/test/unit/org/apache/cassandra/net/MessageDeliveryTest.java
index baf33256cd..73b773e92d 100644
--- a/test/unit/org/apache/cassandra/net/MessageDeliveryTest.java
+++ b/test/unit/org/apache/cassandra/net/MessageDeliveryTest.java
@@ -52,7 +52,6 @@ import org.mockito.Mockito;
 import static accord.utils.Property.qt;
 import static org.apache.cassandra.net.MessageDelivery.RetryErrorMessage;
 import static org.apache.cassandra.net.MessageDelivery.RetryPredicate;
-import static org.apache.cassandra.service.RetryStrategy.randomizers;
 import static org.assertj.core.api.Assertions.assertThat;
 
 public class MessageDeliveryTest


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to