tillrohrmann commented on a change in pull request #6678: [FLINK-10255] Only 
react to onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#discussion_r217291912
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 ##########
 @@ -879,24 +917,66 @@ public void handleError(final Exception exception) {
 
        @Override
        public void onAddedJobGraph(final JobID jobId) {
-               final CompletableFuture<SubmittedJobGraph> recoveredJob = 
getRpcService().execute(
-                       () -> submittedJobGraphStore.recoverJobGraph(jobId));
-
-               final CompletableFuture<Acknowledge> submissionFuture = 
recoveredJob.thenComposeAsync(
-                       (SubmittedJobGraph submittedJobGraph) -> 
submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT),
-                       getMainThreadExecutor());
-
-               submissionFuture.whenComplete(
-                       (Acknowledge acknowledge, Throwable throwable) -> {
-                               if (throwable != null) {
-                                       onFatalError(
-                                               new DispatcherException(
-                                                       String.format("Could 
not start the added job %s", jobId),
-                                                       
ExceptionUtils.stripCompletionException(throwable)));
+               runAsync(
+                       () -> {
+                               if (!jobManagerRunners.containsKey(jobId)) {
+                                       final CompletableFuture<JobGraph> 
recoveredJob = recoveryOperation.thenApplyAsync(
+                                               ignored -> {
+                                                       try {
+                                                               return 
recoverJob(jobId);
+                                                       } catch (Exception e) {
+                                                               
ExceptionUtils.rethrow(e);
+                                                       }
+                                                       return null;
+                                               },
+                                               getRpcService().getExecutor());
+
+                                       final DispatcherId dispatcherId = 
getFencingToken();
+                                       final CompletableFuture<Void> 
submissionFuture = recoveredJob.thenComposeAsync(
+                                               
(FunctionWithThrowable<JobGraph, CompletableFuture<Void>, Exception>) (JobGraph 
jobGraph) -> tryRunRecoveredJobGraph(jobGraph, dispatcherId)
+                                                       .thenAcceptAsync(
+                                                               
(ConsumerWithException<Boolean, Exception>) (Boolean isRecoveredJobRunning) -> {
 
 Review comment:
   Good point. I like this approach better. Will adapt the existing interfaces.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to