This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push: new 6f200ec9cd InterceptingExecutor.schedule returns a simulator-unsafe Future 6f200ec9cd is described below commit 6f200ec9cd068eb871dd463eb4d6695a40399091 Author: Benedict Elliott Smith <bened...@apache.org> AuthorDate: Sat Mar 8 14:24:49 2025 -0800 InterceptingExecutor.schedule returns a simulator-unsafe Future patch by Benedict Elliott Smith; reviewed by David Capwell for CASSANDRA-20420 --- .../simulator/systems/InterceptingExecutor.java | 98 ++++++++++++---------- .../simulator/systems/InterceptorOfExecution.java | 4 +- .../simulator/systems/SimulatedExecution.java | 7 +- .../apache/cassandra/net/MessageDeliveryTest.java | 1 - 4 files changed, 60 insertions(+), 50 deletions(-) diff --git a/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptingExecutor.java b/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptingExecutor.java index a9c0fb2e00..1c38cd813d 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptingExecutor.java +++ b/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptingExecutor.java @@ -91,43 +91,9 @@ public interface InterceptingExecutor extends OrderOn OrderOn orderAppliesAfterScheduling(); - static class InterceptedScheduledFutureTask<T> extends SyncFutureTask<T> implements ScheduledFuture<T> + interface InterceptableScheduledFuture<T> extends ScheduledFuture<T>, RunnableFuture<T> { - final long delayNanos; - Runnable onCancel; - public InterceptedScheduledFutureTask(long delayNanos, Callable<T> call) - { - super(call); - this.delayNanos = delayNanos; - } - - @Override - public long getDelay(TimeUnit unit) - { - return unit.convert(delayNanos, NANOSECONDS); - } - - @Override - public int compareTo(Delayed that) - { - return Long.compare(delayNanos, that.getDelay(NANOSECONDS)); - } - - void onCancel(Runnable onCancel) - { - this.onCancel = onCancel; - } - - @Override - public boolean cancel(boolean b) - { - if (onCancel != null) - { - onCancel.run(); - onCancel = null; - } - return super.cancel(b); - } + void onCancel(Runnable onCancel); } @PerClassLoader @@ -715,6 +681,45 @@ public interface InterceptingExecutor extends OrderOn @PerClassLoader class InterceptingSequentialExecutor extends AbstractSingleThreadedExecutorPlus implements InterceptingExecutor, ScheduledExecutorPlus, OrderOn { + static class InterceptableScheduledFutureTask<T> extends SyncFutureTask<T> implements InterceptableScheduledFuture<T> + { + final long delayNanos; + Runnable onCancel; + public InterceptableScheduledFutureTask(long delayNanos, Callable<T> call) + { + super(call); + this.delayNanos = delayNanos; + } + + @Override + public long getDelay(TimeUnit unit) + { + return unit.convert(delayNanos, NANOSECONDS); + } + + @Override + public int compareTo(Delayed that) + { + return Long.compare(delayNanos, that.getDelay(NANOSECONDS)); + } + + public void onCancel(Runnable onCancel) + { + this.onCancel = onCancel; + } + + @Override + public boolean cancel(boolean b) + { + if (onCancel != null) + { + onCancel.run(); + onCancel = null; + } + return super.cancel(b); + } + } + InterceptingSequentialExecutor(InterceptorOfExecution interceptorOfExecution, ThreadFactory threadFactory, InterceptingTaskFactory taskFactory) { super(interceptorOfExecution, threadFactory, taskFactory); @@ -765,7 +770,7 @@ public interface InterceptingExecutor extends OrderOn throw new RejectedExecutionException(); long delayNanos = unit.toNanos(delay); - return interceptorOfExecution.intercept().schedule(SCHEDULED_TASK, delayNanos, relativeToGlobalNanos(delayNanos), callable(run, null), this); + return schedule(SCHEDULED_TASK, delayNanos, relativeToGlobalNanos(delayNanos), callable(run, null)); } public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) @@ -774,7 +779,7 @@ public interface InterceptingExecutor extends OrderOn throw new RejectedExecutionException(); long delayNanos = unit.toNanos(delay); - return interceptorOfExecution.intercept().schedule(SCHEDULED_TASK, delayNanos, relativeToGlobalNanos(delayNanos), callable, this); + return schedule(SCHEDULED_TASK, delayNanos, relativeToGlobalNanos(delayNanos), callable); } public ScheduledFuture<?> scheduleTimeoutWithDelay(Runnable run, long delay, TimeUnit unit) @@ -787,7 +792,7 @@ public interface InterceptingExecutor extends OrderOn if (isShutdown) throw new RejectedExecutionException(); - return interceptorOfExecution.intercept().schedule(SCHEDULED_TASK, localToRelativeNanos(deadlineNanos), localToGlobalNanos(deadlineNanos), callable(run, null), this); + return schedule(SCHEDULED_TASK, localToRelativeNanos(deadlineNanos), localToGlobalNanos(deadlineNanos), callable(run, null)); } public ScheduledFuture<?> scheduleTimeoutAt(Runnable run, long deadlineNanos) @@ -795,7 +800,7 @@ public interface InterceptingExecutor extends OrderOn if (isShutdown) throw new RejectedExecutionException(); - return interceptorOfExecution.intercept().schedule(SCHEDULED_TIMEOUT, localToRelativeNanos(deadlineNanos), localToGlobalNanos(deadlineNanos), callable(run, null), this); + return schedule(SCHEDULED_TIMEOUT, localToRelativeNanos(deadlineNanos), localToGlobalNanos(deadlineNanos), callable(run, null)); } public ScheduledFuture<?> scheduleSelfRecurring(Runnable run, long delay, TimeUnit unit) @@ -804,7 +809,7 @@ public interface InterceptingExecutor extends OrderOn throw new RejectedExecutionException(); long delayNanos = unit.toNanos(delay); - return interceptorOfExecution.intercept().schedule(SCHEDULED_DAEMON, delayNanos, relativeToGlobalNanos(delayNanos), callable(run, null), this); + return schedule(SCHEDULED_DAEMON, delayNanos, relativeToGlobalNanos(delayNanos), callable(run, null)); } public ScheduledFuture<?> scheduleAtFixedRate(Runnable run, long initialDelay, long period, TimeUnit unit) @@ -813,7 +818,7 @@ public interface InterceptingExecutor extends OrderOn throw new RejectedExecutionException(); long delayNanos = unit.toNanos(initialDelay); - return interceptorOfExecution.intercept().schedule(SCHEDULED_DAEMON, delayNanos, relativeToGlobalNanos(delayNanos), new Callable<Object>() + return schedule(SCHEDULED_DAEMON, delayNanos, relativeToGlobalNanos(delayNanos), new Callable<Object>() { @Override public Object call() @@ -829,7 +834,12 @@ public interface InterceptingExecutor extends OrderOn { return run.toString(); } - }, this); + }); + } + + <T> ScheduledFuture<T> schedule(SimulatedAction.Kind kind, long delayNanos, long deadlineNanos, Callable<T> task) + { + return interceptorOfExecution.intercept().schedule(kind, delayNanos, deadlineNanos, new InterceptableScheduledFutureTask<>(delayNanos, task), this); } public ScheduledFuture<?> scheduleWithFixedDelay(Runnable run, long initialDelay, long delay, TimeUnit unit) @@ -843,6 +853,8 @@ public interface InterceptingExecutor extends OrderOn } } + + @PerClassLoader class InterceptingPooledLocalAwareExecutor extends InterceptingPooledExecutor implements LocalAwareExecutorPlus { diff --git a/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptorOfExecution.java b/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptorOfExecution.java index ac8255d506..6633e27408 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptorOfExecution.java +++ b/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptorOfExecution.java @@ -18,10 +18,10 @@ package org.apache.cassandra.simulator.systems; -import java.util.concurrent.Callable; import java.util.concurrent.ScheduledFuture; import java.util.function.Function; +import org.apache.cassandra.simulator.systems.InterceptingExecutor.InterceptableScheduledFuture; import org.apache.cassandra.simulator.systems.SimulatedAction.Kind; import org.apache.cassandra.utils.Shared; import org.apache.cassandra.utils.concurrent.RunnableFuture; @@ -38,7 +38,7 @@ public interface InterceptorOfExecution interface InterceptExecution { <V, T extends RunnableFuture<V>> T addTask(T task, InterceptingExecutor executor); - <T> ScheduledFuture<T> schedule(Kind kind, long delayNanos, long deadlineNanos, Callable<T> runnable, InterceptingExecutor executor); + <T> ScheduledFuture<T> schedule(Kind kind, long delayNanos, long deadlineNanos, InterceptableScheduledFuture<T> task, InterceptingExecutor executor); Thread start(Kind kind, Function<Runnable, InterceptibleThread> factory, Runnable run); } } diff --git a/test/simulator/main/org/apache/cassandra/simulator/systems/SimulatedExecution.java b/test/simulator/main/org/apache/cassandra/simulator/systems/SimulatedExecution.java index bb6387908e..a56aa84ea6 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/systems/SimulatedExecution.java +++ b/test/simulator/main/org/apache/cassandra/simulator/systems/SimulatedExecution.java @@ -29,7 +29,7 @@ import org.apache.cassandra.simulator.Actions; import org.apache.cassandra.simulator.OrderOn; import org.apache.cassandra.simulator.systems.InterceptedExecution.InterceptedFutureTaskExecution; import org.apache.cassandra.simulator.systems.InterceptedExecution.InterceptedThreadStart; -import org.apache.cassandra.simulator.systems.InterceptingExecutor.InterceptedScheduledFutureTask; +import org.apache.cassandra.simulator.systems.InterceptingExecutor.InterceptableScheduledFuture; import org.apache.cassandra.utils.concurrent.Condition; import org.apache.cassandra.utils.concurrent.NotScheduledFuture; import org.apache.cassandra.utils.concurrent.RunnableFuture; @@ -93,7 +93,7 @@ public class SimulatedExecution implements InterceptorOfExecution return task; } - public <T> ScheduledFuture<T> schedule(SimulatedAction.Kind kind, long delayNanos, long deadlineNanos, Callable<T> runnable, InterceptingExecutor executor) + public <T> ScheduledFuture<T> schedule(SimulatedAction.Kind kind, long delayNanos, long deadlineNanos, InterceptableScheduledFuture<T> task, InterceptingExecutor executor) { return new NotScheduledFuture<>(); } @@ -135,10 +135,9 @@ public class SimulatedExecution implements InterceptorOfExecution return task; } - public <V> ScheduledFuture<V> schedule(SimulatedAction.Kind kind, long delayNanos, long deadlineNanos, Callable<V> call, InterceptingExecutor executor) + public <V> ScheduledFuture<V> schedule(SimulatedAction.Kind kind, long delayNanos, long deadlineNanos, InterceptableScheduledFuture<V> task, InterceptingExecutor executor) { assert kind == SCHEDULED_TASK || kind == SCHEDULED_TIMEOUT || kind == SCHEDULED_DAEMON; - InterceptedScheduledFutureTask<V> task = new InterceptedScheduledFutureTask<>(delayNanos, call); InterceptedFutureTaskExecution<?> intercepted = new InterceptedFutureTaskExecution<>(kind, executor, task, deadlineNanos); task.onCancel(intercepted::cancel); intercept.interceptExecution(intercepted, executor.orderAppliesAfterScheduling()); diff --git a/test/unit/org/apache/cassandra/net/MessageDeliveryTest.java b/test/unit/org/apache/cassandra/net/MessageDeliveryTest.java index baf33256cd..73b773e92d 100644 --- a/test/unit/org/apache/cassandra/net/MessageDeliveryTest.java +++ b/test/unit/org/apache/cassandra/net/MessageDeliveryTest.java @@ -52,7 +52,6 @@ import org.mockito.Mockito; import static accord.utils.Property.qt; import static org.apache.cassandra.net.MessageDelivery.RetryErrorMessage; import static org.apache.cassandra.net.MessageDelivery.RetryPredicate; -import static org.apache.cassandra.service.RetryStrategy.randomizers; import static org.assertj.core.api.Assertions.assertThat; public class MessageDeliveryTest --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org