Thesharing commented on code in PR #19275:
URL: https://github.com/apache/flink/pull/19275#discussion_r845157578


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1060,40 +1066,24 @@ protected CleanupJobState 
jobReachedTerminalState(ExecutionGraphInfo executionGr
                     terminalJobStatus);
         }
 
-        archiveExecutionGraph(executionGraphInfo);
+        storeExecutionGraphInfo(executionGraphInfo);
 
         if (terminalJobStatus.isGloballyTerminalState()) {
-            final JobID jobId = executionGraphInfo.getJobId();
-            try {
-                if (jobResultStore.hasCleanJobResultEntry(jobId)) {
-                    log.warn(
-                            "Job {} is already marked as clean but clean up 
was triggered again.",
-                            jobId);
-                } else if (!jobResultStore.hasDirtyJobResultEntry(jobId)) {
-                    jobResultStore.createDirtyResult(
-                            new JobResultEntry(
-                                    JobResult.createFrom(
-                                            
executionGraphInfo.getArchivedExecutionGraph())));
-                    log.info(
-                            "Job {} has been registered for cleanup in the 
JobResultStore after reaching a terminal state.",
-                            jobId);
-                }
-            } catch (IOException e) {
-                fatalErrorHandler.onFatalError(
-                        new FlinkException(
-                                String.format(
-                                        "The job %s couldn't be marked as 
pre-cleanup finished in JobResultStore.",
-                                        jobId),
-                                e));
-            }
-        }
 
-        return terminalJobStatus.isGloballyTerminalState()
-                ? CleanupJobState.GLOBAL
-                : CleanupJobState.LOCAL;
+            // do not create an archive for suspended jobs, as this would 
eventually lead to
+            // multiple archive attempts which we currently do not support
+            CompletableFuture<Acknowledge> archiveFuture =
+                    archiveExecutionGraph(executionGraphInfo);
+
+            registerCleanupInJobResultStore(executionGraphInfo);

Review Comment:
   > No, thenApply only works in the successful case. We want to register the 
job as completed even if the archiving failed. Therefore, using whenComplete is 
the better option.
   
   The exceptions in `archiveExecutionGraphToHistoryServer` has been handled by 
the code inside it:
   ```java
   return executionGraphFuture.handle(
                   (Acknowledge ignored, Throwable throwable) -> {
                       if (throwable != null) {
                           log.info(
                                   "Could not archive completed job {}({}) to 
the history server.",
                                   
executionGraphInfo.getArchivedExecutionGraph().getJobName(),
                                   
executionGraphInfo.getArchivedExecutionGraph().getJobID(),
                                   throwable);
                       }
                       return Acknowledge.get();
                   });
   ```
   In this way, `submitFailedJob` doesn't need to handle the exception again. 
   
   That's why I think putting the registration into `thenApplyAsync` works. 
WDYT?
   
   > The reasoning behind chaining them together was, that the HistoryServer is 
opt-in functionality, i.e. if the user configured it, he/she would expect it to 
work even if it takes time. The default works with a no-op history server. WDYT?
   
   Yes, you are right. Sorry for making you rephrase it over and over again for 
my information.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to