[FLINK-8678] [flip6] Make RpcService shut down non blocking

Changes the RpcService#stopService method to be non blocking. Instead
of waiting until the RpcService has stopped, it returns the termination
future which is completed once the RpcService has been completelyshut
down.

This closes #5517.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c27e2a77
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c27e2a77
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c27e2a77

Branch: refs/heads/master
Commit: c27e2a77005db355da9e72656af8b0df8b1dfe75
Parents: af3ea81
Author: Till Rohrmann <[email protected]>
Authored: Fri Feb 16 18:46:42 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Fri Feb 23 18:22:06 2018 +0100

----------------------------------------------------------------------
 .../MesosResourceManagerTest.java               |   2 +-
 .../runtime/entrypoint/ClusterEntrypoint.java   |   2 +-
 .../flink/runtime/minicluster/MiniCluster.java  |   4 +-
 .../apache/flink/runtime/rpc/RpcService.java    |   7 +-
 .../org/apache/flink/runtime/rpc/RpcUtils.java  |  17 ++-
 .../flink/runtime/rpc/akka/AkkaRpcService.java  |  40 ++++---
 .../runtime/taskexecutor/TaskManagerRunner.java |  10 +-
 .../clusterframework/ResourceManagerTest.java   |   5 +-
 .../runtime/dispatcher/DispatcherTest.java      |   4 +-
 .../runtime/dispatcher/MiniDispatcherTest.java  |   6 +-
 .../jobmanager/scheduler/SchedulerTestBase.java |   2 +-
 .../jobmaster/slotpool/SlotPoolRpcTest.java     |   4 +-
 .../slotpool/SlotPoolSchedulingTestBase.java    |   4 +-
 .../jobmaster/slotpool/SlotPoolTest.java        |   2 +-
 .../RegisteredRpcConnectionTest.java            |  88 +++++++--------
 .../registration/RetryingRegistrationTest.java  | 108 +++++++++----------
 .../resourcemanager/ResourceManagerHATest.java  |   2 +-
 .../ResourceManagerJobMasterTest.java           |   3 +-
 .../ResourceManagerTaskExecutorTest.java        |   3 +-
 .../resourcemanager/ResourceManagerTest.java    |  10 +-
 .../flink/runtime/rpc/AsyncCallsTest.java       |  13 ++-
 .../runtime/rpc/FencedRpcEndpointTest.java      |   3 +-
 .../flink/runtime/rpc/RpcConnectionTest.java    |  25 ++++-
 .../flink/runtime/rpc/RpcEndpointTest.java      |  23 ++--
 .../flink/runtime/rpc/TestingRpcService.java    |  12 ++-
 .../runtime/rpc/akka/AkkaRpcActorTest.java      |  14 ++-
 .../runtime/rpc/akka/AkkaRpcServiceTest.java    |  13 ++-
 .../rpc/akka/MainThreadValidationTest.java      |   2 +-
 .../rpc/akka/MessageSerializationTest.java      |  29 +++--
 .../taskexecutor/TaskExecutorITCase.java        |   3 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  |   2 +-
 .../retriever/impl/RpcGatewayRetrieverTest.java |   5 +-
 .../flink/yarn/YarnResourceManagerTest.java     |   2 +-
 33 files changed, 271 insertions(+), 198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index e23e0cb..2b38b85 100644
