[FLINK-8774] [flip6] Make shut down of ResourceManagerRunner non blocking

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c9787b69
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c9787b69
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c9787b69

Branch: refs/heads/master
Commit: c9787b6992f05b429edb0678050a1273900d1a63
Parents: 2c40697
Author: Till Rohrmann <[email protected]>
Authored: Sat Feb 24 16:24:25 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Sat Feb 24 23:19:33 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/minicluster/MiniCluster.java  |  2 +-
 .../resourcemanager/ResourceManagerRunner.java  | 32 ++++++++------------
 2 files changed, 14 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c9787b69/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 01be01d..5b1fddb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -437,7 +437,7 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
 
                if (resourceManagerRunner != null) {
                        try {
-                               resourceManagerRunner.shutDown();
+                               resourceManagerRunner.close();
                        } catch (Throwable t) {
                                exception = ExceptionUtils.firstOrSuppressed(t, 
exception);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/c9787b69/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
index 26316ec..9daf96e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -20,26 +20,26 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 
 /**
  * Simple {@link StandaloneResourceManager} runner. It instantiates the 
resource manager's services
  * and handles fatal errors by shutting the resource manager down.
  */
-public class ResourceManagerRunner implements FatalErrorHandler {
+public class ResourceManagerRunner implements FatalErrorHandler, 
AutoCloseableAsync {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(ResourceManagerRunner.class);
 
@@ -101,24 +101,18 @@ public class ResourceManagerRunner implements 
FatalErrorHandler {
                resourceManager.start();
        }
 
-       public void shutDown() throws Exception {
-               // wait for the completion
-               shutDownInternally().get();
-       }
-
-       private CompletableFuture<Void> shutDownInternally() {
+       @Override
+       public CompletableFuture<Void> closeAsync() {
                synchronized (lock) {
                        resourceManager.shutDown();
 
-                       return resourceManager.getTerminationFuture()
-                               .thenAccept(
-                                       ignored -> {
-                                               try {
-                                                       
resourceManagerRuntimeServices.shutDown();
-                                               } catch (Exception e) {
-                                                       throw new 
CompletionException(new FlinkException("Could not properly shut down the 
resource manager runtime services.", e));
-                                               }
-                                       });
+                       return FutureUtils.runAfterwards(
+                               resourceManager.getTerminationFuture(),
+                               () -> {
+                                       synchronized (lock) {
+                                               
resourceManagerRuntimeServices.shutDown();
+                                       }
+                               });
                }
        }
 
@@ -130,7 +124,7 @@ public class ResourceManagerRunner implements 
FatalErrorHandler {
        public void onFatalError(Throwable exception) {
                LOG.error("Encountered fatal error.", exception);
 
-               CompletableFuture<Void> shutdownFuture = shutDownInternally();
+               CompletableFuture<Void> shutdownFuture = closeAsync();
 
                shutdownFuture.whenComplete(
                        (Void ignored, Throwable throwable) -> {

Reply via email to