This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 579f5e082422e8eb3a1c62cb2ea1a2c1fb472851
Author: Till Rohrmann <[email protected]>
AuthorDate: Fri Feb 22 12:22:36 2019 +0100

    [FLINK-11718] Add onStart to Dispatcher
---
 .../flink/runtime/dispatcher/Dispatcher.java       | 86 ++++++++++++++--------
 .../flink/runtime/dispatcher/DispatcherTest.java   |  2 +
 .../runtime/dispatcher/TestingDispatcher.java      | 20 +++++
 3 files changed, 77 insertions(+), 31 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 47796b0..a9f1a68 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -183,6 +183,38 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
        //------------------------------------------------------
 
        @Override
+       public void onStart() throws Exception {
+               try {
+                       startDispatcherServices();
+               } catch (Exception e) {
+                       final DispatcherException exception = new 
DispatcherException(String.format("Could not start the Dispatcher %s", 
getAddress()), e);
+                       onFatalError(exception);
+                       throw exception;
+               }
+       }
+
+       private void startDispatcherServices() throws Exception {
+               try {
+                       submittedJobGraphStore.start(this);
+                       leaderElectionService.start(this);
+
+                       registerDispatcherMetrics(jobManagerMetricGroup);
+               } catch (Exception e) {
+                       handleStartDispatcherServicesException(e);
+               }
+       }
+
+       private void handleStartDispatcherServicesException(Exception e) throws 
Exception {
+               try {
+                       stopDispatcherServices();
+               } catch (Exception exception) {
+                       e.addSuppressed(exception);
+               }
+
+               throw e;
+       }
+
+       @Override
        public CompletableFuture<Void> onStop() {
                log.info("Stopping dispatcher {}.", getAddress());
 
@@ -191,43 +223,35 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                return FutureUtils.runAfterwards(
                        allJobManagerRunnersTerminationFuture,
                        () -> {
-                               Exception exception = null;
-                               try {
-                                       jobManagerSharedServices.shutdown();
-                               } catch (Exception e) {
-                                       exception = 
ExceptionUtils.firstOrSuppressed(e, exception);
-                               }
-
-                               try {
-                                       submittedJobGraphStore.stop();
-                               } catch (Exception e) {
-                                       exception = 
ExceptionUtils.firstOrSuppressed(e, exception);
-                               }
-
-                               try {
-                                       leaderElectionService.stop();
-                               } catch (Exception e) {
-                                       exception = 
ExceptionUtils.firstOrSuppressed(e, exception);
-                               }
+                               stopDispatcherServices();
 
-                               jobManagerMetricGroup.close();
-
-                               if (exception != null) {
-                                       throw exception;
-                               } else {
-                                       log.info("Stopped dispatcher {}.", 
getAddress());
-                               }
+                               log.info("Stopped dispatcher {}.", 
getAddress());
                        });
        }
 
-       @Override
-       public void start() throws Exception {
-               super.start();
+       private void stopDispatcherServices() throws Exception {
+               Exception exception = null;
+               try {
+                       jobManagerSharedServices.shutdown();
+               } catch (Exception e) {
+                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+               }
+
+               try {
+                       submittedJobGraphStore.stop();
+               } catch (Exception e) {
+                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+               }
+
+               try {
+                       leaderElectionService.stop();
+               } catch (Exception e) {
+                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+               }
 
-               submittedJobGraphStore.start(this);
-               leaderElectionService.start(this);
+               jobManagerMetricGroup.close();
 
-               registerDispatcherMetrics(jobManagerMetricGroup);
+               ExceptionUtils.tryRethrowException(exception);
        }
 
        //------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index f0c2772..61c91d0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -545,6 +545,8 @@ public class DispatcherTest extends TestLogger {
 
                dispatcher = createAndStartDispatcher(heartbeatServices, 
haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, 
createdJobManagerRunnerLatch));
 
+               dispatcher.waitUntilStarted();
+
                final SubmittedJobGraph submittedJobGraph = new 
SubmittedJobGraph(jobGraph);
                submittedJobGraphStore.putJobGraph(submittedJobGraph);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
index 347f0fc..9b002b7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
@@ -42,6 +42,8 @@ import java.util.function.Function;
  */
 class TestingDispatcher extends Dispatcher {
 
+       private final CompletableFuture<Void> startFuture;
+
        TestingDispatcher(
                RpcService rpcService,
                String endpointId,
@@ -70,6 +72,20 @@ class TestingDispatcher extends Dispatcher {
                        jobManagerRunnerFactory,
                        fatalErrorHandler,
                        VoidHistoryServerArchivist.INSTANCE);
+
+               this.startFuture = new CompletableFuture<>();
+       }
+
+       @Override
+       public void onStart() throws Exception {
+               try {
+                       super.onStart();
+               } catch (Exception e) {
+                       startFuture.completeExceptionally(e);
+                       throw e;
+               }
+
+               startFuture.complete(null);
        }
 
        void completeJobExecution(ArchivedExecutionGraph 
archivedExecutionGraph) {
@@ -94,4 +110,8 @@ class TestingDispatcher extends Dispatcher {
                        () -> listJobs(timeout).get().size(),
                        timeout);
        }
+
+       void waitUntilStarted() {
+               startFuture.join();
+       }
 }

Reply via email to