[FLINK-4384] [rpc] Add "scheduleRunAsync()" to the RpcEndpoint

This closes #2360


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f5614a4c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f5614a4c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f5614a4c

Branch: refs/heads/flip-6
Commit: f5614a4c352692e39218de866ad68331b28fb7fe
Parents: f5cf6b5
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Aug 11 19:10:48 2016 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Wed Sep 21 11:39:13 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/rpc/MainThreadExecutor.java   |   9 +
 .../apache/flink/runtime/rpc/RpcEndpoint.java   |  12 ++
 .../runtime/rpc/akka/AkkaInvocationHandler.java |  13 +-
 .../flink/runtime/rpc/akka/AkkaRpcActor.java    |  15 +-
 .../runtime/rpc/akka/messages/RunAsync.java     |  24 ++-
 .../runtime/rpc/akka/AkkaRpcServiceTest.java    |   3 +
 .../flink/runtime/rpc/akka/AsyncCallsTest.java  | 216 +++++++++++++++++++
 7 files changed, 286 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f5614a4c/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
index 882c1b7..4efb382 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
@@ -52,4 +52,13 @@ public interface MainThreadExecutor {
         * @return Future of the callable result
         */
        <V> Future<V> callAsync(Callable<V> callable, Timeout callTimeout);
+
+       /**
+        * Execute the runnable in the main thread of the underlying RPC 
endpoint, with
+        * a delay of the given number of milliseconds.
+        *
+        * @param runnable Runnable to be executed
+        * @param delay    The delay, in milliseconds, after which the runnable 
will be executed
+        */
+       void scheduleRunAsync(Runnable runnable, long delay);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f5614a4c/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index aef0803..44933d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -28,6 +28,7 @@ import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 
 import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -168,6 +169,17 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
        }
 
        /**
+        * Execute the runnable in the main thread of the underlying RPC 
endpoint, with
+        * a delay of the given number of milliseconds.
+        *
+        * @param runnable Runnable to be executed
+        * @param delay    The delay after which the runnable will be executed
+        */
+       public void scheduleRunAsync(Runnable runnable, long delay, TimeUnit 
unit) {
+               ((MainThreadExecutor) self).scheduleRunAsync(runnable, 
unit.toMillis(delay));
+       }
+
+       /**
         * Execute the callable in the main thread of the underlying RPC 
service, returning a future for
         * the result of the callable. If the callable is not completed within 
the given timeout, then
         * the future will be failed with a {@link 
java.util.concurrent.TimeoutException}.

http://git-wip-us.apache.org/repos/asf/flink/blob/f5614a4c/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index e8e383a..580b161 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -38,6 +38,9 @@ import java.lang.reflect.Method;
 import java.util.BitSet;
 import java.util.concurrent.Callable;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * Invocation handler to be used with a {@link AkkaRpcActor}. The invocation 
handler wraps the
  * rpc in a {@link RpcInvocation} message and then sends it to the {@link 
AkkaRpcActor} where it is
@@ -106,9 +109,17 @@ class AkkaInvocationHandler implements InvocationHandler, 
AkkaGateway, MainThrea
 
        @Override
        public void runAsync(Runnable runnable) {
+               scheduleRunAsync(runnable, 0);
+       }
+
+       @Override
+       public void scheduleRunAsync(Runnable runnable, long delay) {
+               checkNotNull(runnable, "runnable");
+               checkArgument(delay >= 0, "delay must be zero or greater");
+               
                // Unfortunately I couldn't find a way to allow only local 
communication. Therefore, the
                // runnable field is transient transient
-               rpcServer.tell(new RunAsync(runnable), ActorRef.noSender());
+               rpcServer.tell(new RunAsync(runnable, delay), 
ActorRef.noSender());
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f5614a4c/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 57da38a..18ccf1b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.rpc.akka;
 
+import akka.actor.ActorRef;
 import akka.actor.Status;
 import akka.actor.UntypedActor;
 import akka.pattern.Patterns;
@@ -30,9 +31,11 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.lang.reflect.Method;
 import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Akka rpc actor which receives {@link RpcInvocation}, {@link RunAsync} and 
{@link CallAsync}
@@ -152,13 +155,23 @@ class AkkaRpcActor<C extends RpcGateway, T extends 
RpcEndpoint<C>> extends Untyp
                                "{} is only supported with local 
communication.",
                                runAsync.getClass().getName(),
                                runAsync.getClass().getName());
-               } else {
+               }
+               else if (runAsync.getDelay() == 0) {
+                       // run immediately
                        try {
                                runAsync.getRunnable().run();
                        } catch (final Throwable e) {
                                LOG.error("Caught exception while executing 
runnable in main thread.", e);
                        }
                }
+               else {
+                       // schedule for later. send a new message after the 
delay, which will then be immediately executed 
+                       FiniteDuration delay = new 
FiniteDuration(runAsync.getDelay(), TimeUnit.MILLISECONDS);
+                       RunAsync message = new RunAsync(runAsync.getRunnable(), 
0);
+
+                       getContext().system().scheduler().scheduleOnce(delay, 
getSelf(), message,
+                                       getContext().dispatcher(), 
ActorRef.noSender());
+               }
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/f5614a4c/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
index fb95852..c18906c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
@@ -18,23 +18,39 @@
 
 package org.apache.flink.runtime.rpc.akka.messages;
 
-import org.apache.flink.util.Preconditions;
-
 import java.io.Serializable;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * Message for asynchronous runnable invocations
  */
 public final class RunAsync implements Serializable {
        private static final long serialVersionUID = -3080595100695371036L;
 
+       /** The runnable to be executed. Transient, so it gets lost upon 
serialization */ 
        private final transient Runnable runnable;
 
-       public RunAsync(Runnable runnable) {
-               this.runnable = Preconditions.checkNotNull(runnable);
+       /** The delay after which the runnable should be called */
+       private final long delay;
+
+       /**
+        * 
+        * @param runnable  The Runnable to run.
+        * @param delay     The delay in milliseconds. Zero indicates immediate 
execution.
+        */
+       public RunAsync(Runnable runnable, long delay) {
+               checkArgument(delay >= 0);
+               this.runnable = checkNotNull(runnable);
+               this.delay = delay;
        }
 
        public Runnable getRunnable() {
                return runnable;
        }
+
+       public long getDelay() {
+               return delay;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f5614a4c/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index a4e1d7f..5e37e10 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -21,6 +21,9 @@ package org.apache.flink.runtime.rpc.akka;
 import akka.actor.ActorSystem;
 import akka.util.Timeout;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
 import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager;

http://git-wip-us.apache.org/repos/asf/flink/blob/f5614a4c/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
new file mode 100644
index 0000000..f2ce52d
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.ActorSystem;
+import akka.util.Timeout;
+
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.junit.Assert.*;
+
+public class AsyncCallsTest {
+
+       // 
------------------------------------------------------------------------
+       //  shared test members
+       // 
------------------------------------------------------------------------
+
+       private static ActorSystem actorSystem = 
AkkaUtils.createDefaultActorSystem();
+
+       private static AkkaRpcService akkaRpcService = 
+                       new AkkaRpcService(actorSystem, new Timeout(10000, 
TimeUnit.MILLISECONDS));
+
+       @AfterClass
+       public static void shutdown() {
+               akkaRpcService.stopService();
+               actorSystem.shutdown();
+       }
+
+
+       // 
------------------------------------------------------------------------
+       //  tests
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testScheduleWithNoDelay() throws Exception {
+
+               // to collect all the thread references
+               final ReentrantLock lock = new ReentrantLock();
+               final AtomicBoolean concurrentAccess = new AtomicBoolean(false);
+
+               TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, 
lock);
+               TestGateway gateway = testEndpoint.getSelf();
+
+               // a bunch of gateway calls
+               gateway.someCall();
+               gateway.anotherCall();
+               gateway.someCall();
+
+               // run something asynchronously
+               for (int i = 0; i < 10000; i++) {
+                       testEndpoint.runAsync(new Runnable() {
+                               @Override
+                               public void run() {
+                                       boolean holdsLock = lock.tryLock();
+                                       if (holdsLock) {
+                                               lock.unlock();
+                                       } else {
+                                               concurrentAccess.set(true);
+                                       }
+                               }
+                       });
+               }
+       
+               Future<String> result = testEndpoint.callAsync(new 
Callable<String>() {
+                       @Override
+                       public String call() throws Exception {
+                               boolean holdsLock = lock.tryLock();
+                               if (holdsLock) {
+                                       lock.unlock();
+                               } else {
+                                       concurrentAccess.set(true);
+                               }
+                               return "test";
+                       }
+               }, new Timeout(30, TimeUnit.SECONDS));
+               String str = Await.result(result, new FiniteDuration(30, 
TimeUnit.SECONDS));
+               assertEquals("test", str);
+
+               // validate that no concurrent access happened
+               assertFalse("Rpc Endpoint had concurrent access", 
testEndpoint.hasConcurrentAccess());
+               assertFalse("Rpc Endpoint had concurrent access", 
concurrentAccess.get());
+
+               akkaRpcService.stopServer(testEndpoint.getSelf());
+       }
+
+       @Test
+       public void testScheduleWithDelay() throws Exception {
+
+               // to collect all the thread references
+               final ReentrantLock lock = new ReentrantLock();
+               final AtomicBoolean concurrentAccess = new AtomicBoolean(false);
+               final OneShotLatch latch = new OneShotLatch();
+
+               final long delay = 200;
+
+               TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, 
lock);
+
+               // run something asynchronously
+               testEndpoint.runAsync(new Runnable() {
+                       @Override
+                       public void run() {
+                               boolean holdsLock = lock.tryLock();
+                               if (holdsLock) {
+                                       lock.unlock();
+                               } else {
+                                       concurrentAccess.set(true);
+                               }
+                       }
+               });
+
+               final long start = System.nanoTime();
+
+               testEndpoint.scheduleRunAsync(new Runnable() {
+                       @Override
+                       public void run() {
+                               boolean holdsLock = lock.tryLock();
+                               if (holdsLock) {
+                                       lock.unlock();
+                               } else {
+                                       concurrentAccess.set(true);
+                               }
+                               latch.trigger();
+                       }
+               }, delay, TimeUnit.MILLISECONDS);
+
+               latch.await();
+               final long stop = System.nanoTime();
+
+               // validate that no concurrent access happened
+               assertFalse("Rpc Endpoint had concurrent access", 
testEndpoint.hasConcurrentAccess());
+               assertFalse("Rpc Endpoint had concurrent access", 
concurrentAccess.get());
+
+               assertTrue("call was not properly delayed", ((stop - start) / 
1000000) >= delay);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  test RPC endpoint
+       // 
------------------------------------------------------------------------
+       
+       interface TestGateway extends RpcGateway {
+
+               void someCall();
+
+               void anotherCall();
+       }
+
+       @SuppressWarnings("unused")
+       public static class TestEndpoint extends RpcEndpoint<TestGateway> {
+
+               private final ReentrantLock lock;
+
+               private volatile boolean concurrentAccess;
+
+               public TestEndpoint(RpcService rpcService, ReentrantLock lock) {
+                       super(rpcService);
+                       this.lock = lock;
+               }
+
+               @RpcMethod
+               public void someCall() {
+                       boolean holdsLock = lock.tryLock();
+                       if (holdsLock) {
+                               lock.unlock();
+                       } else {
+                               concurrentAccess = true;
+                       }
+               }
+
+               @RpcMethod
+               public void anotherCall() {
+                       boolean holdsLock = lock.tryLock();
+                       if (holdsLock) {
+                               lock.unlock();
+                       } else {
+                               concurrentAccess = true;
+                       }
+               }
+
+               public boolean hasConcurrentAccess() {
+                       return concurrentAccess;
+               }
+       }
+}

Reply via email to