--- 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -510,7 +510,7 @@ public class MesosResourceManagerTest extends TestLogger {
 
                @Override
                public void close() throws Exception {
-                       rpcService.stopService();
+                       rpcService.stopService().get();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index ccb3ae4..f347d05 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -437,7 +437,7 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
 
                        if (commonRpcService != null) {
                                try {
-                                       commonRpcService.stopService();
+                                       commonRpcService.stopService().get();
                                } catch (Throwable t) {
                                        exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 5046ae7..3f019f4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -756,7 +756,7 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
        private static Throwable shutDownRpc(RpcService rpcService, Throwable 
priorException) {
                if (rpcService != null) {
                        try {
-                               rpcService.stopService();
+                               rpcService.stopService().get();
                        }
                        catch (Throwable t) {
                                return ExceptionUtils.firstOrSuppressed(t, 
priorException);
@@ -773,7 +773,7 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                        for (RpcService service : rpcServices) {
                                try {
                                        if (service != null) {
-                                               service.stopService();
+                                               service.stopService().get();
                                        }
                                }
                                catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index 089e4b0..9aa3119 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -116,9 +116,12 @@ public interface RpcService {
        void stopServer(RpcServer selfGateway);
 
        /**
-        * Stop the rpc service shutting down all started rpc servers.
+        * Trigger the asynchronous stopping of the {@link RpcService}.
+        *
+        * @return Future which is completed once the {@link RpcService} has 
been
+        * fully stopped.
         */
-       void stopService();
+       CompletableFuture<Void> stopService();
 
        /**
         * Returns a future indicating when the RPC service has been shut down.

http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java
index f87d33c..c90a8b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java
@@ -50,7 +50,7 @@ public class RpcUtils {
                while (clazz != null) {
                        for (Class<?> interfaze : clazz.getInterfaces()) {
                                if 
(RpcGateway.class.isAssignableFrom(interfaze)) {
-                                       interfaces.add((Class<? extends 
RpcGateway>)interfaze);
+                                       interfaces.add((Class<? extends 
RpcGateway>) interfaze);
                                }
                        }
 
@@ -65,7 +65,7 @@ public class RpcUtils {
         *
         * @param rpcEndpoint to terminate
         * @param timeout for this operation
-        * @throws ExecutionException if a problem occurs
+        * @throws ExecutionException if a problem occurred
         * @throws InterruptedException if the operation has been interrupted
         * @throws TimeoutException if a timeout occurred
         */
@@ -74,6 +74,19 @@ public class RpcUtils {
                
rpcEndpoint.getTerminationFuture().get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
        }
 
+       /**
+        * Shuts the given rpc service down and waits for its termination.
+        *
+        * @param rpcService to shut down
+        * @param timeout for this operation
+        * @throws InterruptedException if the operation has been interrupted
+        * @throws ExecutionException if a problem occurred
+        * @throws TimeoutException if a timeout occurred
+        */
+       public static void terminateRpcService(RpcService rpcService, Time 
timeout) throws InterruptedException, ExecutionException, TimeoutException {
+               rpcService.stopService().get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+       }
+
        // We don't want this class to be instantiable
        private RpcUtils() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index d2d4bf2..a65fe46 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -42,6 +42,7 @@ import akka.actor.Address;
 import akka.actor.Identify;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.actor.Terminated;
 import akka.dispatch.Futures;
 import akka.dispatch.Mapper;
 import akka.pattern.Patterns;
@@ -98,6 +99,8 @@ public class AkkaRpcService implements RpcService {
 
        private final ScheduledExecutor internalScheduledExecutor;
 
+       private final CompletableFuture<Void> terminationFuture;
+
        private volatile boolean stopped;
 
        public AkkaRpcService(final ActorSystem actorSystem, final Time 
timeout) {
@@ -127,6 +130,8 @@ public class AkkaRpcService implements RpcService {
 
                internalScheduledExecutor = new 
ActorSystemScheduledExecutorAdapter(actorSystem);
 
+               terminationFuture = new CompletableFuture<>();
+
                stopped = false;
        }
 
@@ -311,33 +316,40 @@ public class AkkaRpcService implements RpcService {
        }
 
        @Override
-       public void stopService() {
-               LOG.info("Stopping Akka RPC service.");
-
+       public CompletableFuture<Void> stopService() {
                synchronized (lock) {
                        if (stopped) {
-                               return;
+                               return terminationFuture;
                        }
 
                        stopped = true;
-
                }
 
-               actorSystem.shutdown();
-               actorSystem.awaitTermination();
+               LOG.info("Stopping Akka RPC service.");
 
-               synchronized (lock) {
-                       actors.clear();
-               }
+               final CompletableFuture<Terminated> actorSytemTerminationFuture 
= FutureUtils.toJava(actorSystem.terminate());
+
+               actorSytemTerminationFuture.whenComplete(
+                       (Terminated ignored, Throwable throwable) -> {
+                               synchronized (lock) {
+                                       actors.clear();
+                               }
+
+                               if (throwable != null) {
+                                       
terminationFuture.completeExceptionally(throwable);
+                               } else {
+                                       terminationFuture.complete(null);
+                               }
+
+                               LOG.info("Stopped Akka RPC service.");
+                       });
 
-               LOG.info("Stopped Akka RPC service.");
+               return terminationFuture;
        }
 
        @Override
        public CompletableFuture<Void> getTerminationFuture() {
-               return CompletableFuture.runAsync(
-                       actorSystem::awaitTermination,
-                       getExecutor());
+               return terminationFuture;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 4cb1beb..4620585 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -173,7 +173,15 @@ public class TaskManagerRunner implements 
FatalErrorHandler {
                                exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
                        }
 
-                       rpcService.stopService();
+                       try {
+                               rpcService.stopService().get();
+                       } catch (InterruptedException ie) {
+                               exception = 
ExceptionUtils.firstOrSuppressed(ie, exception);
+
+                               Thread.currentThread().interrupt();
+                       } catch (Exception e) {
+                               exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+                       }
 
                        try {
                                highAvailabilityServices.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index 096ba5e..241da8f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -53,6 +53,7 @@ import 
org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
@@ -578,7 +579,7 @@ public class ResourceManagerTest extends TestLogger {
                        verify(taskExecutorGateway, 
Mockito.timeout(timeout.toMilliseconds())).disconnectResourceManager(any(TimeoutException.class));
 
                } finally {
-                       rpcService.stopService();
+                       RpcUtils.terminateRpcService(rpcService, timeout);
                }
        }
 
@@ -680,7 +681,7 @@ public class ResourceManagerTest extends TestLogger {
                        verify(jobMasterGateway, 
Mockito.timeout(timeout.toMilliseconds())).disconnectResourceManager(eq(rmLeaderId),
 any(TimeoutException.class));
 
                } finally {
-                       rpcService.stopService();
+                       RpcUtils.terminateRpcService(rpcService, timeout);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 46cd9e2..5d264e2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -134,9 +134,9 @@ public class DispatcherTest extends TestLogger {
        }
 
        @AfterClass
-       public static void teardownClass() {
+       public static void teardownClass() throws Exception {
                if (rpcService != null) {
-                       rpcService.stopService();
+                       RpcUtils.terminateRpcService(rpcService, TIMEOUT);
 
                        rpcService = null;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index dfabc61..4291ef2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -61,6 +61,8 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
@@ -143,13 +145,13 @@ public class MiniDispatcherTest extends TestLogger {
        }
 
        @AfterClass
-       public static void teardownClass() throws IOException {
+       public static void teardownClass() throws IOException, 
InterruptedException, ExecutionException, TimeoutException {
                if (blobServer != null) {
                        blobServer.close();
                }
 
                if (rpcService != null) {
-                       rpcService.stopService();
+                       RpcUtils.terminateRpcService(rpcService, timeout);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
index 949ff96..76a5642 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
@@ -117,7 +117,7 @@ public class SchedulerTestBase extends TestLogger {
                }
 
                if (rpcService != null) {
-                       rpcService.stopService();
+                       rpcService.stopService().get();
                        rpcService = null;
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
index 172876d..b2be97e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
@@ -92,9 +92,9 @@ public class SlotPoolRpcTest extends TestLogger {
        }
 
        @AfterClass
-       public static  void shutdown() {
+       public static  void shutdown() throws InterruptedException, 
ExecutionException, TimeoutException {
                if (rpcService != null) {
-                       rpcService.stopService();
+                       RpcUtils.terminateRpcService(rpcService, timeout);
                        rpcService = null;
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
index 4cd7782..b9036c1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
@@ -64,9 +64,9 @@ public class SlotPoolSchedulingTestBase extends TestLogger {
        }
 
        @AfterClass
-       public static void teardown() {
+       public static void teardown() throws ExecutionException, 
InterruptedException {
                if (testingRpcService != null) {
-                       testingRpcService.stopService();
+                       testingRpcService.stopService().get();
                        testingRpcService = null;
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
index e6446ad..d6e0521 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
@@ -104,7 +104,7 @@ public class SlotPoolTest extends TestLogger {
 
        @After
        public void tearDown() throws Exception {
-               rpcService.stopService();
+               RpcUtils.terminateRpcService(rpcService, timeout);
        }
 
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
index 650a0f2..25f976a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
@@ -23,6 +23,8 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.LoggerFactory;
 
@@ -46,6 +48,20 @@ import static org.mockito.Mockito.when;
  */
 public class RegisteredRpcConnectionTest extends TestLogger {
 
+       private TestingRpcService rpcService;
+
+       @Before
+       public void setup() {
+               rpcService = new TestingRpcService();
+       }
+
+       @After
+       public void tearDown() throws ExecutionException, InterruptedException {
+               if (rpcService != null) {
+                       rpcService.stopService().get();
+               }
+       }
+
        @Test
        public void testSuccessfulRpcConnection() throws Exception {
                final String testRpcConnectionEndpointAddress = 
"<TestRpcConnectionEndpointAddress>";
@@ -54,7 +70,6 @@ public class RegisteredRpcConnectionTest extends TestLogger {
 
                // an endpoint that immediately returns success
                TestRegistrationGateway testGateway = new 
TestRegistrationGateway(new 
RetryingRegistrationTest.TestRegistrationSuccess(connectionID));
-               TestingRpcService rpcService = new TestingRpcService();
 
                try {
                        
rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway);
@@ -74,7 +89,6 @@ public class RegisteredRpcConnectionTest extends TestLogger {
                }
                finally {
                        testGateway.stop();
-                       rpcService.stopService();
                }
        }
 
@@ -84,37 +98,30 @@ public class RegisteredRpcConnectionTest extends TestLogger 
{
                final String testRpcConnectionEndpointAddress = 
"<TestRpcConnectionEndpointAddress>";
                final UUID leaderId = UUID.randomUUID();
 
-               TestingRpcService rpcService = new TestingRpcService();
+               // gateway that upon calls Throw an exception
+               TestRegistrationGateway testGateway = 
mock(TestRegistrationGateway.class);
+               final RuntimeException registrationException = new 
RuntimeException(connectionFailureMessage);
+               when(testGateway.registrationCall(any(UUID.class), 
anyLong())).thenThrow(
+                       registrationException);
 
-               try {
-                       // gateway that upon calls Throw an exception
-                       TestRegistrationGateway testGateway = 
mock(TestRegistrationGateway.class);
-                       final RuntimeException registrationException = new 
RuntimeException(connectionFailureMessage);
-                       when(testGateway.registrationCall(any(UUID.class), 
anyLong())).thenThrow(
-                               registrationException);
+               rpcService.registerGateway(testRpcConnectionEndpointAddress, 
testGateway);
 
-                       
rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway);
-
-                       TestRpcConnection connection = new 
TestRpcConnection(testRpcConnectionEndpointAddress, leaderId, 
rpcService.getExecutor(), rpcService);
-                       connection.start();
-
-                       //wait for connection failure
-                       try {
-                               connection.getConnectionFuture().get();
-                               fail("expected failure.");
-                       } catch (ExecutionException ee) {
-                               assertEquals(registrationException, 
ee.getCause());
-                       }
+               TestRpcConnection connection = new 
TestRpcConnection(testRpcConnectionEndpointAddress, leaderId, 
rpcService.getExecutor(), rpcService);
+               connection.start();
 
-                       // validate correct invocation and result
-                       assertFalse(connection.isConnected());
-                       assertEquals(testRpcConnectionEndpointAddress, 
connection.getTargetAddress());
-                       assertEquals(leaderId, connection.getTargetLeaderId());
-                       assertNull(connection.getTargetGateway());
-               }
-               finally {
-                       rpcService.stopService();
+               //wait for connection failure
+               try {
+                       connection.getConnectionFuture().get();
+                       fail("expected failure.");
+               } catch (ExecutionException ee) {
+                       assertEquals(registrationException, ee.getCause());
                }
+
+               // validate correct invocation and result
+               assertFalse(connection.isConnected());
+               assertEquals(testRpcConnectionEndpointAddress, 
connection.getTargetAddress());
+               assertEquals(leaderId, connection.getTargetLeaderId());
+               assertNull(connection.getTargetGateway());
        }
 
        @Test
@@ -124,7 +131,6 @@ public class RegisteredRpcConnectionTest extends TestLogger 
{
                final String connectionID = "Test RPC Connection ID";
 
                TestRegistrationGateway testGateway = new 
TestRegistrationGateway(new 
RetryingRegistrationTest.TestRegistrationSuccess(connectionID));
-               TestingRpcService rpcService = new TestingRpcService();
 
                try {
                        
rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway);
@@ -141,7 +147,6 @@ public class RegisteredRpcConnectionTest extends TestLogger 
{
                }
                finally {
                        testGateway.stop();
-                       rpcService.stopService();
                }
        }
 
@@ -149,31 +154,26 @@ public class RegisteredRpcConnectionTest extends 
TestLogger {
        public void testReconnect() throws Exception {
                final String connectionId1 = "Test RPC Connection ID 1";
                final String connectionId2 = "Test RPC Connection ID 2";
-               final TestingRpcService rpcService = new TestingRpcService();
                final String testRpcConnectionEndpointAddress = 
"<TestRpcConnectionEndpointAddress>";
                final UUID leaderId = UUID.randomUUID();
                final TestRegistrationGateway testGateway = new 
TestRegistrationGateway(
                        new 
RetryingRegistrationTest.TestRegistrationSuccess(connectionId1),
                        new 
RetryingRegistrationTest.TestRegistrationSuccess(connectionId2));
 
-               try {
-                       
rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway);
+               rpcService.registerGateway(testRpcConnectionEndpointAddress, 
testGateway);
 
-                       TestRpcConnection connection = new 
TestRpcConnection(testRpcConnectionEndpointAddress, leaderId, 
rpcService.getExecutor(), rpcService);
-                       connection.start();
+               TestRpcConnection connection = new 
TestRpcConnection(testRpcConnectionEndpointAddress, leaderId, 
rpcService.getExecutor(), rpcService);
+               connection.start();
 
-                       final String actualConnectionId1 = 
connection.getConnectionFuture().get();
+               final String actualConnectionId1 = 
connection.getConnectionFuture().get();
 
-                       assertEquals(actualConnectionId1, connectionId1);
+               assertEquals(actualConnectionId1, connectionId1);
 
-                       assertTrue(connection.tryReconnect());
+               assertTrue(connection.tryReconnect());
 
-                       final String actualConnectionId2 = 
connection.getConnectionFuture().get();
+               final String actualConnectionId2 = 
connection.getConnectionFuture().get();
 
-                       assertEquals(actualConnectionId2, connectionId2);
-               } finally {
-                       rpcService.stopService();
-               }
+               assertEquals(actualConnectionId2, connectionId2);
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
index 885a7f5..ff5a748 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
@@ -25,6 +25,8 @@ import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.slf4j.LoggerFactory;
@@ -53,6 +55,20 @@ import static org.mockito.Mockito.when;
  */
 public class RetryingRegistrationTest extends TestLogger {
 
+       private TestingRpcService rpcService;
+
+       @Before
+       public void setup() {
+               rpcService = new TestingRpcService();
+       }
+
+       @After
+       public void tearDown() throws ExecutionException, InterruptedException {
+               if (rpcService != null) {
+                       rpcService.stopService().get();
+               }
+       }
+
        @Test
        public void testSimpleSuccessfulRegistration() throws Exception {
                final String testId = "laissez les bon temps roulez";
@@ -61,12 +77,11 @@ public class RetryingRegistrationTest extends TestLogger {
 
                // an endpoint that immediately returns success
                TestRegistrationGateway testGateway = new 
TestRegistrationGateway(new TestRegistrationSuccess(testId));
-               TestingRpcService rpc = new TestingRpcService();
 
                try {
-                       rpc.registerGateway(testEndpointAddress, testGateway);
+                       rpcService.registerGateway(testEndpointAddress, 
testGateway);
 
-                       TestRetryingRegistration registration = new 
TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
+                       TestRetryingRegistration registration = new 
TestRetryingRegistration(rpcService, testEndpointAddress, leaderId);
                        registration.startRegistration();
 
                        CompletableFuture<Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess>> future = registration.getFuture();
@@ -84,7 +99,6 @@ public class RetryingRegistrationTest extends TestLogger {
                }
                finally {
                        testGateway.stop();
-                       rpc.stopService();
                }
        }
 
@@ -173,14 +187,12 @@ public class RetryingRegistrationTest extends TestLogger {
                                new TestRegistrationSuccess(testId) // success
                );
 
-               TestingRpcService rpc = new TestingRpcService();
-
                try {
-                       rpc.registerGateway(testEndpointAddress, testGateway);
+                       rpcService.registerGateway(testEndpointAddress, 
testGateway);
 
                        final long initialTimeout = 20L;
                        TestRetryingRegistration registration = new 
TestRetryingRegistration(
-                               rpc,
+                               rpcService,
                                testEndpointAddress,
                                leaderId,
                                initialTimeout,
@@ -206,7 +218,6 @@ public class RetryingRegistrationTest extends TestLogger {
                        assertTrue("retries did not properly back off", 
elapsedMillis >= 3 * initialTimeout);
                }
                finally {
-                       rpc.stopService();
                        testGateway.stop();
                }
        }
@@ -217,8 +228,6 @@ public class RetryingRegistrationTest extends TestLogger {
                final String testEndpointAddress = "<test-address>";
                final UUID leaderId = UUID.randomUUID();
 
-               TestingRpcService rpc = new TestingRpcService();
-
                TestRegistrationGateway testGateway = new 
TestRegistrationGateway(
                                null, // timeout
                                new RegistrationResponse.Decline("no reason "),
@@ -227,9 +236,9 @@ public class RetryingRegistrationTest extends TestLogger {
                );
 
                try {
-                       rpc.registerGateway(testEndpointAddress, testGateway);
+                       rpcService.registerGateway(testEndpointAddress, 
testGateway);
 
-                       TestRetryingRegistration registration = new 
TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
+                       TestRetryingRegistration registration = new 
TestRetryingRegistration(rpcService, testEndpointAddress, leaderId);
 
                        long started = System.nanoTime();
                        registration.startRegistration();
@@ -251,7 +260,6 @@ public class RetryingRegistrationTest extends TestLogger {
                }
                finally {
                        testGateway.stop();
-                       rpc.stopService();
                }
        }
 
@@ -262,39 +270,32 @@ public class RetryingRegistrationTest extends TestLogger {
                final String testEndpointAddress = "<test-address>";
                final UUID leaderId = UUID.randomUUID();
 
-               TestingRpcService rpc = new TestingRpcService();
-
-               try {
-                       // gateway that upon calls first responds with a 
failure, then with a success
-                       TestRegistrationGateway testGateway = 
mock(TestRegistrationGateway.class);
+               // gateway that upon calls first responds with a failure, then 
with a success
+               TestRegistrationGateway testGateway = 
mock(TestRegistrationGateway.class);
 
-                       when(testGateway.registrationCall(any(UUID.class), 
anyLong())).thenReturn(
-                                       FutureUtils.completedExceptionally(new 
Exception("test exception")),
-                                       CompletableFuture.completedFuture(new 
TestRegistrationSuccess(testId)));
+               when(testGateway.registrationCall(any(UUID.class), 
anyLong())).thenReturn(
+                               FutureUtils.completedExceptionally(new 
Exception("test exception")),
+                               CompletableFuture.completedFuture(new 
TestRegistrationSuccess(testId)));
 
-                       rpc.registerGateway(testEndpointAddress, testGateway);
+               rpcService.registerGateway(testEndpointAddress, testGateway);
 
-                       TestRetryingRegistration registration = new 
TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
+               TestRetryingRegistration registration = new 
TestRetryingRegistration(rpcService, testEndpointAddress, leaderId);
 
-                       long started = System.nanoTime();
-                       registration.startRegistration();
+               long started = System.nanoTime();
+               registration.startRegistration();
 
-                       CompletableFuture<Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess>> future = registration.getFuture();
-                       Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess> success =
-                                       future.get(10, TimeUnit.SECONDS);
+               CompletableFuture<Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess>> future = registration.getFuture();
+               Tuple2<TestRegistrationGateway, TestRegistrationSuccess> 
success =
+                               future.get(10, TimeUnit.SECONDS);
 
-                       long finished = System.nanoTime();
-                       long elapsedMillis = (finished - started) / 1000000;
+               long finished = System.nanoTime();
+               long elapsedMillis = (finished - started) / 1000000;
 
-                       assertEquals(testId, success.f1.getCorrelationId());
+               assertEquals(testId, success.f1.getCorrelationId());
 
-                       // validate that some retry-delay / back-off behavior 
happened
-                       assertTrue("retries did not properly back off",
-                                       elapsedMillis >= 
TestRetryingRegistration.DELAY_ON_ERROR);
-               }
-               finally {
-                       rpc.stopService();
-               }
+               // validate that some retry-delay / back-off behavior happened
+               assertTrue("retries did not properly back off",
+                               elapsedMillis >= 
TestRetryingRegistration.DELAY_ON_ERROR);
        }
 
        @Test
@@ -302,29 +303,22 @@ public class RetryingRegistrationTest extends TestLogger {
                final String testEndpointAddress = "my-test-address";
                final UUID leaderId = UUID.randomUUID();
 
-               TestingRpcService rpc = new TestingRpcService();
+               CompletableFuture<RegistrationResponse> result = new 
CompletableFuture<>();
 
-               try {
-                       CompletableFuture<RegistrationResponse> result = new 
CompletableFuture<>();
-
-                       TestRegistrationGateway testGateway = 
mock(TestRegistrationGateway.class);
-                       when(testGateway.registrationCall(any(UUID.class), 
anyLong())).thenReturn(result);
+               TestRegistrationGateway testGateway = 
mock(TestRegistrationGateway.class);
+               when(testGateway.registrationCall(any(UUID.class), 
anyLong())).thenReturn(result);
 
-                       rpc.registerGateway(testEndpointAddress, testGateway);
+               rpcService.registerGateway(testEndpointAddress, testGateway);
 
-                       TestRetryingRegistration registration = new 
TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
-                       registration.startRegistration();
+               TestRetryingRegistration registration = new 
TestRetryingRegistration(rpcService, testEndpointAddress, leaderId);
+               registration.startRegistration();
 
-                       // cancel and fail the current registration attempt
-                       registration.cancel();
-                       result.completeExceptionally(new TimeoutException());
+               // cancel and fail the current registration attempt
+               registration.cancel();
+               result.completeExceptionally(new TimeoutException());
 
-                       // there should not be a second registration attempt
-                       verify(testGateway, 
atMost(1)).registrationCall(any(UUID.class), anyLong());
-               }
-               finally {
-                       rpc.stopService();
-               }
+               // there should not be a second registration attempt
+               verify(testGateway, 
atMost(1)).registrationCall(any(UUID.class), anyLong());
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index 57ce349..a3d17f2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -126,7 +126,7 @@ public class ResourceManagerHATest extends TestLogger {
                                testingFatalErrorHandler.rethrowError();
                        }
                } finally {
-                       rpcService.stopService();
+                       rpcService.stopService().get();
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 4440e38..acd8774 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
@@ -73,7 +74,7 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 
        @After
        public void teardown() throws Exception {
-               rpcService.stopService();
+               RpcUtils.terminateRpcService(rpcService, timeout);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 2af6632..a0c4b43 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
@@ -105,7 +106,7 @@ public class ResourceManagerTaskExecutorTest extends 
TestLogger {
 
        @After
        public void teardown() throws Exception {
-               rpcService.stopService();
+               RpcUtils.terminateRpcService(rpcService, timeout);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index 8f35b13..f5fa899 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -46,6 +46,7 @@ import org.junit.experimental.categories.Category;
 
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
 @Category(Flip6.class)
 public class ResourceManagerTest extends TestLogger {
@@ -54,18 +55,13 @@ public class ResourceManagerTest extends TestLogger {
 
        @Before
        public void setUp() {
-               if (rpcService != null) {
-                       rpcService.stopService();
-                       rpcService = null;
-               }
-
                rpcService = new TestingRpcService();
        }
 
        @After
-       public void tearDown() {
+       public void tearDown() throws ExecutionException, InterruptedException {
                if (rpcService != null) {
-                       rpcService.stopService();
+                       rpcService.stopService().get();
                        rpcService = null;
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
index 9b72cb4..8ba0ccd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
@@ -23,6 +23,7 @@ import akka.actor.ActorSystem;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
@@ -30,10 +31,12 @@ import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
+import akka.actor.Terminated;
 import org.junit.AfterClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.util.Arrays;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -60,9 +63,13 @@ public class AsyncCallsTest extends TestLogger {
                        new AkkaRpcService(actorSystem, 
Time.milliseconds(10000L));
 
        @AfterClass
-       public static void shutdown() {
-               akkaRpcService.stopService();
-               actorSystem.shutdown();
+       public static void shutdown() throws InterruptedException, 
ExecutionException, TimeoutException {
+               final CompletableFuture<Void> rpcTerminationFuture = 
akkaRpcService.stopService();
+               final CompletableFuture<Terminated> 
actorSystemTerminationFuture = FutureUtils.toJava(actorSystem.terminate());
+
+               FutureUtils
+                       .waitForAll(Arrays.asList(rpcTerminationFuture, 
actorSystemTerminationFuture))
+                       .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
index b804051..3d99c3f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
@@ -60,8 +60,7 @@ public class FencedRpcEndpointTest extends TestLogger {
        @AfterClass
        public static void teardown() throws ExecutionException, 
InterruptedException, TimeoutException {
                if (rpcService != null) {
-                       rpcService.stopService();
-                       
rpcService.getTerminationFuture().get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+                       RpcUtils.terminateRpcService(rpcService, timeout);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
index 26630c8..017c1f5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
@@ -23,17 +23,21 @@ import akka.actor.ActorSystem;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.TestLogger;
 
+import akka.actor.Terminated;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import scala.Option;
 import scala.Tuple2;
 
+import java.util.Arrays;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -46,10 +50,10 @@ import static org.junit.Assert.*;
  * connect to an RpcEndpoint.
  */
 @Category(Flip6.class)
-public class RpcConnectionTest {
+public class RpcConnectionTest extends TestLogger {
 
        @Test
-       public void testConnectFailure() {
+       public void testConnectFailure() throws Exception {
                ActorSystem actorSystem = null;
                RpcService rpcService = null;
                try {
@@ -77,12 +81,25 @@ public class RpcConnectionTest {
                        fail("wrong exception: " + t);
                }
                finally {
+                       final CompletableFuture<Void> rpcTerminationFuture;
+
                        if (rpcService != null) {
-                               rpcService.stopService();
+                               rpcTerminationFuture = rpcService.stopService();
+                       } else {
+                               rpcTerminationFuture = 
CompletableFuture.completedFuture(null);
                        }
+
+                       final CompletableFuture<Terminated> 
actorSystemTerminationFuture;
+
                        if (actorSystem != null) {
-                               actorSystem.shutdown();
+                               actorSystemTerminationFuture = 
FutureUtils.toJava(actorSystem.terminate());
+                       } else {
+                               actorSystemTerminationFuture = 
CompletableFuture.completedFuture(null);
                        }
+
+                       FutureUtils
+                               .waitForAll(Arrays.asList(rpcTerminationFuture, 
actorSystemTerminationFuture))
+                               .get();
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
index 6d60de9..d52aadb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
@@ -20,21 +20,22 @@ package org.apache.flink.runtime.rpc;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.TestLogger;
 
 import akka.actor.ActorSystem;
+import akka.actor.Terminated;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.util.Arrays;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
-import scala.concurrent.duration.FiniteDuration;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
@@ -56,21 +57,13 @@ public class RpcEndpointTest extends TestLogger {
 
        @AfterClass
        public static void teardown() throws Exception {
-               if (rpcService != null) {
-                       rpcService.stopService();
-               }
-
-               if (actorSystem != null) {
-                       actorSystem.shutdown();
-               }
 
-               if (rpcService != null) {
-                       
rpcService.getTerminationFuture().get(TIMEOUT.toMilliseconds(), 
TimeUnit.MILLISECONDS);
-               }
+               final CompletableFuture<Void> rpcTerminationFuture = 
rpcService.stopService();
+               final CompletableFuture<Terminated> 
actorSystemTerminationFuture = FutureUtils.toJava(actorSystem.terminate());
 
-               if (actorSystem != null) {
-                       actorSystem.awaitTermination(new 
FiniteDuration(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS));
-               }
+               FutureUtils
+                       .waitForAll(Arrays.asList(rpcTerminationFuture, 
actorSystemTerminationFuture))
+                       .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index 4b9f397..db70a0f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -74,9 +74,15 @@ public class TestingRpcService extends AkkaRpcService {
        // 
------------------------------------------------------------------------
 
        @Override
-       public void stopService() {
-               super.stopService();
-               registeredConnections.clear();
+       public CompletableFuture<Void> stopService() {
+               final CompletableFuture<Void> terminationFuture = 
super.stopService();
+
+               terminationFuture.whenComplete(
+                       (Void ignored, Throwable throwable) -> {
+                               registeredConnections.clear();
+                       });
+
+               return terminationFuture;
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 3ff1b80..1b45006 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -31,14 +31,17 @@ import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
+import akka.actor.Terminated;
 import org.hamcrest.core.Is;
 import org.junit.AfterClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.util.Arrays;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -61,10 +64,13 @@ public class AkkaRpcActorTest extends TestLogger {
                new AkkaRpcService(actorSystem, timeout);
 
        @AfterClass
-       public static void shutdown() {
-               akkaRpcService.stopService();
-               actorSystem.shutdown();
-               actorSystem.awaitTermination();
+       public static void shutdown() throws InterruptedException, 
ExecutionException, TimeoutException {
+               final CompletableFuture<Void> rpcTerminationFuture = 
akkaRpcService.stopService();
+               final CompletableFuture<Terminated> 
actorSystemTerminationFuture = FutureUtils.toJava(actorSystem.terminate());
+
+               FutureUtils
+                       .waitForAll(Arrays.asList(rpcTerminationFuture, 
actorSystemTerminationFuture))
+                       .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index c4259f4..d92e496 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -27,10 +27,12 @@ import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.TestLogger;
 
 import akka.actor.ActorSystem;
+import akka.actor.Terminated;
 import org.junit.AfterClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.util.Arrays;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -59,10 +61,13 @@ public class AkkaRpcServiceTest extends TestLogger {
                        new AkkaRpcService(actorSystem, timeout);
 
        @AfterClass
-       public static void shutdown() {
-               akkaRpcService.stopService();
-               actorSystem.shutdown();
-               
actorSystem.awaitTermination(FutureUtils.toFiniteDuration(timeout));
+       public static void shutdown() throws InterruptedException, 
ExecutionException, TimeoutException {
+               final CompletableFuture<Void> rpcTerminationFuture = 
akkaRpcService.stopService();
+               final CompletableFuture<Terminated> 
actorSystemTerminationFuture = FutureUtils.toJava(actorSystem.terminate());
+
+               FutureUtils
+                       .waitForAll(Arrays.asList(rpcTerminationFuture, 
actorSystemTerminationFuture))
+                       .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
index 8f35c0f..a69bd84 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
@@ -70,7 +70,7 @@ public class MainThreadValidationTest extends TestLogger {
                        testEndpoint.shutDown();
                }
                finally {
-                       akkaRpcService.stopService();
+                       akkaRpcService.stopService().get();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
index bb46bec..061145c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
@@ -18,16 +18,18 @@
 
 package org.apache.flink.runtime.rpc.akka;
 
-import akka.actor.ActorSystem;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValueFactory;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorSystem;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
 import org.hamcrest.core.Is;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -35,8 +37,13 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
@@ -67,15 +74,17 @@ public class MessageSerializationTest extends TestLogger {
        }
 
        @AfterClass
-       public static void teardown() {
-               akkaRpcService1.stopService();
-               akkaRpcService2.stopService();
+       public static void teardown() throws InterruptedException, 
ExecutionException, TimeoutException {
+               final Collection<CompletableFuture<?>> terminationFutures = new 
ArrayList<>(4);
 
-               actorSystem1.shutdown();
-               actorSystem2.shutdown();
+               terminationFutures.add(akkaRpcService1.stopService());
+               
terminationFutures.add(FutureUtils.toJava(actorSystem1.terminate()));
+               terminationFutures.add(akkaRpcService2.stopService());
+               
terminationFutures.add(FutureUtils.toJava(actorSystem2.terminate()));
 
-               actorSystem1.awaitTermination();
-               actorSystem2.awaitTermination();
+               FutureUtils
+                       .waitForAll(terminationFutures)
+                       .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index dc1d09f..8f4ec5d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -48,6 +48,7 @@ import 
org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
@@ -220,7 +221,7 @@ public class TaskExecutorITCase extends TestLogger {
                                testingFatalErrorHandler.rethrowError();
                        }
 
-                       rpcService.stopService();
+                       RpcUtils.terminateRpcService(rpcService, timeout);
                }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index e894e48..d7a1860 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -189,7 +189,7 @@ public class TaskExecutorTest extends TestLogger {
        @After
        public void teardown() throws Exception {
                if (rpc != null) {
-                       rpc.stopService();
+                       RpcUtils.terminateRpcService(rpc, timeout);
                        rpc = null;
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java
index 847250c..ae530f7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.rpc.FencedRpcGateway;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.util.TestLogger;
 
@@ -58,9 +59,7 @@ public class RpcGatewayRetrieverTest extends TestLogger {
        @AfterClass
        public static void teardown() throws InterruptedException, 
ExecutionException, TimeoutException {
                if (rpcService != null) {
-                       rpcService.stopService();
-                       
rpcService.getTerminationFuture().get(TIMEOUT.toMilliseconds(), 
TimeUnit.MILLISECONDS);
-
+                       RpcUtils.terminateRpcService(rpcService, TIMEOUT);
                        rpcService = null;
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/c27e2a77/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index 8743380..455abc9 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -308,7 +308,7 @@ public class YarnResourceManagerTest extends TestLogger {
                 * Stop the Akka actor system.
                 */
                public void stopResourceManager() throws Exception {
-                       rpcService.stopService();
+                       rpcService.stopService().get();
                }
        }
 

Reply via email to