XComp commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1289991654
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -277,18 +277,11 @@ private void startJobMasterServiceProcessAsync(UUID
leaderSessionId) {
@GuardedBy("lock")
private void
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(UUID leaderSessionId)
- throws FlinkException {
- try {
- if (jobResultStore.hasJobResultEntry(getJobID())) {
- jobAlreadyDone(leaderSessionId);
- } else {
- createNewJobMasterServiceProcess(leaderSessionId);
- }
- } catch (IOException e) {
- throw new FlinkException(
- String.format(
- "Could not retrieve the job scheduling status for
job %s.", getJobID()),
- e);
+ throws FlinkException, ExecutionException, InterruptedException {
+ if (jobResultStore.hasJobResultEntryAsync(getJobID()).get()) {
Review Comment:
Ok, I looked into that one: It's actually not that tricky. We have to
refactor the `thenRun` call context of this method (see [line
265](https://github.com/apache/flink/pull/22341/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bR265))
into a `thenCompose(..).handle(..)` chain. The async call of
`hasJobResultEntryAsync` would be executed in the `thenCompose` step. The rest
will be then handled in the subsequent `handle` call. WDYT? Do you need more
guidance with that one or is this proposal helpful?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]