tillrohrmann commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r576091041



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -605,6 +624,289 @@ public void abortPendingCheckpointsWhenRestartingTasks() 
throws Exception {
         assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), 
is(equalTo(0)));
     }
 
+    @Test
+    public void testStopWithSavepointFailingAfterSavepointCreation() throws 
Exception {
+        // initially, we don't allow any restarts since the first phase 
(savepoint creation)
+        // succeeds without any failures
+        testRestartBackoffTimeStrategy.setCanRestart(false);
+
+        final JobGraph jobGraph = 
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final CountDownLatch checkpointTriggeredLatch =
+                getCheckpointTriggeredLatch(taskManagerGateway);
+
+        // collect executions to which the checkpoint completion was confirmed
+        final List<ExecutionAttemptID> 
executionAttemptIdsWithCompletedCheckpoint =
+                new ArrayList<>();
+        taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+                (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+                        
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+        taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+                (ignored0, ignored1, ignored2, ignored3) -> {
+                    throw new 
UnsupportedOperationException("notifyCheckpointAborted was called");
+                });
+
+        final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+        final ExecutionAttemptID succeedingExecutionAttemptId =
+                
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final ExecutionAttemptID failingExecutionAttemptId =
+                
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+
+        // we have to make sure that the tasks are running before 
stop-with-savepoint is triggered
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.RUNNING));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), succeedingExecutionAttemptId, 
ExecutionState.RUNNING));
+
+        final String savepointFolder = 
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+        // trigger savepoint and wait for checkpoint to be retrieved by 
TaskManagerGateway
+        final CompletableFuture<String> stopWithSavepointFuture =
+                scheduler.stopWithSavepoint(savepointFolder, false);
+        checkpointTriggeredLatch.await();
+
+        acknowledgePendingCheckpoint(scheduler, 1);
+
+        assertThat(
+                "Both the executions where notified about the completed 
checkpoint.",
+                executionAttemptIdsWithCompletedCheckpoint,
+                containsInAnyOrder(failingExecutionAttemptId, 
succeedingExecutionAttemptId));
+
+        // The savepoint creation succeeded a failure happens in the second 
phase when finishing
+        // the tasks. That's why, the restarting policy is enabled.
+        testRestartBackoffTimeStrategy.setCanRestart(true);
+
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.FAILED));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(),
+                        succeedingExecutionAttemptId,
+                        ExecutionState.FINISHED));
+
+        // the restarts due to local failure handling and global job fail-over 
are triggered
+        assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), 
hasSize(2));

Review comment:
       Let's not assert on these details. This could change in the future.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -908,38 +909,57 @@ public void reportCheckpointMetrics(
         // will be restarted by the CheckpointCoordinatorDeActivator.
         checkpointCoordinator.stopCheckpointScheduler();
 
+        final CompletableFuture<Collection<ExecutionState>> 
executionGraphTerminationFuture =
+                FutureUtils.combineAll(
+                        StreamSupport.stream(
+                                        
executionGraph.getAllExecutionVertices().spliterator(),
+                                        false)
+                                
.map(ExecutionVertex::getCurrentExecutionAttempt)
+                                .map(Execution::getTerminalStateFuture)
+                                .collect(Collectors.toList()));
+
         final CompletableFuture<String> savepointFuture =
                 checkpointCoordinator
                         .triggerSynchronousSavepoint(advanceToEndOfEventTime, 
targetDirectory)
                         .thenApply(CompletedCheckpoint::getExternalPointer);
 
-        final CompletableFuture<JobStatus> terminationFuture =
-                executionGraph
-                        .getTerminationFuture()
-                        .handle(
-                                (jobstatus, throwable) -> {
-                                    if (throwable != null) {
-                                        log.info(
-                                                "Failed during stopping job {} 
with a savepoint. Reason: {}",
-                                                jobGraph.getJobID(),
-                                                throwable.getMessage());
-                                        throw new 
CompletionException(throwable);
-                                    } else if (jobstatus != 
JobStatus.FINISHED) {
-                                        log.info(
-                                                "Failed during stopping job {} 
with a savepoint. Reason: Reached state {} instead of FINISHED.",
-                                                jobGraph.getJobID(),
-                                                jobstatus);
-                                        throw new CompletionException(
-                                                new FlinkException(
-                                                        "Reached state "
-                                                                + jobstatus
-                                                                + " instead of 
FINISHED."));
-                                    }
-                                    return jobstatus;
-                                });
-
         return savepointFuture
-                .thenCompose((path) -> terminationFuture.thenApply((jobStatus 
-> path)))
+                .thenCompose(
+                        path ->
+                                executionGraphTerminationFuture
+                                        .handleAsync(
+                                                (executionStates, throwable) 
-> {
+                                                    Set<ExecutionState> 
nonFinishedStates =
+                                                            
extractNonFinishedStates(
+                                                                    
executionStates);
+                                                    if (throwable != null) {
+                                                        log.info(
+                                                                "Failed during 
stopping job {} with a savepoint. Reason: {}",
+                                                                
jobGraph.getJobID(),
+                                                                
throwable.getMessage());
+                                                        throw new 
CompletionException(throwable);
+                                                    } else if 
(!nonFinishedStates.isEmpty()) {
+                                                        log.info(
+                                                                "Failed while 
stopping job {} after successfully creating a savepoint. A global failover is 
going to be triggered. Reason: One or more states ended up in the following 
termination states instead of FINISHED: {}",
+                                                                
jobGraph.getJobID(),
+                                                                
nonFinishedStates);
+                                                        FlinkException
+                                                                
inconsistentFinalStateException =
+                                                                        new 
FlinkException(
+                                                                               
 String.format(
+                                                                               
         "Inconsistent execution state after stopping with savepoint. A global 
fail-over was triggered to recover the job %s.",
+                                                                               
         jobGraph
+                                                                               
                 .getJobID()));
+                                                        
executionGraph.failGlobal(
+                                                                
inconsistentFinalStateException);
+
+                                                        throw new 
CompletionException(
+                                                                
inconsistentFinalStateException);
+                                                    }
+                                                    return JobStatus.FINISHED;
+                                                },
+                                                mainThreadExecutor)
+                                        .thenApply(jobStatus -> path))

Review comment:
       This code looks rather complex. I am wondering whether we couldn't 
introduce something like a `StopWithSavepointOperation` which encapsulates this 
logic. This would allows us to better test this functionality in isolation as 
well.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -908,38 +909,57 @@ public void reportCheckpointMetrics(
         // will be restarted by the CheckpointCoordinatorDeActivator.
         checkpointCoordinator.stopCheckpointScheduler();
 
+        final CompletableFuture<Collection<ExecutionState>> 
executionGraphTerminationFuture =

Review comment:
       The name of this variable is confusing to me since it is not the 
execution graph termination future.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -605,6 +624,289 @@ public void abortPendingCheckpointsWhenRestartingTasks() 
throws Exception {
         assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), 
is(equalTo(0)));
     }
 
+    @Test
+    public void testStopWithSavepointFailingAfterSavepointCreation() throws 
Exception {
+        // initially, we don't allow any restarts since the first phase 
(savepoint creation)
+        // succeeds without any failures
+        testRestartBackoffTimeStrategy.setCanRestart(false);
+
+        final JobGraph jobGraph = 
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final CountDownLatch checkpointTriggeredLatch =
+                getCheckpointTriggeredLatch(taskManagerGateway);
+
+        // collect executions to which the checkpoint completion was confirmed
+        final List<ExecutionAttemptID> 
executionAttemptIdsWithCompletedCheckpoint =
+                new ArrayList<>();
+        taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+                (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+                        
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+        taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+                (ignored0, ignored1, ignored2, ignored3) -> {
+                    throw new 
UnsupportedOperationException("notifyCheckpointAborted was called");
+                });
+
+        final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+        final ExecutionAttemptID succeedingExecutionAttemptId =
+                
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final ExecutionAttemptID failingExecutionAttemptId =
+                
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+
+        // we have to make sure that the tasks are running before 
stop-with-savepoint is triggered
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.RUNNING));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), succeedingExecutionAttemptId, 
ExecutionState.RUNNING));
+
+        final String savepointFolder = 
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+        // trigger savepoint and wait for checkpoint to be retrieved by 
TaskManagerGateway
+        final CompletableFuture<String> stopWithSavepointFuture =
+                scheduler.stopWithSavepoint(savepointFolder, false);
+        checkpointTriggeredLatch.await();
+
+        acknowledgePendingCheckpoint(scheduler, 1);
+
+        assertThat(
+                "Both the executions where notified about the completed 
checkpoint.",
+                executionAttemptIdsWithCompletedCheckpoint,
+                containsInAnyOrder(failingExecutionAttemptId, 
succeedingExecutionAttemptId));
+
+        // The savepoint creation succeeded a failure happens in the second 
phase when finishing
+        // the tasks. That's why, the restarting policy is enabled.
+        testRestartBackoffTimeStrategy.setCanRestart(true);
+
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.FAILED));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(),
+                        succeedingExecutionAttemptId,
+                        ExecutionState.FINISHED));
+
+        // the restarts due to local failure handling and global job fail-over 
are triggered
+        assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), 
hasSize(2));
+        taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+        try {
+            stopWithSavepointFuture.get();
+            fail("An exception is expected.");
+        } catch (ExecutionException e) {
+            Optional<FlinkException> flinkException =
+                    ExceptionUtils.findThrowable(e, FlinkException.class);
+
+            assertTrue(flinkException.isPresent());
+            assertThat(
+                    flinkException.get().getMessage(),
+                    is(
+                            String.format(
+                                    "Inconsistent execution state after 
stopping with savepoint. A global fail-over was triggered to recover the job 
%s.",
+                                    jobGraph.getJobID())));
+        }
+
+        assertThat(scheduler.getExecutionGraph().getState(), 
is(JobStatus.RUNNING));

