[FLINK-8665] [rest] Let RpcEndpoint#postStop return completion future

The RpcEndpoint#postStop method returns a CompletableFuture<Void> which is
completed once all post stop actions have completed. The termination future
of the respective RpcEndpoint is only completed afterwards.

This closes #5498.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d9b28e81
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d9b28e81
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d9b28e81

Branch: refs/heads/master
Commit: d9b28e817351eb2eb6b4cdd9597061713d9160e8
Parents: bb306b9
Author: Till Rohrmann <[email protected]>
Authored: Thu Feb 15 19:19:48 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Fri Feb 23 18:22:08 2018 +0100

----------------------------------------------------------------------
 .../clusterframework/MesosResourceManager.java  | 22 +----
 .../flink/runtime/dispatcher/Dispatcher.java    | 24 ++---
 .../flink/runtime/jobmaster/JobMaster.java      | 28 +-----
 .../runtime/jobmaster/slotpool/SlotPool.java    |  4 +-
 .../resourcemanager/ResourceManager.java        | 13 +--
 .../flink/runtime/rpc/FencedRpcEndpoint.java    |  2 +-
 .../apache/flink/runtime/rpc/RpcEndpoint.java   |  5 +-
 .../flink/runtime/rpc/akka/AkkaRpcActor.java    | 21 +++--
 .../runtime/taskexecutor/TaskExecutor.java      | 14 +--
 .../flink/runtime/rpc/AsyncCallsTest.java       | 10 ++
 .../runtime/rpc/FencedRpcEndpointTest.java      |  5 +
 .../flink/runtime/rpc/RpcEndpointTest.java      |  5 +
 .../runtime/rpc/akka/AkkaRpcActorTest.java      | 98 ++++++++++++++++----
 .../rpc/akka/MainThreadValidationTest.java      |  7 ++
 .../rpc/akka/MessageSerializationTest.java      |  5 +
 .../retriever/impl/RpcGatewayRetrieverTest.java |  5 +
 .../apache/flink/yarn/YarnResourceManager.java  | 15 +--
 17 files changed, 170 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index d42d8d1..1f58b11 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -333,8 +333,7 @@ public class MesosResourceManager extends 
ResourceManager<RegisteredMesosWorkerN
        }
 
        @Override
