tillrohrmann commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r579205110



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointScheduling.java
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+/**
+ * {@code CheckpointScheduling} provides methods for starting and stopping the 
periodic scheduling.

Review comment:
       ```suggestion
    * {@code CheckpointScheduling} provides methods for starting and stopping 
the periodic scheduling of checkpoints.
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImpl.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@code StopWithSavepointTerminationHandlerImpl} implements {@link
+ * StopWithSavepointTerminationHandler}.
+ *
+ * <p>The operation only succeeds if both steps, the savepoint creation and 
the successful
+ * termination of the job, succeed. If the former step fails, the operation 
fails exceptionally
+ * without any further actions. If the latter one fails, a global fail-over is 
triggered before
+ * failing the operation.
+ */
+public class StopWithSavepointTerminationHandlerImpl
+        implements StopWithSavepointTerminationHandler {
+
+    private final Logger log;
+
+    private final SchedulerNG scheduler;
+    private final CheckpointScheduling checkpointScheduling;
+    private final JobID jobId;
+
+    private final CompletableFuture<String> result = new CompletableFuture<>();
+
+    private State state = new WaitingForSavepoint();
+
+    public <S extends SchedulerNG & CheckpointScheduling> 
StopWithSavepointTerminationHandlerImpl(
+            @Nonnull JobID jobId, @Nonnull S schedulerWithCheckpointing, 
@Nonnull Logger log) {

Review comment:
       The convention in Flink is that everything which is not annotated with 
`@Nullable` is considered to be non-null: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#nullability-of-the-mutable-parts.
 Hence, `@Nonnull` should not be necessary here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImpl.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@code StopWithSavepointTerminationHandlerImpl} implements {@link
+ * StopWithSavepointTerminationHandler}.
+ *
+ * <p>The operation only succeeds if both steps, the savepoint creation and 
the successful
+ * termination of the job, succeed. If the former step fails, the operation 
fails exceptionally
+ * without any further actions. If the latter one fails, a global fail-over is 
triggered before
+ * failing the operation.
+ */
+public class StopWithSavepointTerminationHandlerImpl
+        implements StopWithSavepointTerminationHandler {
+
+    private final Logger log;
+
+    private final SchedulerNG scheduler;
+    private final CheckpointScheduling checkpointScheduling;
+    private final JobID jobId;
+
+    private final CompletableFuture<String> result = new CompletableFuture<>();
+
+    private State state = new WaitingForSavepoint();
+
+    public <S extends SchedulerNG & CheckpointScheduling> 
StopWithSavepointTerminationHandlerImpl(
+            @Nonnull JobID jobId, @Nonnull S schedulerWithCheckpointing, 
@Nonnull Logger log) {
+        this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, 
log);
+    }
+
+    @VisibleForTesting
+    StopWithSavepointTerminationHandlerImpl(
+            @Nonnull JobID jobId,
+            @Nonnull SchedulerNG scheduler,
+            @Nonnull CheckpointScheduling checkpointScheduling,
+            @Nonnull Logger log) {
+        this.jobId = checkNotNull(jobId);
+        this.scheduler = checkNotNull(scheduler);
+        this.checkpointScheduling = checkNotNull(checkpointScheduling);
+        this.log = checkNotNull(log);
+    }
+
+    @Override
+    public CompletableFuture<String> handlesStopWithSavepointTermination(
+            CompletableFuture<CompletedCheckpoint> completedSavepointFuture,
+            CompletableFuture<Collection<ExecutionState>> 
terminatedExecutionsFuture,
+            ComponentMainThreadExecutor mainThreadExecutor) {
+        completedSavepointFuture
+                .whenCompleteAsync(
+                        (completedSavepoint, throwable) -> {
+                            if (throwable != null) {
+                                handleSavepointCreationFailure(throwable);
+                            } else {
+                                handleSavepointCreation(completedSavepoint);
+                            }
+                        },
+                        mainThreadExecutor)
+                .thenCompose(
+                        aVoid ->
+                                terminatedExecutionsFuture.thenAcceptAsync(
+                                        this::handleExecutionsTermination, 
mainThreadExecutor));

Review comment:
       I think this logic does not really belong to this class. If the class 
states that one first needs to signal the savepoint status and then only the 
termination status, then it should be good enough. Moreover, by not doing this 
part in this class, the class does not need to know about the concept of a main 
thread.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImpl.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@code StopWithSavepointTerminationHandlerImpl} implements {@link
+ * StopWithSavepointTerminationHandler}.
+ *
+ * <p>The operation only succeeds if both steps, the savepoint creation and 
the successful
+ * termination of the job, succeed. If the former step fails, the operation 
fails exceptionally
+ * without any further actions. If the latter one fails, a global fail-over is 
triggered before
+ * failing the operation.
+ */
+public class StopWithSavepointTerminationHandlerImpl
+        implements StopWithSavepointTerminationHandler {
+
+    private final Logger log;
+
+    private final SchedulerNG scheduler;
+    private final CheckpointScheduling checkpointScheduling;
+    private final JobID jobId;
+
+    private final CompletableFuture<String> result = new CompletableFuture<>();
+
+    private State state = new WaitingForSavepoint();
+
+    public <S extends SchedulerNG & CheckpointScheduling> 
StopWithSavepointTerminationHandlerImpl(
+            @Nonnull JobID jobId, @Nonnull S schedulerWithCheckpointing, 
@Nonnull Logger log) {
+        this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, 
log);
+    }
+
+    @VisibleForTesting
+    StopWithSavepointTerminationHandlerImpl(
+            @Nonnull JobID jobId,
+            @Nonnull SchedulerNG scheduler,
+            @Nonnull CheckpointScheduling checkpointScheduling,
+            @Nonnull Logger log) {
+        this.jobId = checkNotNull(jobId);
+        this.scheduler = checkNotNull(scheduler);
+        this.checkpointScheduling = checkNotNull(checkpointScheduling);
+        this.log = checkNotNull(log);
+    }
+
+    @Override
+    public CompletableFuture<String> handlesStopWithSavepointTermination(
+            CompletableFuture<CompletedCheckpoint> completedSavepointFuture,
+            CompletableFuture<Collection<ExecutionState>> 
terminatedExecutionsFuture,
+            ComponentMainThreadExecutor mainThreadExecutor) {
+        completedSavepointFuture
+                .whenCompleteAsync(
+                        (completedSavepoint, throwable) -> {
+                            if (throwable != null) {
+                                handleSavepointCreationFailure(throwable);
+                            } else {
+                                handleSavepointCreation(completedSavepoint);
+                            }
+                        },
+                        mainThreadExecutor)
+                .thenCompose(
+                        aVoid ->
+                                terminatedExecutionsFuture.thenAcceptAsync(

Review comment:
       Why is `async` here required?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImpl.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@code StopWithSavepointTerminationHandlerImpl} implements {@link
+ * StopWithSavepointTerminationHandler}.
+ *
+ * <p>The operation only succeeds if both steps, the savepoint creation and 
the successful
+ * termination of the job, succeed. If the former step fails, the operation 
fails exceptionally
+ * without any further actions. If the latter one fails, a global fail-over is 
triggered before
+ * failing the operation.
+ */
+public class StopWithSavepointTerminationHandlerImpl
+        implements StopWithSavepointTerminationHandler {
+
+    private final Logger log;
+
+    private final SchedulerNG scheduler;
+    private final CheckpointScheduling checkpointScheduling;
+    private final JobID jobId;
+
+    private final CompletableFuture<String> result = new CompletableFuture<>();
+
+    private State state = new WaitingForSavepoint();
+
+    public <S extends SchedulerNG & CheckpointScheduling> 
StopWithSavepointTerminationHandlerImpl(
+            @Nonnull JobID jobId, @Nonnull S schedulerWithCheckpointing, 
@Nonnull Logger log) {
+        this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, 
log);
+    }
+
+    @VisibleForTesting
+    StopWithSavepointTerminationHandlerImpl(
+            @Nonnull JobID jobId,
+            @Nonnull SchedulerNG scheduler,
+            @Nonnull CheckpointScheduling checkpointScheduling,
+            @Nonnull Logger log) {
+        this.jobId = checkNotNull(jobId);
+        this.scheduler = checkNotNull(scheduler);
+        this.checkpointScheduling = checkNotNull(checkpointScheduling);
+        this.log = checkNotNull(log);
+    }
+
+    @Override
+    public CompletableFuture<String> handlesStopWithSavepointTermination(
+            CompletableFuture<CompletedCheckpoint> completedSavepointFuture,
+            CompletableFuture<Collection<ExecutionState>> 
terminatedExecutionsFuture,
+            ComponentMainThreadExecutor mainThreadExecutor) {
+        completedSavepointFuture
+                .whenCompleteAsync(
+                        (completedSavepoint, throwable) -> {
+                            if (throwable != null) {
+                                handleSavepointCreationFailure(throwable);
+                            } else {
+                                handleSavepointCreation(completedSavepoint);
+                            }
+                        },
+                        mainThreadExecutor)
+                .thenCompose(
+                        aVoid ->
+                                terminatedExecutionsFuture.thenAcceptAsync(

Review comment:
       Maybe guard this result via `FutureUtils.assertNoException` in order to 
make sure that `terminatedExecutionsFuture` can never be completed 
exceptionally.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImplTest.java
##########
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.FlinkMatchers;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.TestingCheckpointScheduling;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle;
+import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.QuadConsumer;
+import org.apache.flink.util.function.TriConsumer;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * {@code StopWithSavepointTerminationHandlerImplTest} tests the 
stop-with-savepoint functionality
+ * of {@link SchedulerBase#stopWithSavepoint(String, boolean)}.
+ */
+public class StopWithSavepointTerminationHandlerImplTest extends TestLogger {
+
+    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+    private static final JobID JOB_ID = new JobID();
+
+    private final TestingCheckpointScheduling checkpointScheduling =
+            new TestingCheckpointScheduling(false);
+
+    private StopWithSavepointTerminationHandlerImpl createTestInstance(
+            Consumer<Throwable> handleGlobalFailureConsumer) {
+        // checkpointing should be always stopped before initiating 
stop-with-savepoint
+        checkpointScheduling.stopCheckpointScheduler();
+
+        final SchedulerNG scheduler =
+                TestingSchedulerNG.newBuilder()
+                        
.setHandleGlobalFailureConsumer(handleGlobalFailureConsumer)
+                        .build();
+        return new StopWithSavepointTerminationHandlerImpl(
+                JOB_ID, scheduler, checkpointScheduling, log);
+    }
+
+    @Test
+    public void 
testHappyPathWithSavepointCreationBeforeSuccessfulTermination() throws 
Exception {
+        assertHappyPath(
+                (completedSavepoint,
+                        completedSavepointFuture,
+                        terminatedExecutionStates,
+                        executionsTerminatedFuture) -> {
+                    completedSavepointFuture.complete(completedSavepoint);
+                    
executionsTerminatedFuture.complete(terminatedExecutionStates);
+                });
+    }
+
+    @Test
+    public void testHappyPathWithSavepointCreationAfterSuccessfulTermination() 
throws Exception {
+        assertHappyPath(
+                (completedSavepoint,
+                        completedSavepointFuture,
+                        terminatedExecutionStates,
+                        executionsTerminatedFuture) -> {
+                    
executionsTerminatedFuture.complete(terminatedExecutionStates);
+                    completedSavepointFuture.complete(completedSavepoint);
+                });
+    }
+
+    @Test
+    public void testSavepointCreationFailureBeforeSuccessfulTermination() {
+        assertSavepointCreationFailure(
+                (expectedException, completedSavepointFuture, 
executionsTerminatedFuture) -> {
+                    
completedSavepointFuture.completeExceptionally(expectedException);
+                    executionsTerminatedFuture.complete(
+                            
Collections.singletonList(ExecutionState.FINISHED));
+                });
+    }
+
+    @Test
+    public void testSavepointCreationFailureAfterSuccessfulTermination() {
+        assertSavepointCreationFailure(
+                (expectedException, completedSavepointFuture, 
executionsTerminatedFuture) -> {
+                    executionsTerminatedFuture.complete(
+                            
Collections.singletonList(ExecutionState.FINISHED));
+                    
completedSavepointFuture.completeExceptionally(expectedException);

Review comment:
       If we make the ordering of signals the responsibility of a different 
component, then we wouldn't have this combinatorial explosion of test cases. 
Then we would have test cases for the different signal order completions and 
test cases for the `StopWithSavepointTerminationHandlerImpl` where we can 
concentrate on the handler specific functionality.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImpl.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@code StopWithSavepointTerminationHandlerImpl} implements {@link
+ * StopWithSavepointTerminationHandler}.
+ *
+ * <p>The operation only succeeds if both steps, the savepoint creation and 
the successful
+ * termination of the job, succeed. If the former step fails, the operation 
fails exceptionally
+ * without any further actions. If the latter one fails, a global fail-over is 
triggered before
+ * failing the operation.
+ */
+public class StopWithSavepointTerminationHandlerImpl
+        implements StopWithSavepointTerminationHandler {
+
+    private final Logger log;
+
+    private final SchedulerNG scheduler;
+    private final CheckpointScheduling checkpointScheduling;
+    private final JobID jobId;
+
+    private final CompletableFuture<String> result = new CompletableFuture<>();
+
+    private State state = new WaitingForSavepoint();
+
+    public <S extends SchedulerNG & CheckpointScheduling> 
StopWithSavepointTerminationHandlerImpl(
+            @Nonnull JobID jobId, @Nonnull S schedulerWithCheckpointing, 
@Nonnull Logger log) {
+        this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, 
log);
+    }
+
+    @VisibleForTesting
+    StopWithSavepointTerminationHandlerImpl(
+            @Nonnull JobID jobId,
+            @Nonnull SchedulerNG scheduler,
+            @Nonnull CheckpointScheduling checkpointScheduling,
+            @Nonnull Logger log) {
+        this.jobId = checkNotNull(jobId);
+        this.scheduler = checkNotNull(scheduler);
+        this.checkpointScheduling = checkNotNull(checkpointScheduling);
+        this.log = checkNotNull(log);
+    }
+
+    @Override
+    public CompletableFuture<String> handlesStopWithSavepointTermination(
+            CompletableFuture<CompletedCheckpoint> completedSavepointFuture,
+            CompletableFuture<Collection<ExecutionState>> 
terminatedExecutionsFuture,
+            ComponentMainThreadExecutor mainThreadExecutor) {
+        completedSavepointFuture
+                .whenCompleteAsync(
+                        (completedSavepoint, throwable) -> {
+                            if (throwable != null) {
+                                handleSavepointCreationFailure(throwable);
+                            } else {
+                                handleSavepointCreation(completedSavepoint);
+                            }
+                        },
+                        mainThreadExecutor)
+                .thenCompose(
+                        aVoid ->
+                                terminatedExecutionsFuture.thenAcceptAsync(
+                                        this::handleExecutionsTermination, 
mainThreadExecutor));
+
+        return result;
+    }
+
+    private synchronized void handleSavepointCreation(CompletedCheckpoint 
completedCheckpoint) {
+        final State oldState = state;
+        state = state.onSavepointCreation(completedCheckpoint);
+
+        log.debug(
+                "Stop-with-savepoint transitioned from {} to {} on savepoint 
creation handling for job {}.",
+                oldState,
+                state,
+                jobId);
+    }
+
+    private synchronized void handleSavepointCreationFailure(Throwable 
throwable) {
+        final State oldState = state;
+        state = state.onSavepointCreationFailure(throwable);
+
+        log.debug(
+                "Stop-with-savepoint transitioned from {} to {} on savepoint 
creation failure handling for job {}.",
+                oldState,
+                state,
+                jobId);
+    }
+
+    private synchronized void handleExecutionsTermination(
+            Collection<ExecutionState> executionStates) {
+        final State oldState = state;
+        state = state.onExecutionsTermination(executionStates);
+
+        log.debug(
+                "Stop-with-savepoint transitioned from {} to {} on execution 
termination handling for job {}.",
+                oldState,
+                state,
+                jobId);
+    }
+
+    /**
+     * Handles the termination of the {@code 
StopWithSavepointTerminationHandler} exceptionally
+     * after triggering a global job fail-over.
+     *
+     * @param completedSavepoint the completed savepoint that needs to be 
discarded.
+     * @param unfinishedExecutionStates the unfinished states that caused the 
failure.
+     */
+    private void terminateExceptionallyWithGlobalFailover(
+            CompletedCheckpoint completedSavepoint,
+            Iterable<ExecutionState> unfinishedExecutionStates) {
+        String errorMessage =
+                String.format(
+                        "Inconsistent execution state after stopping with 
savepoint. At least one execution is still in one of the following states: %s. 
A global fail-over is triggered to recover the job %s.",
+                        StringUtils.join(unfinishedExecutionStates, ", "), 
jobId);
+        FlinkException inconsistentFinalStateException = new 
FlinkException(errorMessage);
+
+        scheduler.handleGlobalFailure(inconsistentFinalStateException);
+
+        try {
+            completedSavepoint.discard();
+        } catch (Exception e) {
+            log.warn(
+                    "Error occurred while cleaning up completed savepoint due 
to stop-with-savepoint failure.",
+                    e);
+            inconsistentFinalStateException.addSuppressed(e);
+        }

Review comment:
       Alternatively, we need to give the `completedSavepoint` to the owner of 
this class in case of a termination failure. The owner would then be 
responsible for cleaning the savepoint up.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImpl.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@code StopWithSavepointTerminationHandlerImpl} implements {@link
+ * StopWithSavepointTerminationHandler}.
+ *
+ * <p>The operation only succeeds if both steps, the savepoint creation and 
the successful
+ * termination of the job, succeed. If the former step fails, the operation 
fails exceptionally
+ * without any further actions. If the latter one fails, a global fail-over is 
triggered before
+ * failing the operation.
+ */
+public class StopWithSavepointTerminationHandlerImpl
+        implements StopWithSavepointTerminationHandler {
+
+    private final Logger log;
+
+    private final SchedulerNG scheduler;
+    private final CheckpointScheduling checkpointScheduling;
+    private final JobID jobId;
+
+    private final CompletableFuture<String> result = new CompletableFuture<>();
+
+    private State state = new WaitingForSavepoint();
+
+    public <S extends SchedulerNG & CheckpointScheduling> 
StopWithSavepointTerminationHandlerImpl(
+            @Nonnull JobID jobId, @Nonnull S schedulerWithCheckpointing, 
@Nonnull Logger log) {
+        this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, 
log);
+    }
+
+    @VisibleForTesting
+    StopWithSavepointTerminationHandlerImpl(
+            @Nonnull JobID jobId,
+            @Nonnull SchedulerNG scheduler,
+            @Nonnull CheckpointScheduling checkpointScheduling,
+            @Nonnull Logger log) {
+        this.jobId = checkNotNull(jobId);
+        this.scheduler = checkNotNull(scheduler);
+        this.checkpointScheduling = checkNotNull(checkpointScheduling);
+        this.log = checkNotNull(log);
+    }
+
+    @Override
+    public CompletableFuture<String> handlesStopWithSavepointTermination(
+            CompletableFuture<CompletedCheckpoint> completedSavepointFuture,
+            CompletableFuture<Collection<ExecutionState>> 
terminatedExecutionsFuture,
+            ComponentMainThreadExecutor mainThreadExecutor) {
+        completedSavepointFuture
+                .whenCompleteAsync(
+                        (completedSavepoint, throwable) -> {
+                            if (throwable != null) {
+                                handleSavepointCreationFailure(throwable);
+                            } else {
+                                handleSavepointCreation(completedSavepoint);
+                            }
+                        },
+                        mainThreadExecutor)
+                .thenCompose(
+                        aVoid ->
+                                terminatedExecutionsFuture.thenAcceptAsync(
+                                        this::handleExecutionsTermination, 
mainThreadExecutor));
+
+        return result;
+    }
+
+    private synchronized void handleSavepointCreation(CompletedCheckpoint 
completedCheckpoint) {
+        final State oldState = state;
+        state = state.onSavepointCreation(completedCheckpoint);
+
+        log.debug(
+                "Stop-with-savepoint transitioned from {} to {} on savepoint 
creation handling for job {}.",
+                oldState,
+                state,
+                jobId);
+    }
+
+    private synchronized void handleSavepointCreationFailure(Throwable 
throwable) {
+        final State oldState = state;
+        state = state.onSavepointCreationFailure(throwable);
+
+        log.debug(
+                "Stop-with-savepoint transitioned from {} to {} on savepoint 
creation failure handling for job {}.",
+                oldState,
+                state,
+                jobId);
+    }
+
+    private synchronized void handleExecutionsTermination(
+            Collection<ExecutionState> executionStates) {
+        final State oldState = state;
+        state = state.onExecutionsTermination(executionStates);
+
+        log.debug(
+                "Stop-with-savepoint transitioned from {} to {} on execution 
termination handling for job {}.",
+                oldState,
+                state,
+                jobId);
+    }
+
+    /**
+     * Handles the termination of the {@code 
StopWithSavepointTerminationHandler} exceptionally
+     * after triggering a global job fail-over.
+     *
+     * @param completedSavepoint the completed savepoint that needs to be 
discarded.
+     * @param unfinishedExecutionStates the unfinished states that caused the 
failure.
+     */
+    private void terminateExceptionallyWithGlobalFailover(
+            CompletedCheckpoint completedSavepoint,
+            Iterable<ExecutionState> unfinishedExecutionStates) {
+        String errorMessage =
+                String.format(
+                        "Inconsistent execution state after stopping with 
savepoint. At least one execution is still in one of the following states: %s. 
A global fail-over is triggered to recover the job %s.",
+                        StringUtils.join(unfinishedExecutionStates, ", "), 
jobId);
+        FlinkException inconsistentFinalStateException = new 
FlinkException(errorMessage);
+
+        scheduler.handleGlobalFailure(inconsistentFinalStateException);
+
+        try {
+            completedSavepoint.discard();
+        } catch (Exception e) {
+            log.warn(
+                    "Error occurred while cleaning up completed savepoint due 
to stop-with-savepoint failure.",
+                    e);
+            inconsistentFinalStateException.addSuppressed(e);
+        }
+
+        terminateExceptionally(inconsistentFinalStateException);
+    }
+
+    /**
+     * Handles the termination of the {@code 
StopWithSavepointTerminationHandler} exceptionally
+     * without triggering a global job fail-over. It does restart the 
checkpoint scheduling.
+     *
+     * @param throwable the error that caused the exceptional termination.
+     */
+    private void terminateExceptionally(Throwable throwable) {
+        checkpointScheduling.startCheckpointScheduler();
+        result.completeExceptionally(throwable);
+    }
+
+    /**
+     * Handles the successful termination of the {@code 
StopWithSavepointTerminationHandler}.
+     *
+     * @param path the path where the savepoint is stored.
+     */
+    private void terminateSuccessfully(String path) {
+        result.complete(path);
+    }
+
+    private static Set<ExecutionState> extractUnfinishedStates(
+            Collection<ExecutionState> executionStates) {
+        return executionStates.stream()
+                .filter(state -> state != ExecutionState.FINISHED)
+                .collect(Collectors.toSet());
+    }
+
+    private final class WaitingForSavepoint implements State {
+
+        @Override
+        public State onSavepointCreation(CompletedCheckpoint 
completedSavepoint) {
+            return new SavepointCreated(completedSavepoint);
+        }
+
+        @Override
+        public State onSavepointCreationFailure(Throwable throwable) {
+            terminateExceptionally(throwable);
+            return new FinalState();
+        }
+    }
+
+    private final class SavepointCreated implements State {
+
+        private final CompletedCheckpoint completedSavepoint;
+
+        private SavepointCreated(CompletedCheckpoint completedSavepoint) {
+            this.completedSavepoint = completedSavepoint;
+        }
+
+        @Override
+        public State onExecutionsTermination(Collection<ExecutionState> 
executionStates) {
+            final Set<ExecutionState> unfinishedStates = 
extractUnfinishedStates(executionStates);
+
+            if (unfinishedStates.isEmpty()) {
+                terminateSuccessfully(completedSavepoint.getExternalPointer());
+                return new FinalState();
+            }
+
+            terminateExceptionallyWithGlobalFailover(completedSavepoint, 
unfinishedStates);
+            return new FinalState();
+        }
+    }
+
+    private final class FinalState implements State {
+
+        @Override
+        public State onExecutionsTermination(Collection<ExecutionState> 
executionStates) {
+            return this;
+        }
+    }
+
+    private interface State {
+
+        default State onSavepointCreation(CompletedCheckpoint 
completedSavepoint) {
+            throw new UnsupportedOperationException(
+                    this.getClass().getSimpleName()
+                            + " state does not support onSavepointCreation.");
+        }
+
+        default State onSavepointCreationFailure(Throwable throwable) {
+            throw new UnsupportedOperationException(
+                    this.getClass().getSimpleName()
+                            + " state does not support 
onSavepointCreationFailure.");
+        }
+
+        default State onExecutionsTermination(Collection<ExecutionState> 
executionStates) {
+            throw new UnsupportedOperationException(
+                    this.getClass().getSimpleName()
+                            + " state does not support 
onExecutionsTermination.");
+        }
+    }

Review comment:
       This looks simple and good to understand :-)

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImpl.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@code StopWithSavepointTerminationHandlerImpl} implements {@link
+ * StopWithSavepointTerminationHandler}.
+ *
+ * <p>The operation only succeeds if both steps, the savepoint creation and 
the successful
+ * termination of the job, succeed. If the former step fails, the operation 
fails exceptionally
+ * without any further actions. If the latter one fails, a global fail-over is 
triggered before
+ * failing the operation.
+ */
+public class StopWithSavepointTerminationHandlerImpl
+        implements StopWithSavepointTerminationHandler {
+
+    private final Logger log;
+
+    private final SchedulerNG scheduler;
+    private final CheckpointScheduling checkpointScheduling;
+    private final JobID jobId;
+
+    private final CompletableFuture<String> result = new CompletableFuture<>();
+
+    private State state = new WaitingForSavepoint();
+
+    public <S extends SchedulerNG & CheckpointScheduling> 
StopWithSavepointTerminationHandlerImpl(
+            @Nonnull JobID jobId, @Nonnull S schedulerWithCheckpointing, 
@Nonnull Logger log) {
+        this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, 
log);
+    }
+
+    @VisibleForTesting
+    StopWithSavepointTerminationHandlerImpl(
+            @Nonnull JobID jobId,
+            @Nonnull SchedulerNG scheduler,
+            @Nonnull CheckpointScheduling checkpointScheduling,
+            @Nonnull Logger log) {
+        this.jobId = checkNotNull(jobId);
+        this.scheduler = checkNotNull(scheduler);
+        this.checkpointScheduling = checkNotNull(checkpointScheduling);
+        this.log = checkNotNull(log);
+    }
+
+    @Override
+    public CompletableFuture<String> handlesStopWithSavepointTermination(
+            CompletableFuture<CompletedCheckpoint> completedSavepointFuture,
+            CompletableFuture<Collection<ExecutionState>> 
terminatedExecutionsFuture,
+            ComponentMainThreadExecutor mainThreadExecutor) {
+        completedSavepointFuture
+                .whenCompleteAsync(
+                        (completedSavepoint, throwable) -> {
+                            if (throwable != null) {
+                                handleSavepointCreationFailure(throwable);
+                            } else {
+                                handleSavepointCreation(completedSavepoint);
+                            }
+                        },
+                        mainThreadExecutor)

Review comment:
       I would suggest to consume an exception via `handleAsync` and then add a 
`ExceptionUtils.assertNoException` in order to make potential misusages of the 
state call visible. Otherwise if `handleSavepoint*` fails, then this exception 
will be swallowed by the returned future.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImpl.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@code StopWithSavepointTerminationHandlerImpl} implements {@link
+ * StopWithSavepointTerminationHandler}.
+ *
+ * <p>The operation only succeeds if both steps, the savepoint creation and 
the successful
+ * termination of the job, succeed. If the former step fails, the operation 
fails exceptionally
+ * without any further actions. If the latter one fails, a global fail-over is 
triggered before
+ * failing the operation.
+ */
+public class StopWithSavepointTerminationHandlerImpl
+        implements StopWithSavepointTerminationHandler {
+
+    private final Logger log;
+
+    private final SchedulerNG scheduler;
+    private final CheckpointScheduling checkpointScheduling;
+    private final JobID jobId;
+
+    private final CompletableFuture<String> result = new CompletableFuture<>();
+
+    private State state = new WaitingForSavepoint();
+
+    public <S extends SchedulerNG & CheckpointScheduling> 
StopWithSavepointTerminationHandlerImpl(
+            @Nonnull JobID jobId, @Nonnull S schedulerWithCheckpointing, 
@Nonnull Logger log) {
+        this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, 
log);
+    }
+
+    @VisibleForTesting
+    StopWithSavepointTerminationHandlerImpl(
+            @Nonnull JobID jobId,
+            @Nonnull SchedulerNG scheduler,
+            @Nonnull CheckpointScheduling checkpointScheduling,
+            @Nonnull Logger log) {
+        this.jobId = checkNotNull(jobId);
+        this.scheduler = checkNotNull(scheduler);
+        this.checkpointScheduling = checkNotNull(checkpointScheduling);
+        this.log = checkNotNull(log);
+    }
+
+    @Override
+    public CompletableFuture<String> handlesStopWithSavepointTermination(
+            CompletableFuture<CompletedCheckpoint> completedSavepointFuture,
+            CompletableFuture<Collection<ExecutionState>> 
terminatedExecutionsFuture,
+            ComponentMainThreadExecutor mainThreadExecutor) {
+        completedSavepointFuture
+                .whenCompleteAsync(
+                        (completedSavepoint, throwable) -> {
+                            if (throwable != null) {
+                                handleSavepointCreationFailure(throwable);
+                            } else {
+                                handleSavepointCreation(completedSavepoint);
+                            }
+                        },
+                        mainThreadExecutor)
+                .thenCompose(

Review comment:
       `thenRun` should be fine here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImpl.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@code StopWithSavepointTerminationHandlerImpl} implements {@link
+ * StopWithSavepointTerminationHandler}.
+ *
+ * <p>The operation only succeeds if both steps, the savepoint creation and 
the successful
+ * termination of the job, succeed. If the former step fails, the operation 
fails exceptionally
+ * without any further actions. If the latter one fails, a global fail-over is 
triggered before
+ * failing the operation.
+ */
+public class StopWithSavepointTerminationHandlerImpl
+        implements StopWithSavepointTerminationHandler {
+
+    private final Logger log;
+
+    private final SchedulerNG scheduler;
+    private final CheckpointScheduling checkpointScheduling;
+    private final JobID jobId;
+
+    private final CompletableFuture<String> result = new CompletableFuture<>();
+
+    private State state = new WaitingForSavepoint();
+
+    public <S extends SchedulerNG & CheckpointScheduling> 
StopWithSavepointTerminationHandlerImpl(
+            @Nonnull JobID jobId, @Nonnull S schedulerWithCheckpointing, 
@Nonnull Logger log) {
+        this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, 
log);
+    }
+
+    @VisibleForTesting
+    StopWithSavepointTerminationHandlerImpl(
+            @Nonnull JobID jobId,
+            @Nonnull SchedulerNG scheduler,
+            @Nonnull CheckpointScheduling checkpointScheduling,
+            @Nonnull Logger log) {
+        this.jobId = checkNotNull(jobId);
+        this.scheduler = checkNotNull(scheduler);
+        this.checkpointScheduling = checkNotNull(checkpointScheduling);
+        this.log = checkNotNull(log);
+    }
+
+    @Override
+    public CompletableFuture<String> handlesStopWithSavepointTermination(
+            CompletableFuture<CompletedCheckpoint> completedSavepointFuture,
+            CompletableFuture<Collection<ExecutionState>> 
terminatedExecutionsFuture,
+            ComponentMainThreadExecutor mainThreadExecutor) {
+        completedSavepointFuture
+                .whenCompleteAsync(
+                        (completedSavepoint, throwable) -> {
+                            if (throwable != null) {
+                                handleSavepointCreationFailure(throwable);
+                            } else {
+                                handleSavepointCreation(completedSavepoint);
+                            }
+                        },
+                        mainThreadExecutor)
+                .thenCompose(
+                        aVoid ->
+                                terminatedExecutionsFuture.thenAcceptAsync(
+                                        this::handleExecutionsTermination, 
mainThreadExecutor));
+
+        return result;
+    }
+
+    private synchronized void handleSavepointCreation(CompletedCheckpoint 
completedCheckpoint) {
+        final State oldState = state;
+        state = state.onSavepointCreation(completedCheckpoint);
+
+        log.debug(
+                "Stop-with-savepoint transitioned from {} to {} on savepoint 
creation handling for job {}.",
+                oldState,
+                state,
+                jobId);
+    }
+
+    private synchronized void handleSavepointCreationFailure(Throwable 
throwable) {
+        final State oldState = state;
+        state = state.onSavepointCreationFailure(throwable);
+
+        log.debug(
+                "Stop-with-savepoint transitioned from {} to {} on savepoint 
creation failure handling for job {}.",
+                oldState,
+                state,
+                jobId);
+    }
+
+    private synchronized void handleExecutionsTermination(

Review comment:
       Why are these methods `synchronized`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImpl.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@code StopWithSavepointTerminationHandlerImpl} implements {@link
+ * StopWithSavepointTerminationHandler}.
+ *
+ * <p>The operation only succeeds if both steps, the savepoint creation and 
the successful
+ * termination of the job, succeed. If the former step fails, the operation 
fails exceptionally
+ * without any further actions. If the latter one fails, a global fail-over is 
triggered before
+ * failing the operation.
+ */
+public class StopWithSavepointTerminationHandlerImpl
+        implements StopWithSavepointTerminationHandler {
+
+    private final Logger log;
+
+    private final SchedulerNG scheduler;
+    private final CheckpointScheduling checkpointScheduling;
+    private final JobID jobId;
+
+    private final CompletableFuture<String> result = new CompletableFuture<>();
+
+    private State state = new WaitingForSavepoint();
+
+    public <S extends SchedulerNG & CheckpointScheduling> 
StopWithSavepointTerminationHandlerImpl(
+            @Nonnull JobID jobId, @Nonnull S schedulerWithCheckpointing, 
@Nonnull Logger log) {
+        this(jobId, schedulerWithCheckpointing, schedulerWithCheckpointing, 
log);
+    }
+
+    @VisibleForTesting
+    StopWithSavepointTerminationHandlerImpl(
+            @Nonnull JobID jobId,
+            @Nonnull SchedulerNG scheduler,
+            @Nonnull CheckpointScheduling checkpointScheduling,
+            @Nonnull Logger log) {
+        this.jobId = checkNotNull(jobId);
+        this.scheduler = checkNotNull(scheduler);
+        this.checkpointScheduling = checkNotNull(checkpointScheduling);
+        this.log = checkNotNull(log);
+    }
+
+    @Override
+    public CompletableFuture<String> handlesStopWithSavepointTermination(
+            CompletableFuture<CompletedCheckpoint> completedSavepointFuture,
+            CompletableFuture<Collection<ExecutionState>> 
terminatedExecutionsFuture,
+            ComponentMainThreadExecutor mainThreadExecutor) {
+        completedSavepointFuture
+                .whenCompleteAsync(
+                        (completedSavepoint, throwable) -> {
+                            if (throwable != null) {
+                                handleSavepointCreationFailure(throwable);
+                            } else {
+                                handleSavepointCreation(completedSavepoint);
+                            }
+                        },
+                        mainThreadExecutor)
+                .thenCompose(
+                        aVoid ->
+                                terminatedExecutionsFuture.thenAcceptAsync(
+                                        this::handleExecutionsTermination, 
mainThreadExecutor));
+
+        return result;
+    }
+
+    private synchronized void handleSavepointCreation(CompletedCheckpoint 
completedCheckpoint) {
+        final State oldState = state;
+        state = state.onSavepointCreation(completedCheckpoint);
+
+        log.debug(
+                "Stop-with-savepoint transitioned from {} to {} on savepoint 
creation handling for job {}.",
+                oldState,
+                state,
+                jobId);
+    }
+
+    private synchronized void handleSavepointCreationFailure(Throwable 
throwable) {
+        final State oldState = state;
+        state = state.onSavepointCreationFailure(throwable);
+
+        log.debug(
+                "Stop-with-savepoint transitioned from {} to {} on savepoint 
creation failure handling for job {}.",
+                oldState,
+                state,
+                jobId);
+    }
+
+    private synchronized void handleExecutionsTermination(
+            Collection<ExecutionState> executionStates) {
+        final State oldState = state;
+        state = state.onExecutionsTermination(executionStates);
+
+        log.debug(
+                "Stop-with-savepoint transitioned from {} to {} on execution 
termination handling for job {}.",
+                oldState,
+                state,
+                jobId);
+    }
+
+    /**
+     * Handles the termination of the {@code 
StopWithSavepointTerminationHandler} exceptionally
+     * after triggering a global job fail-over.
+     *
+     * @param completedSavepoint the completed savepoint that needs to be 
discarded.
+     * @param unfinishedExecutionStates the unfinished states that caused the 
failure.
+     */
+    private void terminateExceptionallyWithGlobalFailover(
+            CompletedCheckpoint completedSavepoint,
+            Iterable<ExecutionState> unfinishedExecutionStates) {
+        String errorMessage =
+                String.format(
+                        "Inconsistent execution state after stopping with 
savepoint. At least one execution is still in one of the following states: %s. 
A global fail-over is triggered to recover the job %s.",
+                        StringUtils.join(unfinishedExecutionStates, ", "), 
jobId);
+        FlinkException inconsistentFinalStateException = new 
FlinkException(errorMessage);
+
+        scheduler.handleGlobalFailure(inconsistentFinalStateException);
+
+        try {
+            completedSavepoint.discard();
+        } catch (Exception e) {
+            log.warn(
+                    "Error occurred while cleaning up completed savepoint due 
to stop-with-savepoint failure.",
+                    e);
+            inconsistentFinalStateException.addSuppressed(e);
+        }

Review comment:
       This can entail IO operations. IO operations should not be executed in 
the main thread because they can be blocking. If we want to discard the 
savepoint, then we should pass in an `ioExecutor` which can be used for this.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/StopWithSavepointTerminationHandlerImplTest.java
##########
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.FlinkMatchers;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.TestingCheckpointScheduling;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle;
+import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.QuadConsumer;
+import org.apache.flink.util.function.TriConsumer;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * {@code StopWithSavepointTerminationHandlerImplTest} tests the 
stop-with-savepoint functionality
+ * of {@link SchedulerBase#stopWithSavepoint(String, boolean)}.
+ */
+public class StopWithSavepointTerminationHandlerImplTest extends TestLogger {
+
+    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+    private static final JobID JOB_ID = new JobID();
+
+    private final TestingCheckpointScheduling checkpointScheduling =
+            new TestingCheckpointScheduling(false);
+
+    private StopWithSavepointTerminationHandlerImpl createTestInstance(
+            Consumer<Throwable> handleGlobalFailureConsumer) {
+        // checkpointing should be always stopped before initiating 
stop-with-savepoint
+        checkpointScheduling.stopCheckpointScheduler();
+
+        final SchedulerNG scheduler =
+                TestingSchedulerNG.newBuilder()
+                        
.setHandleGlobalFailureConsumer(handleGlobalFailureConsumer)
+                        .build();
+        return new StopWithSavepointTerminationHandlerImpl(
+                JOB_ID, scheduler, checkpointScheduling, log);
+    }
+
+    @Test
+    public void 
testHappyPathWithSavepointCreationBeforeSuccessfulTermination() throws 
Exception {
+        assertHappyPath(
+                (completedSavepoint,
+                        completedSavepointFuture,
+                        terminatedExecutionStates,
+                        executionsTerminatedFuture) -> {
+                    completedSavepointFuture.complete(completedSavepoint);
+                    
executionsTerminatedFuture.complete(terminatedExecutionStates);
+                });
+    }
+
+    @Test
+    public void testHappyPathWithSavepointCreationAfterSuccessfulTermination() 
throws Exception {
+        assertHappyPath(
+                (completedSavepoint,
+                        completedSavepointFuture,
+                        terminatedExecutionStates,
+                        executionsTerminatedFuture) -> {
+                    
executionsTerminatedFuture.complete(terminatedExecutionStates);
+                    completedSavepointFuture.complete(completedSavepoint);
+                });
+    }
+
+    @Test
+    public void testSavepointCreationFailureBeforeSuccessfulTermination() {
+        assertSavepointCreationFailure(
+                (expectedException, completedSavepointFuture, 
executionsTerminatedFuture) -> {
+                    
completedSavepointFuture.completeExceptionally(expectedException);
+                    executionsTerminatedFuture.complete(
+                            
Collections.singletonList(ExecutionState.FINISHED));
+                });
+    }
+
+    @Test
+    public void testSavepointCreationFailureAfterSuccessfulTermination() {
+        assertSavepointCreationFailure(
+                (expectedException, completedSavepointFuture, 
executionsTerminatedFuture) -> {
+                    executionsTerminatedFuture.complete(
+                            
Collections.singletonList(ExecutionState.FINISHED));
+                    
completedSavepointFuture.completeExceptionally(expectedException);
+                });
+    }
+
+    @Test
+    public void testSavepointCreationFailureBeforeTaskFailure() {
+        assertSavepointCreationFailure(
+                (expectedException, completedSavepointFuture, 
executionsTerminatedFuture) -> {
+                    
completedSavepointFuture.completeExceptionally(expectedException);
+                    executionsTerminatedFuture.complete(
+                            Collections.singletonList(ExecutionState.FAILED));
+                });
+    }
+
+    @Test
+    public void testSavepointCreationFailureAfterTaskFailure() {
+        assertSavepointCreationFailure(
+                (expectedException, completedSavepointFuture, 
executionsTerminatedFuture) -> {
+                    executionsTerminatedFuture.complete(
+                            Collections.singletonList(ExecutionState.FAILED));
+                    
completedSavepointFuture.completeExceptionally(expectedException);
+                });
+    }
+
+    @Test
+    public void testNoTerminationHandlingAfterSavepointCompletion() {
+        assertNoTerminationHandling(
+                (completedSavepoint,
+                        completedSavepointFuture,
+                        terminatedExecutionStates,
+                        executionsTerminatedFuture) -> {
+                    completedSavepointFuture.complete(completedSavepoint);
+                    
executionsTerminatedFuture.complete(terminatedExecutionStates);
+                });
+    }
+
+    @Test
+    public void testNoTerminationHandlingBeforeSavepointCompletion() {
+        assertNoTerminationHandling(
+                (completedSavepoint,
+                        completedSavepointFuture,
+                        terminatedExecutionStates,
+                        executionsTerminatedFuture) -> {
+                    
executionsTerminatedFuture.complete(terminatedExecutionStates);
+                    completedSavepointFuture.complete(completedSavepoint);
+                });
+    }
+
+    private void assertHappyPath(
+            final QuadConsumer<
+                            CompletedCheckpoint,
+                            CompletableFuture<CompletedCheckpoint>,
+                            Collection<ExecutionState>,
+                            CompletableFuture<Collection<ExecutionState>>>
+                    completion)
+            throws ExecutionException, InterruptedException {
+        final StopWithSavepointTerminationHandlerImpl testInstance =
+                createTestInstance(
+                        throwable -> fail("No global fail-over should have 
been triggered."));
+
+        final CompletableFuture<CompletedCheckpoint> completedSavepointFuture =
+                new CompletableFuture<>();
+        final CompletableFuture<Collection<ExecutionState>> 
executionsTerminatedFuture =
+                new CompletableFuture<>();
+
+        final CompletableFuture<String> result =
+                testInstance.handlesStopWithSavepointTermination(
+                        completedSavepointFuture,
+                        executionsTerminatedFuture,
+                        
ComponentMainThreadExecutorServiceAdapter.forMainThread());
+
+        final String savepointPath = "savepoint-path";
+        completion.accept(
+                createCompletedSavepoint(savepointPath),
+                completedSavepointFuture,
+                Collections.singleton(ExecutionState.FINISHED),
+                executionsTerminatedFuture);
+
+        assertThat(result.get(), is(savepointPath));
+
+        // the happy path won't restart the checkpoint scheduling
+        assertFalse("Checkpoint scheduling should be disabled.", 
checkpointScheduling.isEnabled());
+    }
+
+    private void assertSavepointCreationFailure(
+            final TriConsumer<
+                            Throwable,
+                            CompletableFuture<CompletedCheckpoint>,
+                            CompletableFuture<Collection<ExecutionState>>>
+                    completion) {
+        final StopWithSavepointTerminationHandlerImpl testInstance =
+                createTestInstance(throwable -> fail("No global failover 
should be triggered."));
+
+        final CompletableFuture<CompletedCheckpoint> completedSavepointFuture =
+                new CompletableFuture<>();
+        final CompletableFuture<Collection<ExecutionState>> 
executionsTerminatedFuture =
+                new CompletableFuture<>();
+
+        final CompletableFuture<String> result =
+                testInstance.handlesStopWithSavepointTermination(
+                        completedSavepointFuture,
+                        executionsTerminatedFuture,
+                        
ComponentMainThreadExecutorServiceAdapter.forMainThread());
+
+        final String expectedErrorMessage = "Expected exception during 
savepoint creation.";
+        completion.accept(
+                new Exception(expectedErrorMessage),
+                completedSavepointFuture,
+                executionsTerminatedFuture);
+
+        try {
+            result.get();
+            fail("An ExecutionException is expected.");
+        } catch (Throwable e) {
+            final Optional<Throwable> actualException =
+                    ExceptionUtils.findThrowableWithMessage(e, 
expectedErrorMessage);
+            assertTrue(
+                    "An exception with the expected error message should have 
been thrown.",
+                    actualException.isPresent());
+        }
+
+        // the checkpoint scheduling should be enabled in case of failure
+        assertTrue("Checkpoint scheduling should be enabled.", 
checkpointScheduling.isEnabled());
+    }
+
+    private void assertNoTerminationHandling(
+            final QuadConsumer<
+                            CompletedCheckpoint,
+                            CompletableFuture<CompletedCheckpoint>,
+                            Collection<ExecutionState>,
+                            CompletableFuture<Collection<ExecutionState>>>
+                    completion) {
+        final ExecutionState expectedNonFinishedState = ExecutionState.FAILED;
+        final String expectedErrorMessage =
+                String.format(
+                        "Inconsistent execution state after stopping with 
savepoint. At least one execution is still in one of the following states: %s. 
A global fail-over is triggered to recover the job %s.",
+                        expectedNonFinishedState, JOB_ID);
+
+        final EmptyStreamStateHandle streamStateHandle = new 
EmptyStreamStateHandle();
+        final CompletedCheckpoint completedSavepoint =
+                createCompletedSavepoint(streamStateHandle, 
"savepoint-folder");
+
+        // we have to verify that the handle pointing to the savepoint's 
metadata is not disposed,
+        // yet
+        assertFalse(
+                "The completed savepoint must not be disposed, yet.",
+                streamStateHandle.isDisposed());
+
+        final StopWithSavepointTerminationHandlerImpl testInstance =
+                createTestInstance(
+                        throwable ->
+                                assertThat(
+                                        throwable,
+                                        
FlinkMatchers.containsMessage(expectedErrorMessage)));
+
+        final CompletableFuture<CompletedCheckpoint> completedSavepointFuture =
+                new CompletableFuture<>();
+        final CompletableFuture<Collection<ExecutionState>> 
executionsTerminatedFuture =
+                new CompletableFuture<>();
+
+        final CompletableFuture<String> result =
+                testInstance.handlesStopWithSavepointTermination(
+                        completedSavepointFuture,
+                        executionsTerminatedFuture,
+                        
ComponentMainThreadExecutorServiceAdapter.forMainThread());
+
+        completion.accept(
+                completedSavepoint,
+                completedSavepointFuture,
+                Collections.singletonList(expectedNonFinishedState),
+                executionsTerminatedFuture);
+
+        try {
+            result.get();
+            fail("An ExecutionException is expected.");
+        } catch (Throwable e) {
+            final Optional<FlinkException> actualFlinkException =
+                    ExceptionUtils.findThrowable(e, FlinkException.class);
+            assertTrue(
+                    "A FlinkException should have been thrown.", 
actualFlinkException.isPresent());
+            assertThat(
+                    actualFlinkException.get(),
+                    FlinkMatchers.containsMessage(expectedErrorMessage));
+        }
+
+        // the checkpoint scheduling should be enabled in case of failure
+        assertTrue("Checkpoint scheduling should be enabled.", 
checkpointScheduling.isEnabled());
+
+        assertTrue("The savepoint should be cleaned up.", 
streamStateHandle.isDisposed());

Review comment:
       How do we assert that we call the global failover?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to