WencongLiu commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1295337443


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -261,34 +261,69 @@ public void grantLeadership(UUID leaderSessionID) {
 
     @GuardedBy("lock")
     private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
-        sequentialOperation =
-                sequentialOperation.thenRun(
-                        () ->
-                                runIfValidLeader(
-                                        leaderSessionId,
-                                        ThrowingRunnable.unchecked(
-                                                () ->
-                                                        
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(
-                                                                
leaderSessionId)),
-                                        "verify job scheduling status and 
create JobMasterServiceProcess"));
-
+        boolean isValid;
+        synchronized (lock) {
+            isValid = isValidLeader(leaderSessionId);
+        }
+        if (isValid) {
+            sequentialOperation =
+                    sequentialOperation.thenCompose(
+                            unused ->
+                                    runIfValidLeader(
+                                                    leaderSessionId,
+                                                    () ->
+                                                            
jobResultStore.hasJobResultEntryAsync(
+                                                                    
getJobID()),
+                                                    "verify jbb result entry")
+                                            .handle(
+                                                    (hasJobResult, throwable) 
-> {
+                                                        if (hasJobResult == 
null) {
+                                                            return null;
+                                                        }
+                                                        if (hasJobResult) {
+                                                            runIfValidLeader(
+                                                                    
leaderSessionId,
+                                                                    () -> {
+                                                                        
jobAlreadyDone(
+                                                                               
 leaderSessionId);
+                                                                        return 
CompletableFuture
+                                                                               
 .completedFuture(
+                                                                               
         null);
+                                                                    },
+                                                                    "check 
completed job");
+                                                        } else {
+                                                            runIfValidLeader(
+                                                                    
leaderSessionId,
+                                                                    () -> {
+                                                                        
ThrowingRunnable.unchecked(
+                                                                               
         () ->
+                                                                               
                 createNewJobMasterServiceProcess(
+                                                                               
                         leaderSessionId))
+                                                                               
 .run();
+                                                                        return 
CompletableFuture
+                                                                               
 .completedFuture(
+                                                                               
         null);
+                                                                    },
+                                                                    "create 
new job master service process");
+                                                        }
+                                                        return null;
+                                                    }));
+        } else {
+            LOG.trace(
+                    "Ignore leader action '{}' because the leadership runner 
is no longer the valid leader for {}.",
+                    "verify job scheduling status and create 
JobMasterServiceProcess",
+                    leaderSessionId);
+        }
         handleAsyncOperationError(sequentialOperation, "Could not start the 
job manager.");
     }
 
     @GuardedBy("lock")
     private void 
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(UUID leaderSessionId)

Review Comment:
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to