This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a1798ec3ee09c42435524fb890ba1b339392399e Author: Till Rohrmann <[email protected]> AuthorDate: Wed Mar 17 09:37:25 2021 +0100 [FLINK-21602] Allow SchedulerNG to terminate asynchronously This commit let the SchedulerNG extend the AutoCloseableAync interface in order to support asynchronous termination behaviours. Moreover, this commit let the JobMaster wait on the scheduler's termination before terminating itself. --- .../apache/flink/runtime/jobmaster/JobMaster.java | 50 ++++++++++------------ .../flink/runtime/scheduler/SchedulerBase.java | 6 ++- .../flink/runtime/scheduler/SchedulerNG.java | 5 +-- .../scheduler/adaptive/AdaptiveScheduler.java | 6 ++- .../executiongraph/ExecutionGraphSuspendTest.java | 22 ++++------ .../flink/runtime/jobmaster/JobMasterTest.java | 31 ++++++++++++++ .../OperatorCoordinatorSchedulerTest.java | 4 +- .../runtime/scheduler/DefaultSchedulerTest.java | 4 +- .../runtime/scheduler/TestingSchedulerNG.java | 21 +++++---- 9 files changed, 90 insertions(+), 59 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 3e115ff..8130b8e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -105,6 +105,7 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeoutException; @@ -394,18 +395,17 @@ public class JobMaster extends PermanentlyFencedRpcEndpoint<JobMasterId> log.info("Stopping the JobMaster for job {}({}).", jobGraph.getName(), jobGraph.getJobID()); // make sure there is a graceful exit - try { - stopJobExecution( - new FlinkException( - String.format( - "Stopping JobMaster for job %s(%s).", - jobGraph.getName(), jobGraph.getJobID()))); - } catch (Exception e) { - return FutureUtils.completedExceptionally( - new JobMasterException("Could not properly stop the JobMaster.", e)); - } - - return CompletableFuture.completedFuture(null); + return stopJobExecution( + new FlinkException( + String.format( + "Stopping JobMaster for job %s(%s).", + jobGraph.getName(), jobGraph.getJobID()))) + .exceptionally( + exception -> { + throw new CompletionException( + new JobMasterException( + "Could not properly stop the JobMaster.", exception)); + }); } // ---------------------------------------------------------------------------------------------- @@ -920,22 +920,17 @@ public class JobMaster extends PermanentlyFencedRpcEndpoint<JobMasterId> ExceptionUtils.tryRethrowException(resultingException); } - private void stopJobExecution(final Exception cause) throws Exception { + private CompletableFuture<Void> stopJobExecution(final Exception cause) { validateRunsInMainThread(); - stopScheduling(cause); - - disconnectTaskManagerResourceManagerConnections(cause); - - Exception resultingException = null; + final CompletableFuture<Void> terminationFuture = stopScheduling(); - try { - stopJobMasterServices(); - } catch (Exception e) { - resultingException = e; - } - - ExceptionUtils.tryRethrowException(resultingException); + return FutureUtils.runAfterwards( + terminationFuture, + () -> { + disconnectTaskManagerResourceManagerConnections(cause); + stopJobMasterServices(); + }); } private void disconnectTaskManagerResourceManagerConnections(Exception cause) { @@ -960,10 +955,11 @@ public class JobMaster extends PermanentlyFencedRpcEndpoint<JobMasterId> schedulerNG.startScheduling(); } - private void stopScheduling(Exception cause) { - schedulerNG.suspend(cause); + private CompletableFuture<Void> stopScheduling() { jobManagerJobMetricGroup.close(); jobStatusListener.stop(); + + return schedulerNG.closeAsync(); } // ---------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index 56c06ae..aacea94 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -505,12 +505,16 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling protected abstract void startSchedulingInternal(); @Override - public void suspend(Throwable cause) { + public CompletableFuture<Void> closeAsync() { mainThreadExecutor.assertRunningInMainThread(); + final FlinkException cause = new FlinkException("Scheduler is being stopped."); + incrementVersionsOfAllVertices(); executionGraph.suspend(cause); operatorCoordinatorHandler.disposeAllOperatorCoordinators(); + + return CompletableFuture.completedFuture(null); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java index 505714d..4e306d9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java @@ -47,6 +47,7 @@ import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.UnknownKvStateLocation; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.apache.flink.util.AutoCloseableAsync; import org.apache.flink.util.FlinkException; import javax.annotation.Nullable; @@ -64,12 +65,10 @@ import java.util.concurrent.CompletableFuture; * <p>Implementations can expect that methods will not be invoked concurrently. In fact, all * invocations will originate from a thread in the {@link ComponentMainThreadExecutor}. */ -public interface SchedulerNG { +public interface SchedulerNG extends AutoCloseableAsync { void startScheduling(); - void suspend(Throwable cause); - void cancel(); CompletableFuture<JobStatus> getJobTerminationFuture(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index db88e05..4615175 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -282,8 +282,10 @@ public class AdaptiveScheduler } @Override - public void suspend(Throwable cause) { - state.suspend(cause); + public CompletableFuture<Void> closeAsync() { + state.suspend(new FlinkException("AdaptiveScheduler is being stopped.")); + + return FutureUtils.completedVoidFuture(); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java index af09723..c196a32 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java @@ -57,7 +57,7 @@ public class ExecutionGraphSuspendTest extends TestLogger { // suspend - scheduler.suspend(new Exception("suspend")); + scheduler.closeAsync(); assertEquals(JobStatus.SUSPENDED, eg.getState()); validateAllVerticesInState(eg, ExecutionState.CANCELED); @@ -83,7 +83,7 @@ public class ExecutionGraphSuspendTest extends TestLogger { validateAllVerticesInState(eg, ExecutionState.DEPLOYING); // suspend - scheduler.suspend(new Exception("suspend")); + scheduler.closeAsync(); assertEquals(JobStatus.SUSPENDED, eg.getState()); validateCancelRpcCalls(gateway, parallelism); @@ -109,7 +109,7 @@ public class ExecutionGraphSuspendTest extends TestLogger { validateAllVerticesInState(eg, ExecutionState.RUNNING); // suspend - scheduler.suspend(new Exception("suspend")); + scheduler.closeAsync(); assertEquals(JobStatus.SUSPENDED, eg.getState()); validateCancelRpcCalls(gateway, parallelism); @@ -135,7 +135,7 @@ public class ExecutionGraphSuspendTest extends TestLogger { validateCancelRpcCalls(gateway, parallelism); // suspend - scheduler.suspend(new Exception("suspend")); + scheduler.closeAsync(); assertEquals(JobStatus.SUSPENDED, eg.getState()); ensureCannotLeaveSuspendedState(scheduler, gateway); @@ -162,7 +162,7 @@ public class ExecutionGraphSuspendTest extends TestLogger { assertEquals(JobStatus.FAILED, eg.getState()); // suspend - scheduler.suspend(new Exception("suspend")); + scheduler.closeAsync(); // still in failed state assertEquals(JobStatus.FAILED, eg.getState()); @@ -187,7 +187,7 @@ public class ExecutionGraphSuspendTest extends TestLogger { validateCancelRpcCalls(gateway, parallelism); // suspend - scheduler.suspend(new Exception("suspend")); + scheduler.closeAsync(); assertEquals(JobStatus.SUSPENDED, eg.getState()); @@ -215,7 +215,7 @@ public class ExecutionGraphSuspendTest extends TestLogger { assertEquals(JobStatus.CANCELED, eg.getTerminationFuture().get()); // suspend - scheduler.suspend(new Exception("suspend")); + scheduler.closeAsync(); // still in failed state assertEquals(JobStatus.CANCELED, eg.getState()); @@ -249,14 +249,10 @@ public class ExecutionGraphSuspendTest extends TestLogger { ExecutionGraphTestUtils.completeCancellingForAllVertices(eg); assertEquals(JobStatus.RESTARTING, eg.getState()); - final Exception exception = new Exception("Suspended"); - - scheduler.suspend(exception); + scheduler.closeAsync(); assertEquals(JobStatus.SUSPENDED, eg.getState()); - assertEquals(exception, eg.getFailureCause()); - taskRestartExecutor.triggerScheduledTasks(); assertEquals(JobStatus.SUSPENDED, eg.getState()); } @@ -281,7 +277,7 @@ public class ExecutionGraphSuspendTest extends TestLogger { assertEquals(JobStatus.SUSPENDED, eg.getState()); validateNoInteractions(gateway); - scheduler.suspend(new Exception("suspend again")); + scheduler.closeAsync(); assertEquals(JobStatus.SUSPENDED, eg.getState()); validateNoInteractions(gateway); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 5b7c1e9..f66d82a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -1628,6 +1628,37 @@ public class JobMasterTest extends TestLogger { } } + @Test + public void testJobMasterOnlyTerminatesAfterTheSchedulerHasClosed() throws Exception { + final CompletableFuture<Void> schedulerTerminationFuture = new CompletableFuture<>(); + final TestingSchedulerNG testingSchedulerNG = + TestingSchedulerNG.newBuilder() + .setCloseAsyncSupplier(() -> schedulerTerminationFuture) + .build(); + + final JobMaster jobMaster = + new JobMasterBuilder(jobGraph, rpcService) + .withSlotPoolServiceSchedulerFactory( + DefaultSlotPoolServiceSchedulerFactory.create( + TestingSlotPoolServiceBuilder.newBuilder(), + new TestingSchedulerNGFactory(testingSchedulerNG))) + .createJobMaster(); + + jobMaster.start(); + + final CompletableFuture<Void> jobMasterTerminationFuture = jobMaster.closeAsync(); + + try { + jobMasterTerminationFuture.get(10L, TimeUnit.MILLISECONDS); + fail("Expected TimeoutException because the JobMaster should not terminate."); + } catch (TimeoutException expected) { + } + + schedulerTerminationFuture.complete(null); + + jobMasterTerminationFuture.get(); + } + private void runJobFailureWhenTaskExecutorTerminatesTest( HeartbeatServices heartbeatServices, BiConsumer<LocalUnresolvedTaskManagerLocation, JobMasterGateway> jobReachedRunningState, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java index 6a94253..02d1018 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java @@ -118,7 +118,7 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger { @After public void shutdownScheduler() throws Exception { if (createdScheduler != null) { - createdScheduler.suspend(new Exception("shutdown")); + createdScheduler.close(); } } @@ -139,7 +139,7 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger { final DefaultScheduler scheduler = createAndStartScheduler(); final TestingOperatorCoordinator coordinator = getCoordinator(scheduler); - scheduler.suspend(new Exception("test suspend")); + scheduler.close(); assertTrue(coordinator.isClosed()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java index dc2a1ee..cd2c06f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java @@ -875,7 +875,7 @@ public class DefaultSchedulerTest extends TestLogger { } @Test - public void suspendJobWillIncrementVertexVersions() { + public void suspendJobWillIncrementVertexVersions() throws Exception { final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph); final ExecutionVertexID onlyExecutionVertexId = @@ -885,7 +885,7 @@ public class DefaultSchedulerTest extends TestLogger { final ExecutionVertexVersion executionVertexVersion = executionVertexVersioner.getExecutionVertexVersion(onlyExecutionVertexId); - scheduler.suspend(new Exception("forced suspend")); + scheduler.close(); assertTrue(executionVertexVersioner.isModified(executionVertexVersion)); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java index c87ac20..ad0682e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java @@ -24,6 +24,7 @@ import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition; @@ -48,24 +49,25 @@ import java.net.InetSocketAddress; import java.util.concurrent.CompletableFuture; import java.util.function.BiFunction; import java.util.function.Consumer; +import java.util.function.Supplier; /** Testing implementation of the {@link SchedulerNG}. */ public class TestingSchedulerNG implements SchedulerNG { private final CompletableFuture<JobStatus> jobTerminationFuture; private final Runnable startSchedulingRunnable; - private final Consumer<Throwable> suspendConsumer; + private final Supplier<CompletableFuture<Void>> closeAsyncSupplier; private final BiFunction<String, Boolean, CompletableFuture<String>> triggerSavepointFunction; private final Consumer<Throwable> handleGlobalFailureConsumer; private TestingSchedulerNG( CompletableFuture<JobStatus> jobTerminationFuture, Runnable startSchedulingRunnable, - Consumer<Throwable> suspendConsumer, + Supplier<CompletableFuture<Void>> closeAsyncSupplier, BiFunction<String, Boolean, CompletableFuture<String>> triggerSavepointFunction, Consumer<Throwable> handleGlobalFailureConsumer) { this.jobTerminationFuture = jobTerminationFuture; this.startSchedulingRunnable = startSchedulingRunnable; - this.suspendConsumer = suspendConsumer; + this.closeAsyncSupplier = closeAsyncSupplier; this.triggerSavepointFunction = triggerSavepointFunction; this.handleGlobalFailureConsumer = handleGlobalFailureConsumer; } @@ -80,8 +82,8 @@ public class TestingSchedulerNG implements SchedulerNG { } @Override - public void suspend(Throwable cause) { - suspendConsumer.accept(cause); + public CompletableFuture<Void> closeAsync() { + return closeAsyncSupplier.get(); } @Override @@ -226,7 +228,8 @@ public class TestingSchedulerNG implements SchedulerNG { public static final class Builder { private CompletableFuture<JobStatus> jobTerminationFuture = new CompletableFuture<>(); private Runnable startSchedulingRunnable = () -> {}; - private Consumer<Throwable> suspendConsumer = ignored -> {}; + private Supplier<CompletableFuture<Void>> closeAsyncSupplier = + FutureUtils::completedVoidFuture; private BiFunction<String, Boolean, CompletableFuture<String>> triggerSavepointFunction = (ignoredA, ignoredB) -> new CompletableFuture<>(); private Consumer<Throwable> handleGlobalFailureConsumer = (ignored) -> {}; @@ -241,8 +244,8 @@ public class TestingSchedulerNG implements SchedulerNG { return this; } - public Builder setSuspendConsumer(Consumer<Throwable> suspendConsumer) { - this.suspendConsumer = suspendConsumer; + public Builder setCloseAsyncSupplier(Supplier<CompletableFuture<Void>> closeAsyncSupplier) { + this.closeAsyncSupplier = closeAsyncSupplier; return this; } @@ -262,7 +265,7 @@ public class TestingSchedulerNG implements SchedulerNG { return new TestingSchedulerNG( jobTerminationFuture, startSchedulingRunnable, - suspendConsumer, + closeAsyncSupplier, triggerSavepointFunction, handleGlobalFailureConsumer); }
