XComp commented on code in PR #19427:
URL: https://github.com/apache/flink/pull/19427#discussion_r848244397


##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java:
##########
@@ -420,7 +420,7 @@ public void 
testCancellationOfCanceledTerminalDoesNotThrowException() throws Exc
                                         .build())));
 
         // wait for job to finish
-        dispatcherGateway.requestJobResult(jobId, TIMEOUT).get();
+        dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();

Review Comment:
   Switching from `requestJobResult` to `getJobTerminationFuture` we're now 
always waiting for the cleanup to be done before triggering whatever logic we 
want to test afterwards. That means, that the `JobManagerRunner` is always 
deregistered. I'm wondering whether we should add a test to `DispatcherTest` 
that covers the codepath of `Dispatcher#requestJobStatus` where the 
`JobManagerRunner` is not unregistered, yet. WDYT? 🤔 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1062,35 +1066,57 @@ protected CleanupJobState 
jobReachedTerminalState(ExecutionGraphInfo executionGr
 
         archiveExecutionGraph(executionGraphInfo);
 
+        final CompletableFuture<Void> writeFuture = new CompletableFuture<>();
         if (terminalJobStatus.isGloballyTerminalState()) {
             final JobID jobId = executionGraphInfo.getJobId();
-            try {
-                if (jobResultStore.hasCleanJobResultEntry(jobId)) {
-                    log.warn(
-                            "Job {} is already marked as clean but clean up 
was triggered again.",
-                            jobId);
-                } else if (!jobResultStore.hasDirtyJobResultEntry(jobId)) {
-                    jobResultStore.createDirtyResult(
-                            new JobResultEntry(
-                                    JobResult.createFrom(
-                                            
executionGraphInfo.getArchivedExecutionGraph())));
-                    log.info(
-                            "Job {} has been registered for cleanup in the 
JobResultStore after reaching a terminal state.",
-                            jobId);
-                }
-            } catch (IOException e) {
-                fatalErrorHandler.onFatalError(
-                        new FlinkException(
-                                String.format(
-                                        "The job %s couldn't be marked as 
pre-cleanup finished in JobResultStore.",
-                                        jobId),
-                                e));
-            }
+
+            ioExecutor.execute(
+                    () -> {
+                        try {
+                            if (jobResultStore.hasCleanJobResultEntry(jobId)) {
+                                log.warn(
+                                        "Job {} is already marked as clean but 
clean up was triggered again.",
+                                        jobId);
+                            } else if 
(!jobResultStore.hasDirtyJobResultEntry(jobId)) {
+                                jobResultStore.createDirtyResult(
+                                        new JobResultEntry(
+                                                JobResult.createFrom(
+                                                        executionGraphInfo
+                                                                
.getArchivedExecutionGraph())));
+                                log.info(
+                                        "Job {} has been registered for 
cleanup in the JobResultStore after reaching a terminal state.",
+                                        jobId);
+                            }
+                        } catch (IOException e) {
+                            writeFuture.completeExceptionally(e);
+                            return;
+                        }
+                        writeFuture.complete(null);
+                    });
+        } else {
+            writeFuture.complete(null);
         }
 
-        return terminalJobStatus.isGloballyTerminalState()
-                ? CleanupJobState.GLOBAL
-                : CleanupJobState.LOCAL;
+        return writeFuture
+                .handleAsync(
+                        (ignored, error) -> {
+                            if (error != null) {
+                                fatalErrorHandler.onFatalError(
+                                        new FlinkException(
+                                                String.format(
+                                                        "The job %s couldn't 
be marked as pre-cleanup finished in JobResultStore.",
+                                                        
executionGraphInfo.getJobId()),
+                                                error));
+                            }
+                            return null;
+                        },
+                        getMainThreadExecutor())
+                .thenApply(
+                        (ignored) -> {
+                            return terminalJobStatus.isGloballyTerminalState()
+                                    ? CleanupJobState.GLOBAL
+                                    : CleanupJobState.LOCAL;
+                        });

Review Comment:
   ```
           if (!terminalJobStatus.isGloballyTerminalState()) {
               return CompletableFuture.completedFuture(CleanupJobState.LOCAL);
           }
   
           final JobID jobId = executionGraphInfo.getJobId();
           CompletableFuture.runAsync(
                       () -> {
                           try {
                               if 
(jobResultStore.hasCleanJobResultEntry(jobId)) {
                                   log.warn(
                                           "Job {} is already marked as clean 
but clean up was triggered again.",
                                           jobId);
                               } else if 
(!jobResultStore.hasDirtyJobResultEntry(jobId)) {
                                   jobResultStore.createDirtyResult(
                                           new JobResultEntry(
                                                   JobResult.createFrom(
                                                           executionGraphInfo
                                                                   
.getArchivedExecutionGraph())));
                                   log.info(
                                           "Job {} has been registered for 
cleanup in the JobResultStore after reaching a terminal state.",
                                           jobId);
                               }
                           } catch (IOException e) {
                               throw new CompletionException(e);
                           }
                       })
                   .handleAsync(
                           (ignored, error) -> {
                               if (error != null) {
                                   fatalErrorHandler.onFatalError(
                                           new FlinkException(
                                                   String.format(
                                                           "The job %s couldn't 
be marked as pre-cleanup finished in JobResultStore.",
                                                           
executionGraphInfo.getJobId()),
                                                   error));
                               }
                               return CleanupJobState.GLOBAL;
                           },
                           getMainThreadExecutor());
   ```
   That's more of a cosmetic change but what about inverting the if condition 
and returning the `CleanupJobState.LOCAL` earlier. Then, we would have a 
clearer separation between the local and the global terminal state handling in 
this method.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1215,15 +1241,10 @@ private CompletableFuture<Void> waitForTerminatingJob(
                 getMainThreadExecutor());
     }
 
+    @VisibleForTesting
     CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
-        if (jobManagerRunnerRegistry.isRegistered(jobId)) {

Review Comment:
   This method is only called through `Dispatcher#waitForTerminatingJob` > 
`Dispatcher#internalSubmitJob` and `internalSubmitJob` is triggered within 
`Dispatcher#submitJob` after it is verified that there is no `JobManagerRunner` 
registered for this `JobID` (check `Dispatcher#isDuplicateJob` which is used in 
[Dispatcher#submitJob:435](https://github.com/apache/flink/blob/05707cf8955f190d65021d61c5afd8164e831315/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L435)).
 Therefore, it looks like this `DispatcherException` more or less worked like a 
state invariant that got propagated to the user (the job submission would fail 
in that case).
   
   Removing this part is reasonable in my opinion. I'm just wondering whether 
we want to add a Precondition here still to have the invariant still being 
covered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to