zentol commented on code in PR #21849:
URL: https://github.com/apache/flink/pull/21849#discussion_r1212786448


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -609,13 +609,21 @@ private boolean isPartialResourceConfigured(JobGraph 
jobGraph) {
     private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph 
jobGraph) {
         applyParallelismOverrides(jobGraph);
         log.info("Submitting job '{}' ({}).", jobGraph.getName(), 
jobGraph.getJobID());
+
+        // track as an outstanding job
+        submittedAndWaitingTerminationJobIDs.add(jobGraph.getJobID());
+
         return waitForTerminatingJob(jobGraph.getJobID(), jobGraph, 
this::persistAndRunJob)
                 .handle((ignored, throwable) -> 
handleTermination(jobGraph.getJobID(), throwable))
                 .thenCompose(Function.identity());
     }
 
     private CompletableFuture<Acknowledge> handleTermination(
             JobID jobId, @Nullable Throwable terminationThrowable) {
+
+        // job is done processing, whether failed or finished
+        submittedAndWaitingTerminationJobIDs.remove(jobId);

Review Comment:
   I'll have to take a closer look but I'd think that we have to take the 
cleanup below into account.
   Otherwise a new job with the same ID may be submitted and we end up cleaning 
up state that it is already using.
   
   Maybe do this as a final step in `internalSubmitJob` with `whenComplete`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to