XComp commented on code in PR #19427:
URL: https://github.com/apache/flink/pull/19427#discussion_r848244397
##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java:
##########
@@ -420,7 +420,7 @@ public void
testCancellationOfCanceledTerminalDoesNotThrowException() throws Exc
.build())));
// wait for job to finish
- dispatcherGateway.requestJobResult(jobId, TIMEOUT).get();
+ dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();
Review Comment:
Switching from `requestJobResult` to `getJobTerminationFuture` we're now
always waiting for the cleanup to be done before triggering whatever logic we
want to test afterwards. That means, that the `JobManagerRunner` is always
deregistered. I'm wondering whether we should add a test to `DispatcherTest`
that covers the codepath of `Dispatcher#requestJobStatus` where the
`JobManagerRunner` is not unregistered, yet. WDYT? 🤔
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1062,35 +1066,57 @@ protected CleanupJobState
jobReachedTerminalState(ExecutionGraphInfo executionGr
archiveExecutionGraph(executionGraphInfo);
+ final CompletableFuture<Void> writeFuture = new CompletableFuture<>();
if (terminalJobStatus.isGloballyTerminalState()) {
final JobID jobId = executionGraphInfo.getJobId();
- try {
- if (jobResultStore.hasCleanJobResultEntry(jobId)) {
- log.warn(
- "Job {} is already marked as clean but clean up
was triggered again.",
- jobId);
- } else if (!jobResultStore.hasDirtyJobResultEntry(jobId)) {
- jobResultStore.createDirtyResult(
- new JobResultEntry(
- JobResult.createFrom(
-
executionGraphInfo.getArchivedExecutionGraph())));
- log.info(
- "Job {} has been registered for cleanup in the
JobResultStore after reaching a terminal state.",
- jobId);
- }
- } catch (IOException e) {
- fatalErrorHandler.onFatalError(
- new FlinkException(
- String.format(
- "The job %s couldn't be marked as
pre-cleanup finished in JobResultStore.",
- jobId),
- e));
- }
+
+ ioExecutor.execute(
+ () -> {
+ try {
+ if (jobResultStore.hasCleanJobResultEntry(jobId)) {
+ log.warn(
+ "Job {} is already marked as clean but
clean up was triggered again.",
+ jobId);
+ } else if
(!jobResultStore.hasDirtyJobResultEntry(jobId)) {
+ jobResultStore.createDirtyResult(
+ new JobResultEntry(
+ JobResult.createFrom(
+ executionGraphInfo
+
.getArchivedExecutionGraph())));
+ log.info(
+ "Job {} has been registered for
cleanup in the JobResultStore after reaching a terminal state.",
+ jobId);
+ }
+ } catch (IOException e) {
+ writeFuture.completeExceptionally(e);
+ return;
+ }
+ writeFuture.complete(null);
+ });
+ } else {
+ writeFuture.complete(null);
}
- return terminalJobStatus.isGloballyTerminalState()
- ? CleanupJobState.GLOBAL
- : CleanupJobState.LOCAL;
+ return writeFuture
+ .handleAsync(
+ (ignored, error) -> {
+ if (error != null) {
+ fatalErrorHandler.onFatalError(
+ new FlinkException(
+ String.format(
+ "The job %s couldn't
be marked as pre-cleanup finished in JobResultStore.",
+
executionGraphInfo.getJobId()),
+ error));
+ }
+ return null;
+ },
+ getMainThreadExecutor())
+ .thenApply(
+ (ignored) -> {
+ return terminalJobStatus.isGloballyTerminalState()
+ ? CleanupJobState.GLOBAL
+ : CleanupJobState.LOCAL;
+ });
Review Comment:
```
if (!terminalJobStatus.isGloballyTerminalState()) {
return CompletableFuture.completedFuture(CleanupJobState.LOCAL);
}
final JobID jobId = executionGraphInfo.getJobId();
CompletableFuture.runAsync(
() -> {
try {
if
(jobResultStore.hasCleanJobResultEntry(jobId)) {
log.warn(
"Job {} is already marked as clean
but clean up was triggered again.",
jobId);
} else if
(!jobResultStore.hasDirtyJobResultEntry(jobId)) {
jobResultStore.createDirtyResult(
new JobResultEntry(
JobResult.createFrom(
executionGraphInfo
.getArchivedExecutionGraph())));
log.info(
"Job {} has been registered for
cleanup in the JobResultStore after reaching a terminal state.",
jobId);
}
} catch (IOException e) {
throw new CompletionException(e);
}
})
.handleAsync(
(ignored, error) -> {
if (error != null) {
fatalErrorHandler.onFatalError(
new FlinkException(
String.format(
"The job %s couldn't
be marked as pre-cleanup finished in JobResultStore.",
executionGraphInfo.getJobId()),
error));
}
return CleanupJobState.GLOBAL;
},
getMainThreadExecutor());
```
That's more of a cosmetic change but what about inverting the if condition
and returning the `CleanupJobState.LOCAL` earlier. Then, we would have a
clearer separation between the local and the global terminal state handling in
this method.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1215,15 +1241,10 @@ private CompletableFuture<Void> waitForTerminatingJob(
getMainThreadExecutor());
}
+ @VisibleForTesting
CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
- if (jobManagerRunnerRegistry.isRegistered(jobId)) {
Review Comment:
This method is only called through `Dispatcher#waitForTerminatingJob` >
`Dispatcher#internalSubmitJob` and `internalSubmitJob` is triggered within
`Dispatcher#submitJob` after it is verified that there is no `JobManagerRunner`
registered for this `JobID` (check `Dispatcher#isDuplicateJob` which is used in
[Dispatcher#submitJob:435](https://github.com/apache/flink/blob/05707cf8955f190d65021d61c5afd8164e831315/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L435)).
Therefore, it looks like this `DispatcherException` more or less worked like a
state invariant that got propagated to the user (the job submission would fail
in that case).
Removing this part is reasonable in my opinion. I'm just wondering whether
we want to add a Precondition here still to have the invariant still being
covered.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]