Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6279#discussion_r201869163
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
    @@ -536,7 +540,25 @@ private JobManagerRunner 
createJobManagerRunner(JobGraph jobGraph) throws Except
        private void removeJobAndRegisterTerminationFuture(JobID jobId, boolean 
cleanupHA) {
                final CompletableFuture<Void> cleanupFuture = removeJob(jobId, 
cleanupHA);
     
    -           registerOrphanedJobManagerTerminationFuture(cleanupFuture);
    +           registerJobManagerRunnerTerminationFuture(jobId, cleanupFuture);
    +   }
    +
    +   private void registerJobManagerRunnerTerminationFuture(JobID jobId, 
CompletableFuture<Void> jobManagerRunnerTerminationFuture) {
    +           
Preconditions.checkState(!jobManagerTerminationFutures.containsKey(jobId));
    +
    +           jobManagerTerminationFutures.put(jobId, 
jobManagerRunnerTerminationFuture);
    +
    +           // clean up the pending termination future
    +           jobManagerRunnerTerminationFuture.thenRunAsync(
    +                   () -> {
    +                           final CompletableFuture<Void> terminationFuture 
= jobManagerTerminationFutures.remove(jobId);
    +
    +                           //noinspection ObjectEquality
    +                           if (terminationFuture != null && 
terminationFuture != jobManagerRunnerTerminationFuture) {
    +                                   jobManagerTerminationFutures.put(jobId, 
terminationFuture);
    --- End diff --
    
    It can happen because we also clear the termination future in the callback 
of the `Dispatcher#waitForTerminatingJobManager` method.


---

Reply via email to