This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3690c3f44573c753bc5e9a397a555562cec683dc
Author: Matthias Pohl <[email protected]>
AuthorDate: Fri Apr 22 15:41:05 2022 +0200

    [FLINK-27382][runtime] Moves cluster shutdown to when the job cleanup is 
done in job mode
---
 .../flink/runtime/dispatcher/Dispatcher.java       | 77 +++++++++++++++-------
 .../flink/runtime/dispatcher/MiniDispatcher.java   | 71 +++++++++++---------
 .../runtime/dispatcher/MiniDispatcherTest.java     | 41 +++++++++++-
 3 files changed, 134 insertions(+), 55 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index af82eede38d..7c02082ed53 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -614,7 +614,8 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
                                                 jobManagerRunnerResult, 
executionType);
                                     } else {
                                         return 
CompletableFuture.completedFuture(
-                                                jobManagerRunnerFailed(jobId, 
throwable));
+                                                jobManagerRunnerFailed(
+                                                        jobId, 
JobStatus.FAILED, throwable));
                                     }
                                 },
                                 getMainThreadExecutor())
@@ -654,19 +655,43 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
             return CompletableFuture.completedFuture(
                     jobManagerRunnerFailed(
                             
jobManagerRunnerResult.getExecutionGraphInfo().getJobId(),
+                            JobStatus.INITIALIZING,
                             
jobManagerRunnerResult.getInitializationFailure()));
         }
         return 
jobReachedTerminalState(jobManagerRunnerResult.getExecutionGraphInfo());
     }
 
-    enum CleanupJobState {
-        LOCAL,
-        GLOBAL
+    private static class CleanupJobState {
+
+        private final boolean globalCleanup;
+        private final JobStatus jobStatus;
+
+        public static CleanupJobState localCleanup(JobStatus jobStatus) {
+            return new CleanupJobState(false, jobStatus);
+        }
+
+        public static CleanupJobState globalCleanup(JobStatus jobStatus) {
+            return new CleanupJobState(true, jobStatus);
+        }
+
+        private CleanupJobState(boolean globalCleanup, JobStatus jobStatus) {
+            this.globalCleanup = globalCleanup;
+            this.jobStatus = jobStatus;
+        }
+
+        public boolean isGlobalCleanup() {
+            return globalCleanup;
+        }
+
+        public JobStatus getJobStatus() {
+            return jobStatus;
+        }
     }
 
-    private CleanupJobState jobManagerRunnerFailed(JobID jobId, Throwable 
throwable) {
+    private CleanupJobState jobManagerRunnerFailed(
+            JobID jobId, JobStatus jobStatus, Throwable throwable) {
         jobMasterFailed(jobId, throwable);
-        return CleanupJobState.LOCAL;
+        return CleanupJobState.localCleanup(jobStatus);
     }
 
     @Override
@@ -983,15 +1008,17 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
     }
 
     private CompletableFuture<Void> removeJob(JobID jobId, CleanupJobState 
cleanupJobState) {
-        switch (cleanupJobState) {
-            case LOCAL:
-                return localResourceCleaner.cleanupAsync(jobId);
-            case GLOBAL:
-                return globalResourceCleaner
-                        .cleanupAsync(jobId)
-                        .thenRunAsync(() -> markJobAsClean(jobId), ioExecutor);
-            default:
-                throw new IllegalStateException("Invalid cleanup state: " + 
cleanupJobState);
+        if (cleanupJobState.isGlobalCleanup()) {
+            return globalResourceCleaner
+                    .cleanupAsync(jobId)
+                    .thenRunAsync(() -> markJobAsClean(jobId), ioExecutor)
+                    .thenRunAsync(
+                            () ->
+                                    runPostJobGloballyTerminated(
+                                            jobId, 
cleanupJobState.getJobStatus()),
+                            getMainThreadExecutor());
+        } else {
+            return localResourceCleaner.cleanupAsync(jobId);
         }
     }
 
@@ -1005,6 +1032,11 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
         }
     }
 
+    protected void runPostJobGloballyTerminated(JobID jobId, JobStatus 
jobStatus) {
+        // no-op: we need to provide this method to enable the MiniDispatcher 
implementation to do
+        // stuff after the job is cleaned up
+    }
+
     /** Terminate all currently running {@link JobManagerRunner}s. */
     private void terminateRunningJobs() {
         log.info("Stopping all currently running jobs of dispatcher {}.", 
getAddress());
@@ -1034,6 +1066,7 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
         fatalErrorHandler.onFatalError(throwable);
     }
 
