Thesharing commented on code in PR #19275:
URL: https://github.com/apache/flink/pull/19275#discussion_r845265730
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1060,40 +1066,24 @@ protected CleanupJobState
jobReachedTerminalState(ExecutionGraphInfo executionGr
terminalJobStatus);
}
- archiveExecutionGraph(executionGraphInfo);
+ storeExecutionGraphInfo(executionGraphInfo);
if (terminalJobStatus.isGloballyTerminalState()) {
- final JobID jobId = executionGraphInfo.getJobId();
- try {
- if (jobResultStore.hasCleanJobResultEntry(jobId)) {
- log.warn(
- "Job {} is already marked as clean but clean up
was triggered again.",
- jobId);
- } else if (!jobResultStore.hasDirtyJobResultEntry(jobId)) {
- jobResultStore.createDirtyResult(
- new JobResultEntry(
- JobResult.createFrom(
-
executionGraphInfo.getArchivedExecutionGraph())));
- log.info(
- "Job {} has been registered for cleanup in the
JobResultStore after reaching a terminal state.",
- jobId);
- }
- } catch (IOException e) {
- fatalErrorHandler.onFatalError(
- new FlinkException(
- String.format(
- "The job %s couldn't be marked as
pre-cleanup finished in JobResultStore.",
- jobId),
- e));
- }
- }
- return terminalJobStatus.isGloballyTerminalState()
- ? CleanupJobState.GLOBAL
- : CleanupJobState.LOCAL;
+ // do not create an archive for suspended jobs, as this would
eventually lead to
+ // multiple archive attempts which we currently do not support
+ CompletableFuture<Acknowledge> archiveFuture =
+ archiveExecutionGraph(executionGraphInfo);
+
+ registerCleanupInJobResultStore(executionGraphInfo);
Review Comment:
Yes, you are right. We cannot have the assumption that
`archiveExecutionGraphToHistoryServer` always returns a successfully completed
future. How about `handleAsync`? In this way, the code explicitly indicates
that `registerGloballyTerminatedJobInJobResultStore` happens before `return
CleanupJobState.GLOBAL`.
```java
CompletableFuture<Acknowledge> archiveFuture =
archiveExecutionGraphToHistoryServer(executionGraphInfo);
return archiveFuture.handleAsync(
(ignored, throwable) -> {
// No matter the archiving is successful or not, the registration is
carried on
registerGloballyTerminatedJobInJobResultStore(executionGraphInfo);
return CleanupJobState.GLOBAL;
},
getMainThreadExecutor());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]