Repository: flink
Updated Branches:
  refs/heads/master 5983069fc -> 0ba08b444


[FLINK-5799] [rpc] Let RpcService.scheduleRunnable return a ScheduledFuture

The returned ScheduledFuture instance allows to cancel a scheduled runnable and 
obtain
the delay until the runnable will be executed. Furthermore, it allows to wait 
on the
completion of the runnable.

This closes #3311.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ba08b44
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ba08b44
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ba08b44

Branch: refs/heads/master
Commit: 0ba08b44477cd6eaade4e209230645148b303787
Parents: ccf458d
Author: Till Rohrmann <[email protected]>
Authored: Tue Feb 14 20:40:30 2017 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Fri Feb 24 14:48:52 2017 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/rpc/RpcService.java    |  3 +-
 .../flink/runtime/rpc/akka/AkkaRpcService.java  |  6 +--
 .../runtime/rpc/TestingSerialRpcService.java    | 53 +++++++++++++++++++-
 .../runtime/rpc/akka/AkkaRpcServiceTest.java    |  6 ++-
 4 files changed, 61 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0ba08b44/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index 2d2019a..93d345d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -119,7 +120,7 @@ public interface RpcService {
         * @param runnable Runnable to be executed
         * @param delay    The delay after which the runnable will be executed
         */
-       void scheduleRunnable(Runnable runnable, long delay, TimeUnit unit);
+       ScheduledFuture<?> scheduleRunnable(Runnable runnable, long delay, 
TimeUnit unit);
 
        /**
         * Execute the given runnable in the executor of the RPC service. This 
method can be used to run

http://git-wip-us.apache.org/repos/asf/flink/blob/0ba08b44/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 6a6a85d..4298021 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -276,12 +276,12 @@ public class AkkaRpcService implements RpcService {
        }
 
        @Override
-       public void scheduleRunnable(Runnable runnable, long delay, TimeUnit 
unit) {
+       public ScheduledFuture<?> scheduleRunnable(Runnable runnable, long 
delay, TimeUnit unit) {
                checkNotNull(runnable, "runnable");
                checkNotNull(unit, "unit");
-               checkArgument(delay >= 0, "delay must be zero or larger");
+               checkArgument(delay >= 0L, "delay must be zero or larger");
 
-               actorSystem.scheduler().scheduleOnce(new FiniteDuration(delay, 
unit), runnable, actorSystem.dispatcher());
+               return internalScheduledExecutor.schedule(runnable, delay, 
unit);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0ba08b44/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index 07edfef..6280a46 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -37,10 +37,14 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -67,10 +71,12 @@ public class TestingSerialRpcService implements RpcService {
        }
 
        @Override
-       public void scheduleRunnable(final Runnable runnable, final long delay, 
final TimeUnit unit) {
+       public ScheduledFuture<?> scheduleRunnable(final Runnable runnable, 
final long delay, final TimeUnit unit) {
                try {
                        unit.sleep(delay);
                        runnable.run();
+
+                       return new DoneScheduledFuture<Void>(null);
                } catch (Throwable e) {
                        throw new RuntimeException(e);
                }
@@ -434,4 +440,49 @@ public class TestingSerialRpcService implements RpcService 
{
                }
 
        }
+
+       private static class DoneScheduledFuture<V> implements 
ScheduledFuture<V> {
+
+               private final V value;
+
+               private DoneScheduledFuture(V value) {
+                       this.value = value;
+               }
+
+               @Override
+               public long getDelay(TimeUnit unit) {
+                       return 0L;
+               }
+
+               @Override
+               public int compareTo(Delayed o) {
+                       return 0;
+               }
+
+               @Override
+               public boolean cancel(boolean mayInterruptIfRunning) {
+                       return false;
+               }
+
+               @Override
+               public boolean isCancelled() {
+                       return false;
+               }
+
+               @Override
+               public boolean isDone() {
+                       return true;
+               }
+
+               @Override
+               public V get() throws InterruptedException, ExecutionException {
+                       return value;
+               }
+
+               @Override
+               public V get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException {
+                       return value;
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0ba08b44/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index eb71287..caacfa8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -69,14 +69,16 @@ public class AkkaRpcServiceTest extends TestLogger {
                final long delay = 100;
                final long start = System.nanoTime();
 
-               akkaRpcService.scheduleRunnable(new Runnable() {
+               ScheduledFuture<?> scheduledFuture = 
akkaRpcService.scheduleRunnable(new Runnable() {
                        @Override
                        public void run() {
                                latch.trigger();
                        }
                }, delay, TimeUnit.MILLISECONDS);
 
-               latch.await();
+               scheduledFuture.get();
+
+               assertTrue(latch.isTriggered());
                final long stop = System.nanoTime();
 
                assertTrue("call was not properly delayed", ((stop - start) / 
1000000) >= delay);

Reply via email to