[FLINK-8775] [flip6] Non blocking MiniCluster shut down This closes #5576.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4e7f03e4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4e7f03e4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4e7f03e4 Branch: refs/heads/master Commit: 4e7f03e418ced740a731c9323dabef7a594db43c Parents: c9787b6 Author: Till Rohrmann <[email protected]> Authored: Sat Feb 24 16:20:58 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Sat Feb 24 23:44:53 2018 +0100 ---------------------------------------------------------------------- .../flink/runtime/dispatcher/Dispatcher.java | 25 +- .../flink/runtime/minicluster/MiniCluster.java | 334 +++++++++++-------- .../runtime/minicluster/MiniClusterITCase.java | 4 +- .../Flip6LocalStreamEnvironment.java | 2 +- 4 files changed, 215 insertions(+), 150 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4e7f03e4/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index b31d04d..bb60277 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -254,16 +254,22 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme jobManagerRunner.getResultFuture().whenCompleteAsync( (ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> { - if (archivedExecutionGraph != null) { - jobReachedGloballyTerminalState(archivedExecutionGraph); - } else { - final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); - - if (strippedThrowable instanceof JobNotFinishedException) { - jobNotFinished(jobId); + // check if we are still the active JobManagerRunner by checking the identity + //noinspection ObjectEquality + if (jobManagerRunner == jobManagerRunners.get(jobId)) { + if (archivedExecutionGraph != null) { + jobReachedGloballyTerminalState(archivedExecutionGraph); } else { - onFatalError(new FlinkException("JobManagerRunner for job " + jobId + " failed.", strippedThrowable)); + final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); + + if (strippedThrowable instanceof JobNotFinishedException) { + jobNotFinished(jobId); + } else { + onFatalError(new FlinkException("JobManagerRunner for job " + jobId + " failed.", strippedThrowable)); + } } + } else { + log.debug("There is a newer JobManagerRunner for the job {}.", jobId); } }, getMainThreadExecutor()); @@ -294,6 +300,9 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme @Override public CompletableFuture<Collection<JobID>> listJobs(Time timeout) { + if (jobManagerRunners.isEmpty()) { + System.out.println("empty"); + } return CompletableFuture.completedFuture( Collections.unmodifiableSet(new HashSet<>(jobManagerRunners.keySet()))); } http://git-wip-us.apache.org/repos/asf/flink/blob/4e7f03e4/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 5b1fddb..3bd17d1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -69,14 +69,18 @@ import com.typesafe.config.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; import javax.annotation.concurrent.GuardedBy; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -97,6 +101,8 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { private final Time rpcTimeout; + private CompletableFuture<Void> terminationFuture; + @GuardedBy("lock") private MetricRegistryImpl metricRegistry; @@ -161,6 +167,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { this.miniClusterConfiguration = checkNotNull(miniClusterConfiguration, "config may not be null"); this.rpcTimeout = Time.seconds(10L); + this.terminationFuture = CompletableFuture.completedFuture(null); running = false; } @@ -195,6 +202,9 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { LOG.info("Starting Flink Mini Cluster"); LOG.debug("Using configuration {}", miniClusterConfiguration); + // create a new termination future + terminationFuture = new CompletableFuture<>(); + final Configuration configuration = miniClusterConfiguration.getConfiguration(); final Time rpcTimeout = miniClusterConfiguration.getRpcTimeout(); final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers(); @@ -349,7 +359,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { catch (Exception e) { // cleanup everything try { - shutdownInternally(); + close(); } catch (Exception ee) { e.addSuppressed(ee); } @@ -370,141 +380,70 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { * <p>This method shuts down all started services and components, * even if an exception occurs in the process of shutting down some component. * - * @throws Exception Thrown, if the shutdown did not complete cleanly. + * @return Future which is completed once the MiniCluster has been completely shut down */ - public void shutdown() throws Exception { + @Override + public CompletableFuture<Void> closeAsync() { synchronized (lock) { if (running) { LOG.info("Shutting down Flink Mini Cluster"); try { - shutdownInternally(); - } finally { - running = false; - } - LOG.info("Flink Mini Cluster is shut down"); - } - } - } - - @GuardedBy("lock") - private void shutdownInternally() throws Exception { - // this should always be called under the lock - assert Thread.holdsLock(lock); - - // collect the first exception, but continue and add all successive - // exceptions as suppressed - Throwable exception = null; + final int numComponents = 2 + miniClusterConfiguration.getNumTaskManagers(); + final Collection<CompletableFuture<Void>> componentTerminationFutures = new ArrayList<>(numComponents); - // cancel all jobs and shut down the job dispatcher - if (dispatcher != null) { - try { - RpcUtils.terminateRpcEndpoint(dispatcher, rpcTimeout); - } catch (Exception e) { - exception = e; - } - dispatcher = null; - } - - if (dispatcherRestEndpoint != null) { - try { - dispatcherRestEndpoint.shutDownAsync().get(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - - dispatcherRestEndpoint = null; - } - - if (resourceManagerLeaderRetriever != null) { - try { - resourceManagerLeaderRetriever.stop(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - - resourceManagerLeaderRetriever = null; - } + componentTerminationFutures.add(shutDownDispatcher()); - if (dispatcherLeaderRetriever != null) { - try { - dispatcherLeaderRetriever.stop(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - - dispatcherLeaderRetriever = null; - } - - if (resourceManagerRunner != null) { - try { - resourceManagerRunner.close(); - } catch (Throwable t) { - exception = ExceptionUtils.firstOrSuppressed(t, exception); - } - } - - if (taskManagers != null) { - for (TaskExecutor tm : taskManagers) { - if (tm != null) { - try { - tm.shutDown(); - // wait for the TaskManager to properly terminate - tm.getTerminationFuture().get(); - } catch (Throwable t) { - exception = ExceptionUtils.firstOrSuppressed(t, exception); + if (resourceManagerRunner != null) { + componentTerminationFutures.add(resourceManagerRunner.closeAsync()); + resourceManagerRunner = null; } - } - } - taskManagers = null; - } - // metrics shutdown - if (metricRegistry != null) { - metricRegistry.shutdown(); - metricRegistry = null; - } - - // shut down the RpcServices - exception = shutDownRpc(commonRpcService, exception); - exception = shutDownRpc(jobManagerRpcService, exception); - exception = shutDownRpcs(taskManagerRpcServices, exception); - exception = shutDownRpc(resourceManagerRpcService, exception); - commonRpcService = null; - jobManagerRpcService = null; - taskManagerRpcServices = null; - resourceManagerRpcService = null; - - if (blobCacheService != null) { - try { - blobCacheService.close(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - blobCacheService = null; - } - // shut down the blob server - if (blobServer != null) { - try { - blobServer.close(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - blobServer = null; - } + if (taskManagers != null) { + for (TaskExecutor tm : taskManagers) { + if (tm != null) { + tm.shutDown(); + componentTerminationFutures.add(tm.getTerminationFuture()); + } + } + taskManagers = null; + } - // shut down high-availability services - if (haServices != null) { - try { - haServices.closeAndCleanupAllData(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); + final FutureUtils.ConjunctFuture<Void> componentsTerminationFuture = FutureUtils.completeAll(componentTerminationFutures); + + final CompletableFuture<Void> metricRegistryTerminationFuture = FutureUtils.runAfterwards( + componentsTerminationFuture, + () -> { + synchronized (lock) { + // metrics shutdown + if (metricRegistry != null) { + metricRegistry.shutdown(); + metricRegistry = null; + } + } + }); + + // shut down the RpcServices + final CompletableFuture<Void> rpcServicesTerminationFuture = metricRegistryTerminationFuture + .thenCompose((Void ignored) -> terminateRpcServices()); + + final CompletableFuture<Void> remainingServicesTerminationFuture = FutureUtils.runAfterwards( + rpcServicesTerminationFuture, + this::terminateMiniClusterServices); + + remainingServicesTerminationFuture.whenComplete( + (Void ignored, Throwable throwable) -> { + if (throwable != null) { + terminationFuture.completeExceptionally(ExceptionUtils.stripCompletionException(throwable)); + } else { + terminationFuture.complete(null); + } + }); + } finally { + running = false; + } } - haServices = null; - } - // if anything went wrong, throw the first error with all the additional suppressed exceptions - if (exception != null) { - ExceptionUtils.rethrowException(exception, "Error while shutting down mini cluster"); + return terminationFuture; } } @@ -704,6 +643,137 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { } // ------------------------------------------------------------------------ + // Internal methods + // ------------------------------------------------------------------------ + + @GuardedBy("lock") + private CompletableFuture<Void> shutDownDispatcher() { + + final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(2); + + // cancel all jobs and shut down the job dispatcher + if (dispatcher != null) { + dispatcher.shutDown(); + terminationFutures.add(dispatcher.getTerminationFuture()); + + dispatcher = null; + } + + if (dispatcherRestEndpoint != null) { + terminationFutures.add(dispatcherRestEndpoint.shutDownAsync()); + + dispatcherRestEndpoint = null; + } + + final FutureUtils.ConjunctFuture<Void> dispatcherTerminationFuture = FutureUtils.completeAll(terminationFutures); + + return FutureUtils.runAfterwards( + dispatcherTerminationFuture, + () -> { + Exception exception = null; + + synchronized (lock) { + if (resourceManagerLeaderRetriever != null) { + try { + resourceManagerLeaderRetriever.stop(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + resourceManagerLeaderRetriever = null; + } + + if (dispatcherLeaderRetriever != null) { + try { + dispatcherLeaderRetriever.stop(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + dispatcherLeaderRetriever = null; + } + } + + if (exception != null) { + throw exception; + } + }); + } + + private void terminateMiniClusterServices() throws Exception { + // collect the first exception, but continue and add all successive + // exceptions as suppressed + Exception exception = null; + + synchronized (lock) { + if (blobCacheService != null) { + try { + blobCacheService.close(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + blobCacheService = null; + } + + // shut down the blob server + if (blobServer != null) { + try { + blobServer.close(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + blobServer = null; + } + + // shut down high-availability services + if (haServices != null) { + try { + haServices.closeAndCleanupAllData(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + haServices = null; + } + + if (exception != null) { + throw exception; + } + } + } + + @Nonnull + private CompletionStage<Void> terminateRpcServices() { + final int numRpcServices; + if (miniClusterConfiguration.getRpcServiceSharing() == MiniClusterConfiguration.RpcServiceSharing.SHARED) { + numRpcServices = 1; + } else { + numRpcServices = 1 + 2 + miniClusterConfiguration.getNumTaskManagers(); // common, JM, RM, TMs + } + + final Collection<CompletableFuture<?>> rpcTerminationFutures = new ArrayList<>(numRpcServices); + + synchronized (lock) { + rpcTerminationFutures.add(commonRpcService.stopService()); + + if (miniClusterConfiguration.getRpcServiceSharing() != MiniClusterConfiguration.RpcServiceSharing.SHARED) { + rpcTerminationFutures.add(jobManagerRpcService.stopService()); + rpcTerminationFutures.add(resourceManagerRpcService.stopService()); + + for (RpcService taskManagerRpcService : taskManagerRpcServices) { + rpcTerminationFutures.add(taskManagerRpcService.stopService()); + } + } + + commonRpcService = null; + jobManagerRpcService = null; + taskManagerRpcServices = null; + resourceManagerRpcService = null; + } + + return FutureUtils.completeAll(rpcTerminationFutures); + } + + // ------------------------------------------------------------------------ // miscellaneous utilities // ------------------------------------------------------------------------ @@ -743,16 +813,6 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { return priorException; } - @Override - public CompletableFuture<Void> closeAsync() { - try { - shutdown(); - return CompletableFuture.completedFuture(null); - } catch (Exception e) { - return FutureUtils.completedExceptionally(e); - } - } - private class TerminatingFatalErrorHandler implements FatalErrorHandler { private final int index; @@ -784,11 +844,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { @Override public void onFatalError(Throwable exception) { LOG.warn("Error in MiniCluster. Shutting the MiniCluster down.", exception); - try { - shutdown(); - } catch (Exception e) { - LOG.warn("Could not shut down the MiniCluster.", e); - } + closeAsync(); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/4e7f03e4/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java index d974998..646827a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java @@ -67,7 +67,7 @@ public class MiniClusterITCase extends TestLogger { executeJob(miniCluster); } finally { - miniCluster.shutdown(); + miniCluster.close(); } } @@ -84,7 +84,7 @@ public class MiniClusterITCase extends TestLogger { executeJob(miniCluster); } finally { - miniCluster.shutdown(); + miniCluster.close(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/4e7f03e4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java index 5941ee5..4cc23fc 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java @@ -105,7 +105,7 @@ public class Flip6LocalStreamEnvironment extends LocalStreamEnvironment { } finally { transformations.clear(); - miniCluster.shutdown(); + miniCluster.close(); } } }
