Thesharing commented on code in PR #19275:
URL: https://github.com/apache/flink/pull/19275#discussion_r848274651
##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java:
##########
@@ -421,6 +421,8 @@ public void
testCancellationOfCanceledTerminalDoesNotThrowException() throws Exc
// wait for job to finish
dispatcherGateway.requestJobResult(jobId, TIMEOUT).get();
+ dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();
Review Comment:
Thank you for the analysis here. I just realize that the cases here are
using `VoidHistoryServerArchivist` actually, which will just return a
`CompletableFuture.completedFuture` in the main thread.
These `dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();` are
removed.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1060,40 +1065,27 @@ protected CleanupJobState
jobReachedTerminalState(ExecutionGraphInfo executionGr
terminalJobStatus);
}
- archiveExecutionGraph(executionGraphInfo);
+ writeToExecutionGraphInfoStore(executionGraphInfo);
if (terminalJobStatus.isGloballyTerminalState()) {
- final JobID jobId = executionGraphInfo.getJobId();
- try {
- if (jobResultStore.hasCleanJobResultEntry(jobId)) {
- log.warn(
- "Job {} is already marked as clean but clean up
was triggered again.",
- jobId);
- } else if (!jobResultStore.hasDirtyJobResultEntry(jobId)) {
- jobResultStore.createDirtyResult(
- new JobResultEntry(
- JobResult.createFrom(
-
executionGraphInfo.getArchivedExecutionGraph())));
- log.info(
- "Job {} has been registered for cleanup in the
JobResultStore after reaching a terminal state.",
- jobId);
- }
- } catch (IOException e) {
- fatalErrorHandler.onFatalError(
- new FlinkException(
- String.format(
- "The job %s couldn't be marked as
pre-cleanup finished in JobResultStore.",
- jobId),
- e));
- }
- }
- return terminalJobStatus.isGloballyTerminalState()
- ? CleanupJobState.GLOBAL
- : CleanupJobState.LOCAL;
+ // do not create an archive for suspended jobs, as this would
eventually lead to
+ // multiple archive attempts which we currently do not support
+ CompletableFuture<Acknowledge> archiveFuture =
+ archiveExecutionGraphToHistoryServer(executionGraphInfo);
+
+ return archiveFuture.handleAsync(
+ (ignored, throwable) -> {
Review Comment:
Yes, this makes codes here clearer. Thank you for pointing this out.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1103,23 +1095,60 @@ private void archiveExecutionGraph(ExecutionGraphInfo
executionGraphInfo) {
executionGraphInfo.getArchivedExecutionGraph().getJobID(),
e);
}
+ }
+
+ private CompletableFuture<Acknowledge>
archiveExecutionGraphToHistoryServer(
+ ExecutionGraphInfo executionGraphInfo) {
+
+ return historyServerArchivist
+ .archiveExecutionGraph(executionGraphInfo)
+ .handleAsync(
+ (Acknowledge ignored, Throwable throwable) -> {
+ if (throwable != null) {
+ log.info(
+ "Could not archive completed job
{}({}) to the history server.",
+
executionGraphInfo.getArchivedExecutionGraph().getJobName(),
+
executionGraphInfo.getArchivedExecutionGraph().getJobID(),
+ throwable);
+ }
+ return Acknowledge.get();
+ },
+ getMainThreadExecutor());
Review Comment:
`archiveExecutionGraphToHistoryServerWithErrorHandling` is also called by
`Dispatcher#submitFailedJob`. If we don't call `handleAsync` in the main thread
here, `submitFailedJob` will return a `CompletableFuture` running in the IO
executor. I'm not quite sure if that is okay
for`ApplicationDispatcherBootstrap#runApplicationEntryPoint`.
Should we make sure `archiveExecutionGraphToHistoryServerWithErrorHandling`
return a `CompletableFuture` running in the main thread?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -615,15 +616,17 @@ private void runJob(JobManagerRunner jobManagerRunner,
ExecutionType executionTy
return jobManagerRunnerFailed(jobId,
throwable);
}
},
- getMainThreadExecutor());
+ getMainThreadExecutor())
+ .thenCompose(Function.identity());
final CompletableFuture<Void> jobTerminationFuture =
- cleanupJobStateFuture.thenCompose(
+ cleanupJobStateFuture.thenComposeAsync(
Review Comment:
I added a `thenComposeAsync` here because in my opinion we couldn't
guarantee the `CompletableFuture` returned by `handleJobManagerRunnerResult` is
running in the main thread. I consider the `thenComposeAsync` as a insurance to
make sure `removeJob` is executed in the main thread. However, since this
change introduces many redundant changes, I'd like to remove it.
Thank you for your analysis and detailed explanation.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1215,17 +1244,23 @@ private CompletableFuture<Void> waitForTerminatingJob(
getMainThreadExecutor());
}
- CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
+ private CompletableFuture<Void>
getJobTerminationFutureOrFailedFutureForRunningJob(
Review Comment:
Yes, I removed the usage of `getJobTerminationFuture` in `DispatcherTest`
and `DispatcherResourceCleanupTest`, this change is reverted.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]