XComp commented on code in PR #19275:
URL: https://github.com/apache/flink/pull/19275#discussion_r847412599
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1103,23 +1095,60 @@ private void archiveExecutionGraph(ExecutionGraphInfo
executionGraphInfo) {
executionGraphInfo.getArchivedExecutionGraph().getJobID(),
e);
}
+ }
+
+ private CompletableFuture<Acknowledge>
archiveExecutionGraphToHistoryServer(
+ ExecutionGraphInfo executionGraphInfo) {
+
+ return historyServerArchivist
+ .archiveExecutionGraph(executionGraphInfo)
+ .handleAsync(
+ (Acknowledge ignored, Throwable throwable) -> {
+ if (throwable != null) {
+ log.info(
+ "Could not archive completed job
{}({}) to the history server.",
+
executionGraphInfo.getArchivedExecutionGraph().getJobName(),
+
executionGraphInfo.getArchivedExecutionGraph().getJobID(),
+ throwable);
+ }
+ return Acknowledge.get();
+ },
+ getMainThreadExecutor());
Review Comment:
Is there a reason why we want to have this being executed on the main
thread? It's just the error handling, right? 🤔 Or am I missing something?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1060,40 +1065,27 @@ protected CleanupJobState
jobReachedTerminalState(ExecutionGraphInfo executionGr
terminalJobStatus);
}
- archiveExecutionGraph(executionGraphInfo);
+ writeToExecutionGraphInfoStore(executionGraphInfo);
if (terminalJobStatus.isGloballyTerminalState()) {
- final JobID jobId = executionGraphInfo.getJobId();
- try {
- if (jobResultStore.hasCleanJobResultEntry(jobId)) {
- log.warn(
- "Job {} is already marked as clean but clean up
was triggered again.",
- jobId);
- } else if (!jobResultStore.hasDirtyJobResultEntry(jobId)) {
- jobResultStore.createDirtyResult(
- new JobResultEntry(
- JobResult.createFrom(
-
executionGraphInfo.getArchivedExecutionGraph())));
- log.info(
- "Job {} has been registered for cleanup in the
JobResultStore after reaching a terminal state.",
- jobId);
- }
- } catch (IOException e) {
- fatalErrorHandler.onFatalError(
- new FlinkException(
- String.format(
- "The job %s couldn't be marked as
pre-cleanup finished in JobResultStore.",
- jobId),
- e));
- }
- }
- return terminalJobStatus.isGloballyTerminalState()
- ? CleanupJobState.GLOBAL
- : CleanupJobState.LOCAL;
+ // do not create an archive for suspended jobs, as this would
eventually lead to
+ // multiple archive attempts which we currently do not support
+ CompletableFuture<Acknowledge> archiveFuture =
+ archiveExecutionGraphToHistoryServer(executionGraphInfo);
+
+ return archiveFuture.handleAsync(
+ (ignored, throwable) -> {
Review Comment:
I know I said something different first, but thinking about it again -
maybe, we could just rename `archiveExecutionGraphToHistoryServer` into
`archiveExecutionGraphToHistoryServerWithErrorHandling` to make it explicit
that any error is handled already which would enable us to keep the
`thenApplyAsync` method you initially favored.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1103,23 +1095,60 @@ private void archiveExecutionGraph(ExecutionGraphInfo
executionGraphInfo) {
executionGraphInfo.getArchivedExecutionGraph().getJobID(),
e);
}
+ }
+
+ private CompletableFuture<Acknowledge>
archiveExecutionGraphToHistoryServer(
+ ExecutionGraphInfo executionGraphInfo) {
+
+ return historyServerArchivist
+ .archiveExecutionGraph(executionGraphInfo)
+ .handleAsync(
Review Comment:
nit: Wouldn't be a `.exceptionally` call good enough here?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java:
##########
@@ -421,6 +421,8 @@ public void
testCancellationOfCanceledTerminalDoesNotThrowException() throws Exc
// wait for job to finish
dispatcherGateway.requestJobResult(jobId, TIMEOUT).get();
+ dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();
Review Comment:
It took me a bit to figure out why we need this change. It's related to the
`thenComposeAsync` which you added in `Dispatcher.runJob` to run the
`Dispatcher#removeJob` method in the main thread. I commented on that one
separatly. But we could revert this change if we agree that it's not necessary.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1215,17 +1244,23 @@ private CompletableFuture<Void> waitForTerminatingJob(
getMainThreadExecutor());
}
- CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
+ private CompletableFuture<Void>
getJobTerminationFutureOrFailedFutureForRunningJob(
Review Comment:
We could get rid of that change as well if we change the `thenComposeAsync`
change in `Dispatcher#runJob`, I guess?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -615,15 +616,17 @@ private void runJob(JobManagerRunner jobManagerRunner,
ExecutionType executionTy
return jobManagerRunnerFailed(jobId,
throwable);
}
},
- getMainThreadExecutor());
+ getMainThreadExecutor())
+ .thenCompose(Function.identity());
final CompletableFuture<Void> jobTerminationFuture =
- cleanupJobStateFuture.thenCompose(
+ cleanupJobStateFuture.thenComposeAsync(
Review Comment:
What was your motivation behind using `thenComposeAsync` here? Isn't the
previous `handleAsync` already called on the main thread and the subsequent
`thenCompose` will run on the main thread as well. So does the `thenCompose`
that runs the `removeJob` method here?
The `Async` call causes the `removeJob` logic to be queued in the main
thread executor service again. Another thread is selected which doesn't run
necessarily right after the JobManagerRunnerResult handling.
That's the reason why you had to add waiting for the job termination future
in the `DispatcherTest` class.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]