Review comment:
       How do we assert that all executions are running again? I think this 
test should almost pass w/o your fix to the problem if there weren't the 
assertion `assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), 
hasSize(2))`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -605,6 +624,289 @@ public void abortPendingCheckpointsWhenRestartingTasks() 
throws Exception {
         assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), 
is(equalTo(0)));
     }
 
+    @Test
+    public void testStopWithSavepointFailingAfterSavepointCreation() throws 
Exception {
+        // initially, we don't allow any restarts since the first phase 
(savepoint creation)
+        // succeeds without any failures
+        testRestartBackoffTimeStrategy.setCanRestart(false);
+
+        final JobGraph jobGraph = 
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final CountDownLatch checkpointTriggeredLatch =
+                getCheckpointTriggeredLatch(taskManagerGateway);
+
+        // collect executions to which the checkpoint completion was confirmed
+        final List<ExecutionAttemptID> 
executionAttemptIdsWithCompletedCheckpoint =
+                new ArrayList<>();
+        taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+                (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+                        
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+        taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+                (ignored0, ignored1, ignored2, ignored3) -> {
+                    throw new 
UnsupportedOperationException("notifyCheckpointAborted was called");
+                });
+
+        final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+        final ExecutionAttemptID succeedingExecutionAttemptId =
+                
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final ExecutionAttemptID failingExecutionAttemptId =
+                
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+
+        // we have to make sure that the tasks are running before 
stop-with-savepoint is triggered
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.RUNNING));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), succeedingExecutionAttemptId, 
ExecutionState.RUNNING));
+
+        final String savepointFolder = 
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+        // trigger savepoint and wait for checkpoint to be retrieved by 
TaskManagerGateway
+        final CompletableFuture<String> stopWithSavepointFuture =
+                scheduler.stopWithSavepoint(savepointFolder, false);
+        checkpointTriggeredLatch.await();
+
+        acknowledgePendingCheckpoint(scheduler, 1);
+
+        assertThat(
+                "Both the executions where notified about the completed 
checkpoint.",
+                executionAttemptIdsWithCompletedCheckpoint,
+                containsInAnyOrder(failingExecutionAttemptId, 
succeedingExecutionAttemptId));
+
+        // The savepoint creation succeeded a failure happens in the second 
phase when finishing
+        // the tasks. That's why, the restarting policy is enabled.
+        testRestartBackoffTimeStrategy.setCanRestart(true);
+
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.FAILED));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(),
+                        succeedingExecutionAttemptId,
+                        ExecutionState.FINISHED));
+
+        // the restarts due to local failure handling and global job fail-over 
are triggered
+        assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), 
hasSize(2));
+        taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+        try {
+            stopWithSavepointFuture.get();
+            fail("An exception is expected.");
+        } catch (ExecutionException e) {
+            Optional<FlinkException> flinkException =
+                    ExceptionUtils.findThrowable(e, FlinkException.class);
+
+            assertTrue(flinkException.isPresent());
+            assertThat(
+                    flinkException.get().getMessage(),
+                    is(
+                            String.format(
+                                    "Inconsistent execution state after 
stopping with savepoint. A global fail-over was triggered to recover the job 
%s.",
+                                    jobGraph.getJobID())));
+        }
+
+        assertThat(scheduler.getExecutionGraph().getState(), 
is(JobStatus.RUNNING));
+    }
+
+    @Test
+    public void testStopWithSavepointFailingWithDeclinedCheckpoint() throws 
Exception {
+        // we allow restarts right from the start since the failure is going 
to happen in the first
+        // phase (savepoint creation) of stop-with-savepoint
+        testRestartBackoffTimeStrategy.setCanRestart(true);
+
+        final JobGraph jobGraph = 
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final CountDownLatch checkpointTriggeredLatch =
+                getCheckpointTriggeredLatch(taskManagerGateway);
+
+        // collect executions to which the checkpoint completion was confirmed
+        CountDownLatch checkpointAbortionConfirmedLatch = new 
CountDownLatch(2);
+        final List<ExecutionAttemptID> 
executionAttemptIdsWithAbortedCheckpoint = new ArrayList<>();
+        taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+                (executionAttemptId, jobId, actualCheckpointId, timestamp) -> {
+                    
executionAttemptIdsWithAbortedCheckpoint.add(executionAttemptId);
+                    checkpointAbortionConfirmedLatch.countDown();
+                });
+        taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+                (ignored0, ignored1, ignored2, ignored3) -> {
+                    throw new 
UnsupportedOperationException("notifyCheckpointCompleted was called");
+                });
+
+        final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+        final ExecutionAttemptID succeedingExecutionAttemptId =
+                
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final ExecutionAttemptID failingExecutionAttemptId =
+                
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+
+        // we have to make sure that the tasks are running before 
stop-with-savepoint is triggered
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.RUNNING));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), succeedingExecutionAttemptId, 
ExecutionState.RUNNING));
+
+        final CompletableFuture<String> stopWithSavepointFuture =
+                scheduler.stopWithSavepoint("savepoint-path", false);
+        checkpointTriggeredLatch.await();
+
+        final CheckpointCoordinator checkpointCoordinator = 
getCheckpointCoordinator(scheduler);
+
+        final AcknowledgeCheckpoint acknowledgeCheckpoint =
+                new AcknowledgeCheckpoint(jobGraph.getJobID(), 
succeedingExecutionAttemptId, 1);
+        final DeclineCheckpoint declineCheckpoint =
+                new DeclineCheckpoint(
+                        jobGraph.getJobID(),
+                        failingExecutionAttemptId,
+                        1,
+                        new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED));
+
+        checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
"unknown location");
+        checkpointCoordinator.receiveDeclineMessage(declineCheckpoint, 
"unknown location");
+
+        // we need to wait for the confirmations to be collected since this 
running in a separate
+        // thread
+        checkpointAbortionConfirmedLatch.await();
+
+        assertThat(
+                "Both of the executions where notified about the aborted 
checkpoint.",
+                executionAttemptIdsWithAbortedCheckpoint,
+                is(Arrays.asList(succeedingExecutionAttemptId, 
failingExecutionAttemptId)));
+
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(),
+                        succeedingExecutionAttemptId,
+                        ExecutionState.FINISHED));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.FAILED));

Review comment:
       Why is this failure necessary? Or differently asked, what are we testing 
here exactly?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -817,6 +818,126 @@ public void 
testStopWithSavepointFailingWithDeclinedCheckpoint() throws Exceptio
         assertThat(scheduler.getExecutionGraph().getState(), 
is(JobStatus.RUNNING));
     }
 