+    @VisibleForTesting
     protected CompletableFuture<CleanupJobState> jobReachedTerminalState(
             ExecutionGraphInfo executionGraphInfo) {
         final ArchivedExecutionGraph archivedExecutionGraph =
@@ -1068,7 +1101,8 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
         writeToExecutionGraphInfoStore(executionGraphInfo);
 
         if (!terminalJobStatus.isGloballyTerminalState()) {
-            return CompletableFuture.completedFuture(CleanupJobState.LOCAL);
+            return CompletableFuture.completedFuture(
+                    CleanupJobState.localCleanup(terminalJobStatus));
         }
 
         // do not create an archive for suspended jobs, as this would 
eventually lead to
@@ -1076,14 +1110,11 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
         CompletableFuture<Acknowledge> archiveFuture =
                 archiveExecutionGraphToHistoryServer(executionGraphInfo);
 
-        return archiveFuture
-                .thenCompose(
-                        ignored ->
-                                
registerGloballyTerminatedJobInJobResultStore(executionGraphInfo))
-                .thenApply(ignored -> CleanupJobState.GLOBAL);
+        return archiveFuture.thenCompose(
+                ignored -> 
registerGloballyTerminatedJobInJobResultStore(executionGraphInfo));
     }
 
-    private CompletableFuture<Void> 
registerGloballyTerminatedJobInJobResultStore(
+    private CompletableFuture<CleanupJobState> 
registerGloballyTerminatedJobInJobResultStore(
             ExecutionGraphInfo executionGraphInfo) {
         final CompletableFuture<Void> writeFuture = new CompletableFuture<>();
         final JobID jobId = executionGraphInfo.getJobId();
@@ -1130,7 +1161,7 @@ public abstract class Dispatcher extends 
PermanentlyFencedRpcEndpoint<Dispatcher
                                                 executionGraphInfo.getJobId()),
                                         error));
                     }
-                    return null;
+                    return CleanupJobState.globalCleanup(terminalJobStatus);
                 },
                 getMainThreadExecutor());
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
index 85a08332e66..dd9d67d525e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
@@ -18,24 +18,23 @@
 
 package org.apache.flink.runtime.dispatcher;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.dispatcher.cleanup.ResourceCleanerFactory;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
 import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.FlinkException;
 
 import javax.annotation.Nullable;
 
-import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -75,6 +74,31 @@ public class MiniDispatcher extends Dispatcher {
         this.executionMode = checkNotNull(executionMode);
     }
 
+    @VisibleForTesting
+    public MiniDispatcher(
+            RpcService rpcService,
+            DispatcherId fencingToken,
+            DispatcherServices dispatcherServices,
+            @Nullable JobGraph jobGraph,
+            @Nullable JobResult recoveredDirtyJob,
+            DispatcherBootstrapFactory dispatcherBootstrapFactory,
+            JobManagerRunnerRegistry jobManagerRunnerRegistry,
+            ResourceCleanerFactory resourceCleanerFactory,
+            JobClusterEntrypoint.ExecutionMode executionMode)
+            throws Exception {
+        super(
+                rpcService,
+                fencingToken,
+                CollectionUtil.ofNullable(jobGraph),
+                CollectionUtil.ofNullable(recoveredDirtyJob),
+                dispatcherBootstrapFactory,
+                dispatcherServices,
+                jobManagerRunnerRegistry,
+                resourceCleanerFactory);
+
+        this.executionMode = checkNotNull(executionMode);
+    }
+
     @Override
     public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time 
