[FLINK-8774] [flip6] Make shut down of ResourceManagerRunner non blocking
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c9787b69 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c9787b69 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c9787b69 Branch: refs/heads/master Commit: c9787b6992f05b429edb0678050a1273900d1a63 Parents: 2c40697 Author: Till Rohrmann <[email protected]> Authored: Sat Feb 24 16:24:25 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Sat Feb 24 23:19:33 2018 +0100 ---------------------------------------------------------------------- .../flink/runtime/minicluster/MiniCluster.java | 2 +- .../resourcemanager/ResourceManagerRunner.java | 32 ++++++++------------ 2 files changed, 14 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c9787b69/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 01be01d..5b1fddb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -437,7 +437,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { if (resourceManagerRunner != null) { try { - resourceManagerRunner.shutDown(); + resourceManagerRunner.close(); } catch (Throwable t) { exception = ExceptionUtils.firstOrSuppressed(t, exception); } http://git-wip-us.apache.org/repos/asf/flink/blob/c9787b69/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java index 26316ec..9daf96e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java @@ -20,26 +20,26 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.entrypoint.ClusterInformation; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.util.FlinkException; +import org.apache.flink.util.AutoCloseableAsync; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; /** * Simple {@link StandaloneResourceManager} runner. It instantiates the resource manager's services * and handles fatal errors by shutting the resource manager down. */ -public class ResourceManagerRunner implements FatalErrorHandler { +public class ResourceManagerRunner implements FatalErrorHandler, AutoCloseableAsync { private static final Logger LOG = LoggerFactory.getLogger(ResourceManagerRunner.class); @@ -101,24 +101,18 @@ public class ResourceManagerRunner implements FatalErrorHandler { resourceManager.start(); } - public void shutDown() throws Exception { - // wait for the completion - shutDownInternally().get(); - } - - private CompletableFuture<Void> shutDownInternally() { + @Override + public CompletableFuture<Void> closeAsync() { synchronized (lock) { resourceManager.shutDown(); - return resourceManager.getTerminationFuture() - .thenAccept( - ignored -> { - try { - resourceManagerRuntimeServices.shutDown(); - } catch (Exception e) { - throw new CompletionException(new FlinkException("Could not properly shut down the resource manager runtime services.", e)); - } - }); + return FutureUtils.runAfterwards( + resourceManager.getTerminationFuture(), + () -> { + synchronized (lock) { + resourceManagerRuntimeServices.shutDown(); + } + }); } } @@ -130,7 +124,7 @@ public class ResourceManagerRunner implements FatalErrorHandler { public void onFatalError(Throwable exception) { LOG.error("Encountered fatal error.", exception); - CompletableFuture<Void> shutdownFuture = shutDownInternally(); + CompletableFuture<Void> shutdownFuture = closeAsync(); shutdownFuture.whenComplete( (Void ignored, Throwable throwable) -> {
