XComp commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1301628816
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -513,36 +514,39 @@ private void stopDispatcherServices() throws Exception {
public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time
timeout) {
final JobID jobID = jobGraph.getJobID();
log.info("Received JobGraph submission '{}' ({}).",
jobGraph.getName(), jobID);
-
- try {
- if (isInGloballyTerminalState(jobID)) {
- log.warn(
- "Ignoring JobGraph submission '{}' ({}) because the
job already reached a globally-terminal state (i.e. {}) in a previous
execution.",
- jobGraph.getName(),
- jobID,
- Arrays.stream(JobStatus.values())
- .filter(JobStatus::isGloballyTerminalState)
- .map(JobStatus::name)
- .collect(Collectors.joining(", ")));
- return FutureUtils.completedExceptionally(
-
DuplicateJobSubmissionException.ofGloballyTerminated(jobID));
- } else if (jobManagerRunnerRegistry.isRegistered(jobID)
- || submittedAndWaitingTerminationJobIDs.contains(jobID)) {
- // job with the given jobID is not terminated, yet
- return FutureUtils.completedExceptionally(
- DuplicateJobSubmissionException.of(jobID));
- } else if (isPartialResourceConfigured(jobGraph)) {
- return FutureUtils.completedExceptionally(
- new JobSubmissionException(
- jobID,
- "Currently jobs is not supported if parts of
the vertices have "
- + "resources configured. The
limitation will be removed in future versions."));
- } else {
- return internalSubmitJob(jobGraph);
- }
- } catch (FlinkException e) {
- return FutureUtils.completedExceptionally(e);
- }
+ return isInGloballyTerminalState(jobID)
+ .thenCompose(
+ isTerminated -> {
+ if (isTerminated) {
+ log.warn(
+ "Ignoring JobGraph submission '{}'
({}) because the job already "
+ + "reached a globally-terminal
state (i.e. {}) in a "
+ + "previous execution.",
+ jobGraph.getName(),
+ jobID,
+ Arrays.stream(JobStatus.values())
+
.filter(JobStatus::isGloballyTerminalState)
+ .map(JobStatus::name)
+ .collect(Collectors.joining(",
")));
+ return FutureUtils.completedExceptionally(
+
DuplicateJobSubmissionException.ofGloballyTerminated(
+ jobID));
+ } else if
(jobManagerRunnerRegistry.isRegistered(jobID)
Review Comment:
```suggestion
} else if
(jobManagerRunnerRegistry.isRegistered(jobID)
```
Ok, that's a tricky one which I missed: The Dispatcher has one requirement:
Any state access needs to happen in the main thread of the Dispatcher. There's
a special implementation of `JobManagerRunnerRegistry` that ensures this
invariant (see
[OnMainThreadJobmanagerRunnerRegistry](https://github.com/apache/flink/blob/3efd4c2cf1be670f499e6637445e283e48deee60/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/OnMainThreadJobManagerRunnerRegistry.java)).
Our change (with the `thenCompose` being chained with the future that's
returned by `isInGloballyTerminalState`) is going against this requirement. Why?
`isInGloballyTerminalState` calls `jobResultStore.hasJobResultEntryAsync`
which executes the logic on the `ioExecutor` (i.e. a thread for IO operations
which is not the Dispatcher's main thread) internally. The returned future is
linked to this executor, i.e. any chained `CompletableFuture` calls will run in
the same thread. The `thenCompose` logic is, therefore, also executed in the
`ioExecutor` instead of the main thread.
To workaround this, we have to change the executor for the chained
execution. This can be achieved by using `thenComposeAsync`, instead. Here we
would specify the main thread executor by calling `getMainThreadExecutor`. One
example where it's done like that is
[Dispatcher:619](https://github.com/apache/flink/blob/b6992b9d80e8ca9c6a81d198d9ed628821e4ab49/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L619):
The `cleanupAsync` method is executed on the `ioExecutor`. But the error
handling has to happen in the main thread again. That's where we use
`handleAsync` with `getMainThreadExecutor`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]