tillrohrmann commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r577723544



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -908,49 +909,38 @@ public void reportCheckpointMetrics(
         // will be restarted by the CheckpointCoordinatorDeActivator.
         checkpointCoordinator.stopCheckpointScheduler();
 
+        final CompletableFuture<Collection<ExecutionState>> 
executionTerminationsFuture =
+                getCombinedExecutionTerminationFuture();
+
         final CompletableFuture<String> savepointFuture =
                 checkpointCoordinator
                         .triggerSynchronousSavepoint(advanceToEndOfEventTime, 
targetDirectory)
                         .thenApply(CompletedCheckpoint::getExternalPointer);
 
-        final CompletableFuture<JobStatus> terminationFuture =
-                executionGraph
-                        .getTerminationFuture()
-                        .handle(
-                                (jobstatus, throwable) -> {
-                                    if (throwable != null) {
-                                        log.info(
-                                                "Failed during stopping job {} 
with a savepoint. Reason: {}",
-                                                jobGraph.getJobID(),
-                                                throwable.getMessage());
-                                        throw new 
CompletionException(throwable);
-                                    } else if (jobstatus != 
JobStatus.FINISHED) {
-                                        log.info(
-                                                "Failed during stopping job {} 
with a savepoint. Reason: Reached state {} instead of FINISHED.",
-                                                jobGraph.getJobID(),
-                                                jobstatus);
-                                        throw new CompletionException(
-                                                new FlinkException(
-                                                        "Reached state "
-                                                                + jobstatus
-                                                                + " instead of 
FINISHED."));
-                                    }
-                                    return jobstatus;
-                                });
-
-        return savepointFuture
-                .thenCompose((path) -> terminationFuture.thenApply((jobStatus 
-> path)))
-                .handleAsync(
-                        (path, throwable) -> {
-                            if (throwable != null) {
-                                // restart the checkpoint coordinator if 
stopWithSavepoint failed.
-                                
startCheckpointScheduler(checkpointCoordinator);
-                                throw new CompletionException(throwable);
-                            }
+        StopWithSavepointContext stopWithSavepointContext =

Review comment:
       Should we call this class `SWSContext` or `SWSOperation`? The problem 
with context is that we have quite some context classes in the code base and I 
always find it mainly used as an umbrella term because of lack of better names.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointContext.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/** {@code StopWithSavepointContext} implements {@link 
StopWithSavepointOperations}. */
+public class StopWithSavepointContext implements StopWithSavepointOperations {
+
+    private final Logger log;
+
+    private final SchedulerBase scheduler;
+    private final CheckpointCoordinator checkpointCoordinator;
+    private final JobID jobId;
+
+    private final CompletableFuture<String> result = new CompletableFuture<>();
+
+    private StopWithSavepointState state = StopWithSavepointState.InitialWait;
+    private String path;
+    private Set<ExecutionState> unfinishedStates;
+
+    public StopWithSavepointContext(JobID jobId, SchedulerBase scheduler, 
Logger log) {
+        this.jobId = jobId;
+        this.scheduler = scheduler;
+        this.checkpointCoordinator = scheduler.getCheckpointCoordinator();
+        this.log = log;
+    }
+
+    @Override
+    public synchronized void handleSavepointCreation(String path, Throwable 
throwable) {
+        final StopWithSavepointState oldState = state;
+        state = state.onSavepointCreation(this, path, throwable);
+
+        log.debug(
+                "Stop-with-savepoint transitioned from {} to {} on savepoint 
creation handling.",
+                oldState,
+                state);
+    }
+
+    @Override
+    public synchronized void handleExecutionTermination(
+            Collection<ExecutionState> executionStates) {
+        final StopWithSavepointState oldState = state;
+        state = state.onExecutionsTermination(this, executionStates);
+
+        log.debug(
+                "Stop-with-savepoint transitioned from {} to {} on execution 
termination handling.",
+                oldState,
+                state);
+    }
+
+    @Override
+    public CompletableFuture<String> getResult() {
+        return result;
+    }
+
+    private StopWithSavepointState terminateExceptionWithGlobalFailover(
+            Iterable<ExecutionState> unfinishedExecutionStates) {
+        String errorMessage =
+                String.format(
+                        "Inconsistent execution state after stopping with 
savepoint. At least one execution is still in one of the following states: %s. 
A global fail-over is triggered to recover the job %s.",
+                        StringUtils.join(unfinishedExecutionStates, ", "), 
jobId);
+        FlinkException inconsistentFinalStateException = new 
FlinkException(errorMessage);
+
+        scheduler.handleGlobalFailure(inconsistentFinalStateException);
+        return terminateExceptionally(inconsistentFinalStateException);
+    }
+
+    private StopWithSavepointState terminateExceptionally(Throwable throwable) 
{
+        scheduler.startCheckpointScheduler(checkpointCoordinator);

Review comment:
       Can it happen that we start the `CheckpointCoordinator` after the EG has 
reached a non `RUNNING` `JobStatus`? I think this can be the case because we 
first call `handleGlobalFailure` and then `terminateExceptionally`. Maybe we 
should only restart the `CheckpointCoordinator` if we are still in `RUNNING`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointContext.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/** {@code StopWithSavepointContext} implements {@link 
StopWithSavepointOperations}. */
+public class StopWithSavepointContext implements StopWithSavepointOperations {
+
+    private final Logger log;
+
+    private final SchedulerBase scheduler;
+    private final CheckpointCoordinator checkpointCoordinator;
+    private final JobID jobId;
+
+    private final CompletableFuture<String> result = new CompletableFuture<>();
+
+    private StopWithSavepointState state = StopWithSavepointState.InitialWait;
+    private String path;
+    private Set<ExecutionState> unfinishedStates;

Review comment:
       `@Nullable` is missing.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointOperations.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@code StopWithSavepointOperations} collects the steps for creating a 
savepoint and waiting for
+ * the job to stop.
+ */
+public interface StopWithSavepointOperations {
+
+    /**
+     * Handles the Savepoint creation termination.
+     *
+     * @param path the path to the newly created savepoint.
+     * @param throwable the {@code Throwable} in case of failure.
+     */
+    void handleSavepointCreation(String path, Throwable throwable);

Review comment:
       ```suggestion
       void handleSavepointCreation(@Nullable String path, @Nullable Throwable 
throwable);
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/StopWithSavepointContextTest.java
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.core.testutils.FlinkMatchers;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * {@code StopWithSavepointContextTest} tests the stop-with-savepoint 
functionality of {@link
+ * SchedulerBase#stopWithSavepoint(String, boolean)}.
+ */
+public class StopWithSavepointContextTest extends TestLogger {
+
+    private JobGraph jobGraph;
+    private DefaultScheduler scheduler;
+
+    private StopWithSavepointOperations testInstance;
+
+    @Before
+    public void setup() throws Exception {
+        jobGraph = new JobGraph();
+
+        final JobVertex jobVertex = new JobVertex("vertex #0");
+        jobVertex.setInvokableClass(NoOpInvokable.class);
+        jobGraph.addVertex(jobVertex);
+
+        // checkpointInterval has to be set to a value lower than 
Long.MAX_VALUE to enable
+        // periodic checkpointing - only then can we enable/disable the 
CheckpointCoordinator
+        SchedulerTestingUtils.enablePeriodicCheckpointing(jobGraph, 
Long.MAX_VALUE - 1);
+        scheduler =
+                SchedulerTestingUtils.createSchedulerBuilder(
+                                jobGraph, 
ComponentMainThreadExecutorServiceAdapter.forMainThread())
+                        .setFutureExecutor(new 
DirectScheduledExecutorService())
+                        .build();
+        scheduler.startScheduling();
+
+        // the checkpoint scheduler is stopped before triggering the 
stop-with-savepoint
+        disableCheckpointScheduler();
+
+        testInstance = new StopWithSavepointContext(jobGraph.getJobID(), 
scheduler, this.log);
+    }
+
+    @Test
+    public void testHappyPathWithSavepointCreationBeforeTermination() throws 
Exception {
+        assertHappyPath(
+                (savepointPath) -> {
+                    testInstance.handleSavepointCreation(savepointPath, null);
+                    testInstance.handleExecutionTermination(
+                            
Collections.singletonList(ExecutionState.FINISHED));
+                });
+    }
+
+    @Test
+    public void testHappyPathWithSavepointCreationAfterTermination() throws 
Exception {
+        assertHappyPath(
+                (savepointPath) -> {
+                    testInstance.handleExecutionTermination(
+                            
Collections.singletonList(ExecutionState.FINISHED));
+                    testInstance.handleSavepointCreation(savepointPath, null);
+                });
+    }
+
+    private void assertHappyPath(Consumer<String> stopWithSavepointCompletion) 
throws Exception {
+        final String savepointPath = "savepoint-path";
+
+        stopWithSavepointCompletion.accept(savepointPath);
+
+        assertThat(testInstance.getResult().get(), is(savepointPath));
+
+        // the happy path won't restart the CheckpointCoordinator
+        assertCheckpointSchedulingDisabled();
+    }
+
+    @Test
+    public void testSavepointCreationFailure() {
+        final Exception exception = new Exception("Expected exception during 
savepoint creation.");
+        testInstance.handleSavepointCreation(null, exception);
+
+        try {
+            testInstance.getResult().get();
+            fail("An ExecutionException is expected.");
+        } catch (Throwable e) {
+            final Optional<Throwable> actualException =
+                    ExceptionUtils.findThrowableWithMessage(e, 
exception.getMessage());
+            assertTrue(actualException.isPresent());
+        }
+
+        assertCheckpointSchedulingEnabled();
+    }
+
+    @Test
+    public void testNoTerminationHandlingAfterSavepointCompletion() throws 
Exception {
+        assertNoTerminationHandling(
+                () -> {
+                    testInstance.handleSavepointCreation("savepoint-path", 
null);
+                    testInstance.handleExecutionTermination(
+                            // the task failed and was restarted
+                            Collections.singletonList(ExecutionState.RUNNING));

Review comment:
       `RUNNING` is not a terminal state. If an `Execution` is restarted, then 
the old `Execution` will reach a terminal state and then we will create a new 
`Execution`. Hence, we are testing something which is not possible atm I think.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/StopWithSavepointContextTest.java
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.core.testutils.FlinkMatchers;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * {@code StopWithSavepointContextTest} tests the stop-with-savepoint 
functionality of {@link
+ * SchedulerBase#stopWithSavepoint(String, boolean)}.
+ */
+public class StopWithSavepointContextTest extends TestLogger {
+
+    private JobGraph jobGraph;
+    private DefaultScheduler scheduler;
+
+    private StopWithSavepointOperations testInstance;
+
+    @Before
+    public void setup() throws Exception {
+        jobGraph = new JobGraph();
+
+        final JobVertex jobVertex = new JobVertex("vertex #0");
+        jobVertex.setInvokableClass(NoOpInvokable.class);
+        jobGraph.addVertex(jobVertex);
+
+        // checkpointInterval has to be set to a value lower than 
Long.MAX_VALUE to enable
+        // periodic checkpointing - only then can we enable/disable the 
CheckpointCoordinator
+        SchedulerTestingUtils.enablePeriodicCheckpointing(jobGraph, 
Long.MAX_VALUE - 1);
+        scheduler =
+                SchedulerTestingUtils.createSchedulerBuilder(
+                                jobGraph, 
ComponentMainThreadExecutorServiceAdapter.forMainThread())
+                        .setFutureExecutor(new 
DirectScheduledExecutorService())
+                        .build();
+        scheduler.startScheduling();
+
+        // the checkpoint scheduler is stopped before triggering the 
stop-with-savepoint
+        disableCheckpointScheduler();
+
+        testInstance = new StopWithSavepointContext(jobGraph.getJobID(), 
scheduler, this.log);
+    }
+
+    @Test
+    public void testHappyPathWithSavepointCreationBeforeTermination() throws 
Exception {
+        assertHappyPath(
+                (savepointPath) -> {
+                    testInstance.handleSavepointCreation(savepointPath, null);
+                    testInstance.handleExecutionTermination(
+                            
Collections.singletonList(ExecutionState.FINISHED));
+                });
+    }
+
+    @Test
+    public void testHappyPathWithSavepointCreationAfterTermination() throws 
Exception {
+        assertHappyPath(
+                (savepointPath) -> {
+                    testInstance.handleExecutionTermination(
+                            
Collections.singletonList(ExecutionState.FINISHED));
+                    testInstance.handleSavepointCreation(savepointPath, null);
+                });
+    }
+
+    private void assertHappyPath(Consumer<String> stopWithSavepointCompletion) 
throws Exception {
+        final String savepointPath = "savepoint-path";
+
+        stopWithSavepointCompletion.accept(savepointPath);
+
+        assertThat(testInstance.getResult().get(), is(savepointPath));
+
+        // the happy path won't restart the CheckpointCoordinator
+        assertCheckpointSchedulingDisabled();
+    }
+
+    @Test
+    public void testSavepointCreationFailure() {
+        final Exception exception = new Exception("Expected exception during 
savepoint creation.");
+        testInstance.handleSavepointCreation(null, exception);
+
+        try {
+            testInstance.getResult().get();
+            fail("An ExecutionException is expected.");
+        } catch (Throwable e) {
+            final Optional<Throwable> actualException =
+                    ExceptionUtils.findThrowableWithMessage(e, 
exception.getMessage());
+            assertTrue(actualException.isPresent());
+        }
+
+        assertCheckpointSchedulingEnabled();
+    }
+
+    @Test
+    public void testNoTerminationHandlingAfterSavepointCompletion() throws 
Exception {
+        assertNoTerminationHandling(
+                () -> {
+                    testInstance.handleSavepointCreation("savepoint-path", 
null);
+                    testInstance.handleExecutionTermination(
+                            // the task failed and was restarted
+                            Collections.singletonList(ExecutionState.RUNNING));
+                });
+    }
+
+    @Test
+    public void testNoTerminationHandlingBeforeSavepointCompletion() throws 
Exception {
+        assertNoTerminationHandling(
+                () -> {
+                    testInstance.handleExecutionTermination(
+                            // the task failed and was restarted
+                            Collections.singletonList(ExecutionState.RUNNING));
+                    testInstance.handleSavepointCreation("savepoint-path", 
null);
+                });
+    }
+
+    private void assertNoTerminationHandling(Runnable 
stopWithSavepointCompletion)
+            throws Exception {
+        final ManuallyTriggeredScheduledExecutor restartExecutor =
+                new ManuallyTriggeredScheduledExecutor();
+        scheduler =
+                SchedulerTestingUtils.createSchedulerBuilder(
+                                jobGraph, 
ComponentMainThreadExecutorServiceAdapter.forMainThread())
+                        // we're expecting a global fail-over and,
+                        // therefore, have to enable restarting
+                        .setRestartBackoffTimeStrategy(new 
TestRestartBackoffTimeStrategy(true, 0))
+                        .setDelayExecutor(restartExecutor)
+                        .setFutureExecutor(new 
DirectScheduledExecutorService())
+                        .build();
+        scheduler.startScheduling();
+
+        testInstance = new StopWithSavepointContext(jobGraph.getJobID(), 
scheduler, log);
+
+        disableCheckpointScheduler();
+
+        stopWithSavepointCompletion.run();
+
+        // the task gets cancelled before triggering the restart
+        ExecutionAttemptID executionAttemptID =
+                scheduler
+                        .getExecutionGraph()
+                        .getAllExecutionVertices()
+                        .iterator()
+                        .next()
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), executionAttemptID, 
ExecutionState.CANCELED));
+
+        restartExecutor.triggerScheduledTasks();
+
+        try {
+            testInstance.getResult().get();
+            fail("An ExecutionException is expected.");
+        } catch (Throwable e) {
+            final Optional<FlinkException> expectedFlinkException =
+                    ExceptionUtils.findThrowable(e, FlinkException.class);
+            final String expectedMessage =
+                    String.format(
+                            "Inconsistent execution state after stopping with 
savepoint. At least one execution is still in one of the following states: 
RUNNING. A global fail-over is triggered to recover the job %s.",
+                            jobGraph.getJobID());
+            assertTrue(expectedFlinkException.isPresent());
+            assertThat(
+                    expectedFlinkException.get(), 
FlinkMatchers.containsMessage(expectedMessage));
+        }
+
+        // the global fail-over puts all tasks into DEPLOYING state again
+        assertExecutionStates(scheduler, ExecutionState.DEPLOYING);
+
+        // the CheckpointCoordinator should be enabled again
+        assertCheckpointSchedulingEnabled();

Review comment:
       This looks quite complicated for a simple unit test. This is usually a 
sign that the subject under test is not yet super easy to test.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/StopWithSavepointContextTest.java
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.core.testutils.FlinkMatchers;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * {@code StopWithSavepointContextTest} tests the stop-with-savepoint 
functionality of {@link
+ * SchedulerBase#stopWithSavepoint(String, boolean)}.
+ */
+public class StopWithSavepointContextTest extends TestLogger {
+
+    private JobGraph jobGraph;
+    private DefaultScheduler scheduler;
+
+    private StopWithSavepointOperations testInstance;
+
+    @Before
+    public void setup() throws Exception {
+        jobGraph = new JobGraph();
+
+        final JobVertex jobVertex = new JobVertex("vertex #0");
+        jobVertex.setInvokableClass(NoOpInvokable.class);
+        jobGraph.addVertex(jobVertex);
+
+        // checkpointInterval has to be set to a value lower than 
Long.MAX_VALUE to enable
+        // periodic checkpointing - only then can we enable/disable the 
CheckpointCoordinator
+        SchedulerTestingUtils.enablePeriodicCheckpointing(jobGraph, 
Long.MAX_VALUE - 1);
+        scheduler =
+                SchedulerTestingUtils.createSchedulerBuilder(
+                                jobGraph, 
ComponentMainThreadExecutorServiceAdapter.forMainThread())
+                        .setFutureExecutor(new 
DirectScheduledExecutorService())
+                        .build();
+        scheduler.startScheduling();
+
+        // the checkpoint scheduler is stopped before triggering the 
stop-with-savepoint
+        disableCheckpointScheduler();
+
+        testInstance = new StopWithSavepointContext(jobGraph.getJobID(), 
scheduler, this.log);
+    }
+
+    @Test
+    public void testHappyPathWithSavepointCreationBeforeTermination() throws 
Exception {
+        assertHappyPath(
+                (savepointPath) -> {
+                    testInstance.handleSavepointCreation(savepointPath, null);
+                    testInstance.handleExecutionTermination(
+                            
Collections.singletonList(ExecutionState.FINISHED));
+                });
+    }
+
+    @Test
+    public void testHappyPathWithSavepointCreationAfterTermination() throws 
Exception {
+        assertHappyPath(
+                (savepointPath) -> {
+                    testInstance.handleExecutionTermination(
+                            
Collections.singletonList(ExecutionState.FINISHED));
+                    testInstance.handleSavepointCreation(savepointPath, null);
+                });
+    }
+
+    private void assertHappyPath(Consumer<String> stopWithSavepointCompletion) 
throws Exception {
+        final String savepointPath = "savepoint-path";
+
+        stopWithSavepointCompletion.accept(savepointPath);
+
+        assertThat(testInstance.getResult().get(), is(savepointPath));
+
+        // the happy path won't restart the CheckpointCoordinator
+        assertCheckpointSchedulingDisabled();
+    }
+
+    @Test
+    public void testSavepointCreationFailure() {
+        final Exception exception = new Exception("Expected exception during 
savepoint creation.");
+        testInstance.handleSavepointCreation(null, exception);

Review comment:
       What happens if `testInstance.onExecutionsTermination` is called after 
the savepoint has failed (e.g. if an unrelated global failover is triggered at 
a later point in time)?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointContext.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/** {@code StopWithSavepointContext} implements {@link 
StopWithSavepointOperations}. */
+public class StopWithSavepointContext implements StopWithSavepointOperations {
+
+    private final Logger log;
+
+    private final SchedulerBase scheduler;

Review comment:
       Do we really need the `SchedulerBase` as a service or can we use 
interface segregation to limit the required service surface?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointContext.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/** {@code StopWithSavepointContext} implements {@link 
StopWithSavepointOperations}. */
+public class StopWithSavepointContext implements StopWithSavepointOperations {
+
+    private final Logger log;
+
+    private final SchedulerBase scheduler;
+    private final CheckpointCoordinator checkpointCoordinator;

Review comment:
       Same for the `CheckpointCoordinator`. Can't we find a smaller interface 
which we can give to this class? That way we wouldn't have to create a `CC` in 
order to test this class.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointOperations.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@code StopWithSavepointOperations} collects the steps for creating a 
savepoint and waiting for
+ * the job to stop.

Review comment:
       What are the steps?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointContext.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/** {@code StopWithSavepointContext} implements {@link 
StopWithSavepointOperations}. */
+public class StopWithSavepointContext implements StopWithSavepointOperations {
+
+    private final Logger log;
+
+    private final SchedulerBase scheduler;
+    private final CheckpointCoordinator checkpointCoordinator;
+    private final JobID jobId;
+
+    private final CompletableFuture<String> result = new CompletableFuture<>();
+
+    private StopWithSavepointState state = StopWithSavepointState.InitialWait;
+    private String path;
+    private Set<ExecutionState> unfinishedStates;
+
+    public StopWithSavepointContext(JobID jobId, SchedulerBase scheduler, 
Logger log) {
+        this.jobId = jobId;
+        this.scheduler = scheduler;
+        this.checkpointCoordinator = scheduler.getCheckpointCoordinator();
+        this.log = log;
+    }
+
+    @Override
+    public synchronized void handleSavepointCreation(String path, Throwable 
throwable) {
+        final StopWithSavepointState oldState = state;
+        state = state.onSavepointCreation(this, path, throwable);
+
+        log.debug(
+                "Stop-with-savepoint transitioned from {} to {} on savepoint 
creation handling.",
+                oldState,
+                state);
+    }
+
+    @Override
+    public synchronized void handleExecutionTermination(
+            Collection<ExecutionState> executionStates) {
+        final StopWithSavepointState oldState = state;
+        state = state.onExecutionsTermination(this, executionStates);
+
+        log.debug(
+                "Stop-with-savepoint transitioned from {} to {} on execution 
termination handling.",
+                oldState,
+                state);
+    }
+
+    @Override
+    public CompletableFuture<String> getResult() {
+        return result;
+    }
+
+    private StopWithSavepointState terminateExceptionWithGlobalFailover(
+            Iterable<ExecutionState> unfinishedExecutionStates) {
+        String errorMessage =
+                String.format(
+                        "Inconsistent execution state after stopping with 
savepoint. At least one execution is still in one of the following states: %s. 
A global fail-over is triggered to recover the job %s.",
+                        StringUtils.join(unfinishedExecutionStates, ", "), 
jobId);
+        FlinkException inconsistentFinalStateException = new 
FlinkException(errorMessage);
+
+        scheduler.handleGlobalFailure(inconsistentFinalStateException);
+        return terminateExceptionally(inconsistentFinalStateException);
+    }
+
+    private StopWithSavepointState terminateExceptionally(Throwable throwable) 
{
+        scheduler.startCheckpointScheduler(checkpointCoordinator);
+        result.completeExceptionally(throwable);
+
+        return StopWithSavepointState.Final;
+    }
+
+    private StopWithSavepointState terminateSuccessfully(String path) {
+        result.complete(path);
+
+        return StopWithSavepointState.Final;
+    }
+
+    private static Set<ExecutionState> extractUnfinishedStates(
+            Collection<ExecutionState> executionStates) {
+        return executionStates.stream()
+                .filter(state -> state != ExecutionState.FINISHED)
+                .collect(Collectors.toSet());
+    }
+
+    /**
+     * {@code StopWithSavepointState} represents the different states during 
the stop-with-savepoint
+     * operation.
+     *
+     * <p>The state transitions are implemented in the following way: 
InitialWait ->
+     * [WaitForSavepointCreation|WaitForJobTermination] -> Final
+     */
+    private enum StopWithSavepointState {
+        InitialWait {
+            @Override
+            protected StopWithSavepointState handleSavepointCreation(
+                    StopWithSavepointContext context, String path) {
+                context.path = path;
+                return WaitForJobTermination;
+            }
+
+            @Override
+            public StopWithSavepointState onExecutionsTermination(
+                    StopWithSavepointContext context, 
Collection<ExecutionState> executionStates) {
+                context.unfinishedStates = 
extractUnfinishedStates(executionStates);

Review comment:
       Couldn't we already trigger a global failover if 
`unfinishedStates.isEmpty() == false`?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/StopWithSavepointContextTest.java
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.core.testutils.FlinkMatchers;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * {@code StopWithSavepointContextTest} tests the stop-with-savepoint 
functionality of {@link
+ * SchedulerBase#stopWithSavepoint(String, boolean)}.
+ */
+public class StopWithSavepointContextTest extends TestLogger {
+
+    private JobGraph jobGraph;
+    private DefaultScheduler scheduler;
+
+    private StopWithSavepointOperations testInstance;
+
+    @Before
+    public void setup() throws Exception {
+        jobGraph = new JobGraph();
+
+        final JobVertex jobVertex = new JobVertex("vertex #0");
+        jobVertex.setInvokableClass(NoOpInvokable.class);
+        jobGraph.addVertex(jobVertex);
+
+        // checkpointInterval has to be set to a value lower than 
Long.MAX_VALUE to enable
+        // periodic checkpointing - only then can we enable/disable the 
CheckpointCoordinator
+        SchedulerTestingUtils.enablePeriodicCheckpointing(jobGraph, 
Long.MAX_VALUE - 1);
+        scheduler =
+                SchedulerTestingUtils.createSchedulerBuilder(
+                                jobGraph, 
ComponentMainThreadExecutorServiceAdapter.forMainThread())
+                        .setFutureExecutor(new 
DirectScheduledExecutorService())
+                        .build();
+        scheduler.startScheduling();
+
+        // the checkpoint scheduler is stopped before triggering the 
stop-with-savepoint
+        disableCheckpointScheduler();
+
+        testInstance = new StopWithSavepointContext(jobGraph.getJobID(), 
scheduler, this.log);
+    }
+
+    @Test
+    public void testHappyPathWithSavepointCreationBeforeTermination() throws 
Exception {
+        assertHappyPath(
+                (savepointPath) -> {
+                    testInstance.handleSavepointCreation(savepointPath, null);
+                    testInstance.handleExecutionTermination(
+                            
Collections.singletonList(ExecutionState.FINISHED));
+                });
+    }
+
+    @Test
+    public void testHappyPathWithSavepointCreationAfterTermination() throws 
Exception {
+        assertHappyPath(
+                (savepointPath) -> {
+                    testInstance.handleExecutionTermination(
+                            
Collections.singletonList(ExecutionState.FINISHED));
+                    testInstance.handleSavepointCreation(savepointPath, null);
+                });
+    }
+
+    private void assertHappyPath(Consumer<String> stopWithSavepointCompletion) 
throws Exception {
+        final String savepointPath = "savepoint-path";
+
+        stopWithSavepointCompletion.accept(savepointPath);
+
+        assertThat(testInstance.getResult().get(), is(savepointPath));
+
+        // the happy path won't restart the CheckpointCoordinator
+        assertCheckpointSchedulingDisabled();
+    }
+
+    @Test
+    public void testSavepointCreationFailure() {
+        final Exception exception = new Exception("Expected exception during 
savepoint creation.");
+        testInstance.handleSavepointCreation(null, exception);
+
+        try {
+            testInstance.getResult().get();
+            fail("An ExecutionException is expected.");
+        } catch (Throwable e) {
+            final Optional<Throwable> actualException =
+                    ExceptionUtils.findThrowableWithMessage(e, 
exception.getMessage());
+            assertTrue(actualException.isPresent());
+        }
+
+        assertCheckpointSchedulingEnabled();
+    }
+
+    @Test
+    public void testNoTerminationHandlingAfterSavepointCompletion() throws 
Exception {
+        assertNoTerminationHandling(
+                () -> {
+                    testInstance.handleSavepointCreation("savepoint-path", 
null);
+                    testInstance.handleExecutionTermination(
+                            // the task failed and was restarted
+                            Collections.singletonList(ExecutionState.RUNNING));
+                });
+    }
+
+    @Test
+    public void testNoTerminationHandlingBeforeSavepointCompletion() throws 
Exception {
+        assertNoTerminationHandling(
+                () -> {
+                    testInstance.handleExecutionTermination(
+                            // the task failed and was restarted
+                            Collections.singletonList(ExecutionState.RUNNING));
+                    testInstance.handleSavepointCreation("savepoint-path", 
null);
+                });
+    }

Review comment:
       What happens if `handleSavepointCreation(null, throwable)`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointContext.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/** {@code StopWithSavepointContext} implements {@link 
StopWithSavepointOperations}. */
+public class StopWithSavepointContext implements StopWithSavepointOperations {
+
+    private final Logger log;
+
+    private final SchedulerBase scheduler;
+    private final CheckpointCoordinator checkpointCoordinator;
+    private final JobID jobId;
+
+    private final CompletableFuture<String> result = new CompletableFuture<>();
+
+    private StopWithSavepointState state = StopWithSavepointState.InitialWait;
+    private String path;
+    private Set<ExecutionState> unfinishedStates;
+
+    public StopWithSavepointContext(JobID jobId, SchedulerBase scheduler, 
Logger log) {
+        this.jobId = jobId;
+        this.scheduler = scheduler;
+        this.checkpointCoordinator = scheduler.getCheckpointCoordinator();
+        this.log = log;
+    }
+
+    @Override
+    public synchronized void handleSavepointCreation(String path, Throwable 
throwable) {
+        final StopWithSavepointState oldState = state;
+        state = state.onSavepointCreation(this, path, throwable);
+
+        log.debug(
+                "Stop-with-savepoint transitioned from {} to {} on savepoint 
creation handling.",
+                oldState,
+                state);
+    }
+
+    @Override
+    public synchronized void handleExecutionTermination(
+            Collection<ExecutionState> executionStates) {
+        final StopWithSavepointState oldState = state;
+        state = state.onExecutionsTermination(this, executionStates);
+
+        log.debug(
+                "Stop-with-savepoint transitioned from {} to {} on execution 
termination handling.",
+                oldState,
+                state);
+    }
+
+    @Override
+    public CompletableFuture<String> getResult() {
+        return result;
+    }
+
+    private StopWithSavepointState terminateExceptionWithGlobalFailover(
+            Iterable<ExecutionState> unfinishedExecutionStates) {
+        String errorMessage =
+                String.format(
+                        "Inconsistent execution state after stopping with 
savepoint. At least one execution is still in one of the following states: %s. 
A global fail-over is triggered to recover the job %s.",
+                        StringUtils.join(unfinishedExecutionStates, ", "), 
jobId);
+        FlinkException inconsistentFinalStateException = new 
FlinkException(errorMessage);
+
+        scheduler.handleGlobalFailure(inconsistentFinalStateException);
+        return terminateExceptionally(inconsistentFinalStateException);
+    }
+
+    private StopWithSavepointState terminateExceptionally(Throwable throwable) 
{
+        scheduler.startCheckpointScheduler(checkpointCoordinator);
+        result.completeExceptionally(throwable);
+
+        return StopWithSavepointState.Final;
+    }
+
+    private StopWithSavepointState terminateSuccessfully(String path) {
+        result.complete(path);
+
+        return StopWithSavepointState.Final;
+    }
+
+    private static Set<ExecutionState> extractUnfinishedStates(
+            Collection<ExecutionState> executionStates) {
+        return executionStates.stream()
+                .filter(state -> state != ExecutionState.FINISHED)
+                .collect(Collectors.toSet());
+    }
+
+    /**
+     * {@code StopWithSavepointState} represents the different states during 
the stop-with-savepoint
+     * operation.
+     *
+     * <p>The state transitions are implemented in the following way: 
InitialWait ->
+     * [WaitForSavepointCreation|WaitForJobTermination] -> Final
+     */
+    private enum StopWithSavepointState {
+        InitialWait {
+            @Override
+            protected StopWithSavepointState handleSavepointCreation(
+                    StopWithSavepointContext context, String path) {
+                context.path = path;
+                return WaitForJobTermination;
+            }
+
+            @Override
+            public StopWithSavepointState onExecutionsTermination(
+                    StopWithSavepointContext context, 
Collection<ExecutionState> executionStates) {
+                context.unfinishedStates = 
extractUnfinishedStates(executionStates);
+                return WaitForSavepointCreation;
+            }
+        },
+        WaitForSavepointCreation {
+            @Override
+            public StopWithSavepointState handleSavepointCreation(
+                    StopWithSavepointContext context, String path) {
+                Preconditions.checkState(
+                        context.unfinishedStates != null,
+                        InitialWait + " should have preceded: No 
unfinishedStates is set.");
+
+                if (!context.unfinishedStates.isEmpty()) {
+                    return 
context.terminateExceptionWithGlobalFailover(context.unfinishedStates);

Review comment:
       `terminateExceptionallyWithGlobalFailover`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointOperations.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@code StopWithSavepointOperations} collects the steps for creating a 
savepoint and waiting for
+ * the job to stop.
+ */
+public interface StopWithSavepointOperations {
+
+    /**
+     * Handles the Savepoint creation termination.
+     *
+     * @param path the path to the newly created savepoint.
+     * @param throwable the {@code Throwable} in case of failure.

Review comment:
       What is the contract between `path` and `throwable`?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/StopWithSavepointContextTest.java
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.core.testutils.FlinkMatchers;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * {@code StopWithSavepointContextTest} tests the stop-with-savepoint 
functionality of {@link
+ * SchedulerBase#stopWithSavepoint(String, boolean)}.
+ */
+public class StopWithSavepointContextTest extends TestLogger {
+
+    private JobGraph jobGraph;
+    private DefaultScheduler scheduler;
+
+    private StopWithSavepointOperations testInstance;
+
+    @Before
+    public void setup() throws Exception {
+        jobGraph = new JobGraph();
+
+        final JobVertex jobVertex = new JobVertex("vertex #0");
+        jobVertex.setInvokableClass(NoOpInvokable.class);
+        jobGraph.addVertex(jobVertex);
+
+        // checkpointInterval has to be set to a value lower than 
Long.MAX_VALUE to enable
+        // periodic checkpointing - only then can we enable/disable the 
CheckpointCoordinator
+        SchedulerTestingUtils.enablePeriodicCheckpointing(jobGraph, 
Long.MAX_VALUE - 1);
+        scheduler =
+                SchedulerTestingUtils.createSchedulerBuilder(
+                                jobGraph, 
ComponentMainThreadExecutorServiceAdapter.forMainThread())
+                        .setFutureExecutor(new 
DirectScheduledExecutorService())
+                        .build();
+        scheduler.startScheduling();
+
+        // the checkpoint scheduler is stopped before triggering the 
stop-with-savepoint
+        disableCheckpointScheduler();
+
+        testInstance = new StopWithSavepointContext(jobGraph.getJobID(), 
scheduler, this.log);

Review comment:
       This looks quite involved for a simple unit test. This could be an 
indicator that `StopWithSavepointContext` is not super easy to test yet.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointOperations.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * {@code StopWithSavepointOperations} collects the steps for creating a 
savepoint and waiting for
+ * the job to stop.
+ */
+public interface StopWithSavepointOperations {
+
+    /**
+     * Handles the Savepoint creation termination.
+     *
+     * @param path the path to the newly created savepoint.
+     * @param throwable the {@code Throwable} in case of failure.

Review comment:
       Maybe it is a bit easier to have `onSavepointSuccess(String path)` and 
`onSavepointFailure(Throwable throwable)`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointContext.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/** {@code StopWithSavepointContext} implements {@link 
StopWithSavepointOperations}. */
+public class StopWithSavepointContext implements StopWithSavepointOperations {
+
+    private final Logger log;
+
+    private final SchedulerBase scheduler;
+    private final CheckpointCoordinator checkpointCoordinator;
+    private final JobID jobId;
+
+    private final CompletableFuture<String> result = new CompletableFuture<>();
+
+    private StopWithSavepointState state = StopWithSavepointState.InitialWait;
+    private String path;
+    private Set<ExecutionState> unfinishedStates;
+
+    public StopWithSavepointContext(JobID jobId, SchedulerBase scheduler, 
Logger log) {
+        this.jobId = jobId;
+        this.scheduler = scheduler;
+        this.checkpointCoordinator = scheduler.getCheckpointCoordinator();
+        this.log = log;
+    }
+
+    @Override
+    public synchronized void handleSavepointCreation(String path, Throwable 
throwable) {
+        final StopWithSavepointState oldState = state;
+        state = state.onSavepointCreation(this, path, throwable);
+
+        log.debug(
+                "Stop-with-savepoint transitioned from {} to {} on savepoint 
creation handling.",
+                oldState,
+                state);
+    }
+
+    @Override
+    public synchronized void handleExecutionTermination(
+            Collection<ExecutionState> executionStates) {
+        final StopWithSavepointState oldState = state;
+        state = state.onExecutionsTermination(this, executionStates);
+
+        log.debug(
+                "Stop-with-savepoint transitioned from {} to {} on execution 
termination handling.",
+                oldState,
+                state);
+    }
+
+    @Override
+    public CompletableFuture<String> getResult() {
+        return result;
+    }
+
+    private StopWithSavepointState terminateExceptionWithGlobalFailover(
+            Iterable<ExecutionState> unfinishedExecutionStates) {
+        String errorMessage =
+                String.format(
+                        "Inconsistent execution state after stopping with 
savepoint. At least one execution is still in one of the following states: %s. 
A global fail-over is triggered to recover the job %s.",
+                        StringUtils.join(unfinishedExecutionStates, ", "), 
jobId);
+        FlinkException inconsistentFinalStateException = new 
FlinkException(errorMessage);
+
+        scheduler.handleGlobalFailure(inconsistentFinalStateException);
+        return terminateExceptionally(inconsistentFinalStateException);
+    }
+
+    private StopWithSavepointState terminateExceptionally(Throwable throwable) 
{
+        scheduler.startCheckpointScheduler(checkpointCoordinator);
+        result.completeExceptionally(throwable);
+
+        return StopWithSavepointState.Final;
+    }
+
+    private StopWithSavepointState terminateSuccessfully(String path) {
+        result.complete(path);
+
+        return StopWithSavepointState.Final;
+    }
+
+    private static Set<ExecutionState> extractUnfinishedStates(
+            Collection<ExecutionState> executionStates) {
+        return executionStates.stream()
+                .filter(state -> state != ExecutionState.FINISHED)
+                .collect(Collectors.toSet());
+    }
+
+    /**
+     * {@code StopWithSavepointState} represents the different states during 
the stop-with-savepoint
+     * operation.
+     *
+     * <p>The state transitions are implemented in the following way: 
InitialWait ->
+     * [WaitForSavepointCreation|WaitForJobTermination] -> Final
+     */
+    private enum StopWithSavepointState {
+        InitialWait {
+            @Override
+            protected StopWithSavepointState handleSavepointCreation(
+                    StopWithSavepointContext context, String path) {
+                context.path = path;
+                return WaitForJobTermination;
+            }
+
+            @Override
+            public StopWithSavepointState onExecutionsTermination(
+                    StopWithSavepointContext context, 
Collection<ExecutionState> executionStates) {
+                context.unfinishedStates = 
extractUnfinishedStates(executionStates);
+                return WaitForSavepointCreation;
+            }
+        },
+        WaitForSavepointCreation {
+            @Override
+            public StopWithSavepointState handleSavepointCreation(
+                    StopWithSavepointContext context, String path) {
+                Preconditions.checkState(
+                        context.unfinishedStates != null,
+                        InitialWait + " should have preceded: No 
unfinishedStates is set.");
+
+                if (!context.unfinishedStates.isEmpty()) {
+                    return 
context.terminateExceptionWithGlobalFailover(context.unfinishedStates);
+                }
+
+                return context.terminateSuccessfully(path);
+            }
+        },
+        WaitForJobTermination {
+            @Override
+            public StopWithSavepointState onExecutionsTermination(
+                    StopWithSavepointContext context, 
Collection<ExecutionState> executionStates) {
+                Preconditions.checkState(
+                        context.path != null,
+                        InitialWait + " should have preceded: No path is 
set.");
+
+                Collection<ExecutionState> unfinishedExecutionStates =
+                        extractUnfinishedStates(executionStates);
+                if (!unfinishedExecutionStates.isEmpty()) {
+                    return 
context.terminateExceptionWithGlobalFailover(unfinishedExecutionStates);
+                }
+
+                return context.terminateSuccessfully(context.path);
+            }
+        },
+        Final;
+
+        public StopWithSavepointState onSavepointCreation(
+                StopWithSavepointContext context, String path, Throwable 
throwable) {
+            if (throwable != null) {
+                return context.terminateExceptionally(throwable);
+            }
+
+            return handleSavepointCreation(context, path);
+        }
+
+        protected StopWithSavepointState handleSavepointCreation(
+                StopWithSavepointContext context, String path) {
+            throw new IllegalStateException(
+                    "No onSavepointCreation should have been called in " + 
this.name() + " state.");
+        }
+
+        public StopWithSavepointState onExecutionsTermination(
+                StopWithSavepointContext context, Collection<ExecutionState> 
executionStates) {
+            throw new IllegalStateException(
+                    "No onExecutionsTermination should have been called in "
+                            + this.name()
+                            + " state.");
+        }

Review comment:
       I think this does not work because a savepoint can fail and a later 
point in time `onExecutionsTermination` could be called (e.g. if an unrelated 
global failover is triggered).

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointContext.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/** {@code StopWithSavepointContext} implements {@link 
StopWithSavepointOperations}. */
+public class StopWithSavepointContext implements StopWithSavepointOperations {
+
+    private final Logger log;
+
+    private final SchedulerBase scheduler;
+    private final CheckpointCoordinator checkpointCoordinator;
+    private final JobID jobId;
+
+    private final CompletableFuture<String> result = new CompletableFuture<>();
+
+    private StopWithSavepointState state = StopWithSavepointState.InitialWait;
+    private String path;
+    private Set<ExecutionState> unfinishedStates;
+
+    public StopWithSavepointContext(JobID jobId, SchedulerBase scheduler, 
Logger log) {
+        this.jobId = jobId;
+        this.scheduler = scheduler;
+        this.checkpointCoordinator = scheduler.getCheckpointCoordinator();
+        this.log = log;
+    }
+
+    @Override
+    public synchronized void handleSavepointCreation(String path, Throwable 
throwable) {
+        final StopWithSavepointState oldState = state;
+        state = state.onSavepointCreation(this, path, throwable);
+
+        log.debug(
+                "Stop-with-savepoint transitioned from {} to {} on savepoint 
creation handling.",
+                oldState,
+                state);
+    }
+
+    @Override
+    public synchronized void handleExecutionTermination(
+            Collection<ExecutionState> executionStates) {
+        final StopWithSavepointState oldState = state;
+        state = state.onExecutionsTermination(this, executionStates);
+
+        log.debug(
+                "Stop-with-savepoint transitioned from {} to {} on execution 
termination handling.",
+                oldState,
+                state);
+    }
+
+    @Override
+    public CompletableFuture<String> getResult() {
+        return result;
+    }
+
+    private StopWithSavepointState terminateExceptionWithGlobalFailover(
+            Iterable<ExecutionState> unfinishedExecutionStates) {
+        String errorMessage =
+                String.format(
+                        "Inconsistent execution state after stopping with 
savepoint. At least one execution is still in one of the following states: %s. 
A global fail-over is triggered to recover the job %s.",
+                        StringUtils.join(unfinishedExecutionStates, ", "), 
jobId);
+        FlinkException inconsistentFinalStateException = new 
FlinkException(errorMessage);
+
+        scheduler.handleGlobalFailure(inconsistentFinalStateException);
+        return terminateExceptionally(inconsistentFinalStateException);
+    }
+
+    private StopWithSavepointState terminateExceptionally(Throwable throwable) 
{
+        scheduler.startCheckpointScheduler(checkpointCoordinator);
+        result.completeExceptionally(throwable);
+
+        return StopWithSavepointState.Final;
+    }
+
+    private StopWithSavepointState terminateSuccessfully(String path) {
+        result.complete(path);
+
+        return StopWithSavepointState.Final;
+    }
+
+    private static Set<ExecutionState> extractUnfinishedStates(
+            Collection<ExecutionState> executionStates) {
+        return executionStates.stream()
+                .filter(state -> state != ExecutionState.FINISHED)
+                .collect(Collectors.toSet());
+    }
+
+    /**
+     * {@code StopWithSavepointState} represents the different states during 
the stop-with-savepoint
+     * operation.
+     *
+     * <p>The state transitions are implemented in the following way: 
InitialWait ->
+     * [WaitForSavepointCreation|WaitForJobTermination] -> Final
+     */
+    private enum StopWithSavepointState {
+        InitialWait {
+            @Override
+            protected StopWithSavepointState handleSavepointCreation(
+                    StopWithSavepointContext context, String path) {
+                context.path = path;
+                return WaitForJobTermination;
+            }
+
+            @Override
+            public StopWithSavepointState onExecutionsTermination(
+                    StopWithSavepointContext context, 
Collection<ExecutionState> executionStates) {
+                context.unfinishedStates = 
extractUnfinishedStates(executionStates);
+                return WaitForSavepointCreation;
+            }
+        },
+        WaitForSavepointCreation {
+            @Override
+            public StopWithSavepointState handleSavepointCreation(
+                    StopWithSavepointContext context, String path) {
+                Preconditions.checkState(
+                        context.unfinishedStates != null,
+                        InitialWait + " should have preceded: No 
unfinishedStates is set.");
+
+                if (!context.unfinishedStates.isEmpty()) {
+                    return 
context.terminateExceptionWithGlobalFailover(context.unfinishedStates);
+                }
+
+                return context.terminateSuccessfully(path);
+            }
+        },
+        WaitForJobTermination {
+            @Override
+            public StopWithSavepointState onExecutionsTermination(
+                    StopWithSavepointContext context, 
Collection<ExecutionState> executionStates) {

Review comment:
       I am wondering whether the `context` really helps in the overall design. 
It seems that we use the context for storing state and to trigger some methods. 
Making the state an inner class could solve the latter problem. Using classes 
instead of enums could then allow us to store the state with the state instance 
instead of some fields in the outer class which are never used in the outer 
context.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointContext.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/** {@code StopWithSavepointContext} implements {@link 
StopWithSavepointOperations}. */
+public class StopWithSavepointContext implements StopWithSavepointOperations {
+
+    private final Logger log;
+
+    private final SchedulerBase scheduler;
+    private final CheckpointCoordinator checkpointCoordinator;
+    private final JobID jobId;
+
+    private final CompletableFuture<String> result = new CompletableFuture<>();
+
+    private StopWithSavepointState state = StopWithSavepointState.InitialWait;
+    private String path;
+    private Set<ExecutionState> unfinishedStates;
+
+    public StopWithSavepointContext(JobID jobId, SchedulerBase scheduler, 
Logger log) {
+        this.jobId = jobId;
+        this.scheduler = scheduler;
+        this.checkpointCoordinator = scheduler.getCheckpointCoordinator();
+        this.log = log;
+    }
+
+    @Override
+    public synchronized void handleSavepointCreation(String path, Throwable 
throwable) {
+        final StopWithSavepointState oldState = state;
+        state = state.onSavepointCreation(this, path, throwable);
+
+        log.debug(
+                "Stop-with-savepoint transitioned from {} to {} on savepoint 
creation handling.",
+                oldState,
+                state);
+    }
+
+    @Override
+    public synchronized void handleExecutionTermination(
+            Collection<ExecutionState> executionStates) {
+        final StopWithSavepointState oldState = state;
+        state = state.onExecutionsTermination(this, executionStates);
+
+        log.debug(
+                "Stop-with-savepoint transitioned from {} to {} on execution 
termination handling.",
+                oldState,
+                state);
+    }
+
+    @Override
+    public CompletableFuture<String> getResult() {
+        return result;
+    }
+
+    private StopWithSavepointState terminateExceptionWithGlobalFailover(
+            Iterable<ExecutionState> unfinishedExecutionStates) {
+        String errorMessage =
+                String.format(
+                        "Inconsistent execution state after stopping with 
savepoint. At least one execution is still in one of the following states: %s. 
A global fail-over is triggered to recover the job %s.",
+                        StringUtils.join(unfinishedExecutionStates, ", "), 
jobId);
+        FlinkException inconsistentFinalStateException = new 
FlinkException(errorMessage);
+
+        scheduler.handleGlobalFailure(inconsistentFinalStateException);
+        return terminateExceptionally(inconsistentFinalStateException);
+    }
+
+    private StopWithSavepointState terminateExceptionally(Throwable throwable) 
{
+        scheduler.startCheckpointScheduler(checkpointCoordinator);
+        result.completeExceptionally(throwable);
+
+        return StopWithSavepointState.Final;
+    }
+
+    private StopWithSavepointState terminateSuccessfully(String path) {
+        result.complete(path);
+
+        return StopWithSavepointState.Final;
+    }
+
+    private static Set<ExecutionState> extractUnfinishedStates(
+            Collection<ExecutionState> executionStates) {
+        return executionStates.stream()
+                .filter(state -> state != ExecutionState.FINISHED)
+                .collect(Collectors.toSet());
+    }
+
+    /**
+     * {@code StopWithSavepointState} represents the different states during 
the stop-with-savepoint
+     * operation.
+     *
+     * <p>The state transitions are implemented in the following way: 
InitialWait ->
+     * [WaitForSavepointCreation|WaitForJobTermination] -> Final
+     */
+    private enum StopWithSavepointState {
+        InitialWait {
+            @Override
+            protected StopWithSavepointState handleSavepointCreation(
+                    StopWithSavepointContext context, String path) {
+                context.path = path;
+                return WaitForJobTermination;
+            }
+
+            @Override
+            public StopWithSavepointState onExecutionsTermination(
+                    StopWithSavepointContext context, 
Collection<ExecutionState> executionStates) {
+                context.unfinishedStates = 
extractUnfinishedStates(executionStates);
+                return WaitForSavepointCreation;
+            }
+        },
+        WaitForSavepointCreation {
+            @Override
+            public StopWithSavepointState handleSavepointCreation(
+                    StopWithSavepointContext context, String path) {
+                Preconditions.checkState(
+                        context.unfinishedStates != null,
+                        InitialWait + " should have preceded: No 
unfinishedStates is set.");
+
+                if (!context.unfinishedStates.isEmpty()) {
+                    return 
context.terminateExceptionWithGlobalFailover(context.unfinishedStates);
+                }
+
+                return context.terminateSuccessfully(path);
+            }
+        },
+        WaitForJobTermination {
+            @Override
+            public StopWithSavepointState onExecutionsTermination(
+                    StopWithSavepointContext context, 
Collection<ExecutionState> executionStates) {
+                Preconditions.checkState(
+                        context.path != null,
+                        InitialWait + " should have preceded: No path is 
set.");
+
+                Collection<ExecutionState> unfinishedExecutionStates =
+                        extractUnfinishedStates(executionStates);
+                if (!unfinishedExecutionStates.isEmpty()) {
+                    return 
context.terminateExceptionWithGlobalFailover(unfinishedExecutionStates);
+                }
+
+                return context.terminateSuccessfully(context.path);
+            }
+        },
+        Final;
+
+        public StopWithSavepointState onSavepointCreation(
+                StopWithSavepointContext context, String path, Throwable 
throwable) {
+            if (throwable != null) {
+                return context.terminateExceptionally(throwable);
+            }
+
+            return handleSavepointCreation(context, path);
+        }

Review comment:
       I think this logic is not correct. If we reach 
`WaitForSavepointCreation` first, then we want to restart the job globally if 
there was a savepoint failure or any of the `Executions` is not in `FINISHED`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointContext.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/** {@code StopWithSavepointContext} implements {@link 
StopWithSavepointOperations}. */
+public class StopWithSavepointContext implements StopWithSavepointOperations {
+
+    private final Logger log;
+
+    private final SchedulerBase scheduler;
+    private final CheckpointCoordinator checkpointCoordinator;
+    private final JobID jobId;
+
+    private final CompletableFuture<String> result = new CompletableFuture<>();
+
+    private StopWithSavepointState state = StopWithSavepointState.InitialWait;
+    private String path;
+    private Set<ExecutionState> unfinishedStates;
+
+    public StopWithSavepointContext(JobID jobId, SchedulerBase scheduler, 
Logger log) {
+        this.jobId = jobId;
+        this.scheduler = scheduler;
+        this.checkpointCoordinator = scheduler.getCheckpointCoordinator();
+        this.log = log;
+    }
+
+    @Override
+    public synchronized void handleSavepointCreation(String path, Throwable 
throwable) {
+        final StopWithSavepointState oldState = state;
+        state = state.onSavepointCreation(this, path, throwable);
+
+        log.debug(
+                "Stop-with-savepoint transitioned from {} to {} on savepoint 
creation handling.",
+                oldState,
+                state);
+    }
+
+    @Override
+    public synchronized void handleExecutionTermination(
+            Collection<ExecutionState> executionStates) {
+        final StopWithSavepointState oldState = state;
+        state = state.onExecutionsTermination(this, executionStates);
+
+        log.debug(
+                "Stop-with-savepoint transitioned from {} to {} on execution 
termination handling.",
+                oldState,
+                state);
+    }
+
+    @Override
+    public CompletableFuture<String> getResult() {
+        return result;
+    }
+
+    private StopWithSavepointState terminateExceptionWithGlobalFailover(
+            Iterable<ExecutionState> unfinishedExecutionStates) {
+        String errorMessage =
+                String.format(
+                        "Inconsistent execution state after stopping with 
savepoint. At least one execution is still in one of the following states: %s. 
A global fail-over is triggered to recover the job %s.",
+                        StringUtils.join(unfinishedExecutionStates, ", "), 
jobId);
+        FlinkException inconsistentFinalStateException = new 
FlinkException(errorMessage);
+
+        scheduler.handleGlobalFailure(inconsistentFinalStateException);
+        return terminateExceptionally(inconsistentFinalStateException);
+    }
+
+    private StopWithSavepointState terminateExceptionally(Throwable throwable) 
{
+        scheduler.startCheckpointScheduler(checkpointCoordinator);
+        result.completeExceptionally(throwable);
+
+        return StopWithSavepointState.Final;
+    }
+
+    private StopWithSavepointState terminateSuccessfully(String path) {
+        result.complete(path);
+
+        return StopWithSavepointState.Final;
+    }
+
+    private static Set<ExecutionState> extractUnfinishedStates(
+            Collection<ExecutionState> executionStates) {
+        return executionStates.stream()
+                .filter(state -> state != ExecutionState.FINISHED)
+                .collect(Collectors.toSet());
+    }
+
+    /**
+     * {@code StopWithSavepointState} represents the different states during 
the stop-with-savepoint
+     * operation.
+     *
+     * <p>The state transitions are implemented in the following way: 
InitialWait ->
+     * [WaitForSavepointCreation|WaitForJobTermination] -> Final
+     */
+    private enum StopWithSavepointState {
+        InitialWait {
+            @Override
+            protected StopWithSavepointState handleSavepointCreation(
+                    StopWithSavepointContext context, String path) {
+                context.path = path;
+                return WaitForJobTermination;
+            }
+
+            @Override
+            public StopWithSavepointState onExecutionsTermination(
+                    StopWithSavepointContext context, 
Collection<ExecutionState> executionStates) {
+                context.unfinishedStates = 
extractUnfinishedStates(executionStates);
+                return WaitForSavepointCreation;
+            }
+        },
+        WaitForSavepointCreation {
+            @Override
+            public StopWithSavepointState handleSavepointCreation(
+                    StopWithSavepointContext context, String path) {
+                Preconditions.checkState(
+                        context.unfinishedStates != null,
+                        InitialWait + " should have preceded: No 
unfinishedStates is set.");
+
+                if (!context.unfinishedStates.isEmpty()) {
+                    return 
context.terminateExceptionWithGlobalFailover(context.unfinishedStates);
+                }
+
+                return context.terminateSuccessfully(path);
+            }
+        },
+        WaitForJobTermination {
+            @Override
+            public StopWithSavepointState onExecutionsTermination(
+                    StopWithSavepointContext context, 
Collection<ExecutionState> executionStates) {
+                Preconditions.checkState(
+                        context.path != null,
+                        InitialWait + " should have preceded: No path is 
set.");
+
+                Collection<ExecutionState> unfinishedExecutionStates =
+                        extractUnfinishedStates(executionStates);
+                if (!unfinishedExecutionStates.isEmpty()) {
+                    return 
context.terminateExceptionWithGlobalFailover(unfinishedExecutionStates);
+                }
+
+                return context.terminateSuccessfully(context.path);
+            }
+        },
+        Final;
+
+        public StopWithSavepointState onSavepointCreation(
+                StopWithSavepointContext context, String path, Throwable 
throwable) {
+            if (throwable != null) {
+                return context.terminateExceptionally(throwable);
+            }
+
+            return handleSavepointCreation(context, path);
+        }

Review comment:
       Alternatively, we should enforce that we must stick to the order 
`InitialWait -> WaitForJobCompletion -> Final`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to