XComp commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1293129190


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -261,34 +261,69 @@ public void grantLeadership(UUID leaderSessionID) {
 
     @GuardedBy("lock")
     private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
-        sequentialOperation =
-                sequentialOperation.thenRun(
-                        () ->
-                                runIfValidLeader(
-                                        leaderSessionId,
-                                        ThrowingRunnable.unchecked(
-                                                () ->
-                                                        
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(
-                                                                
leaderSessionId)),
-                                        "verify job scheduling status and 
create JobMasterServiceProcess"));
-
+        boolean isValid;
+        synchronized (lock) {
+            isValid = isValidLeader(leaderSessionId);
+        }
+        if (isValid) {

Review Comment:
   ```suggestion
   ```
   This part is obsolete and can be removed: You see that by checking the call 
hierarchy of this method. It's used by `grantLeadership` and only called if in 
running state (and the leadership is granted).



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -261,34 +261,69 @@ public void grantLeadership(UUID leaderSessionID) {
 
     @GuardedBy("lock")
     private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
-        sequentialOperation =
-                sequentialOperation.thenRun(
-                        () ->
-                                runIfValidLeader(
-                                        leaderSessionId,
-                                        ThrowingRunnable.unchecked(
-                                                () ->
-                                                        
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(
-                                                                
leaderSessionId)),
-                                        "verify job scheduling status and 
create JobMasterServiceProcess"));
-
+        boolean isValid;
+        synchronized (lock) {
+            isValid = isValidLeader(leaderSessionId);
+        }
+        if (isValid) {
+            sequentialOperation =
+                    sequentialOperation.thenCompose(
+                            unused ->
+                                    runIfValidLeader(
+                                                    leaderSessionId,
+                                                    () ->
+                                                            
jobResultStore.hasJobResultEntryAsync(
+                                                                    
getJobID()),
+                                                    "verify jbb result entry")
+                                            .handle(
+                                                    (hasJobResult, throwable) 
-> {
+                                                        if (hasJobResult == 
null) {

Review Comment:
   This would swallow a potential IOException in the `hasJobResultEntryAsync` 
call. The old implementation handled these kind of errors. PTAL



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -261,34 +261,69 @@ public void grantLeadership(UUID leaderSessionID) {
 
     @GuardedBy("lock")
     private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
-        sequentialOperation =
-                sequentialOperation.thenRun(
-                        () ->
-                                runIfValidLeader(
-                                        leaderSessionId,
-                                        ThrowingRunnable.unchecked(
-                                                () ->
-                                                        
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(
-                                                                
leaderSessionId)),
-                                        "verify job scheduling status and 
create JobMasterServiceProcess"));
-
+        boolean isValid;
+        synchronized (lock) {
+            isValid = isValidLeader(leaderSessionId);
+        }
+        if (isValid) {
+            sequentialOperation =
+                    sequentialOperation.thenCompose(
+                            unused ->
+                                    runIfValidLeader(
+                                                    leaderSessionId,
+                                                    () ->
+                                                            
jobResultStore.hasJobResultEntryAsync(
+                                                                    
getJobID()),
+                                                    "verify jbb result entry")
+                                            .handle(
+                                                    (hasJobResult, throwable) 
-> {
+                                                        if (hasJobResult == 
null) {
+                                                            return null;
+                                                        }
+                                                        if (hasJobResult) {
+                                                            runIfValidLeader(
+                                                                    
leaderSessionId,
+                                                                    () -> {
+                                                                        
jobAlreadyDone(
+                                                                               
 leaderSessionId);
+                                                                        return 
CompletableFuture
+                                                                               
 .completedFuture(
+                                                                               
         null);
+                                                                    },
+                                                                    "check 
completed job");
+                                                        } else {
+                                                            runIfValidLeader(
+                                                                    
leaderSessionId,
+                                                                    () -> {
+                                                                        
ThrowingRunnable.unchecked(
+                                                                               
         () ->
+                                                                               
                 createNewJobMasterServiceProcess(
+                                                                               
                         leaderSessionId))
+                                                                               
 .run();
+                                                                        return 
CompletableFuture
+                                                                               
 .completedFuture(
+                                                                               
         null);
+                                                                    },
+                                                                    "create 
new job master service process");
+                                                        }
+                                                        return null;
+                                                    }));
+        } else {
+            LOG.trace(
+                    "Ignore leader action '{}' because the leadership runner 
is no longer the valid leader for {}.",
+                    "verify job scheduling status and create 
JobMasterServiceProcess",
+                    leaderSessionId);
+        }
         handleAsyncOperationError(sequentialOperation, "Could not start the 
job manager.");
     }
 
     @GuardedBy("lock")
     private void 
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(UUID leaderSessionId)

Review Comment:
   That method is not used anymore.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -261,34 +261,69 @@ public void grantLeadership(UUID leaderSessionID) {
 
     @GuardedBy("lock")
     private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
-        sequentialOperation =
-                sequentialOperation.thenRun(
-                        () ->
-                                runIfValidLeader(
-                                        leaderSessionId,
-                                        ThrowingRunnable.unchecked(
-                                                () ->
-                                                        
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(
-                                                                
leaderSessionId)),
-                                        "verify job scheduling status and 
create JobMasterServiceProcess"));
-
+        boolean isValid;
+        synchronized (lock) {
+            isValid = isValidLeader(leaderSessionId);
+        }
+        if (isValid) {
+            sequentialOperation =
+                    sequentialOperation.thenCompose(
+                            unused ->
+                                    runIfValidLeader(
+                                                    leaderSessionId,
+                                                    () ->
+                                                            
jobResultStore.hasJobResultEntryAsync(
+                                                                    
getJobID()),
+                                                    "verify jbb result entry")
+                                            .handle(

Review Comment:
   That works. But you could also call the `handle` on the result of the 
`thenCompose` call. I hope that this would improve readability because we would 
not have so many indentations. Another way to fix these indentation/readability 
issues is to move the async code into its own private method with a meaningful 
name.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -261,34 +261,69 @@ public void grantLeadership(UUID leaderSessionID) {
 
     @GuardedBy("lock")
     private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
-        sequentialOperation =
-                sequentialOperation.thenRun(
-                        () ->
-                                runIfValidLeader(
-                                        leaderSessionId,
-                                        ThrowingRunnable.unchecked(
-                                                () ->
-                                                        
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(
-                                                                
leaderSessionId)),
-                                        "verify job scheduling status and 
create JobMasterServiceProcess"));
-
+        boolean isValid;
+        synchronized (lock) {
+            isValid = isValidLeader(leaderSessionId);
+        }
+        if (isValid) {
+            sequentialOperation =
+                    sequentialOperation.thenCompose(
+                            unused ->
+                                    runIfValidLeader(
+                                                    leaderSessionId,
+                                                    () ->
+                                                            
jobResultStore.hasJobResultEntryAsync(
+                                                                    
getJobID()),
+                                                    "verify jbb result entry")

Review Comment:
   ```suggestion
                                                       "verify job result 
entry")
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -488,18 +523,21 @@ private boolean isRunning() {
         return state == State.RUNNING;
     }
 
-    private void runIfValidLeader(
-            UUID expectedLeaderId, Runnable action, String actionDescription) {
+    private <T> CompletableFuture<T> runIfValidLeader(

Review Comment:
   nit: This method should be called `supplyIfValidLeader`. But with your 
current implementation, a dedicated `runIfValidLeader` would be still helpful. 
Hint: SupplierAsync-based and Runnable-based helper methods can be translated 
to each other in the following way:
   ```
   private <T> CompletableFuture<T> supplyAsyncIfValidLeader(
               UUID expectedLeaderId,
               Supplier<CompletableFuture<T>> supplier,
               Supplier<CompletableFuture<T>> noLeaderFallback) {
           final CompletableFuture<T> resultFuture = new CompletableFuture<>();
           runIfValidLeader(
                   expectedLeaderId,
                   () -> FutureUtils.forward(supplier.get(), resultFuture),
                   () -> FutureUtils.forward(noLeaderFallback.get(), 
resultFuture));
   
           return resultFuture;
       }
   
       private void runIfValidLeader(
               UUID expectedLeaderId, Runnable action, Runnable 
noLeaderFallback) {
           synchronized (lock) {
               if (isValidLeader(expectedLeaderId)) {
                   action.run();
               } else {
                   noLeaderFallback.run();
               }
           }
       }
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -261,34 +261,69 @@ public void grantLeadership(UUID leaderSessionID) {
 
     @GuardedBy("lock")
     private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
-        sequentialOperation =
-                sequentialOperation.thenRun(
-                        () ->
-                                runIfValidLeader(
-                                        leaderSessionId,
-                                        ThrowingRunnable.unchecked(
-                                                () ->
-                                                        
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(
-                                                                
leaderSessionId)),
-                                        "verify job scheduling status and 
create JobMasterServiceProcess"));
-
+        boolean isValid;
+        synchronized (lock) {
+            isValid = isValidLeader(leaderSessionId);
+        }
+        if (isValid) {
+            sequentialOperation =
+                    sequentialOperation.thenCompose(
+                            unused ->
+                                    runIfValidLeader(
+                                                    leaderSessionId,
+                                                    () ->
+                                                            
jobResultStore.hasJobResultEntryAsync(
+                                                                    
getJobID()),
+                                                    "verify jbb result entry")
+                                            .handle(
+                                                    (hasJobResult, throwable) 
-> {
+                                                        if (hasJobResult == 
null) {
+                                                            return null;
+                                                        }
+                                                        if (hasJobResult) {
+                                                            runIfValidLeader(

Review Comment:
   I guess we could remove code redundancy here by moving the 
`runIfValidLeader` out of the if clause. Alternatively, you could move the 
content of the two if/else blocks into their own private method with meaningful 
names (e.g. `handleJobAlreadyDoneIfValidLeader(leaderSessionID)` and 
`createNewJobMasterServiceProcessIfValidLeader(leaderSessionID)`) to improve 
this part of the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to