XComp commented on code in PR #19275:
URL: https://github.com/apache/flink/pull/19275#discussion_r847412599


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1103,23 +1095,60 @@ private void archiveExecutionGraph(ExecutionGraphInfo 
executionGraphInfo) {
                     executionGraphInfo.getArchivedExecutionGraph().getJobID(),
                     e);
         }
+    }
+
+    private CompletableFuture<Acknowledge> 
archiveExecutionGraphToHistoryServer(
+            ExecutionGraphInfo executionGraphInfo) {
+
+        return historyServerArchivist
+                .archiveExecutionGraph(executionGraphInfo)
+                .handleAsync(
+                        (Acknowledge ignored, Throwable throwable) -> {
+                            if (throwable != null) {
+                                log.info(
+                                        "Could not archive completed job 
{}({}) to the history server.",
+                                        
executionGraphInfo.getArchivedExecutionGraph().getJobName(),
+                                        
executionGraphInfo.getArchivedExecutionGraph().getJobID(),
+                                        throwable);
+                            }
+                            return Acknowledge.get();
+                        },
+                        getMainThreadExecutor());

Review Comment:
   Is there a reason why we want to have this being executed on the main 
thread? It's just the error handling, right? 🤔  Or am I missing something?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1060,40 +1065,27 @@ protected CleanupJobState 
jobReachedTerminalState(ExecutionGraphInfo executionGr
                     terminalJobStatus);
         }
 
-        archiveExecutionGraph(executionGraphInfo);
+        writeToExecutionGraphInfoStore(executionGraphInfo);
 
         if (terminalJobStatus.isGloballyTerminalState()) {
-            final JobID jobId = executionGraphInfo.getJobId();
-            try {
-                if (jobResultStore.hasCleanJobResultEntry(jobId)) {
-                    log.warn(
-                            "Job {} is already marked as clean but clean up 
was triggered again.",
-                            jobId);
-                } else if (!jobResultStore.hasDirtyJobResultEntry(jobId)) {
-                    jobResultStore.createDirtyResult(
-                            new JobResultEntry(
-                                    JobResult.createFrom(
-                                            
executionGraphInfo.getArchivedExecutionGraph())));
-                    log.info(
-                            "Job {} has been registered for cleanup in the 
JobResultStore after reaching a terminal state.",
-                            jobId);
-                }
-            } catch (IOException e) {
-                fatalErrorHandler.onFatalError(
-                        new FlinkException(
-                                String.format(
-                                        "The job %s couldn't be marked as 
pre-cleanup finished in JobResultStore.",
-                                        jobId),
-                                e));
-            }
-        }
 
-        return terminalJobStatus.isGloballyTerminalState()
-                ? CleanupJobState.GLOBAL
-                : CleanupJobState.LOCAL;
+            // do not create an archive for suspended jobs, as this would 
eventually lead to
+            // multiple archive attempts which we currently do not support
+            CompletableFuture<Acknowledge> archiveFuture =
+                    archiveExecutionGraphToHistoryServer(executionGraphInfo);
+
+            return archiveFuture.handleAsync(
+                    (ignored, throwable) -> {

Review Comment:
   I know I said something different first, but thinking about it again - 
maybe, we could just rename `archiveExecutionGraphToHistoryServer` into 
`archiveExecutionGraphToHistoryServerWithErrorHandling` to make it explicit 
that any error is handled already which would enable us to keep the 
`thenApplyAsync` method you initially favored.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1103,23 +1095,60 @@ private void archiveExecutionGraph(ExecutionGraphInfo 
executionGraphInfo) {
                     executionGraphInfo.getArchivedExecutionGraph().getJobID(),
                     e);
         }
+    }
+
+    private CompletableFuture<Acknowledge> 
archiveExecutionGraphToHistoryServer(
+            ExecutionGraphInfo executionGraphInfo) {
+
+        return historyServerArchivist
+                .archiveExecutionGraph(executionGraphInfo)
+                .handleAsync(

Review Comment:
   nit: Wouldn't be a `.exceptionally` call good enough here?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java:
##########
@@ -421,6 +421,8 @@ public void 
testCancellationOfCanceledTerminalDoesNotThrowException() throws Exc
 
         // wait for job to finish
         dispatcherGateway.requestJobResult(jobId, TIMEOUT).get();
+        dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();

Review Comment:
   It took me a bit to figure out why we need this change. It's related to the 
`thenComposeAsync` which you added in `Dispatcher.runJob` to run the 
`Dispatcher#removeJob` method in the main thread. I commented on that one 
separatly. But we could revert this change if we agree that it's not necessary.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1215,17 +1244,23 @@ private CompletableFuture<Void> waitForTerminatingJob(
                 getMainThreadExecutor());
     }
 
-    CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
+    private CompletableFuture<Void> 
getJobTerminationFutureOrFailedFutureForRunningJob(

Review Comment:
   We could get rid of that change as well if we change the `thenComposeAsync` 
change in `Dispatcher#runJob`, I guess?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -615,15 +616,17 @@ private void runJob(JobManagerRunner jobManagerRunner, 
ExecutionType executionTy
                                         return jobManagerRunnerFailed(jobId, 
throwable);
                                     }
                                 },
-                                getMainThreadExecutor());
+                                getMainThreadExecutor())
+                        .thenCompose(Function.identity());
 
         final CompletableFuture<Void> jobTerminationFuture =
-                cleanupJobStateFuture.thenCompose(
+                cleanupJobStateFuture.thenComposeAsync(

Review Comment:
   What was your motivation behind using `thenComposeAsync` here? Isn't the 
previous `handleAsync` already called on the main thread and the subsequent 
`thenCompose` will run on the main thread as well. So does the `thenCompose` 
that runs the `removeJob` method here?
   
   The `Async` call causes the `removeJob` logic to be queued in the main 
thread executor service again. Another thread is selected which doesn't run 
necessarily right after the JobManagerRunnerResult handling.
   
   That's the reason why you had to add waiting for the job termination future 
in the `DispatcherTest` class. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to