[FLINK-8775] [flip6] Non blocking MiniCluster shut down

This closes #5576.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4e7f03e4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4e7f03e4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4e7f03e4

Branch: refs/heads/master
Commit: 4e7f03e418ced740a731c9323dabef7a594db43c
Parents: c9787b6
Author: Till Rohrmann <[email protected]>
Authored: Sat Feb 24 16:20:58 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Sat Feb 24 23:44:53 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    |  25 +-
 .../flink/runtime/minicluster/MiniCluster.java  | 334 +++++++++++--------
 .../runtime/minicluster/MiniClusterITCase.java  |   4 +-
 .../Flip6LocalStreamEnvironment.java            |   2 +-
 4 files changed, 215 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4e7f03e4/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index b31d04d..bb60277 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -254,16 +254,22 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
 
                                
jobManagerRunner.getResultFuture().whenCompleteAsync(
                                        (ArchivedExecutionGraph 
archivedExecutionGraph, Throwable throwable) -> {
-                                               if (archivedExecutionGraph != 
null) {
-                                                       
jobReachedGloballyTerminalState(archivedExecutionGraph);
-                                               } else {
-                                                       final Throwable 
strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
-
-                                                       if (strippedThrowable 
instanceof JobNotFinishedException) {
-                                                               
jobNotFinished(jobId);
+                                               // check if we are still the 
active JobManagerRunner by checking the identity
+                                               //noinspection ObjectEquality
+                                               if (jobManagerRunner == 
jobManagerRunners.get(jobId)) {
+                                                       if 
(archivedExecutionGraph != null) {
+                                                               
jobReachedGloballyTerminalState(archivedExecutionGraph);
                                                        } else {
-                                                               
onFatalError(new FlinkException("JobManagerRunner for job " + jobId + " 
failed.", strippedThrowable));
+                                                               final Throwable 
strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
+
+                                                               if 
(strippedThrowable instanceof JobNotFinishedException) {
+                                                                       
jobNotFinished(jobId);
+                                                               } else {
+                                                                       
onFatalError(new FlinkException("JobManagerRunner for job " + jobId + " 
failed.", strippedThrowable));
+                                                               }
                                                        }
+                                               } else {
+                                                       log.debug("There is a 
newer JobManagerRunner for the job {}.", jobId);
                                                }
                                        }, getMainThreadExecutor());
 
@@ -294,6 +300,9 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
 
        @Override
        public CompletableFuture<Collection<JobID>> listJobs(Time timeout) {
+               if (jobManagerRunners.isEmpty()) {
+                       System.out.println("empty");
+               }
                return CompletableFuture.completedFuture(
                        Collections.unmodifiableSet(new 
HashSet<>(jobManagerRunners.keySet())));
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/4e7f03e4/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 5b1fddb..3bd17d1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -69,14 +69,18 @@ import com.typesafe.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.concurrent.GuardedBy;
 
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -97,6 +101,8 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
 
        private final Time rpcTimeout;
 
+       private CompletableFuture<Void> terminationFuture;
+
        @GuardedBy("lock")
        private MetricRegistryImpl metricRegistry;
 
@@ -161,6 +167,7 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                this.miniClusterConfiguration = 
checkNotNull(miniClusterConfiguration, "config may not be null");
 
                this.rpcTimeout = Time.seconds(10L);
+               this.terminationFuture = 
CompletableFuture.completedFuture(null);
                running = false;
        }
 
@@ -195,6 +202,9 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                        LOG.info("Starting Flink Mini Cluster");
                        LOG.debug("Using configuration {}", 
miniClusterConfiguration);
 
+                       // create a new termination future
+                       terminationFuture = new CompletableFuture<>();
+
                        final Configuration configuration = 
miniClusterConfiguration.getConfiguration();
                        final Time rpcTimeout = 
miniClusterConfiguration.getRpcTimeout();
                        final int numTaskManagers = 
miniClusterConfiguration.getNumTaskManagers();
@@ -349,7 +359,7 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                        catch (Exception e) {
                                // cleanup everything
                                try {
-                                       shutdownInternally();
+                                       close();
                                } catch (Exception ee) {
                                        e.addSuppressed(ee);
                                }
@@ -370,141 +380,70 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
         * <p>This method shuts down all started services and components,
         * even if an exception occurs in the process of shutting down some 
component.
         *
-        * @throws Exception Thrown, if the shutdown did not complete cleanly.
+        * @return Future which is completed once the MiniCluster has been 
completely shut down
         */
-       public void shutdown() throws Exception {
+       @Override
+       public CompletableFuture<Void> closeAsync() {
                synchronized (lock) {
                        if (running) {
                                LOG.info("Shutting down Flink Mini Cluster");
                                try {
-                                       shutdownInternally();
-                               } finally {
-                                       running = false;
-                               }
-                               LOG.info("Flink Mini Cluster is shut down");
-                       }
-               }
-       }
-
-       @GuardedBy("lock")
-       private void shutdownInternally() throws Exception {
-               // this should always be called under the lock
-               assert Thread.holdsLock(lock);
-
-               // collect the first exception, but continue and add all 
successive
-               // exceptions as suppressed
-               Throwable exception = null;
+                                       final int numComponents = 2 + 
miniClusterConfiguration.getNumTaskManagers();
+                                       final 
Collection<CompletableFuture<Void>> componentTerminationFutures = new 
ArrayList<>(numComponents);
 
-               // cancel all jobs and shut down the job dispatcher
-               if (dispatcher != null) {
-                       try {
-                               RpcUtils.terminateRpcEndpoint(dispatcher, 
rpcTimeout);
-                       } catch (Exception e) {
-                               exception = e;
-                       }
-                       dispatcher = null;
-               }
-
-               if (dispatcherRestEndpoint != null) {
-                       try {
-                               dispatcherRestEndpoint.shutDownAsync().get();
-                       } catch (Exception e) {
-                               exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-                       }
-
-                       dispatcherRestEndpoint = null;
-               }
-
-               if (resourceManagerLeaderRetriever != null) {
-                       try {
-                               resourceManagerLeaderRetriever.stop();
-                       } catch (Exception e) {
-                               exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-                       }
-
-                       resourceManagerLeaderRetriever = null;
-               }
+                                       
componentTerminationFutures.add(shutDownDispatcher());
 
-               if (dispatcherLeaderRetriever != null) {
-                       try {
-                               dispatcherLeaderRetriever.stop();
-                       } catch (Exception e) {
-                               exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-                       }
-
-                       dispatcherLeaderRetriever = null;
-               }
-
-               if (resourceManagerRunner != null) {
-                       try {
-                               resourceManagerRunner.close();
-                       } catch (Throwable t) {
-                               exception = ExceptionUtils.firstOrSuppressed(t, 
exception);
-                       }
-               }
-
-               if (taskManagers != null) {
-                       for (TaskExecutor tm : taskManagers) {
-                               if (tm != null) {
-                                       try {
-                                               tm.shutDown();
-                                               // wait for the TaskManager to 
properly terminate
-                                               tm.getTerminationFuture().get();
-                                       } catch (Throwable t) {
-                                               exception = 
ExceptionUtils.firstOrSuppressed(t, exception);
+                                       if (resourceManagerRunner != null) {
+                                               
componentTerminationFutures.add(resourceManagerRunner.closeAsync());
+                                               resourceManagerRunner = null;
                                        }
-                               }
-                       }
-                       taskManagers = null;
-               }
-               // metrics shutdown
-               if (metricRegistry != null) {
-                       metricRegistry.shutdown();
-                       metricRegistry = null;
-               }
-
-               // shut down the RpcServices
-               exception = shutDownRpc(commonRpcService, exception);
-               exception = shutDownRpc(jobManagerRpcService, exception);
-               exception = shutDownRpcs(taskManagerRpcServices, exception);
-               exception = shutDownRpc(resourceManagerRpcService, exception);
-               commonRpcService = null;
-               jobManagerRpcService = null;
-               taskManagerRpcServices = null;
-               resourceManagerRpcService = null;
-
-               if (blobCacheService != null) {
-                       try {
-                               blobCacheService.close();
-                       } catch (Exception e) {
-                               exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-                       }
-                       blobCacheService = null;
-               }
 
-               // shut down the blob server
-               if (blobServer != null) {
-                       try {
-                               blobServer.close();
-                       } catch (Exception e) {
-                               exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-                       }
-                       blobServer = null;
-               }
+                                       if (taskManagers != null) {
+                                               for (TaskExecutor tm : 
taskManagers) {
+                                                       if (tm != null) {
+                                                               tm.shutDown();
+                                                               
componentTerminationFutures.add(tm.getTerminationFuture());
+                                                       }
+                                               }
+                                               taskManagers = null;
+                                       }
 
-               // shut down high-availability services
-               if (haServices != null) {
-                       try {
-                               haServices.closeAndCleanupAllData();
-                       } catch (Exception e) {
-                               exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+                                       final FutureUtils.ConjunctFuture<Void> 
componentsTerminationFuture = 
FutureUtils.completeAll(componentTerminationFutures);
+
+                                       final CompletableFuture<Void> 
metricRegistryTerminationFuture = FutureUtils.runAfterwards(
+                                               componentsTerminationFuture,
+                                               () -> {
+                                                       synchronized (lock) {
+                                                               // metrics 
shutdown
+                                                               if 
(metricRegistry != null) {
+                                                                       
metricRegistry.shutdown();
+                                                                       
metricRegistry = null;
+                                                               }
+                                                       }
+                                               });
+
+                                       // shut down the RpcServices
+                                       final CompletableFuture<Void> 
rpcServicesTerminationFuture = metricRegistryTerminationFuture
+                                               .thenCompose((Void ignored) -> 
terminateRpcServices());
+
+                                       final CompletableFuture<Void> 
remainingServicesTerminationFuture = FutureUtils.runAfterwards(
+                                               rpcServicesTerminationFuture,
+                                               
this::terminateMiniClusterServices);
+
+                                               
remainingServicesTerminationFuture.whenComplete(
+                                                       (Void ignored, 
Throwable throwable) -> {
+                                                               if (throwable 
!= null) {
+                                                                       
terminationFuture.completeExceptionally(ExceptionUtils.stripCompletionException(throwable));
+                                                               } else {
+                                                                       
terminationFuture.complete(null);
+                                                               }
+                                                       });
+                               } finally {
+                                       running = false;
+                               }
                        }
-                       haServices = null;
-               }
 
-               // if anything went wrong, throw the first error with all the 
additional suppressed exceptions
-               if (exception != null) {
-                       ExceptionUtils.rethrowException(exception, "Error while 
shutting down mini cluster");
+                       return terminationFuture;
                }
        }
 
@@ -704,6 +643,137 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
        }
 
        // 
------------------------------------------------------------------------
+       //  Internal methods
+       // 
------------------------------------------------------------------------
+
+       @GuardedBy("lock")
+       private CompletableFuture<Void> shutDownDispatcher() {
+
+               final Collection<CompletableFuture<Void>> terminationFutures = 
new ArrayList<>(2);
+
+               // cancel all jobs and shut down the job dispatcher
+               if (dispatcher != null) {
+                       dispatcher.shutDown();
+                       
terminationFutures.add(dispatcher.getTerminationFuture());
+
+                       dispatcher = null;
+               }
+
+               if (dispatcherRestEndpoint != null) {
+                       
terminationFutures.add(dispatcherRestEndpoint.shutDownAsync());
+
+                       dispatcherRestEndpoint = null;
+               }
+
+               final FutureUtils.ConjunctFuture<Void> 
dispatcherTerminationFuture = FutureUtils.completeAll(terminationFutures);
+
+               return FutureUtils.runAfterwards(
+                       dispatcherTerminationFuture,
+                       () -> {
+                               Exception exception = null;
+
+                               synchronized (lock) {
+                                       if (resourceManagerLeaderRetriever != 
null) {
+                                               try {
+                                                       
resourceManagerLeaderRetriever.stop();
+                                               } catch (Exception e) {
+                                                       exception = 
ExceptionUtils.firstOrSuppressed(e, exception);
+                                               }
+
+                                               resourceManagerLeaderRetriever 
= null;
+                                       }
+
+                                       if (dispatcherLeaderRetriever != null) {
+                                               try {
+                                                       
dispatcherLeaderRetriever.stop();
+                                               } catch (Exception e) {
+                                                       exception = 
ExceptionUtils.firstOrSuppressed(e, exception);
+                                               }
+
+                                               dispatcherLeaderRetriever = 
null;
+                                       }
+                               }
+
+                               if (exception != null) {
+                                       throw exception;
+                               }
+                       });
+       }
+
+       private void terminateMiniClusterServices() throws Exception {
+               // collect the first exception, but continue and add all 
successive
+               // exceptions as suppressed
+               Exception exception = null;
+
+               synchronized (lock) {
+                       if (blobCacheService != null) {
+                               try {
+                                       blobCacheService.close();
+                               } catch (Exception e) {
+                                       exception = 
ExceptionUtils.firstOrSuppressed(e, exception);
+                               }
+                               blobCacheService = null;
+                       }
+
+                       // shut down the blob server
+                       if (blobServer != null) {
+                               try {
+                                       blobServer.close();
+                               } catch (Exception e) {
+                                       exception = 
ExceptionUtils.firstOrSuppressed(e, exception);
+                               }
+                               blobServer = null;
+                       }
+
+                       // shut down high-availability services
+                       if (haServices != null) {
+                               try {
+                                       haServices.closeAndCleanupAllData();
+                               } catch (Exception e) {
+                                       exception = 
ExceptionUtils.firstOrSuppressed(e, exception);
+                               }
+                               haServices = null;
+                       }
+
+                       if (exception != null) {
+                               throw exception;
+                       }
+               }
+       }
+
+       @Nonnull
+       private CompletionStage<Void> terminateRpcServices() {
+               final int numRpcServices;
+               if (miniClusterConfiguration.getRpcServiceSharing() == 
MiniClusterConfiguration.RpcServiceSharing.SHARED) {
+                       numRpcServices = 1;
+               } else {
+                       numRpcServices = 1 + 2 + 
miniClusterConfiguration.getNumTaskManagers(); // common, JM, RM, TMs
+               }
+
+               final Collection<CompletableFuture<?>> rpcTerminationFutures = 
new ArrayList<>(numRpcServices);
+
+               synchronized (lock) {
+                       
rpcTerminationFutures.add(commonRpcService.stopService());
+
+                       if (miniClusterConfiguration.getRpcServiceSharing() != 
MiniClusterConfiguration.RpcServiceSharing.SHARED) {
+                               
rpcTerminationFutures.add(jobManagerRpcService.stopService());
+                               
rpcTerminationFutures.add(resourceManagerRpcService.stopService());
+
+                               for (RpcService taskManagerRpcService : 
taskManagerRpcServices) {
+                                       
rpcTerminationFutures.add(taskManagerRpcService.stopService());
+                               }
+                       }
+
+                       commonRpcService = null;
+                       jobManagerRpcService = null;
+                       taskManagerRpcServices = null;
+                       resourceManagerRpcService = null;
+               }
+
+               return FutureUtils.completeAll(rpcTerminationFutures);
+       }
+
+       // 
------------------------------------------------------------------------
        //  miscellaneous utilities
        // 
------------------------------------------------------------------------
 
@@ -743,16 +813,6 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                return priorException;
        }
 
-       @Override
-       public CompletableFuture<Void> closeAsync() {
-               try {
-                       shutdown();
-                       return CompletableFuture.completedFuture(null);
-               } catch (Exception e) {
-                       return FutureUtils.completedExceptionally(e);
-               }
-       }
-
        private class TerminatingFatalErrorHandler implements FatalErrorHandler 
{
 
                private final int index;
@@ -784,11 +844,7 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                @Override
                public void onFatalError(Throwable exception) {
                        LOG.warn("Error in MiniCluster. Shutting the 
MiniCluster down.", exception);
-                       try {
-                               shutdown();
-                       } catch (Exception e) {
-                               LOG.warn("Could not shut down the 
MiniCluster.", e);
-                       }
+                       closeAsync();
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4e7f03e4/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index d974998..646827a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -67,7 +67,7 @@ public class MiniClusterITCase extends TestLogger {
                        executeJob(miniCluster);
                }
                finally {
-                       miniCluster.shutdown();
+                       miniCluster.close();
                }
        }
 
@@ -84,7 +84,7 @@ public class MiniClusterITCase extends TestLogger {
                        executeJob(miniCluster);
                }
                finally {
-                       miniCluster.shutdown();
+                       miniCluster.close();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4e7f03e4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
index 5941ee5..4cc23fc 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
@@ -105,7 +105,7 @@ public class Flip6LocalStreamEnvironment extends 
LocalStreamEnvironment {
                }
                finally {
                        transformations.clear();
-                       miniCluster.shutdown();
+                       miniCluster.close();
                }
        }
 }

Reply via email to