This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a1798ec3ee09c42435524fb890ba1b339392399e
Author: Till Rohrmann <[email protected]>
AuthorDate: Wed Mar 17 09:37:25 2021 +0100

    [FLINK-21602] Allow SchedulerNG to terminate asynchronously
    
    This commit let the SchedulerNG extend the AutoCloseableAync interface in 
order to support
    asynchronous termination behaviours. Moreover, this commit let the 
JobMaster wait on the
    scheduler's termination before terminating itself.
---
 .../apache/flink/runtime/jobmaster/JobMaster.java  | 50 ++++++++++------------
 .../flink/runtime/scheduler/SchedulerBase.java     |  6 ++-
 .../flink/runtime/scheduler/SchedulerNG.java       |  5 +--
 .../scheduler/adaptive/AdaptiveScheduler.java      |  6 ++-
 .../executiongraph/ExecutionGraphSuspendTest.java  | 22 ++++------
 .../flink/runtime/jobmaster/JobMasterTest.java     | 31 ++++++++++++++
 .../OperatorCoordinatorSchedulerTest.java          |  4 +-
 .../runtime/scheduler/DefaultSchedulerTest.java    |  4 +-
 .../runtime/scheduler/TestingSchedulerNG.java      | 21 +++++----
 9 files changed, 90 insertions(+), 59 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 3e115ff..8130b8e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -105,6 +105,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
@@ -394,18 +395,17 @@ public class JobMaster extends 
PermanentlyFencedRpcEndpoint<JobMasterId>
         log.info("Stopping the JobMaster for job {}({}).", jobGraph.getName(), 
jobGraph.getJobID());
 
         // make sure there is a graceful exit
