This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3690c3f44573c753bc5e9a397a555562cec683dc Author: Matthias Pohl <[email protected]> AuthorDate: Fri Apr 22 15:41:05 2022 +0200 [FLINK-27382][runtime] Moves cluster shutdown to when the job cleanup is done in job mode --- .../flink/runtime/dispatcher/Dispatcher.java | 77 +++++++++++++++------- .../flink/runtime/dispatcher/MiniDispatcher.java | 71 +++++++++++--------- .../runtime/dispatcher/MiniDispatcherTest.java | 41 +++++++++++- 3 files changed, 134 insertions(+), 55 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index af82eede38d..7c02082ed53 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -614,7 +614,8 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher jobManagerRunnerResult, executionType); } else { return CompletableFuture.completedFuture( - jobManagerRunnerFailed(jobId, throwable)); + jobManagerRunnerFailed( + jobId, JobStatus.FAILED, throwable)); } }, getMainThreadExecutor()) @@ -654,19 +655,43 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher return CompletableFuture.completedFuture( jobManagerRunnerFailed( jobManagerRunnerResult.getExecutionGraphInfo().getJobId(), + JobStatus.INITIALIZING, jobManagerRunnerResult.getInitializationFailure())); } return jobReachedTerminalState(jobManagerRunnerResult.getExecutionGraphInfo()); } - enum CleanupJobState { - LOCAL, - GLOBAL + private static class CleanupJobState { + + private final boolean globalCleanup; + private final JobStatus jobStatus; + + public static CleanupJobState localCleanup(JobStatus jobStatus) { + return new CleanupJobState(false, jobStatus); + } + + public static CleanupJobState globalCleanup(JobStatus jobStatus) { + return new CleanupJobState(true, jobStatus); + } + + private CleanupJobState(boolean globalCleanup, JobStatus jobStatus) { + this.globalCleanup = globalCleanup; + this.jobStatus = jobStatus; + } + + public boolean isGlobalCleanup() { + return globalCleanup; + } + + public JobStatus getJobStatus() { + return jobStatus; + } } - private CleanupJobState jobManagerRunnerFailed(JobID jobId, Throwable throwable) { + private CleanupJobState jobManagerRunnerFailed( + JobID jobId, JobStatus jobStatus, Throwable throwable) { jobMasterFailed(jobId, throwable); - return CleanupJobState.LOCAL; + return CleanupJobState.localCleanup(jobStatus); } @Override @@ -983,15 +1008,17 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher } private CompletableFuture<Void> removeJob(JobID jobId, CleanupJobState cleanupJobState) { - switch (cleanupJobState) { - case LOCAL: - return localResourceCleaner.cleanupAsync(jobId); - case GLOBAL: - return globalResourceCleaner - .cleanupAsync(jobId) - .thenRunAsync(() -> markJobAsClean(jobId), ioExecutor); - default: - throw new IllegalStateException("Invalid cleanup state: " + cleanupJobState); + if (cleanupJobState.isGlobalCleanup()) { + return globalResourceCleaner + .cleanupAsync(jobId) + .thenRunAsync(() -> markJobAsClean(jobId), ioExecutor) + .thenRunAsync( + () -> + runPostJobGloballyTerminated( + jobId, cleanupJobState.getJobStatus()), + getMainThreadExecutor()); + } else { + return localResourceCleaner.cleanupAsync(jobId); } } @@ -1005,6 +1032,11 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher } } + protected void runPostJobGloballyTerminated(JobID jobId, JobStatus jobStatus) { + // no-op: we need to provide this method to enable the MiniDispatcher implementation to do + // stuff after the job is cleaned up + } + /** Terminate all currently running {@link JobManagerRunner}s. */ private void terminateRunningJobs() { log.info("Stopping all currently running jobs of dispatcher {}.", getAddress()); @@ -1034,6 +1066,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher fatalErrorHandler.onFatalError(throwable); } + @VisibleForTesting protected CompletableFuture<CleanupJobState> jobReachedTerminalState( ExecutionGraphInfo executionGraphInfo) { final ArchivedExecutionGraph archivedExecutionGraph = @@ -1068,7 +1101,8 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher writeToExecutionGraphInfoStore(executionGraphInfo); if (!terminalJobStatus.isGloballyTerminalState()) { - return CompletableFuture.completedFuture(CleanupJobState.LOCAL); + return CompletableFuture.completedFuture( + CleanupJobState.localCleanup(terminalJobStatus)); } // do not create an archive for suspended jobs, as this would eventually lead to @@ -1076,14 +1110,11 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher CompletableFuture<Acknowledge> archiveFuture = archiveExecutionGraphToHistoryServer(executionGraphInfo); - return archiveFuture - .thenCompose( - ignored -> - registerGloballyTerminatedJobInJobResultStore(executionGraphInfo)) - .thenApply(ignored -> CleanupJobState.GLOBAL); + return archiveFuture.thenCompose( + ignored -> registerGloballyTerminatedJobInJobResultStore(executionGraphInfo)); } - private CompletableFuture<Void> registerGloballyTerminatedJobInJobResultStore( + private CompletableFuture<CleanupJobState> registerGloballyTerminatedJobInJobResultStore( ExecutionGraphInfo executionGraphInfo) { final CompletableFuture<Void> writeFuture = new CompletableFuture<>(); final JobID jobId = executionGraphInfo.getJobId(); @@ -1130,7 +1161,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher executionGraphInfo.getJobId()), error)); } - return null; + return CleanupJobState.globalCleanup(terminalJobStatus); }, getMainThreadExecutor()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java index 85a08332e66..dd9d67d525e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java @@ -18,24 +18,23 @@ package org.apache.flink.runtime.dispatcher; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.dispatcher.cleanup.ResourceCleanerFactory; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint; -import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.FlinkException; import javax.annotation.Nullable; -import java.util.Objects; import java.util.concurrent.CompletableFuture; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -75,6 +74,31 @@ public class MiniDispatcher extends Dispatcher { this.executionMode = checkNotNull(executionMode); } + @VisibleForTesting + public MiniDispatcher( + RpcService rpcService, + DispatcherId fencingToken, + DispatcherServices dispatcherServices, + @Nullable JobGraph jobGraph, + @Nullable JobResult recoveredDirtyJob, + DispatcherBootstrapFactory dispatcherBootstrapFactory, + JobManagerRunnerRegistry jobManagerRunnerRegistry, + ResourceCleanerFactory resourceCleanerFactory, + JobClusterEntrypoint.ExecutionMode executionMode) + throws Exception { + super( + rpcService, + fencingToken, + CollectionUtil.ofNullable(jobGraph), + CollectionUtil.ofNullable(recoveredDirtyJob), + dispatcherBootstrapFactory, + dispatcherServices, + jobManagerRunnerRegistry, + resourceCleanerFactory); + + this.executionMode = checkNotNull(executionMode); + } + @Override public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) { final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = @@ -129,33 +153,18 @@ public class MiniDispatcher extends Dispatcher { } @Override - protected CompletableFuture<CleanupJobState> jobReachedTerminalState( - ExecutionGraphInfo executionGraphInfo) { - final ArchivedExecutionGraph archivedExecutionGraph = - executionGraphInfo.getArchivedExecutionGraph(); - final CompletableFuture<CleanupJobState> cleanupHAState = - super.jobReachedTerminalState(executionGraphInfo); - - return cleanupHAState.thenApply( - cleanupJobState -> { - JobStatus jobStatus = - Objects.requireNonNull( - archivedExecutionGraph.getState(), - "JobStatus should not be null here."); - if (jobStatus.isGloballyTerminalState() - && (jobCancelled - || executionMode == ClusterEntrypoint.ExecutionMode.DETACHED)) { - // shut down if job is cancelled or we don't have to wait for the execution - // result retrieval - log.info( - "Shutting down cluster with state {}, jobCancelled: {}, executionMode: {}", - jobStatus, - jobCancelled, - executionMode); - shutDownFuture.complete(ApplicationStatus.fromJobStatus(jobStatus)); - } - - return cleanupJobState; - }); + protected void runPostJobGloballyTerminated(JobID jobId, JobStatus jobStatus) { + super.runPostJobGloballyTerminated(jobId, jobStatus); + + if (jobCancelled || executionMode == ClusterEntrypoint.ExecutionMode.DETACHED) { + // shut down if job is cancelled or we don't have to wait for the execution + // result retrieval + log.info( + "Shutting down cluster after job with state {}, jobCancelled: {}, executionMode: {}", + jobStatus, + jobCancelled, + executionMode); + shutDownFuture.complete(ApplicationStatus.fromJobStatus(jobStatus)); + } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java index 9977c84557e..5727dde19d6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.dispatcher.cleanup.TestingCleanupRunnerFactory; +import org.apache.flink.runtime.dispatcher.cleanup.TestingResourceCleanerFactory; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; @@ -40,9 +41,11 @@ import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraph import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; +import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.runtime.testutils.TestingJobResultStore; import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource; import org.apache.flink.util.TestLogger; +import org.apache.flink.util.concurrent.FutureUtils; import org.assertj.core.api.Assertions; import org.junit.AfterClass; @@ -99,6 +102,9 @@ public class MiniDispatcherTest extends TestLogger { private TestingJobMasterServiceLeadershipRunnerFactory testingJobManagerRunnerFactory; private TestingCleanupRunnerFactory testingCleanupRunnerFactory; + private CompletableFuture<Void> localCleanupResultFuture; + private CompletableFuture<Void> globalCleanupResultFuture; + @BeforeClass public static void setupClass() throws IOException { jobGraph = JobGraphTestUtils.singleNoOpJobGraph(); @@ -123,6 +129,10 @@ public class MiniDispatcherTest extends TestLogger { testingJobManagerRunnerFactory = new TestingJobMasterServiceLeadershipRunnerFactory(); testingCleanupRunnerFactory = new TestingCleanupRunnerFactory(); + + // the default setting shouldn't block the cleanup + localCleanupResultFuture = FutureUtils.completedVoidFuture(); + globalCleanupResultFuture = FutureUtils.completedVoidFuture(); } @AfterClass @@ -182,6 +192,7 @@ public class MiniDispatcherTest extends TestLogger { */ @Test public void testTerminationAfterJobCompletion() throws Exception { + globalCleanupResultFuture = new CompletableFuture<>(); final MiniDispatcher miniDispatcher = createMiniDispatcher(ClusterEntrypoint.ExecutionMode.DETACHED); @@ -194,9 +205,24 @@ public class MiniDispatcherTest extends TestLogger { testingJobManagerRunner.completeResultFuture(executionGraphInfo); - // wait until we terminate + CommonTestUtils.waitUntilCondition( + () -> + !highAvailabilityServices + .getJobResultStore() + .getDirtyResults() + .isEmpty()); + + assertFalse( + "The shutdownFuture should not be completed before the cleanup is triggered.", + miniDispatcher.getShutDownFuture().isDone()); + + globalCleanupResultFuture.complete(null); + miniDispatcher.getShutDownFuture().get(); } finally { + // we have to complete the future to make the job and, as a consequence, the + // MiniDispatcher terminate + globalCleanupResultFuture.complete(null); RpcUtils.terminateRpcEndpoint(miniDispatcher); } } @@ -310,6 +336,8 @@ public class MiniDispatcherTest extends TestLogger { @Nullable JobGraph recoveredJobGraph, @Nullable JobResult recoveredDirtyJob) throws Exception { + final JobManagerRunnerRegistry jobManagerRunnerRegistry = + new DefaultJobManagerRunnerRegistry(2); return new MiniDispatcher( rpcService, DispatcherId.generate(), @@ -333,6 +361,17 @@ public class MiniDispatcherTest extends TestLogger { recoveredJobGraph, recoveredDirtyJob, (dispatcher, scheduledExecutor, errorHandler) -> new NoOpDispatcherBootstrap(), + jobManagerRunnerRegistry, + TestingResourceCleanerFactory.builder() + // JobManagerRunnerRegistry needs to be added explicitly + // because cleaning it will trigger the closeAsync latch + // provided by TestingJobManagerRunner + .withLocallyCleanableResource(jobManagerRunnerRegistry) + .withGloballyCleanableResource( + (jobId, ignoredExecutor) -> globalCleanupResultFuture) + .withLocallyCleanableResource( + (jobId, ignoredExecutor) -> localCleanupResultFuture) + .build(), executionMode); } }
