Repository: flink Updated Branches: refs/heads/master e461208a9 -> 4e7f03e41
[FLINK-8773] [flip6] Make JobManagerRunner shut down non blocking The Dispatcher no longer shuts down the JobManagerRunner in a blocking fashion. Instead it registers the termination futures and calls the shut down of the JobManagerSharedServices once all JobManagerRunners have terminated. This closes #5575. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2c40697e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2c40697e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2c40697e Branch: refs/heads/master Commit: 2c40697e46c0e4e9b7c5b86a3ef94b31c3330cb6 Parents: e461208 Author: Till Rohrmann <[email protected]> Authored: Sat Feb 24 15:39:31 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Sat Feb 24 23:19:09 2018 +0100 ---------------------------------------------------------------------- .../flink/runtime/dispatcher/Dispatcher.java | 111 +++++++++---------- .../runtime/jobmaster/JobManagerRunner.java | 13 +-- .../runtime/dispatcher/MiniDispatcherTest.java | 1 + .../runtime/jobmaster/JobManagerRunnerTest.java | 8 +- 4 files changed, 64 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2c40697e/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index e212752..b31d04d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -65,13 +65,16 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; /** * Base class for the Dispatcher component. The Dispatcher component is responsible @@ -109,6 +112,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme @Nullable protected final String restAddress; + private CompletableFuture<Void> orphanedJobManagerRunnersTerminationFuture = CompletableFuture.completedFuture(null); + public Dispatcher( RpcService rpcService, String endpointId, @@ -158,38 +163,39 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme @Override public CompletableFuture<Void> postStop() { log.info("Stopping dispatcher {}.", getAddress()); - Exception exception = null; - try { - clearState(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } + final CompletableFuture<Void> jobManagerRunnersTerminationFuture = terminateJobManagerRunners(); - try { - jobManagerSharedServices.shutdown(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } + final CompletableFuture<Void> allJobManagerRunnersTerminationFuture = FutureUtils.completeAll(Arrays.asList( + jobManagerRunnersTerminationFuture, + orphanedJobManagerRunnersTerminationFuture)); - try { - submittedJobGraphStore.stop(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } + return FutureUtils.runAfterwards( + allJobManagerRunnersTerminationFuture, + () -> { + Exception exception = null; + try { + jobManagerSharedServices.shutdown(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } - try { - leaderElectionService.stop(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } + try { + submittedJobGraphStore.stop(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } - if (exception != null) { - return FutureUtils.completedExceptionally( - new FlinkException("Could not properly terminate the Dispatcher.", exception)); - } else { - return CompletableFuture.completedFuture(null); - } + try { + leaderElectionService.stop(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + if (exception != null) { + throw exception; + } + }); } @Override @@ -491,7 +497,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme JobManagerRunner jobManagerRunner = jobManagerRunners.remove(jobId); if (jobManagerRunner != null) { - jobManagerRunner.shutdown(); + final CompletableFuture<Void> jobManagerRunnerTerminationFuture = jobManagerRunner.closeAsync(); + registerOrphanedJobManagerTerminationFuture(jobManagerRunnerTerminationFuture); } if (cleanupHA) { @@ -502,28 +509,17 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme } /** - * Clears the state of the dispatcher. + * Terminate all currently running {@link JobManagerRunner}. * - * <p>The state are all currently running jobs. + * @return Future which is completed once all {@link JobManagerRunner} have terminated */ - private void clearState() throws Exception { - Exception exception = null; - + private CompletableFuture<Void> terminateJobManagerRunners() { log.info("Stopping all currently running jobs of dispatcher {}.", getAddress()); - // stop all currently running JobManager since they run in the same process - for (JobManagerRunner jobManagerRunner : jobManagerRunners.values()) { - try { - jobManagerRunner.shutdown(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - } + final List<CompletableFuture<Void>> terminationFutures = jobManagerRunners.values().stream() + .map(JobManagerRunner::closeAsync) + .collect(Collectors.toList()); - jobManagerRunners.clear(); - - if (exception != null) { - throw exception; - } + return FutureUtils.completeAll(terminationFutures); } /** @@ -600,6 +596,12 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme } } + private void registerOrphanedJobManagerTerminationFuture(CompletableFuture<Void> jobManagerRunnerTerminationFuture) { + orphanedJobManagerRunnersTerminationFuture = FutureUtils.completeAll(Arrays.asList( + orphanedJobManagerRunnersTerminationFuture, + jobManagerRunnerTerminationFuture)); + } + //------------------------------------------------------ // Leader contender //------------------------------------------------------ @@ -619,11 +621,9 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme // clear the state if we've been the leader before if (getFencingToken() != null) { - try { - clearState(); - } catch (Exception e) { - log.warn("Could not properly clear the Dispatcher state while granting leadership.", e); - } + final CompletableFuture<Void> jobManagerRunnersTerminationFuture = terminateJobManagerRunners(); + registerOrphanedJobManagerTerminationFuture(jobManagerRunnersTerminationFuture); + jobManagerRunners.clear(); } setFencingToken(dispatcherId); @@ -644,11 +644,10 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme runAsyncWithoutFencing( () -> { log.info("Dispatcher {} was revoked leadership.", getAddress()); - try { - clearState(); - } catch (Exception e) { - log.warn("Could not properly clear the Dispatcher state while revoking leadership.", e); - } + + final CompletableFuture<Void> jobManagerRunnersTerminationFuture = terminateJobManagerRunners(); + registerOrphanedJobManagerTerminationFuture(jobManagerRunnersTerminationFuture); + jobManagerRunners.clear(); // clear the fencing token indicating that we don't have the leadership right now setFencingToken(null); http://git-wip-us.apache.org/repos/asf/flink/blob/2c40697e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java index 4269243..80aa673 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.metrics.util.MetricUtils; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.AutoCloseableAsync; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; @@ -59,7 +60,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * The runner for the job manager. It deals with job level leader election and make underlying job manager * properly reacted. */ -public class JobManagerRunner implements LeaderContender, OnCompletionActions, FatalErrorHandler { +public class JobManagerRunner implements LeaderContender, OnCompletionActions, FatalErrorHandler, AutoCloseableAsync { private static final Logger log = LoggerFactory.getLogger(JobManagerRunner.class); @@ -98,9 +99,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F * Exceptions that occur while creating the JobManager or JobManagerRunner are directly * thrown and not reported to the given {@code FatalErrorHandler}. * - * <p>This JobManagerRunner assumes that it owns the given {@code JobManagerSharedServices}. - * It will shut them down on error and on calls to {@link #shutdown()}. - * * @throws Exception Thrown if the runner cannot be set up, because either one of the * required services could not be started, ot the Job could not be initialized. */ @@ -214,11 +212,8 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F } } - public void shutdown() throws Exception { - shutdownInternally().get(); - } - - private CompletableFuture<Void> shutdownInternally() { + @Override + public CompletableFuture<Void> closeAsync() { synchronized (lock) { if (!shutdown) { shutdown = true; http://git-wip-us.apache.org/repos/asf/flink/blob/2c40697e/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java index 2b98939..c6eda2e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java @@ -289,6 +289,7 @@ public class MiniDispatcherTest extends TestLogger { final JobManagerRunner mock = mock(JobManagerRunner.class); when(mock.getResultFuture()).thenReturn(resultFuture); + when(mock.closeAsync()).thenReturn(CompletableFuture.completedFuture(null)); return mock; } http://git-wip-us.apache.org/repos/asf/flink/blob/2c40697e/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java index ba72293..0c238ce 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerTest.java @@ -152,7 +152,7 @@ public class JobManagerRunnerTest extends TestLogger { assertThat(resultFuture.get(), is(archivedExecutionGraph)); } finally { - jobManagerRunner.shutdown(); + jobManagerRunner.close(); } } @@ -176,7 +176,7 @@ public class JobManagerRunnerTest extends TestLogger { assertThat(ExceptionUtils.stripExecutionException(ee), instanceOf(JobNotFinishedException.class)); } } finally { - jobManagerRunner.shutdown(); + jobManagerRunner.close(); } } @@ -191,7 +191,7 @@ public class JobManagerRunnerTest extends TestLogger { assertThat(resultFuture.isDone(), is(false)); - jobManagerRunner.shutdown(); + jobManagerRunner.closeAsync(); try { resultFuture.get(); @@ -200,7 +200,7 @@ public class JobManagerRunnerTest extends TestLogger { assertThat(ExceptionUtils.stripExecutionException(ee), instanceOf(JobNotFinishedException.class)); } } finally { - jobManagerRunner.shutdown(); + jobManagerRunner.close(); } }
