Repository: flink
Updated Branches:
  refs/heads/master e461208a9 -> 4e7f03e41


[FLINK-8773] [flip6] Make JobManagerRunner shut down non blocking

The Dispatcher no longer shuts down the JobManagerRunner in a blocking fashion.
Instead it registers the termination futures and calls the shut down of the
JobManagerSharedServices once all JobManagerRunners have terminated.

This closes #5575.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2c40697e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2c40697e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2c40697e

Branch: refs/heads/master
Commit: 2c40697e46c0e4e9b7c5b86a3ef94b31c3330cb6
Parents: e461208
Author: Till Rohrmann <[email protected]>
Authored: Sat Feb 24 15:39:31 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Sat Feb 24 23:19:09 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    | 111 +++++++++----------
 .../runtime/jobmaster/JobManagerRunner.java     |  13 +--
 .../runtime/dispatcher/MiniDispatcherTest.java  |   1 +
 .../runtime/jobmaster/JobManagerRunnerTest.java |   8 +-
 4 files changed, 64 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2c40697e/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index e212752..b31d04d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -65,13 +65,16 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 
 /**
  * Base class for the Dispatcher component. The Dispatcher component is 
responsible
@@ -109,6 +112,8 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
        @Nullable
        protected final String restAddress;
 
+       private CompletableFuture<Void> 
orphanedJobManagerRunnersTerminationFuture = 
CompletableFuture.completedFuture(null);
+
        public Dispatcher(
                        RpcService rpcService,
                        String endpointId,
@@ -158,38 +163,39 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
        @Override
        public CompletableFuture<Void> postStop() {
                log.info("Stopping dispatcher {}.", getAddress());
-               Exception exception = null;
 
-               try {
-                       clearState();
-               } catch (Exception e) {
-                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-               }
+               final CompletableFuture<Void> 
jobManagerRunnersTerminationFuture = terminateJobManagerRunners();
 
-               try {
-                       jobManagerSharedServices.shutdown();
-               } catch (Exception e) {
-                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-               }
+               final CompletableFuture<Void> 
allJobManagerRunnersTerminationFuture = FutureUtils.completeAll(Arrays.asList(
+                       jobManagerRunnersTerminationFuture,
+                       orphanedJobManagerRunnersTerminationFuture));
 
-               try {
-                       submittedJobGraphStore.stop();
-               } catch (Exception e) {
-                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-               }
+               return FutureUtils.runAfterwards(
+                       allJobManagerRunnersTerminationFuture,
+                       () -> {
+                               Exception exception = null;
+                               try {
+                                       jobManagerSharedServices.shutdown();
+                               } catch (Exception e) {
+                                       exception = 
ExceptionUtils.firstOrSuppressed(e, exception);
+                               }
 
-               try {
-                       leaderElectionService.stop();
-               } catch (Exception e) {
-                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-               }
+                               try {
+                                       submittedJobGraphStore.stop();
+                               } catch (Exception e) {
+                                       exception = 
ExceptionUtils.firstOrSuppressed(e, exception);
+                               }
 
-               if (exception != null) {
-                       return FutureUtils.completedExceptionally(
-                               new FlinkException("Could not properly 
terminate the Dispatcher.", exception));
-               } else {
-                       return CompletableFuture.completedFuture(null);
-               }
+                               try {
+                                       leaderElectionService.stop();
+                               } catch (Exception e) {
+                                       exception = 
ExceptionUtils.firstOrSuppressed(e, exception);
+                               }
+
+                               if (exception != null) {
+                                       throw exception;
+                               }
+                       });
        }
 
        @Override
@@ -491,7 +497,8 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                JobManagerRunner jobManagerRunner = 
jobManagerRunners.remove(jobId);
 
                if (jobManagerRunner != null) {
-                       jobManagerRunner.shutdown();
+                       final CompletableFuture<Void> 
jobManagerRunnerTerminationFuture = jobManagerRunner.closeAsync();
+                       
registerOrphanedJobManagerTerminationFuture(jobManagerRunnerTerminationFuture);
                }
 
                if (cleanupHA) {
@@ -502,28 +509,17 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
        }
 
        /**
-        * Clears the state of the dispatcher.
+        * Terminate all currently running {@link JobManagerRunner}.
         *
-        * <p>The state are all currently running jobs.
+        * @return Future which is completed once all {@link JobManagerRunner} 
have terminated
         */
