Thesharing commented on code in PR #19275:
URL: https://github.com/apache/flink/pull/19275#discussion_r848274651


##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java:
##########
@@ -421,6 +421,8 @@ public void 
testCancellationOfCanceledTerminalDoesNotThrowException() throws Exc
 
         // wait for job to finish
         dispatcherGateway.requestJobResult(jobId, TIMEOUT).get();
+        dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();

Review Comment:
   Thank you for the analysis here. I just realize that the cases here are 
using `VoidHistoryServerArchivist` actually, which will just return a 
`CompletableFuture.completedFuture` in the main thread.
   
   These `dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();` are 
removed.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1060,40 +1065,27 @@ protected CleanupJobState 
jobReachedTerminalState(ExecutionGraphInfo executionGr
                     terminalJobStatus);
         }
 
-        archiveExecutionGraph(executionGraphInfo);
+        writeToExecutionGraphInfoStore(executionGraphInfo);
 
         if (terminalJobStatus.isGloballyTerminalState()) {
-            final JobID jobId = executionGraphInfo.getJobId();
-            try {
-                if (jobResultStore.hasCleanJobResultEntry(jobId)) {
-                    log.warn(
-                            "Job {} is already marked as clean but clean up 
was triggered again.",
-                            jobId);
-                } else if (!jobResultStore.hasDirtyJobResultEntry(jobId)) {
-                    jobResultStore.createDirtyResult(
-                            new JobResultEntry(
-                                    JobResult.createFrom(
-                                            
executionGraphInfo.getArchivedExecutionGraph())));
-                    log.info(
-                            "Job {} has been registered for cleanup in the 
JobResultStore after reaching a terminal state.",
-                            jobId);
-                }
-            } catch (IOException e) {
-                fatalErrorHandler.onFatalError(
-                        new FlinkException(
-                                String.format(
-                                        "The job %s couldn't be marked as 
pre-cleanup finished in JobResultStore.",
-                                        jobId),
-                                e));
-            }
-        }
 
-        return terminalJobStatus.isGloballyTerminalState()
-                ? CleanupJobState.GLOBAL
-                : CleanupJobState.LOCAL;
+            // do not create an archive for suspended jobs, as this would 
eventually lead to
+            // multiple archive attempts which we currently do not support
+            CompletableFuture<Acknowledge> archiveFuture =
+                    archiveExecutionGraphToHistoryServer(executionGraphInfo);
+
+            return archiveFuture.handleAsync(
+                    (ignored, throwable) -> {

Review Comment:
   Yes, this makes codes here clearer. Thank you for pointing this out.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1103,23 +1095,60 @@ private void archiveExecutionGraph(ExecutionGraphInfo 
executionGraphInfo) {
                     executionGraphInfo.getArchivedExecutionGraph().getJobID(),
                     e);
         }
+    }
+
+    private CompletableFuture<Acknowledge> 
archiveExecutionGraphToHistoryServer(
+            ExecutionGraphInfo executionGraphInfo) {
+
+        return historyServerArchivist
+                .archiveExecutionGraph(executionGraphInfo)
+                .handleAsync(
+                        (Acknowledge ignored, Throwable throwable) -> {
+                            if (throwable != null) {
+                                log.info(
+                                        "Could not archive completed job 
{}({}) to the history server.",
+                                        
executionGraphInfo.getArchivedExecutionGraph().getJobName(),
+                                        
executionGraphInfo.getArchivedExecutionGraph().getJobID(),
+                                        throwable);
+                            }
+                            return Acknowledge.get();
+                        },
+                        getMainThreadExecutor());

Review Comment:
   `archiveExecutionGraphToHistoryServerWithErrorHandling` is also called by 
`Dispatcher#submitFailedJob`. If we don't call `handleAsync` in the main thread 
here, `submitFailedJob` will return a `CompletableFuture` running in the IO 
executor. I'm not quite sure if that is okay 
for`ApplicationDispatcherBootstrap#runApplicationEntryPoint`.
   
   Should we make sure `archiveExecutionGraphToHistoryServerWithErrorHandling` 
return a `CompletableFuture` running in the main thread?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -615,15 +616,17 @@ private void runJob(JobManagerRunner jobManagerRunner, 
ExecutionType executionTy
                                         return jobManagerRunnerFailed(jobId, 
throwable);
                                     }
                                 },
-                                getMainThreadExecutor());
+                                getMainThreadExecutor())
+                        .thenCompose(Function.identity());
 
         final CompletableFuture<Void> jobTerminationFuture =
-                cleanupJobStateFuture.thenCompose(
+                cleanupJobStateFuture.thenComposeAsync(

Review Comment:
   I added a `thenComposeAsync` here because in my opinion we couldn't 
guarantee the `CompletableFuture` returned by `handleJobManagerRunnerResult` is 
running in the main thread. I consider the `thenComposeAsync` as a insurance to 
make sure `removeJob` is executed in the main thread. However, since this 
change introduces many redundant changes, I'd like to remove it. 
   
   Thank you for your analysis and detailed explanation.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1215,17 +1244,23 @@ private CompletableFuture<Void> waitForTerminatingJob(
                 getMainThreadExecutor());
     }
 
-    CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
+    private CompletableFuture<Void> 
getJobTerminationFutureOrFailedFutureForRunningJob(

Review Comment:
   Yes, I removed the usage of `getJobTerminationFuture` in `DispatcherTest` 
and `DispatcherResourceCleanupTest`, this change is reverted. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to