XComp commented on code in PR #19275:
URL: https://github.com/apache/flink/pull/19275#discussion_r845239142


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1060,40 +1066,24 @@ protected CleanupJobState 
jobReachedTerminalState(ExecutionGraphInfo executionGr
                     terminalJobStatus);
         }
 
-        archiveExecutionGraph(executionGraphInfo);
+        storeExecutionGraphInfo(executionGraphInfo);
 
         if (terminalJobStatus.isGloballyTerminalState()) {
-            final JobID jobId = executionGraphInfo.getJobId();
-            try {
-                if (jobResultStore.hasCleanJobResultEntry(jobId)) {
-                    log.warn(
-                            "Job {} is already marked as clean but clean up 
was triggered again.",
-                            jobId);
-                } else if (!jobResultStore.hasDirtyJobResultEntry(jobId)) {
-                    jobResultStore.createDirtyResult(
-                            new JobResultEntry(
-                                    JobResult.createFrom(
-                                            
executionGraphInfo.getArchivedExecutionGraph())));
-                    log.info(
-                            "Job {} has been registered for cleanup in the 
JobResultStore after reaching a terminal state.",
-                            jobId);
-                }
-            } catch (IOException e) {
-                fatalErrorHandler.onFatalError(
-                        new FlinkException(
-                                String.format(
-                                        "The job %s couldn't be marked as 
pre-cleanup finished in JobResultStore.",
-                                        jobId),
-                                e));
-            }
-        }
 
-        return terminalJobStatus.isGloballyTerminalState()
-                ? CleanupJobState.GLOBAL
-                : CleanupJobState.LOCAL;
+            // do not create an archive for suspended jobs, as this would 
eventually lead to
+            // multiple archive attempts which we currently do not support
+            CompletableFuture<Acknowledge> archiveFuture =
+                    archiveExecutionGraph(executionGraphInfo);
+
+            registerCleanupInJobResultStore(executionGraphInfo);

Review Comment:
   Fair enough, but do you see any problems with using `whenComplete`? My 
concern is just, that using `thenApply` explicitly states that the registration 
in the JobResultStore should only happen in the case of success. Assuming 
knowledge about how `archiveExecutionGraphToHistoryServer` is valid because 
it's a private method of `Dispatcher`. But it makes reading the code harder 
because the reader really has to know that 
`archiveExecutionGraphToHistoryServer` always returns a successfully completed 
future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to