-       private void clearState() throws Exception {
-               Exception exception = null;
-
+       private CompletableFuture<Void> terminateJobManagerRunners() {
                log.info("Stopping all currently running jobs of dispatcher 
{}.", getAddress());
-               // stop all currently running JobManager since they run in the 
same process
-               for (JobManagerRunner jobManagerRunner : 
jobManagerRunners.values()) {
-                       try {
-                               jobManagerRunner.shutdown();
-                       } catch (Exception e) {
-                               exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-                       }
-               }
+               final List<CompletableFuture<Void>> terminationFutures = 
jobManagerRunners.values().stream()
+                       .map(JobManagerRunner::closeAsync)
+                       .collect(Collectors.toList());
 
-               jobManagerRunners.clear();
-
-               if (exception != null) {
-                       throw exception;
-               }
+               return FutureUtils.completeAll(terminationFutures);
        }
 
        /**
@@ -600,6 +596,12 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                }
        }
 
+       private void 
registerOrphanedJobManagerTerminationFuture(CompletableFuture<Void> 
jobManagerRunnerTerminationFuture) {
+               orphanedJobManagerRunnersTerminationFuture = 
FutureUtils.completeAll(Arrays.asList(
+                       orphanedJobManagerRunnersTerminationFuture,
+                       jobManagerRunnerTerminationFuture));
+       }
+
        //------------------------------------------------------
        // Leader contender
        //------------------------------------------------------
@@ -619,11 +621,9 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
 
                                // clear the state if we've been the leader 
before
                                if (getFencingToken() != null) {
-                                       try {
-                                               clearState();
-                                       } catch (Exception e) {
-                                               log.warn("Could not properly 
clear the Dispatcher state while granting leadership.", e);
-                                       }
+                                       final CompletableFuture<Void> 
jobManagerRunnersTerminationFuture = terminateJobManagerRunners();
+                                       
registerOrphanedJobManagerTerminationFuture(jobManagerRunnersTerminationFuture);
+                                       jobManagerRunners.clear();
                                }
 
                                setFencingToken(dispatcherId);
@@ -644,11 +644,10 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                runAsyncWithoutFencing(
                        () -> {
                                log.info("Dispatcher {} was revoked 
leadership.", getAddress());
-                               try {
-                                       clearState();
-                               } catch (Exception e) {
-                                       log.warn("Could not properly clear the 
Dispatcher state while revoking leadership.", e);
-                               }
+
+                               final CompletableFuture<Void> 
jobManagerRunnersTerminationFuture = terminateJobManagerRunners();
+                               
registerOrphanedJobManagerTerminationFuture(jobManagerRunnersTerminationFuture);
+                               jobManagerRunners.clear();
 
                                // clear the fencing token indicating that we 
don't have the leadership right now
                                setFencingToken(null);

http://git-wip-us.apache.org/repos/asf/flink/blob/2c40697e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 4269243..80aa673 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -40,6 +40,7 @@ import 
org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.metrics.util.MetricUtils;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
@@ -59,7 +60,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * The runner for the job manager. It deals with job level leader election and 
make underlying job manager
  * properly reacted.
  */
-public class JobManagerRunner implements LeaderContender, OnCompletionActions, 
FatalErrorHandler {
+public class JobManagerRunner implements LeaderContender, OnCompletionActions, 
FatalErrorHandler, AutoCloseableAsync {
 
        private static final Logger log = 
LoggerFactory.getLogger(JobManagerRunner.class);
 
@@ -98,9 +99,6 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
         * Exceptions that occur while creating the JobManager or 
JobManagerRunner are directly
         * thrown and not reported to the given {@code FatalErrorHandler}.
         *
-        * <p>This JobManagerRunner assumes that it owns the given {@code 
JobManagerSharedServices}.
-        * It will shut them down on error and on calls to {@link #shutdown()}.
-        *
         * @throws Exception Thrown if the runner cannot be set up, because 
either one of the
         *                   required services could not be started, ot the Job 
could not be initialized.
         */
@@ -214,11 +212,8 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
                }
        }
 
-       public void shutdown() throws Exception {
-               shutdownInternally().get();
-       }
-
-       private CompletableFuture<Void> shutdownInternally() {
+       @Override
+       public CompletableFuture<Void> closeAsync() {
                synchronized (lock) {
                        if (!shutdown) {
                                shutdown = true;

http://git-wip-us.apache.org/repos/asf/flink/blob/2c40697e/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index 2b98939..c6eda2e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -289,6 +289,7 @@ public class MiniDispatcherTest extends TestLogger {
 
                        final JobManagerRunner mock = 
mock(JobManagerRunner.class);
                        when(mock.getResultFuture()).thenReturn(resultFuture);
+                       
when(mock.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
 
                        return mock;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/2c40697e/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
index ba72293..0c238ce 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java
@@ -152,7 +152,7 @@ public class JobManagerRunnerTest extends TestLogger {
 
                        assertThat(resultFuture.get(), 
is(archivedExecutionGraph));
                } finally {
-                       jobManagerRunner.shutdown();
+                       jobManagerRunner.close();
                }
        }
 
@@ -176,7 +176,7 @@ public class JobManagerRunnerTest extends TestLogger {
                                
assertThat(ExceptionUtils.stripExecutionException(ee), 
instanceOf(JobNotFinishedException.class));
                        }
                } finally {
-                       jobManagerRunner.shutdown();
+                       jobManagerRunner.close();
                }
        }
 
@@ -191,7 +191,7 @@ public class JobManagerRunnerTest extends TestLogger {
 
                        assertThat(resultFuture.isDone(), is(false));
 
-                       jobManagerRunner.shutdown();
+                       jobManagerRunner.closeAsync();
 
                        try {
                                resultFuture.get();
@@ -200,7 +200,7 @@ public class JobManagerRunnerTest extends TestLogger {
                                
assertThat(ExceptionUtils.stripExecutionException(ee), 
instanceOf(JobNotFinishedException.class));
                        }
                } finally {
-                       jobManagerRunner.shutdown();
+                       jobManagerRunner.close();
                }
        }
 

Reply via email to