This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new a2925a0  [FLINK-19816] Make job state cleanup dependent on final job 
result
a2925a0 is described below

commit a2925a0d2e894bf28aaced2993ec453589d143de
Author: Till Rohrmann <[email protected]>
AuthorDate: Thu Nov 12 18:45:35 2020 +0100

    [FLINK-19816] Make job state cleanup dependent on final job result
    
    In order to avoid race conditions between stopping a Dispatcher and a 
finishing
    job, we now wait on the actual job result to decide whether to clean up the 
job's
    HA data or not. A stopping dispatcher will simply close the DispatcherJobs 
and
    continue the clean up operation on the job has been terminated.
    
    This closes #14055.
---
 .../flink/runtime/dispatcher/Dispatcher.java       | 113 +++++++++++----------
 .../flink/runtime/dispatcher/MiniDispatcher.java   |  12 ++-
 .../runtime/jobmaster/JobManagerRunnerImpl.java    |   7 +-
 .../dispatcher/DispatcherResourceCleanupTest.java  |  52 ++++++++++
 .../runtime/jobmaster/TestingJobManagerRunner.java |   8 ++
 5 files changed, 127 insertions(+), 65 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 55e2c3d..c78ffc8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -397,6 +397,17 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
                                getMainThreadExecutor());
        }
 
+       enum CleanupJobState {
+               LOCAL(false),
+               GLOBAL(true);
+
+               final boolean cleanupHAData;
+
+               CleanupJobState(boolean cleanupHAData) {
+                       this.cleanupHAData = cleanupHAData;
+               }
+       }
+
        private CompletableFuture<JobManagerRunner> 
