zentol commented on code in PR #21849:
URL: https://github.com/apache/flink/pull/21849#discussion_r1188871040


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1513,13 +1517,20 @@ private CompletableFuture<Void> waitForTerminatingJob(
                                                     throwable));
                                 });
 
-        return jobManagerTerminationFuture.thenAcceptAsync(
+        // keep track of the job as outstanding, if not done
+        if (!jobManagerTerminationFuture.isDone()) {
+            submittedAndWaitingTerminationJobIDs.add(jobId);
+        }

Review Comment:
   This doesn't seem safe; there's no guarantee the termination future doesn't 
complete a millisecond after the condition, so any submission with the same job 
ID could still fail.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1513,13 +1517,20 @@ private CompletableFuture<Void> waitForTerminatingJob(
                                                     throwable));
                                 });
 
-        return jobManagerTerminationFuture.thenAcceptAsync(
+        // keep track of the job as outstanding, if not done
+        if (!jobManagerTerminationFuture.isDone()) {
+            submittedAndWaitingTerminationJobIDs.add(jobId);
+        }
+
+        return FutureUtils.thenAcceptAsyncIfNotDone(

Review Comment:
   AFAICT this change along, without any of the other stuff, would already 
solve the issue in question.
   If no JM exists for this job then this runs synchronously in the main thread.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java:
##########
@@ -232,6 +233,27 @@ public void 
testDuplicateJobSubmissionWithGloballyTerminatedAndCleanedJob() thro
         assertDuplicateJobSubmission();
     }
 
+    @Test
+    public void testDuplicateJobSubmissionIsDetected() throws Exception {

Review Comment:
   Isn't this test just duplicating existing tests and not really targeting the 
case we're interested in? You want a test that actually delays the 
persistAndJob process.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1513,13 +1517,20 @@ private CompletableFuture<Void> waitForTerminatingJob(
                                                     throwable));
                                 });
 
-        return jobManagerTerminationFuture.thenAcceptAsync(
+        // keep track of the job as outstanding, if not done
+        if (!jobManagerTerminationFuture.isDone()) {
+            submittedAndWaitingTerminationJobIDs.add(jobId);
+        }
+
+        return FutureUtils.thenAcceptAsyncIfNotDone(
+                jobManagerTerminationFuture,
+                getMainThreadExecutor(),
                 FunctionUtils.uncheckedConsumer(
                         (ignored) -> {
+                            submittedAndWaitingTerminationJobIDs.remove(jobId);

Review Comment:
   This isnt run if the submission fails, leaking memory and breaking all 
subsequent job submissions attempts with a fixed job id.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to