timeout) {
         final CompletableFuture<Acknowledge> acknowledgeCompletableFuture =
@@ -129,33 +153,18 @@ public class MiniDispatcher extends Dispatcher {
     }
 
     @Override
-    protected CompletableFuture<CleanupJobState> jobReachedTerminalState(
-            ExecutionGraphInfo executionGraphInfo) {
-        final ArchivedExecutionGraph archivedExecutionGraph =
-                executionGraphInfo.getArchivedExecutionGraph();
-        final CompletableFuture<CleanupJobState> cleanupHAState =
-                super.jobReachedTerminalState(executionGraphInfo);
-
-        return cleanupHAState.thenApply(
-                cleanupJobState -> {
-                    JobStatus jobStatus =
-                            Objects.requireNonNull(
-                                    archivedExecutionGraph.getState(),
-                                    "JobStatus should not be null here.");
-                    if (jobStatus.isGloballyTerminalState()
-                            && (jobCancelled
-                                    || executionMode == 
ClusterEntrypoint.ExecutionMode.DETACHED)) {
-                        // shut down if job is cancelled or we don't have to 
wait for the execution
-                        // result retrieval
-                        log.info(
-                                "Shutting down cluster with state {}, 
jobCancelled: {}, executionMode: {}",
-                                jobStatus,
-                                jobCancelled,
-                                executionMode);
-                        
shutDownFuture.complete(ApplicationStatus.fromJobStatus(jobStatus));
-                    }
-
-                    return cleanupJobState;
-                });
+    protected void runPostJobGloballyTerminated(JobID jobId, JobStatus 
jobStatus) {
+        super.runPostJobGloballyTerminated(jobId, jobStatus);
+
+        if (jobCancelled || executionMode == 
ClusterEntrypoint.ExecutionMode.DETACHED) {
+            // shut down if job is cancelled or we don't have to wait for the 
execution
+            // result retrieval
+            log.info(
+                    "Shutting down cluster after job with state {}, 
jobCancelled: {}, executionMode: {}",
+                    jobStatus,
+                    jobCancelled,
+                    executionMode);
+            
shutDownFuture.complete(ApplicationStatus.fromJobStatus(jobStatus));
+        }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index 9977c84557e..5727dde19d6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.dispatcher.cleanup.TestingCleanupRunnerFactory;
+import 
org.apache.flink.runtime.dispatcher.cleanup.TestingResourceCleanerFactory;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -40,9 +41,11 @@ import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraph
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.TestingJobResultStore;
 import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.FutureUtils;
 
 import org.assertj.core.api.Assertions;
 import org.junit.AfterClass;
@@ -99,6 +102,9 @@ public class MiniDispatcherTest extends TestLogger {
     private TestingJobMasterServiceLeadershipRunnerFactory 
testingJobManagerRunnerFactory;
     private TestingCleanupRunnerFactory testingCleanupRunnerFactory;
 
+    private CompletableFuture<Void> localCleanupResultFuture;
+    private CompletableFuture<Void> globalCleanupResultFuture;
+
     @BeforeClass
     public static void setupClass() throws IOException {
         jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
@@ -123,6 +129,10 @@ public class MiniDispatcherTest extends TestLogger {
 
         testingJobManagerRunnerFactory = new 
TestingJobMasterServiceLeadershipRunnerFactory();
         testingCleanupRunnerFactory = new TestingCleanupRunnerFactory();
+
+        // the default setting shouldn't block the cleanup
+        localCleanupResultFuture = FutureUtils.completedVoidFuture();
+        globalCleanupResultFuture = FutureUtils.completedVoidFuture();
     }
 
     @AfterClass
@@ -182,6 +192,7 @@ public class MiniDispatcherTest extends TestLogger {
      */
     @Test
     public void testTerminationAfterJobCompletion() throws Exception {
+        globalCleanupResultFuture = new CompletableFuture<>();
         final MiniDispatcher miniDispatcher =
                 createMiniDispatcher(ClusterEntrypoint.ExecutionMode.DETACHED);
 
@@ -194,9 +205,24 @@ public class MiniDispatcherTest extends TestLogger {
 
             testingJobManagerRunner.completeResultFuture(executionGraphInfo);
 
-            // wait until we terminate
+            CommonTestUtils.waitUntilCondition(
+                    () ->
+                            !highAvailabilityServices
+                                    .getJobResultStore()
+                                    .getDirtyResults()
+                                    .isEmpty());
+
+            assertFalse(
+                    "The shutdownFuture should not be completed before the 
cleanup is triggered.",
+                    miniDispatcher.getShutDownFuture().isDone());
+
+            globalCleanupResultFuture.complete(null);
+
             miniDispatcher.getShutDownFuture().get();
         } finally {
+            // we have to complete the future to make the job and, as a 
consequence, the
+            // MiniDispatcher terminate
+            globalCleanupResultFuture.complete(null);
             RpcUtils.terminateRpcEndpoint(miniDispatcher);
         }
     }
@@ -310,6 +336,8 @@ public class MiniDispatcherTest extends TestLogger {
             @Nullable JobGraph recoveredJobGraph,
             @Nullable JobResult recoveredDirtyJob)
             throws Exception {
+        final JobManagerRunnerRegistry jobManagerRunnerRegistry =
+                new DefaultJobManagerRunnerRegistry(2);
         return new MiniDispatcher(
                 rpcService,
                 DispatcherId.generate(),
@@ -333,6 +361,17 @@ public class MiniDispatcherTest extends TestLogger {
                 recoveredJobGraph,
                 recoveredDirtyJob,
                 (dispatcher, scheduledExecutor, errorHandler) -> new 
NoOpDispatcherBootstrap(),
+                jobManagerRunnerRegistry,
+                TestingResourceCleanerFactory.builder()
+                        // JobManagerRunnerRegistry needs to be added 
explicitly
+                        // because cleaning it will trigger the closeAsync 
latch
+                        // provided by TestingJobManagerRunner
+                        .withLocallyCleanableResource(jobManagerRunnerRegistry)
+                        .withGloballyCleanableResource(
+                                (jobId, ignoredExecutor) -> 
globalCleanupResultFuture)
+                        .withLocallyCleanableResource(
+                                (jobId, ignoredExecutor) -> 
localCleanupResultFuture)
+                        .build(),
                 executionMode);
     }
 }

Reply via email to