[FLINK-4434] [rpc] Add a testing RPC service. This closes #2394.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/728f2661 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/728f2661 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/728f2661 Branch: refs/heads/flip-6 Commit: 728f2661c1b5e24dba74c9061fc57431e58e4ede Parents: 2452014 Author: Stephan Ewen <se...@apache.org> Authored: Fri Aug 19 23:29:45 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Thu Aug 25 20:21:04 2016 +0200 ---------------------------------------------------------------------- .../flink/runtime/rpc/RpcCompletenessTest.java | 3 + .../flink/runtime/rpc/TestingGatewayBase.java | 85 ++++++++++++++ .../flink/runtime/rpc/TestingRpcService.java | 115 +++++++++++++++++++ 3 files changed, 203 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/728f2661/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java index 97cf0cb..b8aad62 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java @@ -41,9 +41,11 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class RpcCompletenessTest extends TestLogger { + private static final Class<?> futureClass = Future.class; @Test + @SuppressWarnings({"rawtypes", "unchecked"}) public void testRpcCompleteness() { Reflections reflections = new Reflections("org.apache.flink"); @@ -64,6 +66,7 @@ public class RpcCompletenessTest extends TestLogger { } } + @SuppressWarnings("rawtypes") private void checkCompleteness(Class<? extends RpcEndpoint> rpcEndpoint, Class<? extends RpcGateway> rpcGateway) { Method[] gatewayMethods = rpcGateway.getDeclaredMethods(); Method[] serverMethods = rpcEndpoint.getDeclaredMethods(); http://git-wip-us.apache.org/repos/asf/flink/blob/728f2661/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java new file mode 100644 index 0000000..4256135 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc; + +import akka.dispatch.Futures; +import scala.concurrent.Future; +import scala.concurrent.Promise; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Utility base class for testing gateways + */ +public abstract class TestingGatewayBase implements RpcGateway { + + private final ScheduledExecutorService executor; + + protected TestingGatewayBase() { + this.executor = Executors.newSingleThreadScheduledExecutor(); + } + + // ------------------------------------------------------------------------ + // shutdown + // ------------------------------------------------------------------------ + + public void stop() { + executor.shutdownNow(); + } + + @Override + protected void finalize() throws Throwable { + super.finalize(); + executor.shutdownNow(); + } + + // ------------------------------------------------------------------------ + // utilities + // ------------------------------------------------------------------------ + + public <T> Future<T> futureWithTimeout(long timeoutMillis) { + Promise<T> promise = Futures.<T>promise(); + executor.schedule(new FutureTimeout(promise), timeoutMillis, TimeUnit.MILLISECONDS); + return promise.future(); + } + + // ------------------------------------------------------------------------ + + private static final class FutureTimeout implements Runnable { + + private final Promise<?> promise; + + private FutureTimeout(Promise<?> promise) { + this.promise = promise; + } + + @Override + public void run() { + try { + promise.failure(new TimeoutException()); + } catch (Throwable t) { + System.err.println("CAUGHT AN ERROR IN THE TEST: " + t.getMessage()); + t.printStackTrace(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/728f2661/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java new file mode 100644 index 0000000..7e92e8d --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc; + +import akka.dispatch.Futures; +import akka.util.Timeout; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; + +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * An RPC Service implementation for testing. This RPC service acts as a replacement for + * teh regular RPC service for cases where tests need to return prepared mock gateways instead of + * proper RPC gateways. + * + * <p>The TestingRpcService can be used for example in the following fashion, + * using <i>Mockito</i> for mocks and verification: + * + * <pre>{@code + * TestingRpcService rpc = new TestingRpcService(); + * + * ResourceManagerGateway testGateway = mock(ResourceManagerGateway.class); + * rpc.registerGateway("myAddress", testGateway); + * + * MyComponentToTest component = new MyComponentToTest(); + * component.triggerSomethingThatCallsTheGateway(); + * + * verify(testGateway, timeout(1000)).theTestMethod(any(UUID.class), anyString()); + * }</pre> + */ +public class TestingRpcService extends AkkaRpcService { + + /** Map of pre-registered connections */ + private final ConcurrentHashMap<String, RpcGateway> registeredConnections; + + /** + * Creates a new {@code TestingRpcService}. + */ + public TestingRpcService() { + this(new Configuration()); + } + + /** + * Creates a new {@code TestingRpcService}, using the given configuration. + */ + public TestingRpcService(Configuration configuration) { + super(AkkaUtils.createLocalActorSystem(configuration), new Timeout(new FiniteDuration(10, TimeUnit.SECONDS))); + + this.registeredConnections = new ConcurrentHashMap<>(); + } + + // ------------------------------------------------------------------------ + + @Override + public void stopService() { + super.stopService(); + registeredConnections.clear(); + } + + // ------------------------------------------------------------------------ + // connections + // ------------------------------------------------------------------------ + + public void registerGateway(String address, RpcGateway gateway) { + checkNotNull(address); + checkNotNull(gateway); + + if (registeredConnections.putIfAbsent(address, gateway) != null) { + throw new IllegalStateException("a gateway is already registered under " + address); + } + } + + @Override + public <C extends RpcGateway> Future<C> connect(String address, Class<C> clazz) { + RpcGateway gateway = registeredConnections.get(address); + + if (gateway != null) { + if (clazz.isAssignableFrom(gateway.getClass())) { + @SuppressWarnings("unchecked") + C typedGateway = (C) gateway; + return Futures.successful(typedGateway); + } else { + return Futures.failed( + new Exception("Gateway registered under " + address + " is not of type " + clazz)); + } + } else { + return Futures.failed(new Exception("No gateway registered under that name")); + } + } +} \ No newline at end of file