Repository: flink Updated Branches: refs/heads/master 5983069fc -> 0ba08b444
[FLINK-5799] [rpc] Let RpcService.scheduleRunnable return a ScheduledFuture The returned ScheduledFuture instance allows to cancel a scheduled runnable and obtain the delay until the runnable will be executed. Furthermore, it allows to wait on the completion of the runnable. This closes #3311. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ba08b44 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ba08b44 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ba08b44 Branch: refs/heads/master Commit: 0ba08b44477cd6eaade4e209230645148b303787 Parents: ccf458d Author: Till Rohrmann <[email protected]> Authored: Tue Feb 14 20:40:30 2017 +0100 Committer: Till Rohrmann <[email protected]> Committed: Fri Feb 24 14:48:52 2017 +0100 ---------------------------------------------------------------------- .../apache/flink/runtime/rpc/RpcService.java | 3 +- .../flink/runtime/rpc/akka/AkkaRpcService.java | 6 +-- .../runtime/rpc/TestingSerialRpcService.java | 53 +++++++++++++++++++- .../runtime/rpc/akka/AkkaRpcServiceTest.java | 6 ++- 4 files changed, 61 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0ba08b44/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java index 2d2019a..93d345d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; import java.util.concurrent.Callable; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; /** @@ -119,7 +120,7 @@ public interface RpcService { * @param runnable Runnable to be executed * @param delay The delay after which the runnable will be executed */ - void scheduleRunnable(Runnable runnable, long delay, TimeUnit unit); + ScheduledFuture<?> scheduleRunnable(Runnable runnable, long delay, TimeUnit unit); /** * Execute the given runnable in the executor of the RPC service. This method can be used to run http://git-wip-us.apache.org/repos/asf/flink/blob/0ba08b44/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index 6a6a85d..4298021 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -276,12 +276,12 @@ public class AkkaRpcService implements RpcService { } @Override - public void scheduleRunnable(Runnable runnable, long delay, TimeUnit unit) { + public ScheduledFuture<?> scheduleRunnable(Runnable runnable, long delay, TimeUnit unit) { checkNotNull(runnable, "runnable"); checkNotNull(unit, "unit"); - checkArgument(delay >= 0, "delay must be zero or larger"); + checkArgument(delay >= 0L, "delay must be zero or larger"); - actorSystem.scheduler().scheduleOnce(new FiniteDuration(delay, unit), runnable, actorSystem.dispatcher()); + return internalScheduledExecutor.schedule(runnable, delay, unit); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/0ba08b44/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java index 07edfef..6280a46 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java @@ -37,10 +37,14 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -67,10 +71,12 @@ public class TestingSerialRpcService implements RpcService { } @Override - public void scheduleRunnable(final Runnable runnable, final long delay, final TimeUnit unit) { + public ScheduledFuture<?> scheduleRunnable(final Runnable runnable, final long delay, final TimeUnit unit) { try { unit.sleep(delay); runnable.run(); + + return new DoneScheduledFuture<Void>(null); } catch (Throwable e) { throw new RuntimeException(e); } @@ -434,4 +440,49 @@ public class TestingSerialRpcService implements RpcService { } } + + private static class DoneScheduledFuture<V> implements ScheduledFuture<V> { + + private final V value; + + private DoneScheduledFuture(V value) { + this.value = value; + } + + @Override + public long getDelay(TimeUnit unit) { + return 0L; + } + + @Override + public int compareTo(Delayed o) { + return 0; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return true; + } + + @Override + public V get() throws InterruptedException, ExecutionException { + return value; + } + + @Override + public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return value; + } + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/0ba08b44/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java index eb71287..caacfa8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java @@ -69,14 +69,16 @@ public class AkkaRpcServiceTest extends TestLogger { final long delay = 100; final long start = System.nanoTime(); - akkaRpcService.scheduleRunnable(new Runnable() { + ScheduledFuture<?> scheduledFuture = akkaRpcService.scheduleRunnable(new Runnable() { @Override public void run() { latch.trigger(); } }, delay, TimeUnit.MILLISECONDS); - latch.await(); + scheduledFuture.get(); + + assertTrue(latch.isTriggered()); final long stop = System.nanoTime(); assertTrue("call was not properly delayed", ((stop - start) / 1000000) >= delay);
