tillrohrmann commented on a change in pull request #7565: [FLINK-11400] 
Linearize leadership operations in JobManagerRunner
URL: https://github.com/apache/flink/pull/7565#discussion_r251455684
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
 ##########
 @@ -277,38 +285,72 @@ public void grantLeadership(final UUID leaderSessionID) {
                                return;
                        }
 
-                       try {
-                               
verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
-                       } catch (Exception e) {
-                               handleJobManagerRunnerError(e);
-                       }
+                       leadershipOperation = leadershipOperation.thenCompose(
+                               (ignored) -> {
+                                       synchronized (lock) {
+                                               return 
verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
+                                       }
+                               });
+
+                       handleException(leadershipOperation, "Could not start 
the job manager.");
                }
        }
 
-       private void verifyJobSchedulingStatusAndStartJobManager(UUID 
leaderSessionId) throws Exception {
-               final JobSchedulingStatus jobSchedulingStatus = 
runningJobsRegistry.getJobSchedulingStatus(jobGraph.getJobID());
+       private CompletableFuture<Void> 
verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) {
+               final CompletableFuture<JobSchedulingStatus> 
jobSchedulingStatusFuture = getJobSchedulingStatus();
+
+               return jobSchedulingStatusFuture.thenCompose(
+                       jobSchedulingStatus -> {
+                               if (jobSchedulingStatus == 
JobSchedulingStatus.DONE) {
+                                       return jobAlreadyDone();
+                               } else {
+                                       return startJobManager(leaderSessionId);
+                               }
+                       });
+       }
 
-               if (jobSchedulingStatus == JobSchedulingStatus.DONE) {
-                       log.info("Granted leader ship but job {} has been 
finished. ", jobGraph.getJobID());
-                       jobFinishedByOther();
-               } else {
-                       log.info("JobManager runner for job {} ({}) was granted 
leadership with session id {} at {}.",
-                               jobGraph.getName(), jobGraph.getJobID(), 
leaderSessionId, getAddress());
+       private CompletionStage<Void> startJobManager(UUID leaderSessionId) {
 
 Review comment:
   good idea. Will change it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to