createJobManagerRunner(JobGraph jobGraph) {
                final RpcService rpcService = getRpcService();
 
@@ -422,32 +433,33 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
        private JobManagerRunner startJobManagerRunner(JobManagerRunner 
jobManagerRunner) throws Exception {
                final JobID jobId = jobManagerRunner.getJobID();
 
-               FutureUtils.assertNoException(
-                       jobManagerRunner.getResultFuture().handleAsync(
-                               (ArchivedExecutionGraph archivedExecutionGraph, 
Throwable throwable) -> {
-                                       // check if we are still the active 
JobManagerRunner by checking the identity
-                                       final JobManagerRunner 
currentJobManagerRunner = 
Optional.ofNullable(jobManagerRunnerFutures.get(jobId))
-                                               .map(future -> 
future.getNow(null))
-                                               .orElse(null);
-                                       //noinspection ObjectEquality
-                                       if (jobManagerRunner == 
currentJobManagerRunner) {
-                                               if (archivedExecutionGraph != 
null) {
-                                                       
jobReachedGloballyTerminalState(archivedExecutionGraph);
-                                               } else {
-                                                       final Throwable 
strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
-
-                                                       if (strippedThrowable 
instanceof JobNotFinishedException) {
-                                                               
jobNotFinished(jobId);
-                                                       } else {
-                                                               
jobMasterFailed(jobId, strippedThrowable);
-                                                       }
-                                               }
+               final CompletableFuture<CleanupJobState> cleanupJobStateFuture 
= jobManagerRunner.getResultFuture().handleAsync(
+                       (ArchivedExecutionGraph archivedExecutionGraph, 
Throwable throwable) -> {
+                               // check if we are still the active 
JobManagerRunner by checking the identity
+                               final JobManagerRunner currentJobManagerRunner 
= Optional.ofNullable(jobManagerRunnerFutures.get(jobId))
+                                       .map(future -> future.getNow(null))
+                                       .orElse(null);
+
+                               Preconditions.checkState(jobManagerRunner == 
currentJobManagerRunner, "The runner entry in jobManagerRunnerFutures must be 
bound to the lifetime of the JobManagerRunner.");
+                               if (archivedExecutionGraph != null) {
+                                       return 
jobReachedGloballyTerminalState(archivedExecutionGraph);
+                               } else {
+                                       final Throwable strippedThrowable = 
ExceptionUtils.stripCompletionException(throwable);
+
+                                       if (strippedThrowable instanceof 
JobNotFinishedException) {
+                                               return jobNotFinished(jobId);
                                        } else {
-                                               log.debug("There is a newer 
JobManagerRunner for the job {}.", jobId);
+                                               return jobMasterFailed(jobId, 
strippedThrowable);
                                        }
+                               }
+                       }, getMainThreadExecutor());
 
-                                       return null;
-                               }, getMainThreadExecutor()));
+               final CompletableFuture<Void> jobTerminationFuture = 
cleanupJobStateFuture
+                       .thenApply(cleanupJobState -> removeJob(jobId, 
cleanupJobState))
+                       .thenCompose(Function.identity());
+
+               FutureUtils.assertNoException(jobTerminationFuture);
+               registerJobManagerRunnerTerminationFuture(jobId, 
jobTerminationFuture);
 
                jobManagerRunner.start();
 
@@ -666,22 +678,8 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
                                
jobMasterGateway.deliverCoordinationRequestToCoordinator(operatorId, 
serializedRequest, timeout));
        }
 
-       /**
-        * Cleans up the job related data from the dispatcher. If cleanupHA is 
true, then
-        * the data will also be removed from HA.
-        *
-        * @param jobId JobID identifying the job to clean up
-        * @param cleanupHA True iff HA data shall also be cleaned up
-        */
-       private void removeJobAndRegisterTerminationFuture(JobID jobId, boolean 
cleanupHA) {
-               final CompletableFuture<Void> cleanupFuture = removeJob(jobId, 
cleanupHA);
-
-               registerJobManagerRunnerTerminationFuture(jobId, cleanupFuture);
-       }
-
        private void registerJobManagerRunnerTerminationFuture(JobID jobId, 
CompletableFuture<Void> jobManagerRunnerTerminationFuture) {
                
Preconditions.checkState(!jobManagerTerminationFutures.containsKey(jobId));
-
                jobManagerTerminationFutures.put(jobId, 
jobManagerRunnerTerminationFuture);
 
                // clean up the pending termination future
@@ -697,18 +695,13 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
                        getMainThreadExecutor());
        }
 
-       private CompletableFuture<Void> removeJob(JobID jobId, boolean 
cleanupHA) {
-               CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = 
jobManagerRunnerFutures.remove(jobId);
+       private CompletableFuture<Void> removeJob(JobID jobId, CleanupJobState 
cleanupJobState) {
+               final CompletableFuture<JobManagerRunner> job = 
checkNotNull(jobManagerRunnerFutures.remove(jobId));
 
-               final CompletableFuture<Void> jobManagerRunnerTerminationFuture;
-               if (jobManagerRunnerFuture != null) {
-                       jobManagerRunnerTerminationFuture = 
jobManagerRunnerFuture.thenCompose(JobManagerRunner::closeAsync);
-               } else {
-                       jobManagerRunnerTerminationFuture = 
CompletableFuture.completedFuture(null);
-               }
+               final CompletableFuture<Void> jobTerminationFuture = 
job.thenCompose(JobManagerRunner::closeAsync);
 
-               return jobManagerRunnerTerminationFuture.thenRunAsync(
-                       () -> cleanUpJobData(jobId, cleanupHA),
+               return jobTerminationFuture.thenRunAsync(
+                       () -> cleanUpJobData(jobId, 
cleanupJobState.cleanupHAData),
                        getRpcService().getExecutor());
        }
 
@@ -751,7 +744,15 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
                final HashSet<JobID> jobsToRemove = new 
HashSet<>(jobManagerRunnerFutures.keySet());
 
                for (JobID jobId : jobsToRemove) {
-                       removeJobAndRegisterTerminationFuture(jobId, false);
+                       terminateJob(jobId);
+               }
+       }
+
+       private void terminateJob(JobID jobId) {
+               final CompletableFuture<JobManagerRunner> 
jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId);
+
+               if (jobManagerRunnerFuture != null) {
+                       
jobManagerRunnerFuture.thenCompose(JobManagerRunner::closeAsync);
                }
        }
 
@@ -765,7 +766,7 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
                fatalErrorHandler.onFatalError(throwable);
        }
 
-       protected void jobReachedGloballyTerminalState(ArchivedExecutionGraph 
archivedExecutionGraph) {
+       protected CleanupJobState 
jobReachedGloballyTerminalState(ArchivedExecutionGraph archivedExecutionGraph) {
                Preconditions.checkArgument(
                        
archivedExecutionGraph.getState().isGloballyTerminalState(),
                        "Job %s is in state %s which is not globally terminal.",
@@ -776,9 +777,7 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
 
                archiveExecutionGraph(archivedExecutionGraph);
 
-               final JobID jobId = archivedExecutionGraph.getJobID();
-
-               removeJobAndRegisterTerminationFuture(jobId, true);
+               return CleanupJobState.GLOBAL;
        }
 
        private void archiveExecutionGraph(ArchivedExecutionGraph 
archivedExecutionGraph) {
@@ -806,16 +805,18 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
                        });
        }
 
-       protected void jobNotFinished(JobID jobId) {
+       protected CleanupJobState jobNotFinished(JobID jobId) {
                log.info("Job {} was not finished by JobManager.", jobId);
 
-               removeJobAndRegisterTerminationFuture(jobId, false);
+               return CleanupJobState.LOCAL;
        }
 
-       private void jobMasterFailed(JobID jobId, Throwable cause) {
+       private CleanupJobState jobMasterFailed(JobID jobId, Throwable cause) {
                // we fail fatally in case of a JobMaster failure in order to 
restart the
                // dispatcher to recover the jobs again. This only works in HA 
mode, though
                onFatalError(new FlinkException(String.format("JobMaster for 
job %s failed.", jobId), cause));
+
+               return CleanupJobState.LOCAL;
        }
 
        private CompletableFuture<JobMasterGateway> 
getJobMasterGatewayFuture(JobID jobId) {
@@ -900,7 +901,7 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
 
        public CompletableFuture<Void> onRemovedJobGraph(JobID jobId) {
                return CompletableFuture.runAsync(
-                       () -> removeJobAndRegisterTerminationFuture(jobId, 
false),
+                       () -> terminateJob(jobId),
                        getMainThreadExecutor());
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
index b06453d..65f71b0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
@@ -112,20 +112,24 @@ public class MiniDispatcher extends Dispatcher {
        }
 
        @Override
-       protected void jobReachedGloballyTerminalState(ArchivedExecutionGraph 
archivedExecutionGraph) {
-               super.jobReachedGloballyTerminalState(archivedExecutionGraph);
+       protected CleanupJobState 
jobReachedGloballyTerminalState(ArchivedExecutionGraph archivedExecutionGraph) {
+               final CleanupJobState cleanupHAState = 
super.jobReachedGloballyTerminalState(archivedExecutionGraph);
 
                if (jobCancelled || executionMode == 
ClusterEntrypoint.ExecutionMode.DETACHED) {
                        // shut down if job is cancelled or we don't have to 
wait for the execution result retrieval
                        
shutDownFuture.complete(ApplicationStatus.fromJobStatus(archivedExecutionGraph.getState()));
                }
+
+               return cleanupHAState;
        }
 
        @Override
-       protected void jobNotFinished(JobID jobId) {
-               super.jobNotFinished(jobId);
+       protected CleanupJobState jobNotFinished(JobID jobId) {
+               final CleanupJobState cleanupJobState = 
super.jobNotFinished(jobId);
 
                // shut down since we have done our job
                shutDownFuture.complete(ApplicationStatus.UNKNOWN);
+
+               return cleanupJobState;
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
index 4d750d4..adfafcf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
@@ -194,6 +194,8 @@ public class JobManagerRunnerImpl implements 
LeaderContender, OnCompletionAction
 
                                                classLoaderLease.release();
 
+                                               
resultFuture.completeExceptionally(new 
JobNotFinishedException(jobGraph.getJobID()));
+
                                                if (throwable != null) {
                                                        
terminationFuture.completeExceptionally(
                                                                new 
FlinkException("Could not properly shut down the JobManagerRunner", throwable));
@@ -201,11 +203,6 @@ public class JobManagerRunnerImpl implements 
LeaderContender, OnCompletionAction
                                                        
terminationFuture.complete(null);
                                                }
                                        });
-
-                               terminationFuture.whenComplete(
-                                       (Void ignored, Throwable throwable) -> {
-                                               
resultFuture.completeExceptionally(new 
JobNotFinishedException(jobGraph.getJobID()));
-                                       });
                        }
 
                        return terminationFuture;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index fdbf165..2792f6e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -74,7 +74,11 @@ import javax.annotation.Nonnull;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.Optional;
+import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -314,6 +318,32 @@ public class DispatcherResourceCleanupTest extends 
TestLogger {
                assertThat(deleteAllHABlobsFuture.isDone(), is(false));
        }
 
+       @Test
+       public void testHACleanupWhenJobFinishedWhileClosingDispatcher() throws 
Exception {
+               final TestingJobManagerRunner testingJobManagerRunner = new 
TestingJobManagerRunner(jobId, true);
+
+               final Queue<JobManagerRunner> jobManagerRunners = new 
ArrayDeque<>(Arrays.asList(testingJobManagerRunner));
+
+               startDispatcher(new 
QueueJobManagerRunnerFactory(jobManagerRunners));
+               submitJob();
+
+               final CompletableFuture<Void> dispatcherTerminationFuture = 
dispatcher.closeAsync();
+
+               testingJobManagerRunner.waitUntilCloseAsyncIsBeingCalled();
+               testingJobManagerRunner.completeResultFuture(new 
ArchivedExecutionGraphBuilder()
+                       .setJobID(jobId)
+                       .setState(JobStatus.FINISHED)
+                       .build());
+
+               testingJobManagerRunner.completeTerminationFuture();
+
+               // check that no exceptions have been thrown
+               dispatcherTerminationFuture.get();
+
+               assertThat(cleanupJobFuture.get(), is(jobId));
+               assertThat(deleteAllHABlobsFuture.get(), is(jobId));
+       }
+
        /**
         * Tests that the {@link RunningJobsRegistry} entries are cleared after 
the
         * job reached a terminal state.
@@ -548,6 +578,28 @@ public class DispatcherResourceCleanupTest extends 
TestLogger {
                }
        }
 
+       private static final class QueueJobManagerRunnerFactory implements 
JobManagerRunnerFactory {
+               private final Queue<? extends JobManagerRunner> 
jobManagerRunners;
+
+               private QueueJobManagerRunnerFactory(Queue<? extends 
JobManagerRunner> jobManagerRunners) {
+                       this.jobManagerRunners = jobManagerRunners;
+               }
+
+               @Override
+               public JobManagerRunner createJobManagerRunner(
+                       JobGraph jobGraph,
+                       Configuration configuration,
+                       RpcService rpcService,
+                       HighAvailabilityServices highAvailabilityServices,
+                       HeartbeatServices heartbeatServices,
+                       JobManagerSharedServices jobManagerServices,
+                       JobManagerJobMetricGroupFactory 
jobManagerJobMetricGroupFactory,
+                       FatalErrorHandler fatalErrorHandler) {
+                       return Optional.ofNullable(jobManagerRunners.poll())
+                               .orElseThrow(() -> new 
IllegalStateException("Cannot create more JobManagerRunners."));
+               }
+       }
+
        private class FailingJobManagerRunnerFactory implements 
JobManagerRunnerFactory {
                private final Exception testException;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
index 55cfde4..e8ed401 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 
 import java.util.concurrent.CompletableFuture;
@@ -38,6 +39,8 @@ public class TestingJobManagerRunner implements 
JobManagerRunner {
 
        private final CompletableFuture<Void> terminationFuture;
 
+       private final OneShotLatch closeAsyncLatch = new OneShotLatch();
+
        public TestingJobManagerRunner(JobID jobId) {
                this(jobId, false);
        }
@@ -72,6 +75,7 @@ public class TestingJobManagerRunner implements 
JobManagerRunner {
 
        @Override
        public CompletableFuture<Void> closeAsync() {
+               closeAsyncLatch.trigger();
                if (!blockingTermination) {
                        terminationFuture.complete(null);
                }
@@ -91,6 +95,10 @@ public class TestingJobManagerRunner implements 
JobManagerRunner {
                terminationFuture.complete(null);
        }
 
+       public void waitUntilCloseAsyncIsBeingCalled() throws 
InterruptedException {
+               closeAsyncLatch.await();
+       }
+
        public CompletableFuture<Void> getTerminationFuture() {
                return terminationFuture;
        }

Reply via email to