XComp commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1301628816


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -513,36 +514,39 @@ private void stopDispatcherServices() throws Exception {
     public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time 
timeout) {
         final JobID jobID = jobGraph.getJobID();
         log.info("Received JobGraph submission '{}' ({}).", 
jobGraph.getName(), jobID);
-
-        try {
-            if (isInGloballyTerminalState(jobID)) {
-                log.warn(
-                        "Ignoring JobGraph submission '{}' ({}) because the 
job already reached a globally-terminal state (i.e. {}) in a previous 
execution.",
-                        jobGraph.getName(),
-                        jobID,
-                        Arrays.stream(JobStatus.values())
-                                .filter(JobStatus::isGloballyTerminalState)
-                                .map(JobStatus::name)
-                                .collect(Collectors.joining(", ")));
-                return FutureUtils.completedExceptionally(
-                        
DuplicateJobSubmissionException.ofGloballyTerminated(jobID));
-            } else if (jobManagerRunnerRegistry.isRegistered(jobID)
-                    || submittedAndWaitingTerminationJobIDs.contains(jobID)) {
-                // job with the given jobID is not terminated, yet
-                return FutureUtils.completedExceptionally(
-                        DuplicateJobSubmissionException.of(jobID));
-            } else if (isPartialResourceConfigured(jobGraph)) {
-                return FutureUtils.completedExceptionally(
-                        new JobSubmissionException(
-                                jobID,
-                                "Currently jobs is not supported if parts of 
the vertices have "
-                                        + "resources configured. The 
limitation will be removed in future versions."));
-            } else {
-                return internalSubmitJob(jobGraph);
-            }
-        } catch (FlinkException e) {
-            return FutureUtils.completedExceptionally(e);
-        }
+        return isInGloballyTerminalState(jobID)
+                .thenCompose(
+                        isTerminated -> {
+                            if (isTerminated) {
+                                log.warn(
+                                        "Ignoring JobGraph submission '{}' 
({}) because the job already "
+                                                + "reached a globally-terminal 
state (i.e. {}) in a "
+                                                + "previous execution.",
+                                        jobGraph.getName(),
+                                        jobID,
+                                        Arrays.stream(JobStatus.values())
+                                                
.filter(JobStatus::isGloballyTerminalState)
+                                                .map(JobStatus::name)
+                                                .collect(Collectors.joining(", 
")));
+                                return FutureUtils.completedExceptionally(
+                                        
DuplicateJobSubmissionException.ofGloballyTerminated(
+                                                jobID));
+                            } else if 
(jobManagerRunnerRegistry.isRegistered(jobID)

Review Comment:
   ```suggestion
                               } else if 
(jobManagerRunnerRegistry.isRegistered(jobID)
   ```
   Ok, that's a tricky one which I missed: The Dispatcher has one requirement: 
Any state access needs to happen in the main thread of the Dispatcher. There's 
a special implementation of `JobManagerRunnerRegistry` that ensures this 
invariant (see 
[OnMainThreadJobmanagerRunnerRegistry](https://github.com/apache/flink/blob/3efd4c2cf1be670f499e6637445e283e48deee60/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/OnMainThreadJobManagerRunnerRegistry.java)).
 Our change (with the `thenCompose` being chained with the future that's 
returned by `isInGloballyTerminalState`) is going against this requirement. Why?
   
   `isInGloballyTerminalState` calls `jobResultStore.hasJobResultEntryAsync` 
which executes the logic on the `ioExecutor` (i.e. a thread for IO operations 
which is not the Dispatcher's main thread) internally. The returned future is 
linked to this executor, i.e. any chained `CompletableFuture` calls will run in 
the same thread. The `thenCompose` logic is, therefore, also executed in the 
`ioExecutor` instead of the main thread.
   
   To workaround this, we have to change the executor for the chained 
execution. This can be achieved by using `thenComposeAsync`, instead. Here we 
would specify the main thread executor by calling `getMainThreadExecutor`. One 
example where it's done like that is 
[Dispatcher:619](https://github.com/apache/flink/blob/b6992b9d80e8ca9c6a81d198d9ed628821e4ab49/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L619):
 The `cleanupAsync` method is executed on the `ioExecutor`. But the error 
handling has to happen in the main thread again. That's where we use 
`handleAsync` with `getMainThreadExecutor`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to