[FLINK-8678] [flip6] Make RpcService shut down non blocking Changes the RpcService#stopService method to be non blocking. Instead of waiting until the RpcService has stopped, it returns the termination future which is completed once the RpcService has been completelyshut down.
This closes #5517. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c27e2a77 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c27e2a77 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c27e2a77 Branch: refs/heads/master Commit: c27e2a77005db355da9e72656af8b0df8b1dfe75 Parents: af3ea81 Author: Till Rohrmann <[email protected]> Authored: Fri Feb 16 18:46:42 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Fri Feb 23 18:22:06 2018 +0100 ---------------------------------------------------------------------- .../MesosResourceManagerTest.java | 2 +- .../runtime/entrypoint/ClusterEntrypoint.java | 2 +- .../flink/runtime/minicluster/MiniCluster.java | 4 +- .../apache/flink/runtime/rpc/RpcService.java | 7 +- .../org/apache/flink/runtime/rpc/RpcUtils.java | 17 ++- .../flink/runtime/rpc/akka/AkkaRpcService.java | 40 ++++--- .../runtime/taskexecutor/TaskManagerRunner.java | 10 +- .../clusterframework/ResourceManagerTest.java | 5 +- .../runtime/dispatcher/DispatcherTest.java | 4 +- .../runtime/dispatcher/MiniDispatcherTest.java | 6 +- .../jobmanager/scheduler/SchedulerTestBase.java | 2 +- .../jobmaster/slotpool/SlotPoolRpcTest.java | 4 +- .../slotpool/SlotPoolSchedulingTestBase.java | 4 +- .../jobmaster/slotpool/SlotPoolTest.java | 2 +- .../RegisteredRpcConnectionTest.java | 88 +++++++-------- .../registration/RetryingRegistrationTest.java | 108 +++++++++---------- .../resourcemanager/ResourceManagerHATest.java | 2 +- .../ResourceManagerJobMasterTest.java | 3 +- .../ResourceManagerTaskExecutorTest.java | 3 +- .../resourcemanager/ResourceManagerTest.java | 10 +- .../flink/runtime/rpc/AsyncCallsTest.java | 13 ++- .../runtime/rpc/FencedRpcEndpointTest.java | 3 +- .../flink/runtime/rpc/RpcConnectionTest.java | 25 ++++- .../flink/runtime/rpc/RpcEndpointTest.java | 23 ++-- .../flink/runtime/rpc/TestingRpcService.java | 12 ++- .../runtime/rpc/akka/AkkaRpcActorTest.java | 14 ++- .../runtime/rpc/akka/AkkaRpcServiceTest.java | 13 ++- .../rpc/akka/MainThreadValidationTest.java | 2 +- .../rpc/akka/MessageSerializationTest.java | 29 +++-- .../taskexecutor/TaskExecutorITCase.java | 3 +- .../runtime/taskexecutor/TaskExecutorTest.java | 2 +- .../retriever/impl/RpcGatewayRetrieverTest.java | 5 +- .../flink/yarn/YarnResourceManagerTest.java | 2 +- 33 files changed, 271 insertions(+), 198 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java index e23e0cb..2b38b85 100644 --- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java @@ -510,7 +510,7 @@ public class MesosResourceManagerTest extends TestLogger { @Override public void close() throws Exception { - rpcService.stopService(); + rpcService.stopService().get(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index ccb3ae4..f347d05 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -437,7 +437,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { if (commonRpcService != null) { try { - commonRpcService.stopService(); + commonRpcService.stopService().get(); } catch (Throwable t) { exception = ExceptionUtils.firstOrSuppressed(t, exception); } http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 5046ae7..3f019f4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -756,7 +756,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { private static Throwable shutDownRpc(RpcService rpcService, Throwable priorException) { if (rpcService != null) { try { - rpcService.stopService(); + rpcService.stopService().get(); } catch (Throwable t) { return ExceptionUtils.firstOrSuppressed(t, priorException); @@ -773,7 +773,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { for (RpcService service : rpcServices) { try { if (service != null) { - service.stopService(); + service.stopService().get(); } } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java index 089e4b0..9aa3119 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java @@ -116,9 +116,12 @@ public interface RpcService { void stopServer(RpcServer selfGateway); /** - * Stop the rpc service shutting down all started rpc servers. + * Trigger the asynchronous stopping of the {@link RpcService}. + * + * @return Future which is completed once the {@link RpcService} has been + * fully stopped. */ - void stopService(); + CompletableFuture<Void> stopService(); /** * Returns a future indicating when the RPC service has been shut down. http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java index f87d33c..c90a8b5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java @@ -50,7 +50,7 @@ public class RpcUtils { while (clazz != null) { for (Class<?> interfaze : clazz.getInterfaces()) { if (RpcGateway.class.isAssignableFrom(interfaze)) { - interfaces.add((Class<? extends RpcGateway>)interfaze); + interfaces.add((Class<? extends RpcGateway>) interfaze); } } @@ -65,7 +65,7 @@ public class RpcUtils { * * @param rpcEndpoint to terminate * @param timeout for this operation - * @throws ExecutionException if a problem occurs + * @throws ExecutionException if a problem occurred * @throws InterruptedException if the operation has been interrupted * @throws TimeoutException if a timeout occurred */ @@ -74,6 +74,19 @@ public class RpcUtils { rpcEndpoint.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); } + /** + * Shuts the given rpc service down and waits for its termination. + * + * @param rpcService to shut down + * @param timeout for this operation + * @throws InterruptedException if the operation has been interrupted + * @throws ExecutionException if a problem occurred + * @throws TimeoutException if a timeout occurred + */ + public static void terminateRpcService(RpcService rpcService, Time timeout) throws InterruptedException, ExecutionException, TimeoutException { + rpcService.stopService().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + } + // We don't want this class to be instantiable private RpcUtils() {} } http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index d2d4bf2..a65fe46 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -42,6 +42,7 @@ import akka.actor.Address; import akka.actor.Identify; import akka.actor.PoisonPill; import akka.actor.Props; +import akka.actor.Terminated; import akka.dispatch.Futures; import akka.dispatch.Mapper; import akka.pattern.Patterns; @@ -98,6 +99,8 @@ public class AkkaRpcService implements RpcService { private final ScheduledExecutor internalScheduledExecutor; + private final CompletableFuture<Void> terminationFuture; + private volatile boolean stopped; public AkkaRpcService(final ActorSystem actorSystem, final Time timeout) { @@ -127,6 +130,8 @@ public class AkkaRpcService implements RpcService { internalScheduledExecutor = new ActorSystemScheduledExecutorAdapter(actorSystem); + terminationFuture = new CompletableFuture<>(); + stopped = false; } @@ -311,33 +316,40 @@ public class AkkaRpcService implements RpcService { } @Override - public void stopService() { - LOG.info("Stopping Akka RPC service."); - + public CompletableFuture<Void> stopService() { synchronized (lock) { if (stopped) { - return; + return terminationFuture; } stopped = true; - } - actorSystem.shutdown(); - actorSystem.awaitTermination(); + LOG.info("Stopping Akka RPC service."); - synchronized (lock) { - actors.clear(); - } + final CompletableFuture<Terminated> actorSytemTerminationFuture = FutureUtils.toJava(actorSystem.terminate()); + + actorSytemTerminationFuture.whenComplete( + (Terminated ignored, Throwable throwable) -> { + synchronized (lock) { + actors.clear(); + } + + if (throwable != null) { + terminationFuture.completeExceptionally(throwable); + } else { + terminationFuture.complete(null); + } + + LOG.info("Stopped Akka RPC service."); + }); - LOG.info("Stopped Akka RPC service."); + return terminationFuture; } @Override public CompletableFuture<Void> getTerminationFuture() { - return CompletableFuture.runAsync( - actorSystem::awaitTermination, - getExecutor()); + return terminationFuture; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index 4cb1beb..4620585 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -173,7 +173,15 @@ public class TaskManagerRunner implements FatalErrorHandler { exception = ExceptionUtils.firstOrSuppressed(e, exception); } - rpcService.stopService(); + try { + rpcService.stopService().get(); + } catch (InterruptedException ie) { + exception = ExceptionUtils.firstOrSuppressed(ie, exception); + + Thread.currentThread().interrupt(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } try { highAvailabilityServices.close(); http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java index 096ba5e..241da8f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java @@ -53,6 +53,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; @@ -578,7 +579,7 @@ public class ResourceManagerTest extends TestLogger { verify(taskExecutorGateway, Mockito.timeout(timeout.toMilliseconds())).disconnectResourceManager(any(TimeoutException.class)); } finally { - rpcService.stopService(); + RpcUtils.terminateRpcService(rpcService, timeout); } } @@ -680,7 +681,7 @@ public class ResourceManagerTest extends TestLogger { verify(jobMasterGateway, Mockito.timeout(timeout.toMilliseconds())).disconnectResourceManager(eq(rmLeaderId), any(TimeoutException.class)); } finally { - rpcService.stopService(); + RpcUtils.terminateRpcService(rpcService, timeout); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index 46cd9e2..5d264e2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -134,9 +134,9 @@ public class DispatcherTest extends TestLogger { } @AfterClass - public static void teardownClass() { + public static void teardownClass() throws Exception { if (rpcService != null) { - rpcService.stopService(); + RpcUtils.terminateRpcService(rpcService, TIMEOUT); rpcService = null; } http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java index dfabc61..4291ef2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java @@ -61,6 +61,8 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; @@ -143,13 +145,13 @@ public class MiniDispatcherTest extends TestLogger { } @AfterClass - public static void teardownClass() throws IOException { + public static void teardownClass() throws IOException, InterruptedException, ExecutionException, TimeoutException { if (blobServer != null) { blobServer.close(); } if (rpcService != null) { - rpcService.stopService(); + RpcUtils.terminateRpcService(rpcService, timeout); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java index 949ff96..76a5642 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java @@ -117,7 +117,7 @@ public class SchedulerTestBase extends TestLogger { } if (rpcService != null) { - rpcService.stopService(); + rpcService.stopService().get(); rpcService = null; } } http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java index 172876d..b2be97e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java @@ -92,9 +92,9 @@ public class SlotPoolRpcTest extends TestLogger { } @AfterClass - public static void shutdown() { + public static void shutdown() throws InterruptedException, ExecutionException, TimeoutException { if (rpcService != null) { - rpcService.stopService(); + RpcUtils.terminateRpcService(rpcService, timeout); rpcService = null; } } http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java index 4cd7782..b9036c1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java @@ -64,9 +64,9 @@ public class SlotPoolSchedulingTestBase extends TestLogger { } @AfterClass - public static void teardown() { + public static void teardown() throws ExecutionException, InterruptedException { if (testingRpcService != null) { - testingRpcService.stopService(); + testingRpcService.stopService().get(); testingRpcService = null; } } http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java index e6446ad..d6e0521 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java @@ -104,7 +104,7 @@ public class SlotPoolTest extends TestLogger { @After public void tearDown() throws Exception { - rpcService.stopService(); + RpcUtils.terminateRpcService(rpcService, timeout); } @Test http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java index 650a0f2..25f976a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java @@ -23,6 +23,8 @@ import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.util.TestLogger; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.slf4j.LoggerFactory; @@ -46,6 +48,20 @@ import static org.mockito.Mockito.when; */ public class RegisteredRpcConnectionTest extends TestLogger { + private TestingRpcService rpcService; + + @Before + public void setup() { + rpcService = new TestingRpcService(); + } + + @After + public void tearDown() throws ExecutionException, InterruptedException { + if (rpcService != null) { + rpcService.stopService().get(); + } + } + @Test public void testSuccessfulRpcConnection() throws Exception { final String testRpcConnectionEndpointAddress = "<TestRpcConnectionEndpointAddress>"; @@ -54,7 +70,6 @@ public class RegisteredRpcConnectionTest extends TestLogger { // an endpoint that immediately returns success TestRegistrationGateway testGateway = new TestRegistrationGateway(new RetryingRegistrationTest.TestRegistrationSuccess(connectionID)); - TestingRpcService rpcService = new TestingRpcService(); try { rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway); @@ -74,7 +89,6 @@ public class RegisteredRpcConnectionTest extends TestLogger { } finally { testGateway.stop(); - rpcService.stopService(); } } @@ -84,37 +98,30 @@ public class RegisteredRpcConnectionTest extends TestLogger { final String testRpcConnectionEndpointAddress = "<TestRpcConnectionEndpointAddress>"; final UUID leaderId = UUID.randomUUID(); - TestingRpcService rpcService = new TestingRpcService(); + // gateway that upon calls Throw an exception + TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class); + final RuntimeException registrationException = new RuntimeException(connectionFailureMessage); + when(testGateway.registrationCall(any(UUID.class), anyLong())).thenThrow( + registrationException); - try { - // gateway that upon calls Throw an exception - TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class); - final RuntimeException registrationException = new RuntimeException(connectionFailureMessage); - when(testGateway.registrationCall(any(UUID.class), anyLong())).thenThrow( - registrationException); + rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway); - rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway); - - TestRpcConnection connection = new TestRpcConnection(testRpcConnectionEndpointAddress, leaderId, rpcService.getExecutor(), rpcService); - connection.start(); - - //wait for connection failure - try { - connection.getConnectionFuture().get(); - fail("expected failure."); - } catch (ExecutionException ee) { - assertEquals(registrationException, ee.getCause()); - } + TestRpcConnection connection = new TestRpcConnection(testRpcConnectionEndpointAddress, leaderId, rpcService.getExecutor(), rpcService); + connection.start(); - // validate correct invocation and result - assertFalse(connection.isConnected()); - assertEquals(testRpcConnectionEndpointAddress, connection.getTargetAddress()); - assertEquals(leaderId, connection.getTargetLeaderId()); - assertNull(connection.getTargetGateway()); - } - finally { - rpcService.stopService(); + //wait for connection failure + try { + connection.getConnectionFuture().get(); + fail("expected failure."); + } catch (ExecutionException ee) { + assertEquals(registrationException, ee.getCause()); } + + // validate correct invocation and result + assertFalse(connection.isConnected()); + assertEquals(testRpcConnectionEndpointAddress, connection.getTargetAddress()); + assertEquals(leaderId, connection.getTargetLeaderId()); + assertNull(connection.getTargetGateway()); } @Test @@ -124,7 +131,6 @@ public class RegisteredRpcConnectionTest extends TestLogger { final String connectionID = "Test RPC Connection ID"; TestRegistrationGateway testGateway = new TestRegistrationGateway(new RetryingRegistrationTest.TestRegistrationSuccess(connectionID)); - TestingRpcService rpcService = new TestingRpcService(); try { rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway); @@ -141,7 +147,6 @@ public class RegisteredRpcConnectionTest extends TestLogger { } finally { testGateway.stop(); - rpcService.stopService(); } } @@ -149,31 +154,26 @@ public class RegisteredRpcConnectionTest extends TestLogger { public void testReconnect() throws Exception { final String connectionId1 = "Test RPC Connection ID 1"; final String connectionId2 = "Test RPC Connection ID 2"; - final TestingRpcService rpcService = new TestingRpcService(); final String testRpcConnectionEndpointAddress = "<TestRpcConnectionEndpointAddress>"; final UUID leaderId = UUID.randomUUID(); final TestRegistrationGateway testGateway = new TestRegistrationGateway( new RetryingRegistrationTest.TestRegistrationSuccess(connectionId1), new RetryingRegistrationTest.TestRegistrationSuccess(connectionId2)); - try { - rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway); + rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway); - TestRpcConnection connection = new TestRpcConnection(testRpcConnectionEndpointAddress, leaderId, rpcService.getExecutor(), rpcService); - connection.start(); + TestRpcConnection connection = new TestRpcConnection(testRpcConnectionEndpointAddress, leaderId, rpcService.getExecutor(), rpcService); + connection.start(); - final String actualConnectionId1 = connection.getConnectionFuture().get(); + final String actualConnectionId1 = connection.getConnectionFuture().get(); - assertEquals(actualConnectionId1, connectionId1); + assertEquals(actualConnectionId1, connectionId1); - assertTrue(connection.tryReconnect()); + assertTrue(connection.tryReconnect()); - final String actualConnectionId2 = connection.getConnectionFuture().get(); + final String actualConnectionId2 = connection.getConnectionFuture().get(); - assertEquals(actualConnectionId2, connectionId2); - } finally { - rpcService.stopService(); - } + assertEquals(actualConnectionId2, connectionId2); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java index 885a7f5..ff5a748 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java @@ -25,6 +25,8 @@ import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.TestLogger; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.slf4j.LoggerFactory; @@ -53,6 +55,20 @@ import static org.mockito.Mockito.when; */ public class RetryingRegistrationTest extends TestLogger { + private TestingRpcService rpcService; + + @Before + public void setup() { + rpcService = new TestingRpcService(); + } + + @After + public void tearDown() throws ExecutionException, InterruptedException { + if (rpcService != null) { + rpcService.stopService().get(); + } + } + @Test public void testSimpleSuccessfulRegistration() throws Exception { final String testId = "laissez les bon temps roulez"; @@ -61,12 +77,11 @@ public class RetryingRegistrationTest extends TestLogger { // an endpoint that immediately returns success TestRegistrationGateway testGateway = new TestRegistrationGateway(new TestRegistrationSuccess(testId)); - TestingRpcService rpc = new TestingRpcService(); try { - rpc.registerGateway(testEndpointAddress, testGateway); + rpcService.registerGateway(testEndpointAddress, testGateway); - TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId); + TestRetryingRegistration registration = new TestRetryingRegistration(rpcService, testEndpointAddress, leaderId); registration.startRegistration(); CompletableFuture<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture(); @@ -84,7 +99,6 @@ public class RetryingRegistrationTest extends TestLogger { } finally { testGateway.stop(); - rpc.stopService(); } } @@ -173,14 +187,12 @@ public class RetryingRegistrationTest extends TestLogger { new TestRegistrationSuccess(testId) // success ); - TestingRpcService rpc = new TestingRpcService(); - try { - rpc.registerGateway(testEndpointAddress, testGateway); + rpcService.registerGateway(testEndpointAddress, testGateway); final long initialTimeout = 20L; TestRetryingRegistration registration = new TestRetryingRegistration( - rpc, + rpcService, testEndpointAddress, leaderId, initialTimeout, @@ -206,7 +218,6 @@ public class RetryingRegistrationTest extends TestLogger { assertTrue("retries did not properly back off", elapsedMillis >= 3 * initialTimeout); } finally { - rpc.stopService(); testGateway.stop(); } } @@ -217,8 +228,6 @@ public class RetryingRegistrationTest extends TestLogger { final String testEndpointAddress = "<test-address>"; final UUID leaderId = UUID.randomUUID(); - TestingRpcService rpc = new TestingRpcService(); - TestRegistrationGateway testGateway = new TestRegistrationGateway( null, // timeout new RegistrationResponse.Decline("no reason "), @@ -227,9 +236,9 @@ public class RetryingRegistrationTest extends TestLogger { ); try { - rpc.registerGateway(testEndpointAddress, testGateway); + rpcService.registerGateway(testEndpointAddress, testGateway); - TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId); + TestRetryingRegistration registration = new TestRetryingRegistration(rpcService, testEndpointAddress, leaderId); long started = System.nanoTime(); registration.startRegistration(); @@ -251,7 +260,6 @@ public class RetryingRegistrationTest extends TestLogger { } finally { testGateway.stop(); - rpc.stopService(); } } @@ -262,39 +270,32 @@ public class RetryingRegistrationTest extends TestLogger { final String testEndpointAddress = "<test-address>"; final UUID leaderId = UUID.randomUUID(); - TestingRpcService rpc = new TestingRpcService(); - - try { - // gateway that upon calls first responds with a failure, then with a success - TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class); + // gateway that upon calls first responds with a failure, then with a success + TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class); - when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn( - FutureUtils.completedExceptionally(new Exception("test exception")), - CompletableFuture.completedFuture(new TestRegistrationSuccess(testId))); + when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn( + FutureUtils.completedExceptionally(new Exception("test exception")), + CompletableFuture.completedFuture(new TestRegistrationSuccess(testId))); - rpc.registerGateway(testEndpointAddress, testGateway); + rpcService.registerGateway(testEndpointAddress, testGateway); - TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId); + TestRetryingRegistration registration = new TestRetryingRegistration(rpcService, testEndpointAddress, leaderId); - long started = System.nanoTime(); - registration.startRegistration(); + long started = System.nanoTime(); + registration.startRegistration(); - CompletableFuture<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture(); - Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success = - future.get(10, TimeUnit.SECONDS); + CompletableFuture<Tuple2<TestRegistrationGateway, TestRegistrationSuccess>> future = registration.getFuture(); + Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success = + future.get(10, TimeUnit.SECONDS); - long finished = System.nanoTime(); - long elapsedMillis = (finished - started) / 1000000; + long finished = System.nanoTime(); + long elapsedMillis = (finished - started) / 1000000; - assertEquals(testId, success.f1.getCorrelationId()); + assertEquals(testId, success.f1.getCorrelationId()); - // validate that some retry-delay / back-off behavior happened - assertTrue("retries did not properly back off", - elapsedMillis >= TestRetryingRegistration.DELAY_ON_ERROR); - } - finally { - rpc.stopService(); - } + // validate that some retry-delay / back-off behavior happened + assertTrue("retries did not properly back off", + elapsedMillis >= TestRetryingRegistration.DELAY_ON_ERROR); } @Test @@ -302,29 +303,22 @@ public class RetryingRegistrationTest extends TestLogger { final String testEndpointAddress = "my-test-address"; final UUID leaderId = UUID.randomUUID(); - TestingRpcService rpc = new TestingRpcService(); + CompletableFuture<RegistrationResponse> result = new CompletableFuture<>(); - try { - CompletableFuture<RegistrationResponse> result = new CompletableFuture<>(); - - TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class); - when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(result); + TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class); + when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(result); - rpc.registerGateway(testEndpointAddress, testGateway); + rpcService.registerGateway(testEndpointAddress, testGateway); - TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId); - registration.startRegistration(); + TestRetryingRegistration registration = new TestRetryingRegistration(rpcService, testEndpointAddress, leaderId); + registration.startRegistration(); - // cancel and fail the current registration attempt - registration.cancel(); - result.completeExceptionally(new TimeoutException()); + // cancel and fail the current registration attempt + registration.cancel(); + result.completeExceptionally(new TimeoutException()); - // there should not be a second registration attempt - verify(testGateway, atMost(1)).registrationCall(any(UUID.class), anyLong()); - } - finally { - rpc.stopService(); - } + // there should not be a second registration attempt + verify(testGateway, atMost(1)).registrationCall(any(UUID.class), anyLong()); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java index 57ce349..a3d17f2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java @@ -126,7 +126,7 @@ public class ResourceManagerHATest extends TestLogger { testingFatalErrorHandler.rethrowError(); } } finally { - rpcService.stopService(); + rpcService.stopService().get(); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java index 4440e38..acd8774 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.rpc.exceptions.FencingTokenException; @@ -73,7 +74,7 @@ public class ResourceManagerJobMasterTest extends TestLogger { @After public void teardown() throws Exception { - rpcService.stopService(); + RpcUtils.terminateRpcService(rpcService, timeout); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java index 2af6632..a0c4b43 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo; import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.rpc.exceptions.FencingTokenException; @@ -105,7 +106,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger { @After public void teardown() throws Exception { - rpcService.stopService(); + RpcUtils.terminateRpcService(rpcService, timeout); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java index 8f35b13..f5fa899 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java @@ -46,6 +46,7 @@ import org.junit.experimental.categories.Category; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; @Category(Flip6.class) public class ResourceManagerTest extends TestLogger { @@ -54,18 +55,13 @@ public class ResourceManagerTest extends TestLogger { @Before public void setUp() { - if (rpcService != null) { - rpcService.stopService(); - rpcService = null; - } - rpcService = new TestingRpcService(); } @After - public void tearDown() { + public void tearDown() throws ExecutionException, InterruptedException { if (rpcService != null) { - rpcService.stopService(); + rpcService.stopService().get(); rpcService = null; } } http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java index 9b72cb4..8ba0ccd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java @@ -23,6 +23,7 @@ import akka.actor.ActorSystem; import org.apache.flink.api.common.time.Time; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.runtime.rpc.exceptions.FencingTokenException; @@ -30,10 +31,12 @@ import org.apache.flink.testutils.category.Flip6; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; +import akka.actor.Terminated; import org.junit.AfterClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import java.util.Arrays; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -60,9 +63,13 @@ public class AsyncCallsTest extends TestLogger { new AkkaRpcService(actorSystem, Time.milliseconds(10000L)); @AfterClass - public static void shutdown() { - akkaRpcService.stopService(); - actorSystem.shutdown(); + public static void shutdown() throws InterruptedException, ExecutionException, TimeoutException { + final CompletableFuture<Void> rpcTerminationFuture = akkaRpcService.stopService(); + final CompletableFuture<Terminated> actorSystemTerminationFuture = FutureUtils.toJava(actorSystem.terminate()); + + FutureUtils + .waitForAll(Arrays.asList(rpcTerminationFuture, actorSystemTerminationFuture)) + .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); } http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java index b804051..3d99c3f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java @@ -60,8 +60,7 @@ public class FencedRpcEndpointTest extends TestLogger { @AfterClass public static void teardown() throws ExecutionException, InterruptedException, TimeoutException { if (rpcService != null) { - rpcService.stopService(); - rpcService.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + RpcUtils.terminateRpcService(rpcService, timeout); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java index 26630c8..017c1f5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java @@ -23,17 +23,21 @@ import akka.actor.ActorSystem; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.TestLogger; +import akka.actor.Terminated; import org.junit.Test; import org.junit.experimental.categories.Category; import scala.Option; import scala.Tuple2; +import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -46,10 +50,10 @@ import static org.junit.Assert.*; * connect to an RpcEndpoint. */ @Category(Flip6.class) -public class RpcConnectionTest { +public class RpcConnectionTest extends TestLogger { @Test - public void testConnectFailure() { + public void testConnectFailure() throws Exception { ActorSystem actorSystem = null; RpcService rpcService = null; try { @@ -77,12 +81,25 @@ public class RpcConnectionTest { fail("wrong exception: " + t); } finally { + final CompletableFuture<Void> rpcTerminationFuture; + if (rpcService != null) { - rpcService.stopService(); + rpcTerminationFuture = rpcService.stopService(); + } else { + rpcTerminationFuture = CompletableFuture.completedFuture(null); } + + final CompletableFuture<Terminated> actorSystemTerminationFuture; + if (actorSystem != null) { - actorSystem.shutdown(); + actorSystemTerminationFuture = FutureUtils.toJava(actorSystem.terminate()); + } else { + actorSystemTerminationFuture = CompletableFuture.completedFuture(null); } + + FutureUtils + .waitForAll(Arrays.asList(rpcTerminationFuture, actorSystemTerminationFuture)) + .get(); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java index 6d60de9..d52aadb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java @@ -20,21 +20,22 @@ package org.apache.flink.runtime.rpc; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.testutils.category.Flip6; import org.apache.flink.util.TestLogger; import akka.actor.ActorSystem; +import akka.actor.Terminated; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import scala.concurrent.duration.FiniteDuration; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -56,21 +57,13 @@ public class RpcEndpointTest extends TestLogger { @AfterClass public static void teardown() throws Exception { - if (rpcService != null) { - rpcService.stopService(); - } - - if (actorSystem != null) { - actorSystem.shutdown(); - } - if (rpcService != null) { - rpcService.getTerminationFuture().get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); - } + final CompletableFuture<Void> rpcTerminationFuture = rpcService.stopService(); + final CompletableFuture<Terminated> actorSystemTerminationFuture = FutureUtils.toJava(actorSystem.terminate()); - if (actorSystem != null) { - actorSystem.awaitTermination(new FiniteDuration(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS)); - } + FutureUtils + .waitForAll(Arrays.asList(rpcTerminationFuture, actorSystemTerminationFuture)) + .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java index 4b9f397..db70a0f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java @@ -74,9 +74,15 @@ public class TestingRpcService extends AkkaRpcService { // ------------------------------------------------------------------------ @Override - public void stopService() { - super.stopService(); - registeredConnections.clear(); + public CompletableFuture<Void> stopService() { + final CompletableFuture<Void> terminationFuture = super.stopService(); + + terminationFuture.whenComplete( + (Void ignored, Throwable throwable) -> { + registeredConnections.clear(); + }); + + return terminationFuture; } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java index 3ff1b80..1b45006 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java @@ -31,14 +31,17 @@ import org.apache.flink.testutils.category.Flip6; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; +import akka.actor.Terminated; import org.hamcrest.core.Is; import org.junit.AfterClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -61,10 +64,13 @@ public class AkkaRpcActorTest extends TestLogger { new AkkaRpcService(actorSystem, timeout); @AfterClass - public static void shutdown() { - akkaRpcService.stopService(); - actorSystem.shutdown(); - actorSystem.awaitTermination(); + public static void shutdown() throws InterruptedException, ExecutionException, TimeoutException { + final CompletableFuture<Void> rpcTerminationFuture = akkaRpcService.stopService(); + final CompletableFuture<Terminated> actorSystemTerminationFuture = FutureUtils.toJava(actorSystem.terminate()); + + FutureUtils + .waitForAll(Arrays.asList(rpcTerminationFuture, actorSystemTerminationFuture)) + .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java index c4259f4..d92e496 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java @@ -27,10 +27,12 @@ import org.apache.flink.testutils.category.Flip6; import org.apache.flink.util.TestLogger; import akka.actor.ActorSystem; +import akka.actor.Terminated; import org.junit.AfterClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import java.util.Arrays; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -59,10 +61,13 @@ public class AkkaRpcServiceTest extends TestLogger { new AkkaRpcService(actorSystem, timeout); @AfterClass - public static void shutdown() { - akkaRpcService.stopService(); - actorSystem.shutdown(); - actorSystem.awaitTermination(FutureUtils.toFiniteDuration(timeout)); + public static void shutdown() throws InterruptedException, ExecutionException, TimeoutException { + final CompletableFuture<Void> rpcTerminationFuture = akkaRpcService.stopService(); + final CompletableFuture<Terminated> actorSystemTerminationFuture = FutureUtils.toJava(actorSystem.terminate()); + + FutureUtils + .waitForAll(Arrays.asList(rpcTerminationFuture, actorSystemTerminationFuture)) + .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java index 8f35c0f..a69bd84 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java @@ -70,7 +70,7 @@ public class MainThreadValidationTest extends TestLogger { testEndpoint.shutDown(); } finally { - akkaRpcService.stopService(); + akkaRpcService.stopService().get(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java index bb46bec..061145c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java @@ -18,16 +18,18 @@ package org.apache.flink.runtime.rpc.akka; -import akka.actor.ActorSystem; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigValueFactory; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.testutils.category.Flip6; import org.apache.flink.util.TestLogger; + +import akka.actor.ActorSystem; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigValueFactory; import org.hamcrest.core.Is; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -35,8 +37,13 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; @@ -67,15 +74,17 @@ public class MessageSerializationTest extends TestLogger { } @AfterClass - public static void teardown() { - akkaRpcService1.stopService(); - akkaRpcService2.stopService(); + public static void teardown() throws InterruptedException, ExecutionException, TimeoutException { + final Collection<CompletableFuture<?>> terminationFutures = new ArrayList<>(4); - actorSystem1.shutdown(); - actorSystem2.shutdown(); + terminationFutures.add(akkaRpcService1.stopService()); + terminationFutures.add(FutureUtils.toJava(actorSystem1.terminate())); + terminationFutures.add(akkaRpcService2.stopService()); + terminationFutures.add(FutureUtils.toJava(actorSystem2.terminate())); - actorSystem1.awaitTermination(); - actorSystem2.awaitTermination(); + FutureUtils + .waitForAll(terminationFutures) + .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index dc1d09f..8f4ec5d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -48,6 +48,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; @@ -220,7 +221,7 @@ public class TaskExecutorITCase extends TestLogger { testingFatalErrorHandler.rethrowError(); } - rpcService.stopService(); + RpcUtils.terminateRpcService(rpcService, timeout); } http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index e894e48..d7a1860 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -189,7 +189,7 @@ public class TaskExecutorTest extends TestLogger { @After public void teardown() throws Exception { if (rpc != null) { - rpc.stopService(); + RpcUtils.terminateRpcService(rpc, timeout); rpc = null; } http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java index 847250c..ae530f7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.rpc.FencedRpcGateway; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcTimeout; +import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.util.TestLogger; @@ -58,9 +59,7 @@ public class RpcGatewayRetrieverTest extends TestLogger { @AfterClass public static void teardown() throws InterruptedException, ExecutionException, TimeoutException { if (rpcService != null) { - rpcService.stopService(); - rpcService.getTerminationFuture().get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); - + RpcUtils.terminateRpcService(rpcService, TIMEOUT); rpcService = null; } } http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java index 8743380..455abc9 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java @@ -308,7 +308,7 @@ public class YarnResourceManagerTest extends TestLogger { * Stop the Akka actor system. */ public void stopResourceManager() throws Exception { - rpcService.stopService(); + rpcService.stopService().get(); } }