-        try {
-            stopJobExecution(
-                    new FlinkException(
-                            String.format(
-                                    "Stopping JobMaster for job %s(%s).",
-                                    jobGraph.getName(), jobGraph.getJobID())));
-        } catch (Exception e) {
-            return FutureUtils.completedExceptionally(
-                    new JobMasterException("Could not properly stop the 
JobMaster.", e));
-        }
-
-        return CompletableFuture.completedFuture(null);
+        return stopJobExecution(
+                        new FlinkException(
+                                String.format(
+                                        "Stopping JobMaster for job %s(%s).",
+                                        jobGraph.getName(), 
jobGraph.getJobID())))
+                .exceptionally(
+                        exception -> {
+                            throw new CompletionException(
+                                    new JobMasterException(
+                                            "Could not properly stop the 
JobMaster.", exception));
+                        });
     }
 
     // 
----------------------------------------------------------------------------------------------
@@ -920,22 +920,17 @@ public class JobMaster extends 
PermanentlyFencedRpcEndpoint<JobMasterId>
         ExceptionUtils.tryRethrowException(resultingException);
     }
 
-    private void stopJobExecution(final Exception cause) throws Exception {
+    private CompletableFuture<Void> stopJobExecution(final Exception cause) {
         validateRunsInMainThread();
 
-        stopScheduling(cause);
-
-        disconnectTaskManagerResourceManagerConnections(cause);
-
-        Exception resultingException = null;
+        final CompletableFuture<Void> terminationFuture = stopScheduling();
 
-        try {
-            stopJobMasterServices();
-        } catch (Exception e) {
-            resultingException = e;
-        }
-
-        ExceptionUtils.tryRethrowException(resultingException);
+        return FutureUtils.runAfterwards(
+                terminationFuture,
+                () -> {
+                    disconnectTaskManagerResourceManagerConnections(cause);
+                    stopJobMasterServices();
+                });
     }
 
     private void disconnectTaskManagerResourceManagerConnections(Exception 
cause) {
@@ -960,10 +955,11 @@ public class JobMaster extends 
PermanentlyFencedRpcEndpoint<JobMasterId>
         schedulerNG.startScheduling();
     }
 
-    private void stopScheduling(Exception cause) {
-        schedulerNG.suspend(cause);
+    private CompletableFuture<Void> stopScheduling() {
         jobManagerJobMetricGroup.close();
         jobStatusListener.stop();
+
+        return schedulerNG.closeAsync();
     }
 
     // 
----------------------------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 56c06ae..aacea94 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -505,12 +505,16 @@ public abstract class SchedulerBase implements 
SchedulerNG, CheckpointScheduling
     protected abstract void startSchedulingInternal();
 
     @Override
-    public void suspend(Throwable cause) {
+    public CompletableFuture<Void> closeAsync() {
         mainThreadExecutor.assertRunningInMainThread();
 
+        final FlinkException cause = new FlinkException("Scheduler is being 
stopped.");
+
         incrementVersionsOfAllVertices();
         executionGraph.suspend(cause);
         operatorCoordinatorHandler.disposeAllOperatorCoordinators();
+
+        return CompletableFuture.completedFuture(null);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
index 505714d..4e306d9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.UnknownKvStateLocation;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.FlinkException;
 
 import javax.annotation.Nullable;
@@ -64,12 +65,10 @@ import java.util.concurrent.CompletableFuture;
  * <p>Implementations can expect that methods will not be invoked 
concurrently. In fact, all
  * invocations will originate from a thread in the {@link 
ComponentMainThreadExecutor}.
  */
-public interface SchedulerNG {
+public interface SchedulerNG extends AutoCloseableAsync {
 
     void startScheduling();
 
-    void suspend(Throwable cause);
-
     void cancel();
 
     CompletableFuture<JobStatus> getJobTerminationFuture();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index db88e05..4615175 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -282,8 +282,10 @@ public class AdaptiveScheduler
     }
 
     @Override
-    public void suspend(Throwable cause) {
-        state.suspend(cause);
+    public CompletableFuture<Void> closeAsync() {
+        state.suspend(new FlinkException("AdaptiveScheduler is being 
stopped."));
+
+        return FutureUtils.completedVoidFuture();
     }
 
     @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
index af09723..c196a32 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
@@ -57,7 +57,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
 
         // suspend
 
-        scheduler.suspend(new Exception("suspend"));
+        scheduler.closeAsync();
 
         assertEquals(JobStatus.SUSPENDED, eg.getState());
         validateAllVerticesInState(eg, ExecutionState.CANCELED);
@@ -83,7 +83,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
         validateAllVerticesInState(eg, ExecutionState.DEPLOYING);
 
         // suspend
-        scheduler.suspend(new Exception("suspend"));
+        scheduler.closeAsync();
 
         assertEquals(JobStatus.SUSPENDED, eg.getState());
         validateCancelRpcCalls(gateway, parallelism);
@@ -109,7 +109,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
         validateAllVerticesInState(eg, ExecutionState.RUNNING);
 
         // suspend
-        scheduler.suspend(new Exception("suspend"));
+        scheduler.closeAsync();
 
         assertEquals(JobStatus.SUSPENDED, eg.getState());
         validateCancelRpcCalls(gateway, parallelism);
@@ -135,7 +135,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
         validateCancelRpcCalls(gateway, parallelism);
 
         // suspend
-        scheduler.suspend(new Exception("suspend"));
+        scheduler.closeAsync();
 
         assertEquals(JobStatus.SUSPENDED, eg.getState());
         ensureCannotLeaveSuspendedState(scheduler, gateway);
@@ -162,7 +162,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
         assertEquals(JobStatus.FAILED, eg.getState());
 
         // suspend
-        scheduler.suspend(new Exception("suspend"));
+        scheduler.closeAsync();
 
         // still in failed state
         assertEquals(JobStatus.FAILED, eg.getState());
@@ -187,7 +187,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
         validateCancelRpcCalls(gateway, parallelism);
 
         // suspend
-        scheduler.suspend(new Exception("suspend"));
+        scheduler.closeAsync();
 
         assertEquals(JobStatus.SUSPENDED, eg.getState());
 
@@ -215,7 +215,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
         assertEquals(JobStatus.CANCELED, eg.getTerminationFuture().get());
 
         // suspend
-        scheduler.suspend(new Exception("suspend"));
+        scheduler.closeAsync();
 
         // still in failed state
         assertEquals(JobStatus.CANCELED, eg.getState());
@@ -249,14 +249,10 @@ public class ExecutionGraphSuspendTest extends TestLogger 
{
         ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
         assertEquals(JobStatus.RESTARTING, eg.getState());
 
-        final Exception exception = new Exception("Suspended");
-
-        scheduler.suspend(exception);
+        scheduler.closeAsync();
 
         assertEquals(JobStatus.SUSPENDED, eg.getState());
 
-        assertEquals(exception, eg.getFailureCause());
-
         taskRestartExecutor.triggerScheduledTasks();
         assertEquals(JobStatus.SUSPENDED, eg.getState());
     }
@@ -281,7 +277,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
         assertEquals(JobStatus.SUSPENDED, eg.getState());
         validateNoInteractions(gateway);
 
-        scheduler.suspend(new Exception("suspend again"));
+        scheduler.closeAsync();
         assertEquals(JobStatus.SUSPENDED, eg.getState());
         validateNoInteractions(gateway);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 5b7c1e9..f66d82a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -1628,6 +1628,37 @@ public class JobMasterTest extends TestLogger {
         }
     }
 
+    @Test
+    public void testJobMasterOnlyTerminatesAfterTheSchedulerHasClosed() throws 
Exception {
+        final CompletableFuture<Void> schedulerTerminationFuture = new 
CompletableFuture<>();
+        final TestingSchedulerNG testingSchedulerNG =
+                TestingSchedulerNG.newBuilder()
+                        .setCloseAsyncSupplier(() -> 
schedulerTerminationFuture)
+                        .build();
+
+        final JobMaster jobMaster =
+                new JobMasterBuilder(jobGraph, rpcService)
+                        .withSlotPoolServiceSchedulerFactory(
+                                DefaultSlotPoolServiceSchedulerFactory.create(
+                                        
TestingSlotPoolServiceBuilder.newBuilder(),
+                                        new 
TestingSchedulerNGFactory(testingSchedulerNG)))
+                        .createJobMaster();
+
+        jobMaster.start();
+
+        final CompletableFuture<Void> jobMasterTerminationFuture = 
jobMaster.closeAsync();
+
+        try {
+            jobMasterTerminationFuture.get(10L, TimeUnit.MILLISECONDS);
+            fail("Expected TimeoutException because the JobMaster should not 
terminate.");
+        } catch (TimeoutException expected) {
+        }
+
+        schedulerTerminationFuture.complete(null);
+
+        jobMasterTerminationFuture.get();
+    }
+
     private void runJobFailureWhenTaskExecutorTerminatesTest(
             HeartbeatServices heartbeatServices,
             BiConsumer<LocalUnresolvedTaskManagerLocation, JobMasterGateway> 
jobReachedRunningState,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index 6a94253..02d1018 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -118,7 +118,7 @@ public class OperatorCoordinatorSchedulerTest extends 
TestLogger {
     @After
     public void shutdownScheduler() throws Exception {
         if (createdScheduler != null) {
-            createdScheduler.suspend(new Exception("shutdown"));
+            createdScheduler.close();
         }
     }
 
@@ -139,7 +139,7 @@ public class OperatorCoordinatorSchedulerTest extends 
TestLogger {
         final DefaultScheduler scheduler = createAndStartScheduler();
         final TestingOperatorCoordinator coordinator = 
getCoordinator(scheduler);
 
-        scheduler.suspend(new Exception("test suspend"));
+        scheduler.close();
 
         assertTrue(coordinator.isClosed());
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index dc2a1ee..cd2c06f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -875,7 +875,7 @@ public class DefaultSchedulerTest extends TestLogger {
     }
 
     @Test
-    public void suspendJobWillIncrementVertexVersions() {
+    public void suspendJobWillIncrementVertexVersions() throws Exception {
         final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
         final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph);
         final ExecutionVertexID onlyExecutionVertexId =
@@ -885,7 +885,7 @@ public class DefaultSchedulerTest extends TestLogger {
         final ExecutionVertexVersion executionVertexVersion =
                 
executionVertexVersioner.getExecutionVertexVersion(onlyExecutionVertexId);
 
-        scheduler.suspend(new Exception("forced suspend"));
+        scheduler.close();
 
         
assertTrue(executionVertexVersioner.isModified(executionVertexVersion));
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
index c87ac20..ad0682e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
@@ -24,6 +24,7 @@ import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
@@ -48,24 +49,25 @@ import java.net.InetSocketAddress;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
+import java.util.function.Supplier;
 
 /** Testing implementation of the {@link SchedulerNG}. */
 public class TestingSchedulerNG implements SchedulerNG {
     private final CompletableFuture<JobStatus> jobTerminationFuture;
     private final Runnable startSchedulingRunnable;
-    private final Consumer<Throwable> suspendConsumer;
+    private final Supplier<CompletableFuture<Void>> closeAsyncSupplier;
     private final BiFunction<String, Boolean, CompletableFuture<String>> 
triggerSavepointFunction;
     private final Consumer<Throwable> handleGlobalFailureConsumer;
 
     private TestingSchedulerNG(
             CompletableFuture<JobStatus> jobTerminationFuture,
             Runnable startSchedulingRunnable,
-            Consumer<Throwable> suspendConsumer,
+            Supplier<CompletableFuture<Void>> closeAsyncSupplier,
             BiFunction<String, Boolean, CompletableFuture<String>> 
triggerSavepointFunction,
             Consumer<Throwable> handleGlobalFailureConsumer) {
         this.jobTerminationFuture = jobTerminationFuture;
         this.startSchedulingRunnable = startSchedulingRunnable;
-        this.suspendConsumer = suspendConsumer;
+        this.closeAsyncSupplier = closeAsyncSupplier;
         this.triggerSavepointFunction = triggerSavepointFunction;
         this.handleGlobalFailureConsumer = handleGlobalFailureConsumer;
     }
@@ -80,8 +82,8 @@ public class TestingSchedulerNG implements SchedulerNG {
     }
 
     @Override
-    public void suspend(Throwable cause) {
-        suspendConsumer.accept(cause);
+    public CompletableFuture<Void> closeAsync() {
+        return closeAsyncSupplier.get();
     }
 
     @Override
@@ -226,7 +228,8 @@ public class TestingSchedulerNG implements SchedulerNG {
     public static final class Builder {
         private CompletableFuture<JobStatus> jobTerminationFuture = new 
CompletableFuture<>();
         private Runnable startSchedulingRunnable = () -> {};
-        private Consumer<Throwable> suspendConsumer = ignored -> {};
+        private Supplier<CompletableFuture<Void>> closeAsyncSupplier =
+                FutureUtils::completedVoidFuture;
         private BiFunction<String, Boolean, CompletableFuture<String>> 
triggerSavepointFunction =
                 (ignoredA, ignoredB) -> new CompletableFuture<>();
         private Consumer<Throwable> handleGlobalFailureConsumer = (ignored) -> 
{};
@@ -241,8 +244,8 @@ public class TestingSchedulerNG implements SchedulerNG {
             return this;
         }
 
-        public Builder setSuspendConsumer(Consumer<Throwable> suspendConsumer) 
{
-            this.suspendConsumer = suspendConsumer;
+        public Builder setCloseAsyncSupplier(Supplier<CompletableFuture<Void>> 
closeAsyncSupplier) {
+            this.closeAsyncSupplier = closeAsyncSupplier;
             return this;
         }
 
@@ -262,7 +265,7 @@ public class TestingSchedulerNG implements SchedulerNG {
             return new TestingSchedulerNG(
                     jobTerminationFuture,
                     startSchedulingRunnable,
-                    suspendConsumer,
+                    closeAsyncSupplier,
                     triggerSavepointFunction,
                     handleGlobalFailureConsumer);
         }

Reply via email to