Thesharing commented on code in PR #19275:
URL: https://github.com/apache/flink/pull/19275#discussion_r845157578
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1060,40 +1066,24 @@ protected CleanupJobState
jobReachedTerminalState(ExecutionGraphInfo executionGr
terminalJobStatus);
}
- archiveExecutionGraph(executionGraphInfo);
+ storeExecutionGraphInfo(executionGraphInfo);
if (terminalJobStatus.isGloballyTerminalState()) {
- final JobID jobId = executionGraphInfo.getJobId();
- try {
- if (jobResultStore.hasCleanJobResultEntry(jobId)) {
- log.warn(
- "Job {} is already marked as clean but clean up
was triggered again.",
- jobId);
- } else if (!jobResultStore.hasDirtyJobResultEntry(jobId)) {
- jobResultStore.createDirtyResult(
- new JobResultEntry(
- JobResult.createFrom(
-
executionGraphInfo.getArchivedExecutionGraph())));
- log.info(
- "Job {} has been registered for cleanup in the
JobResultStore after reaching a terminal state.",
- jobId);
- }
- } catch (IOException e) {
- fatalErrorHandler.onFatalError(
- new FlinkException(
- String.format(
- "The job %s couldn't be marked as
pre-cleanup finished in JobResultStore.",
- jobId),
- e));
- }
- }
- return terminalJobStatus.isGloballyTerminalState()
- ? CleanupJobState.GLOBAL
- : CleanupJobState.LOCAL;
+ // do not create an archive for suspended jobs, as this would
eventually lead to
+ // multiple archive attempts which we currently do not support
+ CompletableFuture<Acknowledge> archiveFuture =
+ archiveExecutionGraph(executionGraphInfo);
+
+ registerCleanupInJobResultStore(executionGraphInfo);
Review Comment:
> No, thenApply only works in the successful case. We want to register the
job as completed even if the archiving failed. Therefore, using whenComplete is
the better option.
The exceptions in `archiveExecutionGraphToHistoryServer` has been handled by
the code inside it:
```java
return executionGraphFuture.handle(
(Acknowledge ignored, Throwable throwable) -> {
if (throwable != null) {
log.info(
"Could not archive completed job {}({}) to
the history server.",
executionGraphInfo.getArchivedExecutionGraph().getJobName(),
executionGraphInfo.getArchivedExecutionGraph().getJobID(),
throwable);
}
return Acknowledge.get();
});
```
In this way, `submitFailedJob` doesn't need to handle the exception again.
That's why I think putting the registration into `thenApplyAsync` works.
WDYT?
> The reasoning behind chaining them together was, that the HistoryServer is
opt-in functionality, i.e. if the user configured it, he/she would expect it to
work even if it takes time. The default works with a no-op history server. WDYT?
Yes, you are right. Sorry for making you rephrase it over and over again for
my information.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]