+    @Test
+    public void testStopWithSavepointFailingWithExpiredCheckpoint() throws 
Exception {
+        // we allow restarts right from the start since the failure is going 
to happen in the first
+        // phase (savepoint creation) of stop-with-savepoint
+        testRestartBackoffTimeStrategy.setCanRestart(true);
+
+        final JobGraph jobGraph = createTwoVertexJobGraph();
+        // set checkpoint timeout to a low value to simulate checkpoint 
expiration
+        enableCheckpointing(jobGraph, 10);
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final CountDownLatch checkpointTriggeredLatch =
+                getCheckpointTriggeredLatch(taskManagerGateway);
+
+        // we have to set a listener that checks for the termination of the 
checkpoint handling
+        OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch();
+        taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+                (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+                        checkpointAbortionWasTriggered.trigger());
+
+        // the failure handling has to happen in the same thread as the 
checkpoint coordination -
+        // that's why we have to instantiate a separate ThreadExecutorService 
here
+        final ScheduledExecutorService singleThreadExecutorService =
+                Executors.newSingleThreadScheduledExecutor();
+        final ComponentMainThreadExecutor mainThreadExecutor =
+                
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+                        singleThreadExecutorService);
+
+        final DefaultScheduler scheduler =
+                CompletableFuture.supplyAsync(
+                                () ->
+                                        createSchedulerAndStartScheduling(
+                                                jobGraph, mainThreadExecutor),
+                                mainThreadExecutor)
+                        .get();
+
+        final ExecutionAttemptID succeedingExecutionAttemptId =
+                
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final ExecutionAttemptID failingExecutionAttemptId =
+                
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+
+        final CompletableFuture<String> stopWithSavepointFuture =
+                CompletableFuture.supplyAsync(
+                                () -> {
+                                    // we have to make sure that the tasks are 
running before
+                                    // stop-with-savepoint is triggered
+                                    scheduler.updateTaskExecutionState(
+                                            new TaskExecutionState(
+                                                    jobGraph.getJobID(),
+                                                    failingExecutionAttemptId,
+                                                    ExecutionState.RUNNING));
+                                    scheduler.updateTaskExecutionState(
+                                            new TaskExecutionState(
+                                                    jobGraph.getJobID(),
+                                                    
succeedingExecutionAttemptId,
+                                                    ExecutionState.RUNNING));
+
+                                    return 
scheduler.stopWithSavepoint("savepoint-path", false);
+                                },
+                                mainThreadExecutor)
+                        .get();
+
+        checkpointTriggeredLatch.await();
+
+        final CheckpointCoordinator checkpointCoordinator = 
getCheckpointCoordinator(scheduler);
+
+        final AcknowledgeCheckpoint acknowledgeCheckpoint =
+                new AcknowledgeCheckpoint(jobGraph.getJobID(), 
succeedingExecutionAttemptId, 1);
+
+        checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
"unknown location");
+
+        // we need to wait for the expired checkpoint to be handled
+        checkpointAbortionWasTriggered.await();
+
+        CompletableFuture.runAsync(
+                        () -> {
+                            scheduler.updateTaskExecutionState(
+                                    new TaskExecutionState(
+                                            jobGraph.getJobID(),
+                                            succeedingExecutionAttemptId,
+                                            ExecutionState.FINISHED));
+                            scheduler.updateTaskExecutionState(
+                                    new TaskExecutionState(
+                                            jobGraph.getJobID(),
+                                            failingExecutionAttemptId,
+                                            ExecutionState.FAILED));
+
+                            // the restart due to failed checkpoint handling 
triggering a global job
+                            // fail-over
+                            assertThat(
+                                    
taskRestartExecutor.getNonPeriodicScheduledTask(), hasSize(1));
+                            
taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+                        },
+                        mainThreadExecutor)
+                .get();
+
+        try {
+            stopWithSavepointFuture.get();
+            fail("An exception is expected.");
+        } catch (ExecutionException e) {
+            Optional<CheckpointException> actualCheckpointException =
+                    findThrowable(e, CheckpointException.class);
+            assertTrue(actualCheckpointException.isPresent());
+            assertThat(
+                    
actualCheckpointException.get().getCheckpointFailureReason(),
+                    is(CheckpointFailureReason.CHECKPOINT_EXPIRED));
+        }
+
+        // we have to wait for the main executor to be finished with 
restarting tasks
+        singleThreadExecutorService.shutdown();
+        singleThreadExecutorService.awaitTermination(1, TimeUnit.SECONDS);

Review comment:
       There is a `ExecutorUtils.gracefulShutDown` method for stopping 
