XComp commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1289991654


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -277,18 +277,11 @@ private void startJobMasterServiceProcessAsync(UUID 
leaderSessionId) {
 
     @GuardedBy("lock")
     private void 
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(UUID leaderSessionId)
-            throws FlinkException {
-        try {
-            if (jobResultStore.hasJobResultEntry(getJobID())) {
-                jobAlreadyDone(leaderSessionId);
-            } else {
-                createNewJobMasterServiceProcess(leaderSessionId);
-            }
-        } catch (IOException e) {
-            throw new FlinkException(
-                    String.format(
-                            "Could not retrieve the job scheduling status for 
job %s.", getJobID()),
-                    e);
+            throws FlinkException, ExecutionException, InterruptedException {
+        if (jobResultStore.hasJobResultEntryAsync(getJobID()).get()) {

Review Comment:
   Ok, I looked into that one: It's actually not that tricky. We have to 
refactor the `thenRun` call context of this method (see [line 
265](https://github.com/apache/flink/pull/22341/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bR265))
 into a `thenCompose(..).handle(..)` chain. The async call of 
`hasJobResultEntryAsync` would be executed in the `thenCompose` step. The rest 
will be then handled in the subsequent `handle` call. WDYT? Do you need more 
guidance with that one or is this proposal helpful?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to