-       public void postStop() throws Exception {
-               Exception exception = null;
+       public CompletableFuture<Void> postStop() {
                FiniteDuration stopTimeout = new FiniteDuration(5L, 
TimeUnit.SECONDS);
 
                CompletableFuture<Boolean> stopTaskMonitorFuture = 
stopActor(taskMonitor, stopTimeout);
@@ -355,22 +354,11 @@ public class MesosResourceManager extends 
ResourceManager<RegisteredMesosWorkerN
                        stopLaunchCoordinatorFuture,
                        stopReconciliationCoordinatorFuture);
 
-               // wait for the future to complete or to time out
-               try {
-                       stopFuture.get();
-               } catch (Exception e) {
-                       exception = e;
-               }
-
-               try {
-                       super.postStop();
-               } catch (Exception e) {
-                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-               }
+               final CompletableFuture<Void> terminationFuture = 
super.postStop();
 
-               if (exception != null) {
-                       throw new ResourceManagerException("Could not properly 
shut down the ResourceManager.", exception);
-               }
+               return stopFuture.thenCombine(
+                       terminationFuture,
+                       (Void voidA, Void voidB) -> null);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index b2d2b6a..e212752 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -156,40 +156,40 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
        //------------------------------------------------------
 
        @Override
-       public void postStop() throws Exception {
+       public CompletableFuture<Void> postStop() {
                log.info("Stopping dispatcher {}.", getAddress());
-               Throwable exception = null;
-
-               clearState();
+               Exception exception = null;
 
                try {
-                       jobManagerSharedServices.shutdown();
-               } catch (Throwable t) {
-                       exception = ExceptionUtils.firstOrSuppressed(t, 
exception);
+                       clearState();
+               } catch (Exception e) {
+                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
                }
 
                try {
-                       submittedJobGraphStore.stop();
+                       jobManagerSharedServices.shutdown();
                } catch (Exception e) {
                        exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
                }
 
                try {
-                       leaderElectionService.stop();
+                       submittedJobGraphStore.stop();
                } catch (Exception e) {
                        exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
                }
 
                try {
-                       super.postStop();
+                       leaderElectionService.stop();
                } catch (Exception e) {
                        exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
                }
 
                if (exception != null) {
-                       throw new FlinkException("Could not properly terminate 
the Dispatcher.", exception);
+                       return FutureUtils.completedExceptionally(
+                               new FlinkException("Could not properly 
terminate the Dispatcher.", exception));
+               } else {
+                       return CompletableFuture.completedFuture(null);
                }
-               log.info("Stopped dispatcher {}.", getAddress());
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 015751b..425f241 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -99,7 +99,6 @@ import 
org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.clock.SystemClock;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
@@ -123,7 +122,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -387,8 +385,8 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
         * Suspend the job and shutdown all other services including rpc.
         */
        @Override
-       public void postStop() throws Exception {
-               log.info("Stopping the JobMaster for job " + jobGraph.getName() 
+ '(' + jobGraph.getJobID() + ").");
+       public CompletableFuture<Void> postStop() {
+               log.info("Stopping the JobMaster for job {}({}).", 
jobGraph.getName(), jobGraph.getJobID());
 
                // disconnect from all registered TaskExecutors
                final Set<ResourceID> taskManagerResourceIds = new 
HashSet<>(registeredTaskManagers.keySet());
@@ -407,28 +405,8 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
 
                // shut down will internally release all registered slots
                slotPool.shutDown();
-               CompletableFuture<Void> terminationFuture = 
slotPool.getTerminationFuture();
 
-               Exception exception = null;
-
-               // wait for the slot pool shut down
-               try {
-                       terminationFuture.get(rpcTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
-               } catch (Exception e) {
-                       exception = e;
-               }
-
-               try {
-                       super.postStop();
-               } catch (Exception e) {
-                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-               }
-
-               if (exception != null) {
-                       throw exception;
-               }
-
-               log.info("Stopped the JobMaster for job " + jobGraph.getName() 
+ '(' + jobGraph.getJobID() + ").");
+               return slotPool.getTerminationFuture();
        }
 
        
//----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index a94a107..6ba9e8a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -199,7 +199,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
        }
 
        @Override
-       public void postStop() throws Exception {
+       public CompletableFuture<Void> postStop() {
                // cancel all pending allocations
                Set<AllocationID> allocationIds = pendingRequests.keySetB();
 
@@ -214,7 +214,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
 
                clear();
 
-               super.postStop();
+               return CompletableFuture.completedFuture(null);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 2d633f1..77e4362 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -213,7 +213,7 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
        }
 
        @Override
-       public void postStop() throws Exception {
+       public CompletableFuture<Void> postStop() {
                Exception exception = null;
 
                taskManagerHeartbeatManager.stop();
@@ -240,14 +240,11 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
 
                clearState();
 
-               try {
-                       super.postStop();
-               } catch (Exception e) {
-                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-               }
-
                if (exception != null) {
-                       ExceptionUtils.rethrowException(exception, "Error while 
shutting the ResourceManager down.");
+                       return FutureUtils.completedExceptionally(
+                               new FlinkException("Could not properly shut 
down the ResourceManager.", exception));
+               } else {
+                       return CompletableFuture.completedFuture(null);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
index ff74f47..d078d58 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
@@ -35,7 +35,7 @@ import java.util.concurrent.CompletableFuture;
  *
  * @param <F> type of the fencing token
  */
-public class FencedRpcEndpoint<F extends Serializable> extends RpcEndpoint {
+public abstract class FencedRpcEndpoint<F extends Serializable> extends 
RpcEndpoint {
 
        private volatile F fencingToken;
        private volatile MainThreadExecutor fencedMainThreadExecutor;

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index d269e84..549e5c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -141,9 +141,10 @@ public abstract class RpcEndpoint implements RpcGateway {
         *
         * <p>IMPORTANT: This method should never be called directly by the 
user.
         *
-        * @throws Exception if an error occurs. The exception is returned as 
result of the termination future.
+        * @return Future which is completed once all post stop actions are 
completed. If an error
+        * occurs this future is completed exceptionally
         */
-       public void postStop() throws Exception {}
+       public abstract CompletableFuture<Void> postStop();
 
        /**
         * Triggers the shut down of the rpc endpoint. The shut down is 
executed asynchronously.

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index da7ce35..a7d15d6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.rpc.akka;
 
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
@@ -90,12 +91,11 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> 
extends UntypedActor {
                mainThreadValidator.enterMainThread();
 
                try {
-                       Throwable shutdownThrowable = null;
-
+                       CompletableFuture<Void> postStopFuture;
                        try {
-                               rpcEndpoint.postStop();
+                               postStopFuture = rpcEndpoint.postStop();
                        } catch (Throwable throwable) {
-                               shutdownThrowable = throwable;
+                               postStopFuture = 
FutureUtils.completedExceptionally(throwable);
                        }
 
                        super.postStop();
@@ -105,11 +105,14 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> 
extends UntypedActor {
                        // future.
                        // Complete the termination future so that others know 
that we've stopped.
 
-                       if (shutdownThrowable != null) {
-                               
terminationFuture.completeExceptionally(shutdownThrowable);
-                       } else {
-                               terminationFuture.complete(null);
-                       }
+                       postStopFuture.whenComplete(
+                               (Void value, Throwable throwable) -> {
+                                       if (throwable != null) {
+                                               
terminationFuture.completeExceptionally(throwable);
+                                       } else {
+                                               
terminationFuture.complete(null);
+                                       }
+                               });
                } finally {
                        mainThreadValidator.exitMainThread();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index d880407..f4c953d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -254,7 +254,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
         * Called to shut down the TaskManager. The method closes all 
TaskManager services.
         */
        @Override
-       public void postStop() throws Exception {
+       public CompletableFuture<Void> postStop() {
                log.info("Stopping TaskManager {}.", getAddress());
 
                Throwable throwable = null;
@@ -281,17 +281,11 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                        throwable = ExceptionUtils.firstOrSuppressed(t, 
throwable);
                }
 
-               try {
-                       super.postStop();
-               } catch (Throwable e) {
-                       throwable = ExceptionUtils.firstOrSuppressed(e, 
throwable);
-               }
-
                if (throwable != null) {
-                       ExceptionUtils.rethrowException(throwable, "Error while 
shutting the TaskExecutor down.");
+                       return FutureUtils.completedExceptionally(new 
FlinkException("Error while shutting the TaskExecutor down.", throwable));
+               } else {
+                       return CompletableFuture.completedFuture(null);
                }
-
-               log.info("Stopped TaskManager {}.", getAddress());
        }
 
        // 
======================================================================

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
index 8ba0ccd..66f8d9f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
@@ -340,6 +340,11 @@ public class AsyncCallsTest extends TestLogger {
                public boolean hasConcurrentAccess() {
                        return concurrentAccess;
                }
+
+               @Override
+               public CompletableFuture<Void> postStop() {
+                       return CompletableFuture.completedFuture(null);
+               }
        }
 
        public interface FencedTestGateway extends FencedRpcGateway<UUID> {
@@ -384,5 +389,10 @@ public class AsyncCallsTest extends TestLogger {
 
                        return 
CompletableFuture.completedFuture(Acknowledge.get());
                }
+
+               @Override
+               public CompletableFuture<Void> postStop() {
+                       return CompletableFuture.completedFuture(null);
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
index 3d99c3f..f488308 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
@@ -300,6 +300,11 @@ public class FencedRpcEndpointTest extends TestLogger {
                        this(rpcService, value, null);
                }
 
+               @Override
+               public CompletableFuture<Void> postStop() {
+                       return CompletableFuture.completedFuture(null);
+               }
+
                protected FencedTestingEndpoint(RpcService rpcService, String 
value, UUID initialFencingToken) {
                        super(rpcService);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
index d52aadb..b5add60 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
@@ -163,6 +163,11 @@ public class RpcEndpointTest extends TestLogger {
                public CompletableFuture<Integer> foobar() {
                        return CompletableFuture.completedFuture(foobarValue);
                }
+
+               @Override
+               public CompletableFuture<Void> postStop() {
+                       return CompletableFuture.completedFuture(null);
+               }
        }
 
        public static class ExtendedEndpoint extends BaseEndpoint implements 
ExtendedGateway, DifferentGateway {

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 2a65cac..2530bce 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -18,26 +18,28 @@
 
 package org.apache.flink.runtime.rpc.akka;
 
-import akka.actor.ActorSystem;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.Terminated;
+import akka.actor.ActorSystem;
 import org.hamcrest.core.Is;
 import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import java.util.Arrays;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -56,21 +58,19 @@ public class AkkaRpcActorTest extends TestLogger {
        //  shared test members
        // 
------------------------------------------------------------------------
 
-       private static ActorSystem actorSystem = 
AkkaUtils.createDefaultActorSystem();
-
        private static Time timeout = Time.milliseconds(10000L);
 
-       private static AkkaRpcService akkaRpcService =
-               new AkkaRpcService(actorSystem, timeout);
+       private static AkkaRpcService akkaRpcService;
+
+
+       @BeforeClass
+       public static void setup() {
+               akkaRpcService = new TestingRpcService();
+       }
 
        @AfterClass
        public static void shutdown() throws InterruptedException, 
ExecutionException, TimeoutException {
-               final CompletableFuture<Void> rpcTerminationFuture = 
akkaRpcService.stopService();
-               final CompletableFuture<Terminated> 
actorSystemTerminationFuture = FutureUtils.toJava(actorSystem.terminate());
-
-               FutureUtils
-                       .waitForAll(Arrays.asList(rpcTerminationFuture, 
actorSystemTerminationFuture))
-                       .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+               RpcUtils.terminateRpcService(akkaRpcService, timeout);
        }
 
        /**
@@ -191,7 +191,7 @@ public class AkkaRpcActorTest extends TestLogger {
 
                CompletableFuture.runAsync(
                        () -> rpcEndpoint.shutDown(),
-                       actorSystem.dispatcher());
+                       akkaRpcService.getExecutor());
 
                // wait until the rpc endpoint has terminated
                terminationFuture.get();
@@ -296,6 +296,33 @@ public class AkkaRpcActorTest extends TestLogger {
                }
        }
 
+       /**
+        * Tests that the {@link AkkaRpcActor} only completes after the 
asynchronous
+        * post stop action has completed.
+        */
+       @Test
+       public void testActorTerminationWithAsynchronousPostStopAction() throws 
Exception {
+               final CompletableFuture<Void> postStopFuture = new 
CompletableFuture<>();
+               final AsynchronousPostStopEndpoint endpoint = new 
AsynchronousPostStopEndpoint(akkaRpcService, postStopFuture);
+
+               try {
+                       endpoint.start();
+
+                       final CompletableFuture<Void> terminationFuture = 
endpoint.getTerminationFuture();
+
+                       endpoint.shutDown();
+
+                       assertFalse(terminationFuture.isDone());
+
+                       postStopFuture.complete(null);
+
+                       // the postStopFuture completion should allow the 
endpoint to terminate
+                       terminationFuture.get();
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(endpoint, timeout);
+               }
+       }
+
        // 
------------------------------------------------------------------------
        //  Test Actors and Interfaces
        // 
------------------------------------------------------------------------
@@ -309,7 +336,19 @@ public class AkkaRpcActorTest extends TestLogger {
                void tell(String message);
        }
 
-       private static class DummyRpcEndpoint extends RpcEndpoint implements 
DummyRpcGateway {
+       private static class TestRpcEndpoint extends RpcEndpoint {
+
+               protected TestRpcEndpoint(RpcService rpcService) {
+                       super(rpcService);
+               }
+
+               @Override
+               public CompletableFuture<Void> postStop() {
+                       return CompletableFuture.completedFuture(null);
+               }
+       }
+
+       private static class DummyRpcEndpoint extends TestRpcEndpoint 
implements DummyRpcGateway {
 
                private volatile int _foobar = 42;
 
@@ -333,7 +372,7 @@ public class AkkaRpcActorTest extends TestLogger {
                CompletableFuture<Integer> doStuff();
        }
 
-       private static class ExceptionalEndpoint extends RpcEndpoint implements 
ExceptionalGateway {
+       private static class ExceptionalEndpoint extends TestRpcEndpoint 
implements ExceptionalGateway {
 
                protected ExceptionalEndpoint(RpcService rpcService) {
                        super(rpcService);
@@ -345,7 +384,7 @@ public class AkkaRpcActorTest extends TestLogger {
                }
        }
 
-       private static class ExceptionalFutureEndpoint extends RpcEndpoint 
implements ExceptionalGateway {
+       private static class ExceptionalFutureEndpoint extends TestRpcEndpoint 
implements ExceptionalGateway {
 
                protected ExceptionalFutureEndpoint(RpcService rpcService) {
                        super(rpcService);
@@ -379,8 +418,9 @@ public class AkkaRpcActorTest extends TestLogger {
                }
 
                @Override
-               public void postStop() {
+               public CompletableFuture<Void> postStop() {
                        validateRunsInMainThread();
+                       return CompletableFuture.completedFuture(null);
                }
        }
 
@@ -393,8 +433,8 @@ public class AkkaRpcActorTest extends TestLogger {
                }
 
                @Override
-               public void postStop() throws Exception {
-                       throw new PostStopException("Test exception.");
+               public CompletableFuture<Void> postStop() {
+                       return FutureUtils.completedExceptionally(new 
PostStopException("Test exception."));
                }
 
                private static class PostStopException extends FlinkException {
@@ -406,4 +446,22 @@ public class AkkaRpcActorTest extends TestLogger {
                        }
                }
        }
+
+       // 
------------------------------------------------------------------------
+
+       private static class AsynchronousPostStopEndpoint extends RpcEndpoint {
+
+               private final CompletableFuture<Void> postStopFuture;
+
+               protected AsynchronousPostStopEndpoint(RpcService rpcService, 
CompletableFuture<Void> postStopFuture) {
+                       super(rpcService);
+
+                       this.postStopFuture = 
Preconditions.checkNotNull(postStopFuture);
+               }
+
+               @Override
+               public CompletableFuture<Void> postStop() {
+                       return postStopFuture;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
index a69bd84..6dacdfd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
@@ -30,6 +30,8 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.util.concurrent.CompletableFuture;
+
 import static org.junit.Assert.assertTrue;
 
 @Category(Flip6.class)
@@ -91,6 +93,11 @@ public class MainThreadValidationTest extends TestLogger {
                }
 
                @Override
+               public CompletableFuture<Void> postStop() {
+                       return CompletableFuture.completedFuture(null);
+               }
+
+               @Override
                public void someConcurrencyCriticalFunction() {
                        validateRunsInMainThread();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
index 061145c..6006850 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
@@ -193,6 +193,11 @@ public class MessageSerializationTest extends TestLogger {
                public void foobar(Object object) throws InterruptedException {
                        queue.put(object);
                }
+
+               @Override
+               public CompletableFuture<Void> postStop() {
+                       return CompletableFuture.completedFuture(null);
+               }
        }
 
        private static class NonSerializableObject {

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java
index ae530f7..5f59d59 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java
@@ -138,5 +138,10 @@ public class RpcGatewayRetrieverTest extends TestLogger {
                public UUID getFencingToken() {
                        return HighAvailabilityServices.DEFAULT_LEADER_ID;
                }
+
+               @Override
+               public CompletableFuture<Void> postStop() {
+                       return CompletableFuture.completedFuture(null);
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b28e81/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 87324cb..5380356 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -36,6 +37,7 @@ import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -56,6 +58,7 @@ import javax.annotation.Nullable;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -218,7 +221,7 @@ public class YarnResourceManager extends 
ResourceManager<YarnWorkerNode> impleme
        }
 
        @Override
-       public void postStop() throws Exception {
+       public CompletableFuture<Void> postStop() {
                // shut down all components
                Throwable firstException = null;
 
@@ -238,14 +241,12 @@ public class YarnResourceManager extends 
ResourceManager<YarnWorkerNode> impleme
                        }
                }
 
-               try {
-                       super.postStop();
-               } catch (Throwable t) {
-                       firstException = ExceptionUtils.firstOrSuppressed(t, 
firstException);
-               }
+               final CompletableFuture<Void> terminationFuture = 
super.postStop();
 
                if (firstException != null) {
-                       ExceptionUtils.rethrowException(firstException, "Error 
while shutting down YARN resource manager");
+                       return FutureUtils.completedExceptionally(new 
FlinkException("Error while shutting down YARN resource manager", 
firstException));
+               } else {
+                       return terminationFuture;
                }
        }
 

Reply via email to