`ExecutorServices`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -908,38 +909,56 @@ public void reportCheckpointMetrics(
         // will be restarted by the CheckpointCoordinatorDeActivator.
         checkpointCoordinator.stopCheckpointScheduler();
 
+        final CompletableFuture<Collection<ExecutionState>> 
executionGraphTerminationFuture =
+                FutureUtils.combineAll(
+                        StreamSupport.stream(
+                                        
executionGraph.getAllExecutionVertices().spliterator(),

Review comment:
       Pipelined regions cannot be used here because they are only used for 
scheduling and failover. The state management still works on a `Execution` 
basis.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
##########
@@ -144,18 +144,31 @@ public static DefaultSchedulerBuilder 
createSchedulerBuilder(
     }
 
     public static void enableCheckpointing(final JobGraph jobGraph) {
-        enableCheckpointing(jobGraph, null, null);
+        enableCheckpointing(jobGraph, DEFAULT_CHECKPOINT_TIMEOUT_MS);
+    }
+
+    public static void enableCheckpointing(final JobGraph jobGraph, long 
checkpointTimeout) {
+        enableCheckpointing(jobGraph, checkpointTimeout, null, null);
+    }
+
+    public static void enableCheckpointing(
+            final JobGraph jobGraph,
+            StateBackend stateBackend,
+            CheckpointStorage checkpointStorage) {

Review comment:
       `@Nullable` is missing.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -908,38 +909,56 @@ public void reportCheckpointMetrics(
         // will be restarted by the CheckpointCoordinatorDeActivator.
         checkpointCoordinator.stopCheckpointScheduler();
 
+        final CompletableFuture<Collection<ExecutionState>> 
executionGraphTerminationFuture =
+                FutureUtils.combineAll(
+                        StreamSupport.stream(
+                                        
executionGraph.getAllExecutionVertices().spliterator(),
+                                        false)
+                                
.map(ExecutionVertex::getCurrentExecutionAttempt)
+                                .map(Execution::getTerminalStateFuture)
+                                .collect(Collectors.toList()));
+
         final CompletableFuture<String> savepointFuture =
                 checkpointCoordinator
                         .triggerSynchronousSavepoint(advanceToEndOfEventTime, 
targetDirectory)
                         .thenApply(CompletedCheckpoint::getExternalPointer);
 
-        final CompletableFuture<JobStatus> terminationFuture =
-                executionGraph
-                        .getTerminationFuture()
-                        .handle(
-                                (jobstatus, throwable) -> {
-                                    if (throwable != null) {
-                                        log.info(
-                                                "Failed during stopping job {} 
with a savepoint. Reason: {}",
-                                                jobGraph.getJobID(),
-                                                throwable.getMessage());
-                                        throw new 
CompletionException(throwable);
-                                    } else if (jobstatus != 
JobStatus.FINISHED) {
-                                        log.info(
-                                                "Failed during stopping job {} 
with a savepoint. Reason: Reached state {} instead of FINISHED.",
-                                                jobGraph.getJobID(),
-                                                jobstatus);
-                                        throw new CompletionException(
-                                                new FlinkException(
-                                                        "Reached state "
-                                                                + jobstatus
-                                                                + " instead of 
FINISHED."));
-                                    }
-                                    return jobstatus;
-                                });
-
         return savepointFuture
-                .thenCompose((path) -> terminationFuture.thenApply((jobStatus 
-> path)))
+                .thenCompose(
+                        path ->
+                                executionGraphTerminationFuture
+                                        .handle(
+                                                (executionStates, throwable) 
-> {
+                                                    Set<ExecutionState> 
nonFinishedStates =
+                                                            
extractNonFinishedStates(
+                                                                    
executionStates);
+                                                    if (throwable != null) {
+                                                        log.info(
+                                                                "Failed during 
stopping job {} with a savepoint. Reason: {}",
+                                                                
jobGraph.getJobID(),
+                                                                
throwable.getMessage());
+                                                        throw new 
CompletionException(throwable);
+                                                    } else if 
(!nonFinishedStates.isEmpty()) {
+                                                        log.info(
+                                                                "Failed while 
stopping job {} after successfully creating a savepoint. A global failover is 
going to be triggered. Reason: One or more states ended up in the following 
termination states instead of FINISHED: {}",
+                                                                
jobGraph.getJobID(),
+                                                                
nonFinishedStates);
+                                                        FlinkException
+                                                                
inconsistentFinalStateException =
+                                                                        new 
FlinkException(
+                                                                               
 String.format(
+                                                                               
         "Inconsistent execution state after stopping with savepoint. A global 
fail-over was triggered to recover the job %s.",
+                                                                               
         jobGraph
+                                                                               
                 .getJobID()));
+                                                        
executionGraph.failGlobal(
+                                                                
inconsistentFinalStateException);

Review comment:
       I think the reason why we need `handleAsync` here is that 
`savepointFuture` which comes out of the `CheckpointCoordinator` might not be 
completed by the main thread. Please correct me if I am wrong with that 
@rkhachatryan.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -817,6 +818,126 @@ public void 
testStopWithSavepointFailingWithDeclinedCheckpoint() throws Exceptio
         assertThat(scheduler.getExecutionGraph().getState(), 
is(JobStatus.RUNNING));
     }
 
+    @Test
+    public void testStopWithSavepointFailingWithExpiredCheckpoint() throws 
Exception {
+        // we allow restarts right from the start since the failure is going 
to happen in the first
+        // phase (savepoint creation) of stop-with-savepoint
+        testRestartBackoffTimeStrategy.setCanRestart(true);
+
+        final JobGraph jobGraph = createTwoVertexJobGraph();
+        // set checkpoint timeout to a low value to simulate checkpoint 
expiration
+        enableCheckpointing(jobGraph, 10);
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final CountDownLatch checkpointTriggeredLatch =
+                getCheckpointTriggeredLatch(taskManagerGateway);
+
+        // we have to set a listener that checks for the termination of the 
checkpoint handling
+        OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch();
+        taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+                (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+                        checkpointAbortionWasTriggered.trigger());
+
+        // the failure handling has to happen in the same thread as the 
checkpoint coordination -
+        // that's why we have to instantiate a separate ThreadExecutorService 
here
+        final ScheduledExecutorService singleThreadExecutorService =
+                Executors.newSingleThreadScheduledExecutor();
+        final ComponentMainThreadExecutor mainThreadExecutor =
+                
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+                        singleThreadExecutorService);
+
+        final DefaultScheduler scheduler =
+                CompletableFuture.supplyAsync(
+                                () ->
+                                        createSchedulerAndStartScheduling(
+                                                jobGraph, mainThreadExecutor),
+                                mainThreadExecutor)
+                        .get();
+
+        final ExecutionAttemptID succeedingExecutionAttemptId =
+                
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final ExecutionAttemptID failingExecutionAttemptId =
+                
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+
+        final CompletableFuture<String> stopWithSavepointFuture =
+                CompletableFuture.supplyAsync(
+                                () -> {
+                                    // we have to make sure that the tasks are 
running before
+                                    // stop-with-savepoint is triggered
+                                    scheduler.updateTaskExecutionState(
+                                            new TaskExecutionState(
+                                                    jobGraph.getJobID(),
+                                                    failingExecutionAttemptId,
+                                                    ExecutionState.RUNNING));
+                                    scheduler.updateTaskExecutionState(
+                                            new TaskExecutionState(
+                                                    jobGraph.getJobID(),
+                                                    
succeedingExecutionAttemptId,
+                                                    ExecutionState.RUNNING));
+
+                                    return 
scheduler.stopWithSavepoint("savepoint-path", false);
+                                },
+                                mainThreadExecutor)
+                        .get();
+
+        checkpointTriggeredLatch.await();
+
+        final CheckpointCoordinator checkpointCoordinator = 
getCheckpointCoordinator(scheduler);
+
+        final AcknowledgeCheckpoint acknowledgeCheckpoint =
+                new AcknowledgeCheckpoint(jobGraph.getJobID(), 
succeedingExecutionAttemptId, 1);
+
+        checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
"unknown location");
+
+        // we need to wait for the expired checkpoint to be handled
+        checkpointAbortionWasTriggered.await();
+
+        CompletableFuture.runAsync(
+                        () -> {
+                            scheduler.updateTaskExecutionState(
+                                    new TaskExecutionState(
+                                            jobGraph.getJobID(),
+                                            succeedingExecutionAttemptId,
+                                            ExecutionState.FINISHED));

Review comment:
       Why does a checkpoint abortion triggers the shut down of this task?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -817,6 +818,126 @@ public void 
testStopWithSavepointFailingWithDeclinedCheckpoint() throws Exceptio
         assertThat(scheduler.getExecutionGraph().getState(), 
is(JobStatus.RUNNING));
     }
 
+    @Test
+    public void testStopWithSavepointFailingWithExpiredCheckpoint() throws 
Exception {
+        // we allow restarts right from the start since the failure is going 
to happen in the first
+        // phase (savepoint creation) of stop-with-savepoint
+        testRestartBackoffTimeStrategy.setCanRestart(true);
+
+        final JobGraph jobGraph = createTwoVertexJobGraph();
+        // set checkpoint timeout to a low value to simulate checkpoint 
expiration
+        enableCheckpointing(jobGraph, 10);
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final CountDownLatch checkpointTriggeredLatch =
+                getCheckpointTriggeredLatch(taskManagerGateway);
+
+        // we have to set a listener that checks for the termination of the 
checkpoint handling
+        OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch();
+        taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+                (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+                        checkpointAbortionWasTriggered.trigger());
+
+        // the failure handling has to happen in the same thread as the 
checkpoint coordination -
+        // that's why we have to instantiate a separate ThreadExecutorService 
here
+        final ScheduledExecutorService singleThreadExecutorService =
+                Executors.newSingleThreadScheduledExecutor();
+        final ComponentMainThreadExecutor mainThreadExecutor =
+                
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+                        singleThreadExecutorService);
+
+        final DefaultScheduler scheduler =
+                CompletableFuture.supplyAsync(
+                                () ->
+                                        createSchedulerAndStartScheduling(
+                                                jobGraph, mainThreadExecutor),
+                                mainThreadExecutor)
+                        .get();
+
+        final ExecutionAttemptID succeedingExecutionAttemptId =
+                
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final ExecutionAttemptID failingExecutionAttemptId =
+                
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+
+        final CompletableFuture<String> stopWithSavepointFuture =
+                CompletableFuture.supplyAsync(
+                                () -> {
+                                    // we have to make sure that the tasks are 
running before
+                                    // stop-with-savepoint is triggered
+                                    scheduler.updateTaskExecutionState(
+                                            new TaskExecutionState(
+                                                    jobGraph.getJobID(),
+                                                    failingExecutionAttemptId,
+                                                    ExecutionState.RUNNING));
+                                    scheduler.updateTaskExecutionState(
+                                            new TaskExecutionState(
+                                                    jobGraph.getJobID(),
+                                                    
succeedingExecutionAttemptId,
+                                                    ExecutionState.RUNNING));
+
+                                    return 
scheduler.stopWithSavepoint("savepoint-path", false);
+                                },
+                                mainThreadExecutor)
+                        .get();
+
+        checkpointTriggeredLatch.await();
+
+        final CheckpointCoordinator checkpointCoordinator = 
getCheckpointCoordinator(scheduler);
+
+        final AcknowledgeCheckpoint acknowledgeCheckpoint =
+                new AcknowledgeCheckpoint(jobGraph.getJobID(), 
succeedingExecutionAttemptId, 1);
+
+        checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
"unknown location");
+
+        // we need to wait for the expired checkpoint to be handled
+        checkpointAbortionWasTriggered.await();
+
+        CompletableFuture.runAsync(
+                        () -> {
+                            scheduler.updateTaskExecutionState(
+                                    new TaskExecutionState(
+                                            jobGraph.getJobID(),
+                                            succeedingExecutionAttemptId,
+                                            ExecutionState.FINISHED));
+                            scheduler.updateTaskExecutionState(
+                                    new TaskExecutionState(
+                                            jobGraph.getJobID(),
+                                            failingExecutionAttemptId,
+                                            ExecutionState.FAILED));
+
+                            // the restart due to failed checkpoint handling 
triggering a global job
+                            // fail-over
+                            assertThat(
+                                    
taskRestartExecutor.getNonPeriodicScheduledTask(), hasSize(1));

Review comment:
       Why does a failed checkpoint triggers a global failover? Shouldn't the 
aborted savepoint stop the stop-with-savepoint operation and let everything 
else continue normally?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -605,6 +624,289 @@ public void abortPendingCheckpointsWhenRestartingTasks() 
throws Exception {
         assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), 
is(equalTo(0)));
     }
 
+    @Test
+    public void testStopWithSavepointFailingAfterSavepointCreation() throws 
Exception {
+        // initially, we don't allow any restarts since the first phase 
(savepoint creation)
+        // succeeds without any failures
+        testRestartBackoffTimeStrategy.setCanRestart(false);
+
+        final JobGraph jobGraph = 
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final CountDownLatch checkpointTriggeredLatch =
+                getCheckpointTriggeredLatch(taskManagerGateway);
+
+        // collect executions to which the checkpoint completion was confirmed
+        final List<ExecutionAttemptID> 
executionAttemptIdsWithCompletedCheckpoint =
+                new ArrayList<>();
+        taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+                (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+                        
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+        taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+                (ignored0, ignored1, ignored2, ignored3) -> {
+                    throw new 
UnsupportedOperationException("notifyCheckpointAborted was called");
+                });
+
+        final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+        final ExecutionAttemptID succeedingExecutionAttemptId =
+                
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final ExecutionAttemptID failingExecutionAttemptId =
+                
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+
+        // we have to make sure that the tasks are running before 
stop-with-savepoint is triggered
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.RUNNING));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), succeedingExecutionAttemptId, 
ExecutionState.RUNNING));
+
+        final String savepointFolder = 
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+        // trigger savepoint and wait for checkpoint to be retrieved by 
TaskManagerGateway
+        final CompletableFuture<String> stopWithSavepointFuture =
+                scheduler.stopWithSavepoint(savepointFolder, false);
+        checkpointTriggeredLatch.await();
+
+        acknowledgePendingCheckpoint(scheduler, 1);
+
+        assertThat(
+                "Both the executions where notified about the completed 
checkpoint.",
+                executionAttemptIdsWithCompletedCheckpoint,
+                containsInAnyOrder(failingExecutionAttemptId, 
succeedingExecutionAttemptId));
+
+        // The savepoint creation succeeded a failure happens in the second 
phase when finishing
+        // the tasks. That's why, the restarting policy is enabled.
+        testRestartBackoffTimeStrategy.setCanRestart(true);
+
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.FAILED));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(),
+                        succeedingExecutionAttemptId,
+                        ExecutionState.FINISHED));
+
+        // the restarts due to local failure handling and global job fail-over 
are triggered
+        assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), 
hasSize(2));
+        taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+        try {
+            stopWithSavepointFuture.get();
+            fail("An exception is expected.");
+        } catch (ExecutionException e) {
+            Optional<FlinkException> flinkException =
+                    ExceptionUtils.findThrowable(e, FlinkException.class);
+
+            assertTrue(flinkException.isPresent());
+            assertThat(
+                    flinkException.get().getMessage(),
+                    is(
+                            String.format(
+                                    "Inconsistent execution state after 
stopping with savepoint. A global fail-over was triggered to recover the job 
%s.",
+                                    jobGraph.getJobID())));
+        }
+
+        assertThat(scheduler.getExecutionGraph().getState(), 
is(JobStatus.RUNNING));
+    }
+
+    @Test
+    public void testStopWithSavepointFailingWithDeclinedCheckpoint() throws 
Exception {
+        // we allow restarts right from the start since the failure is going 
to happen in the first
+        // phase (savepoint creation) of stop-with-savepoint
+        testRestartBackoffTimeStrategy.setCanRestart(true);
+
+        final JobGraph jobGraph = 
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final CountDownLatch checkpointTriggeredLatch =
+                getCheckpointTriggeredLatch(taskManagerGateway);
+
+        // collect executions to which the checkpoint completion was confirmed
+        CountDownLatch checkpointAbortionConfirmedLatch = new 
CountDownLatch(2);
+        final List<ExecutionAttemptID> 
executionAttemptIdsWithAbortedCheckpoint = new ArrayList<>();
+        taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+                (executionAttemptId, jobId, actualCheckpointId, timestamp) -> {
+                    
executionAttemptIdsWithAbortedCheckpoint.add(executionAttemptId);
+                    checkpointAbortionConfirmedLatch.countDown();
+                });
+        taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+                (ignored0, ignored1, ignored2, ignored3) -> {
+                    throw new 
UnsupportedOperationException("notifyCheckpointCompleted was called");
+                });
+
+        final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+        final ExecutionAttemptID succeedingExecutionAttemptId =
+                
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final ExecutionAttemptID failingExecutionAttemptId =
+                
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+
+        // we have to make sure that the tasks are running before 
stop-with-savepoint is triggered
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.RUNNING));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), succeedingExecutionAttemptId, 
ExecutionState.RUNNING));
+
+        final CompletableFuture<String> stopWithSavepointFuture =
+                scheduler.stopWithSavepoint("savepoint-path", false);
+        checkpointTriggeredLatch.await();
+
+        final CheckpointCoordinator checkpointCoordinator = 
getCheckpointCoordinator(scheduler);
+
+        final AcknowledgeCheckpoint acknowledgeCheckpoint =
+                new AcknowledgeCheckpoint(jobGraph.getJobID(), 
succeedingExecutionAttemptId, 1);
+        final DeclineCheckpoint declineCheckpoint =
+                new DeclineCheckpoint(
+                        jobGraph.getJobID(),
+                        failingExecutionAttemptId,
+                        1,
+                        new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED));
+
+        checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
"unknown location");
+        checkpointCoordinator.receiveDeclineMessage(declineCheckpoint, 
"unknown location");
+
+        // we need to wait for the confirmations to be collected since this 
running in a separate
+        // thread
+        checkpointAbortionConfirmedLatch.await();
+
+        assertThat(
+                "Both of the executions where notified about the aborted 
checkpoint.",
+                executionAttemptIdsWithAbortedCheckpoint,
+                is(Arrays.asList(succeedingExecutionAttemptId, 
failingExecutionAttemptId)));
+
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(),
+                        succeedingExecutionAttemptId,
+                        ExecutionState.FINISHED));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.FAILED));
+
+        // the restart due to failed checkpoint handling triggering a global 
job fail-over
+        assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), 
hasSize(1));
+        taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+        try {
+            stopWithSavepointFuture.get();
+            fail("An exception is expected.");
+        } catch (ExecutionException e) {
+            Optional<CheckpointException> actualCheckpointException =
+                    findThrowable(e, CheckpointException.class);
+            assertTrue(actualCheckpointException.isPresent());
+            assertThat(
+                    
actualCheckpointException.get().getCheckpointFailureReason(),
+                    is(CheckpointFailureReason.CHECKPOINT_DECLINED));
+        }
+
+        assertThat(scheduler.getExecutionGraph().getState(), 
is(JobStatus.RUNNING));
+    }
+
+    @Test
+    public void testStopWithSavepoint() throws Exception {
+        // we don't allow any restarts during the happy path
+        testRestartBackoffTimeStrategy.setCanRestart(false);
+
+        final JobGraph jobGraph = 
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final CountDownLatch checkpointTriggeredLatch =
+                getCheckpointTriggeredLatch(taskManagerGateway);
+
+        // collect executions to which the checkpoint completion was confirmed
+        final List<ExecutionAttemptID> 
executionAttemptIdsWithCompletedCheckpoint =
+                new ArrayList<>();
+        taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+                (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+                        
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+        taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+                (ignored0, ignored1, ignored2, ignored3) -> {
+                    throw new 
UnsupportedOperationException("notifyCheckpointAborted was called");
+                });
+
+        final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+        final Set<ExecutionAttemptID> executionAttemptIds =
+                StreamSupport.stream(
+                                scheduler
+                                        .getExecutionGraph()
+                                        .getAllExecutionVertices()
+                                        .spliterator(),
+                                false)
+                        .map(ExecutionVertex::getCurrentExecutionAttempt)
+                        .map(Execution::getAttemptId)
+                        .collect(Collectors.toSet());
+
+        // we have to make sure that the tasks are running before 
stop-with-savepoint is triggered
+        executionAttemptIds.forEach(
+                id ->
+                        scheduler.updateTaskExecutionState(
+                                new TaskExecutionState(
+                                        jobGraph.getJobID(), id, 
ExecutionState.RUNNING)));
+
+        final String savepointFolder = 
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+        // trigger savepoint and wait for checkpoint to be retrieved by 
TaskManagerGateway
+        final CompletableFuture<String> stopWithSavepointFuture =
+                scheduler.stopWithSavepoint(savepointFolder, false);
+        checkpointTriggeredLatch.await();
+
+        acknowledgePendingCheckpoint(scheduler, 1);
+
+        assertThat(
+                "Both the executions where notified about the completed 
checkpoint.",
+                executionAttemptIdsWithCompletedCheckpoint,
+                containsInAnyOrder(executionAttemptIds.toArray()));
+
+        executionAttemptIds.forEach(
+                id ->
+                        scheduler.updateTaskExecutionState(
+                                new TaskExecutionState(
+                                        jobGraph.getJobID(), id, 
ExecutionState.FINISHED)));
+
+        try {
+            assertThat(
+                    stopWithSavepointFuture.get(),
+                    startsWith(String.format("file:%s", savepointFolder)));
+        } catch (ExecutionException e) {
+            fail("No exception is expected.");
+        }

Review comment:
       This is an anti-pattern because it hides `e`. I would recommend to let 
the exception simply bubble up.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -605,6 +624,289 @@ public void abortPendingCheckpointsWhenRestartingTasks() 
throws Exception {
         assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), 
is(equalTo(0)));
     }
 
