[FLINK-5798] [rpc] Let the RpcService provide a ScheduledExecutorService This PR adds the getScheduledExecutorService method to the RpcService interface. So henceforth all RpcService implementations have to provide a ScheduledExecutorService implementation.
Currently, we only support the AkkaRpcService. The AkkaRpcService returns a ScheduledExecutorService proxy which forwards the schedule calls to the ActorSystem's internal scheduler. Introduce ScheduledExecutor interface to hide service methods from the ScheduledExecutorService This closes #3310. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ccf458dd Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ccf458dd Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ccf458dd Branch: refs/heads/master Commit: ccf458dd4d173b3370257177c2bbd9680baa6511 Parents: 5983069 Author: Till Rohrmann <[email protected]> Authored: Tue Feb 14 16:50:43 2017 +0100 Committer: Till Rohrmann <[email protected]> Committed: Fri Feb 24 14:48:52 2017 +0100 ---------------------------------------------------------------------- .../runtime/concurrent/ScheduledExecutor.java | 92 ++++++++++ .../ScheduledExecutorServiceAdapter.java | 64 +++++++ .../apache/flink/runtime/rpc/RpcService.java | 15 ++ .../flink/runtime/rpc/akka/AkkaRpcService.java | 178 +++++++++++++++++++ .../runtime/rpc/TestingSerialRpcService.java | 34 ++++ .../runtime/rpc/akka/AkkaRpcServiceTest.java | 160 +++++++++++++++++ 6 files changed, 543 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ccf458dd/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java new file mode 100644 index 0000000..c1b47e2 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent; + +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * Extension for the {@link Executor} interface which is enriched by method for scheduling tasks + * in the future. + */ +public interface ScheduledExecutor extends Executor { + + /** + * Executes the given command after the given delay. + * + * @param command the task to execute in the future + * @param delay the time from now to delay the execution + * @param unit the time unit of the delay parameter + * @return a ScheduledFuture representing the completion of the scheduled task + */ + ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); + + /** + * Executes the given callable after the given delay. The result of the callable is returned + * as a {@link ScheduledFuture}. + * + * @param callable the callable to execute + * @param delay the time from now to delay the execution + * @param unit the time unit of the delay parameter + * @param <V> result type of the callable + * @return a ScheduledFuture which holds the future value of the given callable + */ + <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); + + /** + * Executes the given command periodically. The first execution is started after the + * {@code initialDelay}, the second execution is started after {@code initialDelay + period}, + * the third after {@code initialDelay + 2*period} and so on. + * The task is executed until either an execution fails, or the returned {@link ScheduledFuture} + * is cancelled. + * + * @param command the task to be executed periodically + * @param initialDelay the time from now until the first execution is triggered + * @param period the time after which the next execution is triggered + * @param unit the time unit of the delay and period parameter + * @return a ScheduledFuture representing the periodic task. This future never completes + * unless an execution of the given task fails or if the future is cancelled + */ + ScheduledFuture<?> scheduleAtFixedRate( + Runnable command, + long initialDelay, + long period, + TimeUnit unit); + + /** + * Executed the given command repeatedly with the given delay between the end of an execution + * and the start of the next execution. + * The task is executed repeatedly until either an exception occurs or if the returned + * {@link ScheduledFuture} is cancelled. + * + * @param command the task to execute repeatedly + * @param initialDelay the time from now until the first execution is triggered + * @param delay the time between the end of the current and the start of the next execution + * @param unit the time unit of the initial delay and the delay parameter + * @return a ScheduledFuture representing the repeatedly executed task. This future never + * completes unless th exectuion of the given task fails or if the future is cancelled + */ + ScheduledFuture<?> scheduleWithFixedDelay( + Runnable command, + long initialDelay, + long delay, + TimeUnit unit); +} http://git-wip-us.apache.org/repos/asf/flink/blob/ccf458dd/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutorServiceAdapter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutorServiceAdapter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutorServiceAdapter.java new file mode 100644 index 0000000..7662c35 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutorServiceAdapter.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.concurrent; + +import org.apache.flink.util.Preconditions; + +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * Adapter class for a {@link ScheduledExecutorService} which shall be used as a + * {@link ScheduledExecutor}. + */ +public class ScheduledExecutorServiceAdapter implements ScheduledExecutor { + + private final ScheduledExecutorService scheduledExecutorService; + + public ScheduledExecutorServiceAdapter(ScheduledExecutorService scheduledExecutorService) { + this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService); + } + + @Override + public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { + return scheduledExecutorService.schedule(command, delay, unit); + } + + @Override + public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { + return scheduledExecutorService.schedule(callable, delay, unit); + } + + @Override + public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + return scheduledExecutorService.scheduleAtFixedRate(command, initialDelay, period, unit); + } + + @Override + public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return scheduledExecutorService.scheduleWithFixedDelay(command, initialDelay, delay, unit); + } + + @Override + public void execute(Runnable command) { + scheduledExecutorService.execute(command); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ccf458dd/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java index 4b9100a..2d2019a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.rpc; import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; import java.util.concurrent.Callable; @@ -98,6 +99,20 @@ public interface RpcService { Executor getExecutor(); /** + * Gets a scheduled executor from the RPC service. This executor can be used to schedule + * tasks to be executed in the future. + * + * <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against + * any concurrent invocations and is therefore not suitable to run completion methods of futures + * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the + * {@link RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that + * {@code RpcEndpoint}. + * + * @return The RPC service provided scheduled executor + */ + ScheduledExecutor getScheduledExecutor(); + + /** * Execute the runnable in the execution context of this RPC Service, as returned by * {@link #getExecutor()}, after a scheduled delay. * http://git-wip-us.apache.org/repos/asf/flink/blob/ccf458dd/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index 6e3fb40..6a6a85d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -23,6 +23,7 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Address; +import akka.actor.Cancellable; import akka.actor.Identify; import akka.actor.PoisonPill; import akka.actor.Props; @@ -34,6 +35,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.concurrent.CompletableFuture; import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.concurrent.impl.FlinkFuture; import org.apache.flink.runtime.rpc.MainThreadExecutable; @@ -43,18 +45,24 @@ import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.StartStoppable; import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.FiniteDuration; +import javax.annotation.Nonnull; import javax.annotation.concurrent.ThreadSafe; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Proxy; import java.util.HashSet; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.Delayed; import java.util.concurrent.Executor; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RunnableScheduledFuture; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkArgument; @@ -81,6 +89,8 @@ public class AkkaRpcService implements RpcService { private final String address; + private final ScheduledExecutor internalScheduledExecutor; + private volatile boolean stopped; public AkkaRpcService(final ActorSystem actorSystem, final Time timeout) { @@ -101,6 +111,8 @@ public class AkkaRpcService implements RpcService { } else { address = ""; } + + internalScheduledExecutor = new InternalScheduledExecutorImpl(actorSystem); } @Override @@ -259,6 +271,10 @@ public class AkkaRpcService implements RpcService { return actorSystem.dispatcher(); } + public ScheduledExecutor getScheduledExecutor() { + return internalScheduledExecutor; + } + @Override public void scheduleRunnable(Runnable runnable, long delay, TimeUnit unit) { checkNotNull(runnable, "runnable"); @@ -279,4 +295,166 @@ public class AkkaRpcService implements RpcService { return new FlinkFuture<>(scalaFuture); } + + /** + * Helper class to expose the internal scheduling logic via a {@link ScheduledExecutor}. + */ + private static final class InternalScheduledExecutorImpl implements ScheduledExecutor { + + private final ActorSystem actorSystem; + + private InternalScheduledExecutorImpl(ActorSystem actorSystem) { + this.actorSystem = Preconditions.checkNotNull(actorSystem, "rpcService"); + } + + @Override + @Nonnull + public ScheduledFuture<?> schedule(@Nonnull Runnable command, long delay, @Nonnull TimeUnit unit) { + ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(command, unit.toNanos(delay), 0L); + + Cancellable cancellable = internalSchedule(scheduledFutureTask, delay, unit); + + scheduledFutureTask.setCancellable(cancellable); + + return scheduledFutureTask; + } + + @Override + @Nonnull + public <V> ScheduledFuture<V> schedule(@Nonnull Callable<V> callable, long delay, @Nonnull TimeUnit unit) { + ScheduledFutureTask<V> scheduledFutureTask = new ScheduledFutureTask<>(callable, unit.toNanos(delay), 0L); + + Cancellable cancellable = internalSchedule(scheduledFutureTask, delay, unit); + + scheduledFutureTask.setCancellable(cancellable); + + return scheduledFutureTask; + } + + @Override + @Nonnull + public ScheduledFuture<?> scheduleAtFixedRate(@Nonnull Runnable command, long initialDelay, long period, @Nonnull TimeUnit unit) { + ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>( + command, + triggerTime(unit.toNanos(initialDelay)), + unit.toNanos(period)); + + Cancellable cancellable = actorSystem.scheduler().schedule( + new FiniteDuration(initialDelay, unit), + new FiniteDuration(period, unit), + scheduledFutureTask, + actorSystem.dispatcher()); + + scheduledFutureTask.setCancellable(cancellable); + + return scheduledFutureTask; + } + + @Override + @Nonnull + public ScheduledFuture<?> scheduleWithFixedDelay(@Nonnull Runnable command, long initialDelay, long delay, @Nonnull TimeUnit unit) { + ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>( + command, + triggerTime(unit.toNanos(initialDelay)), + unit.toNanos(-delay)); + + Cancellable cancellable = internalSchedule(scheduledFutureTask, initialDelay, unit); + + scheduledFutureTask.setCancellable(cancellable); + + return scheduledFutureTask; + } + + @Override + public void execute(@Nonnull Runnable command) { + actorSystem.dispatcher().execute(command); + } + + private Cancellable internalSchedule(Runnable runnable, long delay, TimeUnit unit) { + return actorSystem.scheduler().scheduleOnce( + new FiniteDuration(delay, unit), + runnable, + actorSystem.dispatcher()); + } + + private long now() { + return System.nanoTime(); + } + + private long triggerTime(long delay) { + return now() + delay; + } + + private final class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { + + private long time; + + private final long period; + + private volatile Cancellable cancellable; + + ScheduledFutureTask(Callable<V> callable, long time, long period) { + super(callable); + this.time = time; + this.period = period; + } + + ScheduledFutureTask(Runnable runnable, long time, long period) { + super(runnable, null); + this.time = time; + this.period = period; + } + + public void setCancellable(Cancellable newCancellable) { + this.cancellable = newCancellable; + } + + @Override + public void run() { + if (!isPeriodic()) { + super.run(); + } else if (runAndReset()){ + if (period > 0L) { + time += period; + } else { + cancellable = internalSchedule(this, -period, TimeUnit.NANOSECONDS); + + // check whether we have been cancelled concurrently + if (isCancelled()) { + cancellable.cancel(); + } else { + time = triggerTime(-period); + } + } + } + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + boolean result = super.cancel(mayInterruptIfRunning); + + return result && cancellable.cancel(); + } + + @Override + public long getDelay(@Nonnull TimeUnit unit) { + return unit.convert(time - now(), TimeUnit.NANOSECONDS); + } + + @Override + public int compareTo(@Nonnull Delayed o) { + if (o == this) { + return 0; + } + + long diff = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS); + return (diff < 0L) ? -1 : (diff > 0L) ? 1 : 0; + } + + @Override + public boolean isPeriodic() { + return period != 0L; + } + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/ccf458dd/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java index 1d30ea4..07edfef 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java @@ -22,6 +22,8 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.concurrent.CompletableFuture; import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.util.DirectExecutorService; import org.apache.flink.util.Preconditions; @@ -31,10 +33,13 @@ import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.BitSet; +import java.util.List; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -46,13 +51,19 @@ import static org.apache.flink.util.Preconditions.checkNotNull; public class TestingSerialRpcService implements RpcService { private final DirectExecutorService executorService; + private final ScheduledExecutorService scheduledExecutorService; private final ConcurrentHashMap<String, RpcGateway> registeredConnections; private final CompletableFuture<Void> terminationFuture; + private final ScheduledExecutor scheduledExecutorServiceAdapter; + public TestingSerialRpcService() { executorService = new DirectExecutorService(); + scheduledExecutorService = new ScheduledThreadPoolExecutor(1); this.registeredConnections = new ConcurrentHashMap<>(16); this.terminationFuture = new FlinkCompletableFuture<>(); + + this.scheduledExecutorServiceAdapter = new ScheduledExecutorServiceAdapter(scheduledExecutorService); } @Override @@ -86,9 +97,32 @@ public class TestingSerialRpcService implements RpcService { return executorService; } + public ScheduledExecutor getScheduledExecutor() { + return scheduledExecutorServiceAdapter; + } + @Override public void stopService() { executorService.shutdown(); + + scheduledExecutorService.shutdown(); + + boolean terminated = false; + + try { + terminated = scheduledExecutorService.awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + if (!terminated) { + List<Runnable> runnables = scheduledExecutorService.shutdownNow(); + + for (Runnable runnable : runnables) { + runnable.run(); + } + } + registeredConnections.clear(); terminationFuture.complete(null); } http://git-wip-us.apache.org/repos/asf/flink/blob/ccf458dd/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java index 7c8defa..eb71287 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.concurrent.impl.FlinkFuture; import org.apache.flink.util.TestLogger; @@ -30,13 +31,16 @@ import org.junit.AfterClass; import org.junit.Test; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class AkkaRpcServiceTest extends TestLogger { @@ -149,4 +153,160 @@ public class AkkaRpcServiceTest extends TestLogger { terminationFuture.get(); } + + /** + * Tests a simple scheduled runnable being executed by the RPC services scheduled executor + * service. + */ + @Test(timeout = 1000) + public void testScheduledExecutorServiceSimpleSchedule() throws ExecutionException, InterruptedException { + ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor(); + + final OneShotLatch latch = new OneShotLatch(); + + ScheduledFuture<?> future = scheduledExecutor.schedule( + new Runnable() { + @Override + public void run() { + latch.trigger(); + } + }, + 10L, + TimeUnit.MILLISECONDS); + + future.get(); + + // once the future is completed, then the latch should have been triggered + assertTrue(latch.isTriggered()); + } + + /** + * Tests that the RPC service's scheduled executor service can execute runnables at a fixed + * rate. + */ + @Test(timeout = 1000) + public void testScheduledExecutorServicePeriodicSchedule() throws ExecutionException, InterruptedException { + ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor(); + + final int tries = 4; + final long delay = 10L; + final CountDownLatch countDownLatch = new CountDownLatch(tries); + + long currentTime = System.nanoTime(); + + ScheduledFuture<?> future = scheduledExecutor.scheduleAtFixedRate( + new Runnable() { + @Override + public void run() { + countDownLatch.countDown(); + } + }, + delay, + delay, + TimeUnit.MILLISECONDS); + + assertTrue(!future.isDone()); + + countDownLatch.await(); + + // the future should not complete since we have a periodic task + assertTrue(!future.isDone()); + + long finalTime = System.nanoTime() - currentTime; + + // the processing should have taken at least delay times the number of count downs. + assertTrue(finalTime >= tries * delay); + + future.cancel(true); + } + + /** + * Tests that the RPC service's scheduled executor service can execute runnable with a fixed + * delay. + */ + @Test(timeout = 1000) + public void testScheduledExecutorServiceWithFixedDelaySchedule() throws ExecutionException, InterruptedException { + ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor(); + + final int tries = 4; + final long delay = 10L; + final CountDownLatch countDownLatch = new CountDownLatch(tries); + + long currentTime = System.nanoTime(); + + ScheduledFuture<?> future = scheduledExecutor.scheduleWithFixedDelay( + new Runnable() { + @Override + public void run() { + countDownLatch.countDown(); + } + }, + delay, + delay, + TimeUnit.MILLISECONDS); + + assertTrue(!future.isDone()); + + countDownLatch.await(); + + // the future should not complete since we have a periodic task + assertTrue(!future.isDone()); + + long finalTime = System.nanoTime() - currentTime; + + // the processing should have taken at least delay times the number of count downs. + assertTrue(finalTime >= tries * delay); + + future.cancel(true); + } + + /** + * Tests that canceling the returned future will stop the execution of the scheduled runnable. + */ + @Test + public void testScheduledExecutorServiceCancelWithFixedDelay() throws InterruptedException { + ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor(); + + long delay = 10L; + + final OneShotLatch futureTask = new OneShotLatch(); + final OneShotLatch latch = new OneShotLatch(); + final OneShotLatch shouldNotBeTriggeredLatch = new OneShotLatch(); + + ScheduledFuture<?> future = scheduledExecutor.scheduleWithFixedDelay( + new Runnable() { + @Override + public void run() { + try { + if (!futureTask.isTriggered()) { + // first run + futureTask.trigger(); + latch.await(); + } else { + shouldNotBeTriggeredLatch.trigger(); + } + } catch (InterruptedException e) { + // ignore + } + } + }, + delay, + delay, + TimeUnit.MILLISECONDS); + + // wait until we're in the runnable + futureTask.await(); + + // cancel the scheduled future + future.cancel(false); + + latch.trigger(); + + try { + shouldNotBeTriggeredLatch.await(5 * delay, TimeUnit.MILLISECONDS); + fail("The shouldNotBeTriggeredLatch should never be triggered."); + } catch (TimeoutException e) { + // expected + } + } }
