XComp commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1295642610


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -262,36 +261,92 @@ public void grantLeadership(UUID leaderSessionID) {
     @GuardedBy("lock")
     private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
         sequentialOperation =
-                sequentialOperation.thenRun(
-                        () ->
-                                runIfValidLeader(
-                                        leaderSessionId,
-                                        ThrowingRunnable.unchecked(
+                sequentialOperation
+                        .thenCompose(
+                                unused ->
+                                        supplyAsyncIfValidLeader(
+                                                leaderSessionId,
                                                 () ->
-                                                        
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(
-                                                                
leaderSessionId)),
-                                        "verify job scheduling status and 
create JobMasterServiceProcess"));
-
+                                                        
jobResultStore.hasJobResultEntryAsync(
+                                                                getJobID()),
+                                                () ->
+                                                        
printLogIfNotValidLeader(
+                                                                "verify job 
result entry",
+                                                                
leaderSessionId)))
+                        .handle(
+                                (hasJobResult, throwable) -> {
+                                    if (throwable != null) {
+                                        ExceptionUtils.rethrow(throwable);
+                                    }
+                                    if (hasJobResult == null) {
+                                        return null;
+                                    }
+                                    if (hasJobResult) {
+                                        
handleJobAlreadyDoneIfValidLeader(leaderSessionId);
+                                    } else {
+                                        
createNewJobMasterServiceProcessIfValidLeader(
+                                                leaderSessionId);
+                                    }
+                                    return null;
+                                });
         handleAsyncOperationError(sequentialOperation, "Could not start the 
job manager.");
     }
 
-    @GuardedBy("lock")
-    private void 
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(UUID leaderSessionId)
-            throws FlinkException {
-        try {
-            if (jobResultStore.hasJobResultEntry(getJobID())) {
-                jobAlreadyDone(leaderSessionId);
+    private <T> CompletableFuture<T> supplyAsyncIfValidLeader(
+            UUID expectedLeaderId,
+            Supplier<CompletableFuture<T>> supplier,
+            Supplier<CompletableFuture<T>> noLeaderFallback) {
+        final CompletableFuture<T> resultFuture = new CompletableFuture<>();
+        runIfValidLeader(
+                expectedLeaderId,
+                () -> FutureUtils.forward(supplier.get(), resultFuture),
+                () -> FutureUtils.forward(noLeaderFallback.get(), 
resultFuture));
+
+        return resultFuture;
+    }
+
+    private void handleJobAlreadyDoneIfValidLeader(UUID leaderSessionId) {
+        runIfValidLeader(
+                leaderSessionId,
+                () -> {
+                    jobAlreadyDone(leaderSessionId);
+                },
+                () -> printLogIfNotValidLeader("check completed job", 
leaderSessionId));
+    }
+
+    private void createNewJobMasterServiceProcessIfValidLeader(UUID 
leaderSessionId) {
+        runIfValidLeader(
+                leaderSessionId,
+                () -> {
+                    ThrowingRunnable.unchecked(
+                                    () -> 
createNewJobMasterServiceProcess(leaderSessionId))
+                            .run();
+                },
+                () ->
+                        printLogIfNotValidLeader(
+                                "create new job master service process", 
leaderSessionId));
+    }
+
+    private void runIfValidLeader(

Review Comment:
   Moving the `runIfValidLeader` method up destroys the git history to some 
degree (imagine someone wants to see the history of this specific method in 
Intellij by selecting the method and using the "Git > Show history for 
Selection..." feature). Instead, why not refactor the method in its original 
location and add the `supplyAsyncIfValidLeader` next to that one.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -262,36 +261,92 @@ public void grantLeadership(UUID leaderSessionID) {
     @GuardedBy("lock")
     private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
         sequentialOperation =
-                sequentialOperation.thenRun(
-                        () ->
-                                runIfValidLeader(
-                                        leaderSessionId,
-                                        ThrowingRunnable.unchecked(
+                sequentialOperation
+                        .thenCompose(
+                                unused ->
+                                        supplyAsyncIfValidLeader(
+                                                leaderSessionId,
                                                 () ->
-                                                        
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(
-                                                                
leaderSessionId)),
-                                        "verify job scheduling status and 
create JobMasterServiceProcess"));
-
+                                                        
jobResultStore.hasJobResultEntryAsync(
+                                                                getJobID()),
+                                                () ->
+                                                        
printLogIfNotValidLeader(
+                                                                "verify job 
result entry",
+                                                                
leaderSessionId)))
+                        .handle(
+                                (hasJobResult, throwable) -> {
+                                    if (throwable != null) {
+                                        ExceptionUtils.rethrow(throwable);
+                                    }
+                                    if (hasJobResult == null) {
+                                        return null;
+                                    }
+                                    if (hasJobResult) {
+                                        
handleJobAlreadyDoneIfValidLeader(leaderSessionId);
+                                    } else {
+                                        
createNewJobMasterServiceProcessIfValidLeader(
+                                                leaderSessionId);
+                                    }
+                                    return null;
+                                });
         handleAsyncOperationError(sequentialOperation, "Could not start the 
job manager.");
     }
 
-    @GuardedBy("lock")
-    private void 
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(UUID leaderSessionId)
-            throws FlinkException {
-        try {
-            if (jobResultStore.hasJobResultEntry(getJobID())) {
-                jobAlreadyDone(leaderSessionId);
+    private <T> CompletableFuture<T> supplyAsyncIfValidLeader(
+            UUID expectedLeaderId,
+            Supplier<CompletableFuture<T>> supplier,
+            Supplier<CompletableFuture<T>> noLeaderFallback) {
+        final CompletableFuture<T> resultFuture = new CompletableFuture<>();
+        runIfValidLeader(
+                expectedLeaderId,
+                () -> FutureUtils.forward(supplier.get(), resultFuture),
+                () -> FutureUtils.forward(noLeaderFallback.get(), 
resultFuture));
+
+        return resultFuture;
+    }
+
+    private void handleJobAlreadyDoneIfValidLeader(UUID leaderSessionId) {
+        runIfValidLeader(
+                leaderSessionId,
+                () -> {
+                    jobAlreadyDone(leaderSessionId);
+                },
+                () -> printLogIfNotValidLeader("check completed job", 
leaderSessionId));
+    }
+
+    private void createNewJobMasterServiceProcessIfValidLeader(UUID 
leaderSessionId) {
+        runIfValidLeader(
+                leaderSessionId,
+                () -> {
+                    ThrowingRunnable.unchecked(
+                                    () -> 
createNewJobMasterServiceProcess(leaderSessionId))
+                            .run();
+                },
+                () ->
+                        printLogIfNotValidLeader(
+                                "create new job master service process", 
leaderSessionId));
+    }
+
+    private void runIfValidLeader(
+            UUID expectedLeaderId, Runnable action, Runnable noLeaderFallback) 
{
+        synchronized (lock) {
+            if (isValidLeader(expectedLeaderId)) {
+                action.run();
             } else {
-                createNewJobMasterServiceProcess(leaderSessionId);
+                noLeaderFallback.run();
             }
-        } catch (IOException e) {
-            throw new FlinkException(
-                    String.format(
-                            "Could not retrieve the job scheduling status for 
job %s.", getJobID()),
-                    e);
         }
     }
 
+    private CompletableFuture<Boolean> printLogIfNotValidLeader(
+            String actionDescription, UUID leaderSessionId) {
+        LOG.trace(

Review Comment:
   a bit out-of-scope, but: I feel like these log statement should be in debug 
level. :thinking: WDYT? But we should fix it in a separate hotfix commit if you 
agree.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -261,34 +261,69 @@ public void grantLeadership(UUID leaderSessionID) {
 
     @GuardedBy("lock")
     private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
-        sequentialOperation =
-                sequentialOperation.thenRun(
-                        () ->
-                                runIfValidLeader(
-                                        leaderSessionId,
-                                        ThrowingRunnable.unchecked(
-                                                () ->
-                                                        
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(
-                                                                
leaderSessionId)),
-                                        "verify job scheduling status and 
create JobMasterServiceProcess"));
-
+        boolean isValid;
+        synchronized (lock) {
+            isValid = isValidLeader(leaderSessionId);
+        }
+        if (isValid) {
+            sequentialOperation =
+                    sequentialOperation.thenCompose(
+                            unused ->
+                                    runIfValidLeader(
+                                                    leaderSessionId,
+                                                    () ->
+                                                            
jobResultStore.hasJobResultEntryAsync(
+                                                                    
getJobID()),
+                                                    "verify jbb result entry")
+                                            .handle(
+                                                    (hasJobResult, throwable) 
-> {
+                                                        if (hasJobResult == 
null) {

Review Comment:
   ~The if (hasJobResult == null) { block becomes obsolete here (because there 
shouldn't be a situation where no result is passed if no error happened in 
upstream async calls for handle(BiFunction). You could add a 
Precondition.checkNotNull if you like. But the subsequent if call would cause a 
NullPointerException anyway which makes the precondition obsolete again.~
   
   ~(... and a side-remark here: we would at least add logs rather than only 
returning null in case if the if (hasJobResult == null) { case.)~
   
   Ok, I misread the code here. The `null` check actually has a meaning here 
because the upstream call of `supplyAsyncIfValidLeader` call returns a 
`CompletableFuture` that completed successfully with `null` if the leadership 
is not granted anymore (the `noLeaderFallback`). This functional dependency is 
a bit hidden (IMHO). I see two options to improve the code:
   1. Add a comment to the `if (hasJobResult == null) {` explaining why this is 
needed. I would also like to add a `@Nullable` annotation to 
`printLogIfNotValidLeader` to acknowlegde. But I don't know how to handle such 
a case with async calls. The `@Nullable` annotation becomes kind of ambigious.
   2. Consider not having the leadership in the `supplyAsyncIfValidLeader` an 
error scenario that shall be handled in the downstream `handle` call. This we 
can achieve by returning an exceptionally completed future. You might want to 
introduce a new exception `LeadershipLostException` that derives from 
`LeaderElectionException` to cover that error scenario. You could add the log 
message in the `handle` method when handling `throwable != null`, something 
like:
   ```
   if (throwable instanceof LeadershipLostException) {
       // print your log statement
   } else if (throwable != null) {
       ExceptionUtils.rethrow(throwable);
   }
   ```
   
   I'm in favor of the 2nd solution because it makes the code more explicit 
and, in this regard, easier to understand.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to