+    @Test
+    public void testStopWithSavepointFailingAfterSavepointCreation() throws 
Exception {
+        // initially, we don't allow any restarts since the first phase 
(savepoint creation)
+        // succeeds without any failures
+        testRestartBackoffTimeStrategy.setCanRestart(false);
+
+        final JobGraph jobGraph = 
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final CountDownLatch checkpointTriggeredLatch =
+                getCheckpointTriggeredLatch(taskManagerGateway);
+
+        // collect executions to which the checkpoint completion was confirmed
+        final List<ExecutionAttemptID> 
executionAttemptIdsWithCompletedCheckpoint =
+                new ArrayList<>();
+        taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+                (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+                        
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+        taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+                (ignored0, ignored1, ignored2, ignored3) -> {
+                    throw new 
UnsupportedOperationException("notifyCheckpointAborted was called");
+                });
+
+        final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+        final ExecutionAttemptID succeedingExecutionAttemptId =
+                
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final ExecutionAttemptID failingExecutionAttemptId =
+                
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+
+        // we have to make sure that the tasks are running before 
stop-with-savepoint is triggered
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.RUNNING));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), succeedingExecutionAttemptId, 
ExecutionState.RUNNING));
+
+        final String savepointFolder = 
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+        // trigger savepoint and wait for checkpoint to be retrieved by 
TaskManagerGateway
+        final CompletableFuture<String> stopWithSavepointFuture =
+                scheduler.stopWithSavepoint(savepointFolder, false);
+        checkpointTriggeredLatch.await();
+
+        acknowledgePendingCheckpoint(scheduler, 1);
+
+        assertThat(
+                "Both the executions where notified about the completed 
checkpoint.",
+                executionAttemptIdsWithCompletedCheckpoint,
+                containsInAnyOrder(failingExecutionAttemptId, 
succeedingExecutionAttemptId));
+
+        // The savepoint creation succeeded a failure happens in the second 
phase when finishing
+        // the tasks. That's why, the restarting policy is enabled.
+        testRestartBackoffTimeStrategy.setCanRestart(true);
+
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.FAILED));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(),
+                        succeedingExecutionAttemptId,
+                        ExecutionState.FINISHED));
+
+        // the restarts due to local failure handling and global job fail-over 
are triggered
+        assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), 
hasSize(2));
+        taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+        try {
+            stopWithSavepointFuture.get();
+            fail("An exception is expected.");
+        } catch (ExecutionException e) {
+            Optional<FlinkException> flinkException =
+                    ExceptionUtils.findThrowable(e, FlinkException.class);
+
+            assertTrue(flinkException.isPresent());
+            assertThat(
+                    flinkException.get().getMessage(),
+                    is(
+                            String.format(
+                                    "Inconsistent execution state after 
stopping with savepoint. A global fail-over was triggered to recover the job 
%s.",
+                                    jobGraph.getJobID())));

Review comment:
       You could use `FlinkMatchers.containsMessage` here.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -605,6 +624,289 @@ public void abortPendingCheckpointsWhenRestartingTasks() 
throws Exception {
         assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), 
is(equalTo(0)));
     }
 
