[FLINK-4386] [rpc] Add a utility to verify calls happen in the Rpc Endpoint's 
main thread


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4ca049b5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4ca049b5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4ca049b5

Branch: refs/heads/flip-6
Commit: 4ca049b5c5d7c543a2e0ec40534cd08a13cd113a
Parents: 86f21bf
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Aug 11 20:30:54 2016 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Wed Sep 21 11:39:13 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/rpc/MainThreadExecutor.java   |  2 +-
 .../runtime/rpc/MainThreadValidatorUtil.java    | 47 ++++++++++
 .../apache/flink/runtime/rpc/RpcEndpoint.java   | 38 +++++++-
 .../flink/runtime/rpc/akka/AkkaRpcActor.java    | 37 +++++---
 .../flink/runtime/rpc/akka/AkkaRpcService.java  |  2 +-
 .../rpc/akka/MainThreadValidationTest.java      | 97 ++++++++++++++++++++
 6 files changed, 205 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4ca049b5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
index 4efb382..5e4fead 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
@@ -30,7 +30,7 @@ import java.util.concurrent.TimeoutException;
  *
  * <p>This interface is intended to be implemented by the self gateway in a 
{@link RpcEndpoint}
  * implementation which allows to dispatch local procedures to the main thread 
of the underlying
- * rpc server.
+ * RPC endpoint.
  */
 public interface MainThreadExecutor {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4ca049b5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadValidatorUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadValidatorUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadValidatorUtil.java
new file mode 100644
index 0000000..b3fea77
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadValidatorUtil.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This utility exists to bridge between the visibility of the
+ * {@code currentMainThread} field in the {@link RpcEndpoint}.
+ * 
+ * The {@code currentMainThread} can be hidden from {@code RpcEndpoint} 
implementations
+ * and only be accessed via this utility from other packages.
+ */
+public final class MainThreadValidatorUtil {
+
+       private final RpcEndpoint<?> endpoint;
+
+       public MainThreadValidatorUtil(RpcEndpoint<?> endpoint) {
+               this.endpoint = checkNotNull(endpoint);
+       }
+
+       public void enterMainThread() {
+               assert(endpoint.currentMainThread.compareAndSet(null, 
Thread.currentThread())) : 
+                               "The RpcEndpoint has concurrent access from " + 
endpoint.currentMainThread.get();
+       }
+       
+       public void exitMainThread() {
+               
assert(endpoint.currentMainThread.compareAndSet(Thread.currentThread(), null)) :
+                               "The RpcEndpoint has concurrent access from " + 
endpoint.currentMainThread.get();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4ca049b5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 44933d5..d36a283 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -29,6 +29,7 @@ import scala.concurrent.Future;
 
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -75,6 +76,9 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
         * of the executing rpc server. */
        private final MainThreadExecutionContext mainThreadExecutionContext;
 
+       /** A reference to the endpoint's main thread, if the current method is 
called by the main thread */
+       final AtomicReference<Thread> currentMainThread = new 
AtomicReference<>(null); 
+
        /**
         * Initializes the RPC endpoint.
         * 
@@ -92,6 +96,15 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
                this.mainThreadExecutionContext = new 
MainThreadExecutionContext((MainThreadExecutor) self);
        }
 
+       /**
+        * Returns the class of the self gateway type.
+        *
+        * @return Class of the self gateway type
+        */
+       public final Class<C> getSelfGatewayType() {
+               return selfGatewayType;
+       }
+       
        // 
------------------------------------------------------------------------
        //  Shutdown
        // 
------------------------------------------------------------------------
@@ -193,13 +206,28 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
                return ((MainThreadExecutor) self).callAsync(callable, timeout);
        }
 
+       // 
------------------------------------------------------------------------
+       //  Main Thread Validation
+       // 
------------------------------------------------------------------------
+
        /**
-        * Returns the class of the self gateway type.
-        *
-        * @return Class of the self gateway type
+        * Validates that the method call happens in the RPC endpoint's main 
thread.
+        * 
+        * <p><b>IMPORTANT:</b> This check only happens when assertions are 
enabled,
+        * such as when running tests.
+        * 
+        * <p>This can be used for additional checks, like
+        * <pre>{@code
+        * protected void concurrencyCriticalMethod() {
+        *     validateRunsInMainThread();
+        *     
+        *     // some critical stuff
+        * }
+        * }</pre>
         */
-       public final Class<C> getSelfGatewayType() {
-               return selfGatewayType;
+       public void validateRunsInMainThread() {
+               // because the initialization is lazy, it can be that certain 
methods are
+               assert currentMainThread.get() == Thread.currentThread();
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/4ca049b5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 18ccf1b..5e0a7da 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -22,14 +22,16 @@ import akka.actor.ActorRef;
 import akka.actor.Status;
 import akka.actor.UntypedActor;
 import akka.pattern.Patterns;
+import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.akka.messages.CallAsync;
 import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
 import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
-import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -37,6 +39,8 @@ import java.lang.reflect.Method;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Akka rpc actor which receives {@link RpcInvocation}, {@link RunAsync} and 
{@link CallAsync}
  * messages.
@@ -51,24 +55,35 @@ import java.util.concurrent.TimeUnit;
  * @param <T> Type of the {@link RpcEndpoint}
  */
 class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends 
UntypedActor {
+       
        private static final Logger LOG = 
LoggerFactory.getLogger(AkkaRpcActor.class);
 
+       /** the endpoint to invoke the methods on */
        private final T rpcEndpoint;
 
+       /** the helper that tracks whether calls come from the main thread */
+       private final MainThreadValidatorUtil mainThreadValidator;
+
        AkkaRpcActor(final T rpcEndpoint) {
-               this.rpcEndpoint = Preconditions.checkNotNull(rpcEndpoint, "rpc 
endpoint");
+               this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint");
+               this.mainThreadValidator = new 
MainThreadValidatorUtil(rpcEndpoint);
        }
 
        @Override
-       public void onReceive(final Object message)  {
-               if (message instanceof RunAsync) {
-                       handleRunAsync((RunAsync) message);
-               } else if (message instanceof CallAsync) {
-                       handleCallAsync((CallAsync) message);
-               } else if (message instanceof RpcInvocation) {
-                       handleRpcInvocation((RpcInvocation) message);
-               } else {
-                       LOG.warn("Received message of unknown type {}. Dropping 
this message!", message.getClass());
+       public void onReceive(final Object message) {
+               mainThreadValidator.enterMainThread();
+               try {
+                       if (message instanceof RunAsync) {
+                               handleRunAsync((RunAsync) message);
+                       } else if (message instanceof CallAsync) {
+                               handleCallAsync((CallAsync) message);
+                       } else if (message instanceof RpcInvocation) {
+                               handleRpcInvocation((RpcInvocation) message);
+                       } else {
+                               LOG.warn("Received message of unknown type {}. 
Dropping this message!", message.getClass());
+                       }
+               } finally {
+                       mainThreadValidator.exitMainThread();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4ca049b5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 448216c..db40f10 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -174,7 +174,7 @@ public class AkkaRpcService implements RpcService {
        }
 
        @Override
-       public <C extends RpcGateway> String getAddress(C selfGateway) {
+       public String getAddress(RpcGateway selfGateway) {
                checkState(!stopped, "RpcService is stopped");
 
                if (selfGateway instanceof AkkaGateway) {

http://git-wip-us.apache.org/repos/asf/flink/blob/4ca049b5/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
new file mode 100644
index 0000000..b854143
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.util.Timeout;
+
+import org.apache.flink.runtime.akka.AkkaUtils;
+
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+
+public class MainThreadValidationTest {
+
+       @Test
+       public void failIfNotInMainThread() {
+               // test if assertions are activated. The test only works if 
assertions are loaded.
+               try {
+                       assert false;
+                       // apparently they are not activated
+                       return;
+               } catch (AssertionError ignored) {}
+
+               // actual test
+               AkkaRpcService akkaRpcService = new AkkaRpcService(
+                               AkkaUtils.createDefaultActorSystem(),
+                               new Timeout(10000, TimeUnit.MILLISECONDS));
+
+               try {
+                       TestEndpoint testEndpoint = new 
TestEndpoint(akkaRpcService);
+
+                       // this works, because it is executed as an RPC call
+                       
testEndpoint.getSelf().someConcurrencyCriticalFunction();
+
+                       // this fails, because it is executed directly
+                       boolean exceptionThrown;
+                       try {
+                               testEndpoint.someConcurrencyCriticalFunction();
+                               exceptionThrown = false;
+                       }
+                       catch (AssertionError e) {
+                               exceptionThrown = true;
+                       }
+                       assertTrue("should fail with an assertion error", 
exceptionThrown);
+
+                       akkaRpcService.stopServer(testEndpoint.getSelf());
+               }
+               finally {
+                       akkaRpcService.stopService();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  test RPC endpoint
+       // 
------------------------------------------------------------------------
+
+       interface TestGateway extends RpcGateway {
+
+               void someConcurrencyCriticalFunction();
+       }
+
+       @SuppressWarnings("unused")
+       public static class TestEndpoint extends RpcEndpoint<TestGateway> {
+
+               public TestEndpoint(RpcService rpcService) {
+                       super(rpcService);
+               }
+
+               @RpcMethod
+               public void someConcurrencyCriticalFunction() {
+                       validateRunsInMainThread();
+               }
+       }
+}

Reply via email to