[FLINK-4434] [rpc] Add a testing RPC service.

This closes #2394.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e6b0f12c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e6b0f12c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e6b0f12c

Branch: refs/heads/flip-6
Commit: e6b0f12cc79d7164be98c4d7c2c6482d0c8f0937
Parents: d9baa58
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Aug 19 23:29:45 2016 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Wed Sep 21 11:39:15 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/rpc/RpcCompletenessTest.java  |   3 +
 .../flink/runtime/rpc/TestingGatewayBase.java   |  85 ++++++++++++++
 .../flink/runtime/rpc/TestingRpcService.java    | 115 +++++++++++++++++++
 3 files changed, 203 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e6b0f12c/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
index 97cf0cb..b8aad62 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -41,9 +41,11 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class RpcCompletenessTest extends TestLogger {
+
        private static final Class<?> futureClass = Future.class;
 
        @Test
+       @SuppressWarnings({"rawtypes", "unchecked"})
        public void testRpcCompleteness() {
                Reflections reflections = new Reflections("org.apache.flink");
 
@@ -64,6 +66,7 @@ public class RpcCompletenessTest extends TestLogger {
                }
        }
 
+       @SuppressWarnings("rawtypes")
        private void checkCompleteness(Class<? extends RpcEndpoint> 
rpcEndpoint, Class<? extends RpcGateway> rpcGateway) {
                Method[] gatewayMethods = rpcGateway.getDeclaredMethods();
                Method[] serverMethods = rpcEndpoint.getDeclaredMethods();

http://git-wip-us.apache.org/repos/asf/flink/blob/e6b0f12c/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
new file mode 100644
index 0000000..4256135
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import akka.dispatch.Futures;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Utility base class for testing gateways
+ */
+public abstract class TestingGatewayBase implements RpcGateway {
+
+       private final ScheduledExecutorService executor;
+
+       protected TestingGatewayBase() {
+               this.executor = Executors.newSingleThreadScheduledExecutor();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  shutdown
+       // 
------------------------------------------------------------------------
+
+       public void stop() {
+               executor.shutdownNow();
+       }
+
+       @Override
+       protected void finalize() throws Throwable {
+               super.finalize();
+               executor.shutdownNow();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  utilities
+       // 
------------------------------------------------------------------------
+
+       public <T> Future<T> futureWithTimeout(long timeoutMillis) {
+               Promise<T> promise = Futures.<T>promise();
+               executor.schedule(new FutureTimeout(promise), timeoutMillis, 
TimeUnit.MILLISECONDS);
+               return promise.future();
+       }
+
+       // 
------------------------------------------------------------------------
+       
+       private static final class FutureTimeout implements Runnable {
+
+               private final Promise<?> promise;
+
+               private FutureTimeout(Promise<?> promise) {
+                       this.promise = promise;
+               }
+
+               @Override
+               public void run() {
+                       try {
+                               promise.failure(new TimeoutException());
+                       } catch (Throwable t) {
+                               System.err.println("CAUGHT AN ERROR IN THE 
TEST: " + t.getMessage());
+                               t.printStackTrace();
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e6b0f12c/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
new file mode 100644
index 0000000..7e92e8d
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import akka.dispatch.Futures;
+import akka.util.Timeout;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An RPC Service implementation for testing. This RPC service acts as a 
replacement for
+ * teh regular RPC service for cases where tests need to return prepared mock 
gateways instead of
+ * proper RPC gateways.
+ * 
+ * <p>The TestingRpcService can be used for example in the following fashion,
+ * using <i>Mockito</i> for mocks and verification:
+ * 
+ * <pre>{@code
+ * TestingRpcService rpc = new TestingRpcService();
+ *
+ * ResourceManagerGateway testGateway = mock(ResourceManagerGateway.class);
+ * rpc.registerGateway("myAddress", testGateway);
+ * 
+ * MyComponentToTest component = new MyComponentToTest();
+ * component.triggerSomethingThatCallsTheGateway();
+ * 
+ * verify(testGateway, timeout(1000)).theTestMethod(any(UUID.class), 
anyString());
+ * }</pre>
+ */
+public class TestingRpcService extends AkkaRpcService {
+
+       /** Map of pre-registered connections */
+       private final ConcurrentHashMap<String, RpcGateway> 
registeredConnections;
+
+       /**
+        * Creates a new {@code TestingRpcService}. 
+        */
+       public TestingRpcService() {
+               this(new Configuration());
+       }
+
+       /**
+        * Creates a new {@code TestingRpcService}, using the given 
configuration. 
+        */
+       public TestingRpcService(Configuration configuration) {
+               super(AkkaUtils.createLocalActorSystem(configuration), new 
Timeout(new FiniteDuration(10, TimeUnit.SECONDS)));
+
+               this.registeredConnections = new ConcurrentHashMap<>();
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void stopService() {
+               super.stopService();
+               registeredConnections.clear();
+       }
+
+       // 
------------------------------------------------------------------------
+       // connections
+       // 
------------------------------------------------------------------------
+
+       public void registerGateway(String address, RpcGateway gateway) {
+               checkNotNull(address);
+               checkNotNull(gateway);
+
+               if (registeredConnections.putIfAbsent(address, gateway) != 
null) {
+                       throw new IllegalStateException("a gateway is already 
registered under " + address);
+               }
+       }
+
+       @Override
+       public <C extends RpcGateway> Future<C> connect(String address, 
Class<C> clazz) {
+               RpcGateway gateway = registeredConnections.get(address);
+
+               if (gateway != null) {
+                       if (clazz.isAssignableFrom(gateway.getClass())) {
+                               @SuppressWarnings("unchecked")
+                               C typedGateway = (C) gateway;
+                               return Futures.successful(typedGateway);
+                       } else {
+                               return Futures.failed(
+                                               new Exception("Gateway 
registered under " + address + " is not of type " + clazz));
+                       }
+               } else {
+                       return Futures.failed(new Exception("No gateway 
registered under that name"));
+               }
+       }
+}
\ No newline at end of file

Reply via email to