+    @Test
+    public void testStopWithSavepointFailingAfterSavepointCreation() throws 
Exception {
+        // initially, we don't allow any restarts since the first phase 
(savepoint creation)
+        // succeeds without any failures
+        testRestartBackoffTimeStrategy.setCanRestart(false);
+
+        final JobGraph jobGraph = 
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final CountDownLatch checkpointTriggeredLatch =
+                getCheckpointTriggeredLatch(taskManagerGateway);
+
+        // collect executions to which the checkpoint completion was confirmed
+        final List<ExecutionAttemptID> 
executionAttemptIdsWithCompletedCheckpoint =
+                new ArrayList<>();
+        taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+                (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+                        
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+        taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+                (ignored0, ignored1, ignored2, ignored3) -> {
+                    throw new 
UnsupportedOperationException("notifyCheckpointAborted was called");
+                });
+
+        final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+        final ExecutionAttemptID succeedingExecutionAttemptId =
+                
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final ExecutionAttemptID failingExecutionAttemptId =
+                
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+
+        // we have to make sure that the tasks are running before 
stop-with-savepoint is triggered
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.RUNNING));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), succeedingExecutionAttemptId, 
ExecutionState.RUNNING));
+
+        final String savepointFolder = 
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+        // trigger savepoint and wait for checkpoint to be retrieved by 
TaskManagerGateway
+        final CompletableFuture<String> stopWithSavepointFuture =
+                scheduler.stopWithSavepoint(savepointFolder, false);
+        checkpointTriggeredLatch.await();
+
+        acknowledgePendingCheckpoint(scheduler, 1);
+
+        assertThat(
+                "Both the executions where notified about the completed 
checkpoint.",
+                executionAttemptIdsWithCompletedCheckpoint,
+                containsInAnyOrder(failingExecutionAttemptId, 
succeedingExecutionAttemptId));
+
+        // The savepoint creation succeeded a failure happens in the second 
phase when finishing
+        // the tasks. That's why, the restarting policy is enabled.
+        testRestartBackoffTimeStrategy.setCanRestart(true);
+
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.FAILED));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(),
+                        succeedingExecutionAttemptId,
+                        ExecutionState.FINISHED));
+
+        // the restarts due to local failure handling and global job fail-over 
are triggered
+        assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), 
hasSize(2));
+        taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+        try {
+            stopWithSavepointFuture.get();
+            fail("An exception is expected.");
+        } catch (ExecutionException e) {
+            Optional<FlinkException> flinkException =
+                    ExceptionUtils.findThrowable(e, FlinkException.class);
+
+            assertTrue(flinkException.isPresent());
+            assertThat(
+                    flinkException.get().getMessage(),
+                    is(
+                            String.format(
+                                    "Inconsistent execution state after 
stopping with savepoint. A global fail-over was triggered to recover the job 
%s.",
+                                    jobGraph.getJobID())));
+        }
+
+        assertThat(scheduler.getExecutionGraph().getState(), 
is(JobStatus.RUNNING));
+    }
+
+    @Test
+    public void testStopWithSavepointFailingWithDeclinedCheckpoint() throws 
Exception {
+        // we allow restarts right from the start since the failure is going 
to happen in the first
+        // phase (savepoint creation) of stop-with-savepoint
+        testRestartBackoffTimeStrategy.setCanRestart(true);
+
+        final JobGraph jobGraph = 
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final CountDownLatch checkpointTriggeredLatch =
+                getCheckpointTriggeredLatch(taskManagerGateway);
+
+        // collect executions to which the checkpoint completion was confirmed
+        CountDownLatch checkpointAbortionConfirmedLatch = new 
CountDownLatch(2);
+        final List<ExecutionAttemptID> 
executionAttemptIdsWithAbortedCheckpoint = new ArrayList<>();
+        taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+                (executionAttemptId, jobId, actualCheckpointId, timestamp) -> {
+                    
executionAttemptIdsWithAbortedCheckpoint.add(executionAttemptId);
+                    checkpointAbortionConfirmedLatch.countDown();
+                });
+        taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+                (ignored0, ignored1, ignored2, ignored3) -> {
+                    throw new 
UnsupportedOperationException("notifyCheckpointCompleted was called");
+                });
+
+        final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+        final ExecutionAttemptID succeedingExecutionAttemptId =
+                
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final ExecutionAttemptID failingExecutionAttemptId =
+                
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+
+        // we have to make sure that the tasks are running before 
stop-with-savepoint is triggered
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.RUNNING));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), succeedingExecutionAttemptId, 
ExecutionState.RUNNING));
+
+        final CompletableFuture<String> stopWithSavepointFuture =
+                scheduler.stopWithSavepoint("savepoint-path", false);
+        checkpointTriggeredLatch.await();
+
+        final CheckpointCoordinator checkpointCoordinator = 
getCheckpointCoordinator(scheduler);
+
+        final AcknowledgeCheckpoint acknowledgeCheckpoint =
+                new AcknowledgeCheckpoint(jobGraph.getJobID(), 
succeedingExecutionAttemptId, 1);
+        final DeclineCheckpoint declineCheckpoint =
+                new DeclineCheckpoint(
+                        jobGraph.getJobID(),
+                        failingExecutionAttemptId,
+                        1,
+                        new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED));
+
+        checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
"unknown location");
+        checkpointCoordinator.receiveDeclineMessage(declineCheckpoint, 
"unknown location");
+
+        // we need to wait for the confirmations to be collected since this 
running in a separate
+        // thread
+        checkpointAbortionConfirmedLatch.await();
+
+        assertThat(
+                "Both of the executions where notified about the aborted 
checkpoint.",
+                executionAttemptIdsWithAbortedCheckpoint,
+                is(Arrays.asList(succeedingExecutionAttemptId, 
failingExecutionAttemptId)));
+
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(),
+                        succeedingExecutionAttemptId,
+                        ExecutionState.FINISHED));

Review comment:
       Why does this task terminate?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
##########
@@ -539,6 +555,171 @@ public void testSubmitWithUnknownSavepointPath() throws 
Exception {
         }
     }
 
