This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git
commit f6cbd8b05a81c3f8330c6c2a162103775f159149 Author: Till Rohrmann <[email protected]> AuthorDate: Fri Feb 22 12:22:36 2019 +0100 [FLINK-11718] Add onStart to Dispatcher --- .../flink/runtime/dispatcher/Dispatcher.java | 86 ++++++++++++++-------- .../flink/runtime/dispatcher/DispatcherTest.java | 2 + .../runtime/dispatcher/TestingDispatcher.java | 20 +++++ 3 files changed, 77 insertions(+), 31 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 47796b0..a9f1a68 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -183,6 +183,38 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme //------------------------------------------------------ @Override + public void onStart() throws Exception { + try { + startDispatcherServices(); + } catch (Exception e) { + final DispatcherException exception = new DispatcherException(String.format("Could not start the Dispatcher %s", getAddress()), e); + onFatalError(exception); + throw exception; + } + } + + private void startDispatcherServices() throws Exception { + try { + submittedJobGraphStore.start(this); + leaderElectionService.start(this); + + registerDispatcherMetrics(jobManagerMetricGroup); + } catch (Exception e) { + handleStartDispatcherServicesException(e); + } + } + + private void handleStartDispatcherServicesException(Exception e) throws Exception { + try { + stopDispatcherServices(); + } catch (Exception exception) { + e.addSuppressed(exception); + } + + throw e; + } + + @Override public CompletableFuture<Void> onStop() { log.info("Stopping dispatcher {}.", getAddress()); @@ -191,43 +223,35 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme return FutureUtils.runAfterwards( allJobManagerRunnersTerminationFuture, () -> { - Exception exception = null; - try { - jobManagerSharedServices.shutdown(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - - try { - submittedJobGraphStore.stop(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - - try { - leaderElectionService.stop(); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } + stopDispatcherServices(); - jobManagerMetricGroup.close(); - - if (exception != null) { - throw exception; - } else { - log.info("Stopped dispatcher {}.", getAddress()); - } + log.info("Stopped dispatcher {}.", getAddress()); }); } - @Override - public void start() throws Exception { - super.start(); + private void stopDispatcherServices() throws Exception { + Exception exception = null; + try { + jobManagerSharedServices.shutdown(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + try { + submittedJobGraphStore.stop(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + try { + leaderElectionService.stop(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } - submittedJobGraphStore.start(this); - leaderElectionService.start(this); + jobManagerMetricGroup.close(); - registerDispatcherMetrics(jobManagerMetricGroup); + ExceptionUtils.tryRethrowException(exception); } //------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index f0c2772..61c91d0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -545,6 +545,8 @@ public class DispatcherTest extends TestLogger { dispatcher = createAndStartDispatcher(heartbeatServices, haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, createdJobManagerRunnerLatch)); + dispatcher.waitUntilStarted(); + final SubmittedJobGraph submittedJobGraph = new SubmittedJobGraph(jobGraph); submittedJobGraphStore.putJobGraph(submittedJobGraph); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java index 347f0fc..9b002b7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java @@ -42,6 +42,8 @@ import java.util.function.Function; */ class TestingDispatcher extends Dispatcher { + private final CompletableFuture<Void> startFuture; + TestingDispatcher( RpcService rpcService, String endpointId, @@ -70,6 +72,20 @@ class TestingDispatcher extends Dispatcher { jobManagerRunnerFactory, fatalErrorHandler, VoidHistoryServerArchivist.INSTANCE); + + this.startFuture = new CompletableFuture<>(); + } + + @Override + public void onStart() throws Exception { + try { + super.onStart(); + } catch (Exception e) { + startFuture.completeExceptionally(e); + throw e; + } + + startFuture.complete(null); } void completeJobExecution(ArchivedExecutionGraph archivedExecutionGraph) { @@ -94,4 +110,8 @@ class TestingDispatcher extends Dispatcher { () -> listJobs(timeout).get().size(), timeout); } + + void waitUntilStarted() { + startFuture.join(); + } }
