[FLINK-4528] [rpc] Marks main thread execution methods in RpcEndpoint as 
protected

Give main thread execution context into the 
TaskExecutorToResourceManagerConnection


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b779d19d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b779d19d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b779d19d

Branch: refs/heads/flip-6
Commit: b779d19de3416492afd3191ba5fc911d6b475919
Parents: 2f12ba3
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Mon Aug 29 15:49:59 2016 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Wed Sep 21 11:39:17 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/rpc/RpcEndpoint.java   |   8 +-
 .../runtime/taskexecutor/TaskExecutor.java      |   7 +-
 ...TaskExecutorToResourceManagerConnection.java |  26 ++-
 .../flink/runtime/rpc/AsyncCallsTest.java       | 216 ++++++++++++++++++
 .../flink/runtime/rpc/akka/AsyncCallsTest.java  | 219 -------------------
 5 files changed, 242 insertions(+), 234 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b779d19d/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 7b3f8a1..e9e2b2c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -161,7 +161,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
         *
         * @return Main thread execution context
         */
-       public ExecutionContext getMainThreadExecutionContext() {
+       protected ExecutionContext getMainThreadExecutionContext() {
                return mainThreadExecutionContext;
        }
 
@@ -184,7 +184,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
         *
         * @param runnable Runnable to be executed in the main thread of the 
underlying RPC endpoint
         */
-       public void runAsync(Runnable runnable) {
+       protected void runAsync(Runnable runnable) {
                ((MainThreadExecutor) self).runAsync(runnable);
        }
 
@@ -195,7 +195,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
         * @param runnable Runnable to be executed
         * @param delay    The delay after which the runnable will be executed
         */
-       public void scheduleRunAsync(Runnable runnable, long delay, TimeUnit 
unit) {
+       protected void scheduleRunAsync(Runnable runnable, long delay, TimeUnit 
unit) {
                ((MainThreadExecutor) self).scheduleRunAsync(runnable, 
unit.toMillis(delay));
        }
 
@@ -209,7 +209,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
         * @param <V> Return type of the callable
         * @return Future for the result of the callable.
         */
-       public <V> Future<V> callAsync(Callable<V> callable, Timeout timeout) {
+       protected <V> Future<V> callAsync(Callable<V> callable, Timeout 
timeout) {
                return ((MainThreadExecutor) self).callAsync(callable, timeout);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b779d19d/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 4871b96..735730b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -176,7 +176,12 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                if (newLeaderAddress != null) {
                        log.info("Attempting to register at ResourceManager 
{}", newLeaderAddress);
                        resourceManagerConnection =
-                               new 
TaskExecutorToResourceManagerConnection(log, this, newLeaderAddress, 
newLeaderId);
+                               new TaskExecutorToResourceManagerConnection(
+                                       log,
+                                       this,
+                                       newLeaderAddress,
+                                       newLeaderId,
+                                       getMainThreadExecutionContext());
                        resourceManagerConnection.start();
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/b779d19d/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
index 25332a0..28062b6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 
 import org.slf4j.Logger;
 
+import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -55,9 +56,12 @@ public class TaskExecutorToResourceManagerConnection {
 
        private final String resourceManagerAddress;
 
+       /** Execution context to be used to execute the on complete action of 
the ResourceManagerRegistration */
+       private final ExecutionContext executionContext;
+
        private 
TaskExecutorToResourceManagerConnection.ResourceManagerRegistration 
pendingRegistration;
 
-       private ResourceManagerGateway registeredResourceManager;
+       private volatile ResourceManagerGateway registeredResourceManager;
 
        private InstanceID registrationId;
 
@@ -66,15 +70,17 @@ public class TaskExecutorToResourceManagerConnection {
 
 
        public TaskExecutorToResourceManagerConnection(
-                       Logger log,
-                       TaskExecutor taskExecutor,
-                       String resourceManagerAddress,
-                       UUID resourceManagerLeaderId) {
+               Logger log,
+               TaskExecutor taskExecutor,
+               String resourceManagerAddress,
+               UUID resourceManagerLeaderId,
+               ExecutionContext executionContext) {
 
                this.log = checkNotNull(log);
                this.taskExecutor = checkNotNull(taskExecutor);
                this.resourceManagerAddress = 
checkNotNull(resourceManagerAddress);
                this.resourceManagerLeaderId = 
checkNotNull(resourceManagerLeaderId);
+               this.executionContext = checkNotNull(executionContext);
        }
 
        // 
------------------------------------------------------------------------
@@ -93,22 +99,22 @@ public class TaskExecutorToResourceManagerConnection {
                pendingRegistration.startRegistration();
 
                Future<Tuple2<ResourceManagerGateway, 
TaskExecutorRegistrationSuccess>> future = pendingRegistration.getFuture();
-               
+
                future.onSuccess(new OnSuccess<Tuple2<ResourceManagerGateway, 
TaskExecutorRegistrationSuccess>>() {
                        @Override
                        public void onSuccess(Tuple2<ResourceManagerGateway, 
TaskExecutorRegistrationSuccess> result) {
-                               registeredResourceManager = result.f0;
                                registrationId = result.f1.getRegistrationId();
+                               registeredResourceManager = result.f0;
                        }
-               }, taskExecutor.getMainThreadExecutionContext());
+               }, executionContext);
                
                // this future should only ever fail if there is a bug, not if 
the registration is declined
                future.onFailure(new OnFailure() {
                        @Override
                        public void onFailure(Throwable failure) {
-                               taskExecutor.onFatalError(failure);
+                               taskExecutor.onFatalErrorAsync(failure);
                        }
-               }, taskExecutor.getMainThreadExecutionContext());
+               }, executionContext);
        }
 
        public void close() {

http://git-wip-us.apache.org/repos/asf/flink/blob/b779d19d/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
new file mode 100644
index 0000000..1791056
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import akka.actor.ActorSystem;
+import akka.util.Timeout;
+
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.akka.AkkaUtils;
+
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.junit.Assert.*;
+
+public class AsyncCallsTest extends TestLogger {
+
+       // 
------------------------------------------------------------------------
+       //  shared test members
+       // 
------------------------------------------------------------------------
+
+       private static ActorSystem actorSystem = 
AkkaUtils.createDefaultActorSystem();
+
+       private static AkkaRpcService akkaRpcService =
+                       new AkkaRpcService(actorSystem, new Timeout(10000, 
TimeUnit.MILLISECONDS));
+
+       @AfterClass
+       public static void shutdown() {
+               akkaRpcService.stopService();
+               actorSystem.shutdown();
+       }
+
+
+       // 
------------------------------------------------------------------------
+       //  tests
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testScheduleWithNoDelay() throws Exception {
+
+               // to collect all the thread references
+               final ReentrantLock lock = new ReentrantLock();
+               final AtomicBoolean concurrentAccess = new AtomicBoolean(false);
+
+               TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, 
lock);
+               testEndpoint.start();
+               TestGateway gateway = testEndpoint.getSelf();
+
+               // a bunch of gateway calls
+               gateway.someCall();
+               gateway.anotherCall();
+               gateway.someCall();
+
+               // run something asynchronously
+               for (int i = 0; i < 10000; i++) {
+                       testEndpoint.runAsync(new Runnable() {
+                               @Override
+                               public void run() {
+                                       boolean holdsLock = lock.tryLock();
+                                       if (holdsLock) {
+                                               lock.unlock();
+                                       } else {
+                                               concurrentAccess.set(true);
+                                       }
+                               }
+                       });
+               }
+       
+               Future<String> result = testEndpoint.callAsync(new 
Callable<String>() {
+                       @Override
+                       public String call() throws Exception {
+                               boolean holdsLock = lock.tryLock();
+                               if (holdsLock) {
+                                       lock.unlock();
+                               } else {
+                                       concurrentAccess.set(true);
+                               }
+                               return "test";
+                       }
+               }, new Timeout(30, TimeUnit.SECONDS));
+               String str = Await.result(result, new FiniteDuration(30, 
TimeUnit.SECONDS));
+               assertEquals("test", str);
+
+               // validate that no concurrent access happened
+               assertFalse("Rpc Endpoint had concurrent access", 
testEndpoint.hasConcurrentAccess());
+               assertFalse("Rpc Endpoint had concurrent access", 
concurrentAccess.get());
+
+               akkaRpcService.stopServer(testEndpoint.getSelf());
+       }
+
+       @Test
+       public void testScheduleWithDelay() throws Exception {
+
+               // to collect all the thread references
+               final ReentrantLock lock = new ReentrantLock();
+               final AtomicBoolean concurrentAccess = new AtomicBoolean(false);
+               final OneShotLatch latch = new OneShotLatch();
+
+               final long delay = 200;
+
+               TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, 
lock);
+               testEndpoint.start();
+
+               // run something asynchronously
+               testEndpoint.runAsync(new Runnable() {
+                       @Override
+                       public void run() {
+                               boolean holdsLock = lock.tryLock();
+                               if (holdsLock) {
+                                       lock.unlock();
+                               } else {
+                                       concurrentAccess.set(true);
+                               }
+                       }
+               });
+
+               final long start = System.nanoTime();
+
+               testEndpoint.scheduleRunAsync(new Runnable() {
+                       @Override
+                       public void run() {
+                               boolean holdsLock = lock.tryLock();
+                               if (holdsLock) {
+                                       lock.unlock();
+                               } else {
+                                       concurrentAccess.set(true);
+                               }
+                               latch.trigger();
+                       }
+               }, delay, TimeUnit.MILLISECONDS);
+
+               latch.await();
+               final long stop = System.nanoTime();
+
+               // validate that no concurrent access happened
+               assertFalse("Rpc Endpoint had concurrent access", 
testEndpoint.hasConcurrentAccess());
+               assertFalse("Rpc Endpoint had concurrent access", 
concurrentAccess.get());
+
+               assertTrue("call was not properly delayed", ((stop - start) / 
1000000) >= delay);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  test RPC endpoint
+       // 
------------------------------------------------------------------------
+       
+       public interface TestGateway extends RpcGateway {
+
+               void someCall();
+
+               void anotherCall();
+       }
+
+       @SuppressWarnings("unused")
+       public static class TestEndpoint extends RpcEndpoint<TestGateway> {
+
+               private final ReentrantLock lock;
+
+               private volatile boolean concurrentAccess;
+
+               public TestEndpoint(RpcService rpcService, ReentrantLock lock) {
+                       super(rpcService);
+                       this.lock = lock;
+               }
+
+               @RpcMethod
+               public void someCall() {
+                       boolean holdsLock = lock.tryLock();
+                       if (holdsLock) {
+                               lock.unlock();
+                       } else {
+                               concurrentAccess = true;
+                       }
+               }
+
+               @RpcMethod
+               public void anotherCall() {
+                       boolean holdsLock = lock.tryLock();
+                       if (holdsLock) {
+                               lock.unlock();
+                       } else {
+                               concurrentAccess = true;
+                       }
+               }
+
+               public boolean hasConcurrentAccess() {
+                       return concurrentAccess;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b779d19d/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
deleted file mode 100644
index d33987c..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka;
-
-import akka.actor.ActorSystem;
-import akka.util.Timeout;
-
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcGateway;
-import org.apache.flink.runtime.rpc.RpcMethod;
-import org.apache.flink.runtime.rpc.RpcService;
-
-import org.apache.flink.util.TestLogger;
-import org.junit.AfterClass;
-import org.junit.Test;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static org.junit.Assert.*;
-
-public class AsyncCallsTest extends TestLogger {
-
-       // 
------------------------------------------------------------------------
-       //  shared test members
-       // 
------------------------------------------------------------------------
-
-       private static ActorSystem actorSystem = 
AkkaUtils.createDefaultActorSystem();
-
-       private static AkkaRpcService akkaRpcService = 
-                       new AkkaRpcService(actorSystem, new Timeout(10000, 
TimeUnit.MILLISECONDS));
-
-       @AfterClass
-       public static void shutdown() {
-               akkaRpcService.stopService();
-               actorSystem.shutdown();
-       }
-
-
-       // 
------------------------------------------------------------------------
-       //  tests
-       // 
------------------------------------------------------------------------
-
-       @Test
-       public void testScheduleWithNoDelay() throws Exception {
-
-               // to collect all the thread references
-               final ReentrantLock lock = new ReentrantLock();
-               final AtomicBoolean concurrentAccess = new AtomicBoolean(false);
-
-               TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, 
lock);
-               testEndpoint.start();
-               TestGateway gateway = testEndpoint.getSelf();
-
-               // a bunch of gateway calls
-               gateway.someCall();
-               gateway.anotherCall();
-               gateway.someCall();
-
-               // run something asynchronously
-               for (int i = 0; i < 10000; i++) {
-                       testEndpoint.runAsync(new Runnable() {
-                               @Override
-                               public void run() {
-                                       boolean holdsLock = lock.tryLock();
-                                       if (holdsLock) {
-                                               lock.unlock();
-                                       } else {
-                                               concurrentAccess.set(true);
-                                       }
-                               }
-                       });
-               }
-       
-               Future<String> result = testEndpoint.callAsync(new 
Callable<String>() {
-                       @Override
-                       public String call() throws Exception {
-                               boolean holdsLock = lock.tryLock();
-                               if (holdsLock) {
-                                       lock.unlock();
-                               } else {
-                                       concurrentAccess.set(true);
-                               }
-                               return "test";
-                       }
-               }, new Timeout(30, TimeUnit.SECONDS));
-               String str = Await.result(result, new FiniteDuration(30, 
TimeUnit.SECONDS));
-               assertEquals("test", str);
-
-               // validate that no concurrent access happened
-               assertFalse("Rpc Endpoint had concurrent access", 
testEndpoint.hasConcurrentAccess());
-               assertFalse("Rpc Endpoint had concurrent access", 
concurrentAccess.get());
-
-               akkaRpcService.stopServer(testEndpoint.getSelf());
-       }
-
-       @Test
-       public void testScheduleWithDelay() throws Exception {
-
-               // to collect all the thread references
-               final ReentrantLock lock = new ReentrantLock();
-               final AtomicBoolean concurrentAccess = new AtomicBoolean(false);
-               final OneShotLatch latch = new OneShotLatch();
-
-               final long delay = 200;
-
-               TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, 
lock);
-               testEndpoint.start();
-
-               // run something asynchronously
-               testEndpoint.runAsync(new Runnable() {
-                       @Override
-                       public void run() {
-                               boolean holdsLock = lock.tryLock();
-                               if (holdsLock) {
-                                       lock.unlock();
-                               } else {
-                                       concurrentAccess.set(true);
-                               }
-                       }
-               });
-
-               final long start = System.nanoTime();
-
-               testEndpoint.scheduleRunAsync(new Runnable() {
-                       @Override
-                       public void run() {
-                               boolean holdsLock = lock.tryLock();
-                               if (holdsLock) {
-                                       lock.unlock();
-                               } else {
-                                       concurrentAccess.set(true);
-                               }
-                               latch.trigger();
-                       }
-               }, delay, TimeUnit.MILLISECONDS);
-
-               latch.await();
-               final long stop = System.nanoTime();
-
-               // validate that no concurrent access happened
-               assertFalse("Rpc Endpoint had concurrent access", 
testEndpoint.hasConcurrentAccess());
-               assertFalse("Rpc Endpoint had concurrent access", 
concurrentAccess.get());
-
-               assertTrue("call was not properly delayed", ((stop - start) / 
1000000) >= delay);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  test RPC endpoint
-       // 
------------------------------------------------------------------------
-       
-       interface TestGateway extends RpcGateway {
-
-               void someCall();
-
-               void anotherCall();
-       }
-
-       @SuppressWarnings("unused")
-       public static class TestEndpoint extends RpcEndpoint<TestGateway> {
-
-               private final ReentrantLock lock;
-
-               private volatile boolean concurrentAccess;
-
-               public TestEndpoint(RpcService rpcService, ReentrantLock lock) {
-                       super(rpcService);
-                       this.lock = lock;
-               }
-
-               @RpcMethod
-               public void someCall() {
-                       boolean holdsLock = lock.tryLock();
-                       if (holdsLock) {
-                               lock.unlock();
-                       } else {
-                               concurrentAccess = true;
-                       }
-               }
-
-               @RpcMethod
-               public void anotherCall() {
-                       boolean holdsLock = lock.tryLock();
-                       if (holdsLock) {
-                               lock.unlock();
-                       } else {
-                               concurrentAccess = true;
-                       }
-               }
-
-               public boolean hasConcurrentAccess() {
-                       return concurrentAccess;
-               }
-       }
-}

Reply via email to