+    @Test
+    public void testStopWithSavepointFailingInSnapshotCreation() throws 
Exception {
+        testStopWithFailingSourceInOnePipeline(
+                new SnapshotFailingInfiniteTestSource(),
+                folder.newFolder(),
+                // two restarts expected:
+                // 1. task failure restart
+                // 2. job failover triggered by the CheckpointFailureManager
+                2,
+                assertInSnapshotCreationFailure());
+    }
+
+    @Test
+    public void testStopWithSavepointFailingAfterSnapshotCreation() throws 
Exception {
+        testStopWithFailingSourceInOnePipeline(
+                new InfiniteTestSource() {
+                    @Override
+                    public void cancel() {
+                        throw new RuntimeException(
+                                "Expected RuntimeException after snapshot 
creation.");
+                    }
+                },
+                folder.newFolder(),
+                // two restarts expected:
+                // 1. task failure restart
+                // 2. job failover triggered by SchedulerBase.stopWithSavepoint
+                2,
+                assertAfterSnapshotCreationFailure());
+    }
+
+    private static BiConsumer<JobID, ExecutionException> 
assertAfterSnapshotCreationFailure() {
+        return (jobId, actualException) -> {
+            Optional<FlinkException> actualFlinkException =
+                    ExceptionUtils.findThrowable(actualException, 
FlinkException.class);
+            assertTrue(actualFlinkException.isPresent());
+            assertThat(
+                    actualFlinkException.get().getMessage(),
+                    is(
+                            String.format(
+                                    "Inconsistent execution state after 
stopping with savepoint. A global fail-over was triggered to recover the job 
%s.",
+                                    jobId)));

Review comment:
       `FlinkMatchers.containsMessage` maybe

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -605,6 +625,414 @@ public void abortPendingCheckpointsWhenRestartingTasks() 
throws Exception {
         assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(), 
is(equalTo(0)));
     }
 
+    @Test
+    public void testStopWithSavepointFailingAfterSavepointCreation() throws 
Exception {
+        // initially, we don't allow any restarts since the first phase 
(savepoint creation)
+        // succeeds without any failures
+        testRestartBackoffTimeStrategy.setCanRestart(false);
+
+        final JobGraph jobGraph = 
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final CountDownLatch checkpointTriggeredLatch =
+                getCheckpointTriggeredLatch(taskManagerGateway);
+
+        // collect executions to which the checkpoint completion was confirmed
+        final List<ExecutionAttemptID> 
executionAttemptIdsWithCompletedCheckpoint =
+                new ArrayList<>();
+        taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+                (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+                        
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+        taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+                (ignored0, ignored1, ignored2, ignored3) -> {
+                    throw new 
UnsupportedOperationException("notifyCheckpointAborted was called");
+                });
+
+        final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+        final ExecutionAttemptID succeedingExecutionAttemptId =
+                
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final ExecutionAttemptID failingExecutionAttemptId =
+                
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+
+        // we have to make sure that the tasks are running before 
stop-with-savepoint is triggered
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.RUNNING));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), succeedingExecutionAttemptId, 
ExecutionState.RUNNING));
+
+        final String savepointFolder = 
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+        // trigger savepoint and wait for checkpoint to be retrieved by 
TaskManagerGateway
+        final CompletableFuture<String> stopWithSavepointFuture =
+                scheduler.stopWithSavepoint(savepointFolder, false);
+        checkpointTriggeredLatch.await();
+
+        acknowledgePendingCheckpoint(scheduler, 1);
+
+        assertThat(
+                "Both the executions where notified about the completed 
checkpoint.",
+                executionAttemptIdsWithCompletedCheckpoint,
+                containsInAnyOrder(failingExecutionAttemptId, 
succeedingExecutionAttemptId));
+
+        // The savepoint creation succeeded a failure happens in the second 
phase when finishing
+        // the tasks. That's why, the restarting policy is enabled.
+        testRestartBackoffTimeStrategy.setCanRestart(true);
+
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.FAILED));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(),
+                        succeedingExecutionAttemptId,
+                        ExecutionState.FINISHED));
+
+        // the restarts due to local failure handling and global job fail-over 
are triggered
+        assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), 
hasSize(2));
+        taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+        try {
+            stopWithSavepointFuture.get();
+            fail("An exception is expected.");
+        } catch (ExecutionException e) {
+            Optional<FlinkException> flinkException =
+                    ExceptionUtils.findThrowable(e, FlinkException.class);
+
+            assertTrue(flinkException.isPresent());
+            assertThat(
+                    flinkException.get().getMessage(),
+                    is(
+                            String.format(
+                                    "Inconsistent execution state after 
stopping with savepoint. A global fail-over was triggered to recover the job 
%s.",
+                                    jobGraph.getJobID())));
+        }
+
+        assertThat(scheduler.getExecutionGraph().getState(), 
is(JobStatus.RUNNING));
+    }
+
+    @Test
+    public void testStopWithSavepointFailingWithDeclinedCheckpoint() throws 
Exception {
+        // we allow restarts right from the start since the failure is going 
to happen in the first
+        // phase (savepoint creation) of stop-with-savepoint
+        testRestartBackoffTimeStrategy.setCanRestart(true);
+
+        final JobGraph jobGraph = 
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final CountDownLatch checkpointTriggeredLatch =
+                getCheckpointTriggeredLatch(taskManagerGateway);
+
+        // collect executions to which the checkpoint completion was confirmed
+        CountDownLatch checkpointAbortionConfirmedLatch = new 
CountDownLatch(2);
+        final List<ExecutionAttemptID> 
executionAttemptIdsWithAbortedCheckpoint = new ArrayList<>();
+        taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+                (executionAttemptId, jobId, actualCheckpointId, timestamp) -> {
+                    
executionAttemptIdsWithAbortedCheckpoint.add(executionAttemptId);
+                    checkpointAbortionConfirmedLatch.countDown();
+                });
+        taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+                (ignored0, ignored1, ignored2, ignored3) -> {
+                    throw new 
UnsupportedOperationException("notifyCheckpointCompleted was called");
+                });
+
+        final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+        final ExecutionAttemptID succeedingExecutionAttemptId =
+                
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final ExecutionAttemptID failingExecutionAttemptId =
+                
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+
+        // we have to make sure that the tasks are running before 
stop-with-savepoint is triggered
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.RUNNING));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), succeedingExecutionAttemptId, 
ExecutionState.RUNNING));
+
+        final CompletableFuture<String> stopWithSavepointFuture =
+                scheduler.stopWithSavepoint("savepoint-path", false);
+        checkpointTriggeredLatch.await();
+
+        final CheckpointCoordinator checkpointCoordinator = 
getCheckpointCoordinator(scheduler);
+
+        final AcknowledgeCheckpoint acknowledgeCheckpoint =
+                new AcknowledgeCheckpoint(jobGraph.getJobID(), 
succeedingExecutionAttemptId, 1);
+        final DeclineCheckpoint declineCheckpoint =
+                new DeclineCheckpoint(
+                        jobGraph.getJobID(),
+                        failingExecutionAttemptId,
+                        1,
+                        new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED));
+
+        checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
"unknown location");
+        checkpointCoordinator.receiveDeclineMessage(declineCheckpoint, 
"unknown location");
+
+        // we need to wait for the confirmations to be collected since this is 
running in a separate
+        // thread
+        checkpointAbortionConfirmedLatch.await();
+
+        assertThat(
+                "Both of the executions where notified about the aborted 
checkpoint.",
+                executionAttemptIdsWithAbortedCheckpoint,
+                is(Arrays.asList(succeedingExecutionAttemptId, 
failingExecutionAttemptId)));
+
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(),
+                        succeedingExecutionAttemptId,
+                        ExecutionState.FINISHED));
+        scheduler.updateTaskExecutionState(
+                new TaskExecutionState(
+                        jobGraph.getJobID(), failingExecutionAttemptId, 
ExecutionState.FAILED));
+
+        // the restart due to failed checkpoint handling triggering a global 
job fail-over
+        assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(), 
hasSize(1));
+        taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+        try {
+            stopWithSavepointFuture.get();
+            fail("An exception is expected.");
+        } catch (ExecutionException e) {
+            Optional<CheckpointException> actualCheckpointException =
+                    findThrowable(e, CheckpointException.class);
+            assertTrue(actualCheckpointException.isPresent());
+            assertThat(
+                    
actualCheckpointException.get().getCheckpointFailureReason(),
+                    is(CheckpointFailureReason.CHECKPOINT_DECLINED));
+        }
+
+        assertThat(scheduler.getExecutionGraph().getState(), 
is(JobStatus.RUNNING));
+    }
+
+    @Test
+    public void testStopWithSavepointFailingWithExpiredCheckpoint() throws 
Exception {
+        // we allow restarts right from the start since the failure is going 
to happen in the first
+        // phase (savepoint creation) of stop-with-savepoint
+        testRestartBackoffTimeStrategy.setCanRestart(true);
+
+        final JobGraph jobGraph = createTwoVertexJobGraph();
+        // set checkpoint timeout to a low value to simulate checkpoint 
expiration
+        enableCheckpointing(jobGraph, 10);
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final CountDownLatch checkpointTriggeredLatch =
+                getCheckpointTriggeredLatch(taskManagerGateway);
+
+        // we have to set a listener that checks for the termination of the 
checkpoint handling
+        OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch();
+        taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+                (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+                        checkpointAbortionWasTriggered.trigger());
+
+        // the failure handling has to happen in the same thread as the 
checkpoint coordination -
+        // that's why we have to instantiate a separate ThreadExecutorService 
here
+        final ScheduledExecutorService singleThreadExecutorService =

Review comment:
       `CheckpointCoordinator` is a mess wrt its internal threading model. This 
is definitely out of the scope for this PR. I think we want to refactor the 
`CC` for more than 3 release now.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -817,6 +818,126 @@ public void 
testStopWithSavepointFailingWithDeclinedCheckpoint() throws Exceptio
         assertThat(scheduler.getExecutionGraph().getState(), 
is(JobStatus.RUNNING));
     }
 
+    @Test
+    public void testStopWithSavepointFailingWithExpiredCheckpoint() throws 
Exception {
+        // we allow restarts right from the start since the failure is going 
to happen in the first
+        // phase (savepoint creation) of stop-with-savepoint
+        testRestartBackoffTimeStrategy.setCanRestart(true);
+
+        final JobGraph jobGraph = createTwoVertexJobGraph();
+        // set checkpoint timeout to a low value to simulate checkpoint 
expiration
+        enableCheckpointing(jobGraph, 10);
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final CountDownLatch checkpointTriggeredLatch =
+                getCheckpointTriggeredLatch(taskManagerGateway);
+
+        // we have to set a listener that checks for the termination of the 
checkpoint handling
+        OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch();
+        taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+                (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+                        checkpointAbortionWasTriggered.trigger());
+
+        // the failure handling has to happen in the same thread as the 
checkpoint coordination -
+        // that's why we have to instantiate a separate ThreadExecutorService 
here
+        final ScheduledExecutorService singleThreadExecutorService =
+                Executors.newSingleThreadScheduledExecutor();
+        final ComponentMainThreadExecutor mainThreadExecutor =
+                
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+                        singleThreadExecutorService);
+
+        final DefaultScheduler scheduler =
+                CompletableFuture.supplyAsync(
+                                () ->
+                                        createSchedulerAndStartScheduling(
+                                                jobGraph, mainThreadExecutor),
+                                mainThreadExecutor)
+                        .get();
+
+        final ExecutionAttemptID succeedingExecutionAttemptId =
+                
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final ExecutionAttemptID failingExecutionAttemptId =
+                
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+
+        final CompletableFuture<String> stopWithSavepointFuture =
+                CompletableFuture.supplyAsync(
+                                () -> {
+                                    // we have to make sure that the tasks are 
running before
+                                    // stop-with-savepoint is triggered
+                                    scheduler.updateTaskExecutionState(
+                                            new TaskExecutionState(
+                                                    jobGraph.getJobID(),
+                                                    failingExecutionAttemptId,
+                                                    ExecutionState.RUNNING));
+                                    scheduler.updateTaskExecutionState(
+                                            new TaskExecutionState(
+                                                    jobGraph.getJobID(),
+                                                    
succeedingExecutionAttemptId,
+                                                    ExecutionState.RUNNING));
+
+                                    return 
scheduler.stopWithSavepoint("savepoint-path", false);
+                                },
+                                mainThreadExecutor)
+                        .get();
+
+        checkpointTriggeredLatch.await();
+
+        final CheckpointCoordinator checkpointCoordinator = 
getCheckpointCoordinator(scheduler);
+
+        final AcknowledgeCheckpoint acknowledgeCheckpoint =
+                new AcknowledgeCheckpoint(jobGraph.getJobID(), 
succeedingExecutionAttemptId, 1);
+
+        checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
"unknown location");
+
+        // we need to wait for the expired checkpoint to be handled
+        checkpointAbortionWasTriggered.await();
+
+        CompletableFuture.runAsync(
+                        () -> {
+                            scheduler.updateTaskExecutionState(
+                                    new TaskExecutionState(
+                                            jobGraph.getJobID(),
+                                            succeedingExecutionAttemptId,
+                                            ExecutionState.FINISHED));
+                            scheduler.updateTaskExecutionState(
+                                    new TaskExecutionState(
+                                            jobGraph.getJobID(),
+                                            failingExecutionAttemptId,
+                                            ExecutionState.FAILED));
+
+                            // the restart due to failed checkpoint handling 
triggering a global job
+                            // fail-over
+                            assertThat(
+                                    
taskRestartExecutor.getNonPeriodicScheduledTask(), hasSize(1));
+                            
taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+                        },
+                        mainThreadExecutor)
+                .get();
+
+        try {
+            stopWithSavepointFuture.get();
+            fail("An exception is expected.");
+        } catch (ExecutionException e) {
+            Optional<CheckpointException> actualCheckpointException =
+                    findThrowable(e, CheckpointException.class);
+            assertTrue(actualCheckpointException.isPresent());
+            assertThat(
+                    
actualCheckpointException.get().getCheckpointFailureReason(),
+                    is(CheckpointFailureReason.CHECKPOINT_EXPIRED));
+        }
+
+        // we have to wait for the main executor to be finished with 
restarting tasks
+        singleThreadExecutorService.shutdown();
+        singleThreadExecutorService.awaitTermination(1, TimeUnit.SECONDS);
+
+        assertThat(scheduler.getExecutionGraph().getState(), 
is(JobStatus.RUNNING));

Review comment:
       This access shouldn't be thread-safe. Hence it is bound to fail 
eventually.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -817,6 +818,126 @@ public void 
testStopWithSavepointFailingWithDeclinedCheckpoint() throws Exceptio
         assertThat(scheduler.getExecutionGraph().getState(), 
is(JobStatus.RUNNING));
     }
 
+    @Test
+    public void testStopWithSavepointFailingWithExpiredCheckpoint() throws 
Exception {
+        // we allow restarts right from the start since the failure is going 
to happen in the first
+        // phase (savepoint creation) of stop-with-savepoint
+        testRestartBackoffTimeStrategy.setCanRestart(true);
+
+        final JobGraph jobGraph = createTwoVertexJobGraph();
+        // set checkpoint timeout to a low value to simulate checkpoint 
expiration
+        enableCheckpointing(jobGraph, 10);
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final CountDownLatch checkpointTriggeredLatch =
+                getCheckpointTriggeredLatch(taskManagerGateway);
+
+        // we have to set a listener that checks for the termination of the 
checkpoint handling
+        OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch();
+        taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+                (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+                        checkpointAbortionWasTriggered.trigger());
+
+        // the failure handling has to happen in the same thread as the 
checkpoint coordination -
+        // that's why we have to instantiate a separate ThreadExecutorService 
here
+        final ScheduledExecutorService singleThreadExecutorService =
+                Executors.newSingleThreadScheduledExecutor();
+        final ComponentMainThreadExecutor mainThreadExecutor =
+                
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+                        singleThreadExecutorService);
+
+        final DefaultScheduler scheduler =
+                CompletableFuture.supplyAsync(
+                                () ->
+                                        createSchedulerAndStartScheduling(
+                                                jobGraph, mainThreadExecutor),
+                                mainThreadExecutor)
+                        .get();
+
+        final ExecutionAttemptID succeedingExecutionAttemptId =
+                
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final ExecutionAttemptID failingExecutionAttemptId =
+                
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+
+        final CompletableFuture<String> stopWithSavepointFuture =
+                CompletableFuture.supplyAsync(
+                                () -> {
+                                    // we have to make sure that the tasks are 
running before
+                                    // stop-with-savepoint is triggered
+                                    scheduler.updateTaskExecutionState(
+                                            new TaskExecutionState(
+                                                    jobGraph.getJobID(),
+                                                    failingExecutionAttemptId,
+                                                    ExecutionState.RUNNING));
+                                    scheduler.updateTaskExecutionState(
+                                            new TaskExecutionState(
+                                                    jobGraph.getJobID(),
+                                                    
succeedingExecutionAttemptId,
+                                                    ExecutionState.RUNNING));
+
+                                    return 
scheduler.stopWithSavepoint("savepoint-path", false);
+                                },
+                                mainThreadExecutor)
+                        .get();
+
+        checkpointTriggeredLatch.await();
+
+        final CheckpointCoordinator checkpointCoordinator = 
getCheckpointCoordinator(scheduler);
+
+        final AcknowledgeCheckpoint acknowledgeCheckpoint =
+                new AcknowledgeCheckpoint(jobGraph.getJobID(), 
succeedingExecutionAttemptId, 1);
+
+        checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
"unknown location");
+
+        // we need to wait for the expired checkpoint to be handled
+        checkpointAbortionWasTriggered.await();
+
+        CompletableFuture.runAsync(
+                        () -> {
+                            scheduler.updateTaskExecutionState(
+                                    new TaskExecutionState(
+                                            jobGraph.getJobID(),
+                                            succeedingExecutionAttemptId,
+                                            ExecutionState.FINISHED));
+                            scheduler.updateTaskExecutionState(
+                                    new TaskExecutionState(
+                                            jobGraph.getJobID(),
+                                            failingExecutionAttemptId,
+                                            ExecutionState.FAILED));
+
+                            // the restart due to failed checkpoint handling 
triggering a global job
+                            // fail-over
+                            assertThat(
+                                    
taskRestartExecutor.getNonPeriodicScheduledTask(), hasSize(1));
+                            
taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+                        },
+                        mainThreadExecutor)
+                .get();
+
+        try {
+            stopWithSavepointFuture.get();
+            fail("An exception is expected.");
+        } catch (ExecutionException e) {
+            Optional<CheckpointException> actualCheckpointException =
+                    findThrowable(e, CheckpointException.class);
+            assertTrue(actualCheckpointException.isPresent());
+            assertThat(
+                    
actualCheckpointException.get().getCheckpointFailureReason(),
+                    is(CheckpointFailureReason.CHECKPOINT_EXPIRED));
+        }
+
+        // we have to wait for the main executor to be finished with 
restarting tasks
+        singleThreadExecutorService.shutdown();
+        singleThreadExecutorService.awaitTermination(1, TimeUnit.SECONDS);
+
+        assertThat(scheduler.getExecutionGraph().getState(), 
is(JobStatus.RUNNING));

Review comment:
       I think it is dangerous to first shut down things and then assume that 
things have been done. This couples a lot of internal details because this only 
works if at the time of shutting down the executor all runnables to run need to 
be enqueued. If a runnable enqueues something else, then this won't work 
anymore.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to