tillrohrmann commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r576091041
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -605,6 +624,289 @@ public void abortPendingCheckpointsWhenRestartingTasks()
throws Exception {
assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(),
is(equalTo(0)));
}
+ @Test
+ public void testStopWithSavepointFailingAfterSavepointCreation() throws
Exception {
+ // initially, we don't allow any restarts since the first phase
(savepoint creation)
+ // succeeds without any failures
+ testRestartBackoffTimeStrategy.setCanRestart(false);
+
+ final JobGraph jobGraph =
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+ final SimpleAckingTaskManagerGateway taskManagerGateway =
+ new SimpleAckingTaskManagerGateway();
+ final CountDownLatch checkpointTriggeredLatch =
+ getCheckpointTriggeredLatch(taskManagerGateway);
+
+ // collect executions to which the checkpoint completion was confirmed
+ final List<ExecutionAttemptID>
executionAttemptIdsWithCompletedCheckpoint =
+ new ArrayList<>();
+ taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+ (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+ taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+ (ignored0, ignored1, ignored2, ignored3) -> {
+ throw new
UnsupportedOperationException("notifyCheckpointAborted was called");
+ });
+
+ final DefaultScheduler scheduler =
createSchedulerAndStartScheduling(jobGraph);
+
+ final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+ final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+
+ // we have to make sure that the tasks are running before
stop-with-savepoint is triggered
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.RUNNING));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), succeedingExecutionAttemptId,
ExecutionState.RUNNING));
+
+ final String savepointFolder =
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+ // trigger savepoint and wait for checkpoint to be retrieved by
TaskManagerGateway
+ final CompletableFuture<String> stopWithSavepointFuture =
+ scheduler.stopWithSavepoint(savepointFolder, false);
+ checkpointTriggeredLatch.await();
+
+ acknowledgePendingCheckpoint(scheduler, 1);
+
+ assertThat(
+ "Both the executions where notified about the completed
checkpoint.",
+ executionAttemptIdsWithCompletedCheckpoint,
+ containsInAnyOrder(failingExecutionAttemptId,
succeedingExecutionAttemptId));
+
+ // The savepoint creation succeeded a failure happens in the second
phase when finishing
+ // the tasks. That's why, the restarting policy is enabled.
+ testRestartBackoffTimeStrategy.setCanRestart(true);
+
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.FAILED));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ succeedingExecutionAttemptId,
+ ExecutionState.FINISHED));
+
+ // the restarts due to local failure handling and global job fail-over
are triggered
+ assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(),
hasSize(2));
Review comment:
Let's not assert on these details. This could change in the future.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -908,38 +909,57 @@ public void reportCheckpointMetrics(
// will be restarted by the CheckpointCoordinatorDeActivator.
checkpointCoordinator.stopCheckpointScheduler();
+ final CompletableFuture<Collection<ExecutionState>>
executionGraphTerminationFuture =
+ FutureUtils.combineAll(
+ StreamSupport.stream(
+
executionGraph.getAllExecutionVertices().spliterator(),
+ false)
+
.map(ExecutionVertex::getCurrentExecutionAttempt)
+ .map(Execution::getTerminalStateFuture)
+ .collect(Collectors.toList()));
+
final CompletableFuture<String> savepointFuture =
checkpointCoordinator
.triggerSynchronousSavepoint(advanceToEndOfEventTime,
targetDirectory)
.thenApply(CompletedCheckpoint::getExternalPointer);
- final CompletableFuture<JobStatus> terminationFuture =
- executionGraph
- .getTerminationFuture()
- .handle(
- (jobstatus, throwable) -> {
- if (throwable != null) {
- log.info(
- "Failed during stopping job {}
with a savepoint. Reason: {}",
- jobGraph.getJobID(),
- throwable.getMessage());
- throw new
CompletionException(throwable);
- } else if (jobstatus !=
JobStatus.FINISHED) {
- log.info(
- "Failed during stopping job {}
with a savepoint. Reason: Reached state {} instead of FINISHED.",
- jobGraph.getJobID(),
- jobstatus);
- throw new CompletionException(
- new FlinkException(
- "Reached state "
- + jobstatus
- + " instead of
FINISHED."));
- }
- return jobstatus;
- });
-
return savepointFuture
- .thenCompose((path) -> terminationFuture.thenApply((jobStatus
-> path)))
+ .thenCompose(
+ path ->
+ executionGraphTerminationFuture
+ .handleAsync(
+ (executionStates, throwable)
-> {
+ Set<ExecutionState>
nonFinishedStates =
+
extractNonFinishedStates(
+
executionStates);
+ if (throwable != null) {
+ log.info(
+ "Failed during
stopping job {} with a savepoint. Reason: {}",
+
jobGraph.getJobID(),
+
throwable.getMessage());
+ throw new
CompletionException(throwable);
+ } else if
(!nonFinishedStates.isEmpty()) {
+ log.info(
+ "Failed while
stopping job {} after successfully creating a savepoint. A global failover is
going to be triggered. Reason: One or more states ended up in the following
termination states instead of FINISHED: {}",
+
jobGraph.getJobID(),
+
nonFinishedStates);
+ FlinkException
+
inconsistentFinalStateException =
+ new
FlinkException(
+
String.format(
+
"Inconsistent execution state after stopping with savepoint. A global
fail-over was triggered to recover the job %s.",
+
jobGraph
+
.getJobID()));
+
executionGraph.failGlobal(
+
inconsistentFinalStateException);
+
+ throw new
CompletionException(
+
inconsistentFinalStateException);
+ }
+ return JobStatus.FINISHED;
+ },
+ mainThreadExecutor)
+ .thenApply(jobStatus -> path))
Review comment:
This code looks rather complex. I am wondering whether we couldn't
introduce something like a `StopWithSavepointOperation` which encapsulates this
logic. This would allows us to better test this functionality in isolation as
well.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -908,38 +909,57 @@ public void reportCheckpointMetrics(
// will be restarted by the CheckpointCoordinatorDeActivator.
checkpointCoordinator.stopCheckpointScheduler();
+ final CompletableFuture<Collection<ExecutionState>>
executionGraphTerminationFuture =
Review comment:
The name of this variable is confusing to me since it is not the
execution graph termination future.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -605,6 +624,289 @@ public void abortPendingCheckpointsWhenRestartingTasks()
throws Exception {
assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(),
is(equalTo(0)));
}
+ @Test
+ public void testStopWithSavepointFailingAfterSavepointCreation() throws
Exception {
+ // initially, we don't allow any restarts since the first phase
(savepoint creation)
+ // succeeds without any failures
+ testRestartBackoffTimeStrategy.setCanRestart(false);
+
+ final JobGraph jobGraph =
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+ final SimpleAckingTaskManagerGateway taskManagerGateway =
+ new SimpleAckingTaskManagerGateway();
+ final CountDownLatch checkpointTriggeredLatch =
+ getCheckpointTriggeredLatch(taskManagerGateway);
+
+ // collect executions to which the checkpoint completion was confirmed
+ final List<ExecutionAttemptID>
executionAttemptIdsWithCompletedCheckpoint =
+ new ArrayList<>();
+ taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+ (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+ taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+ (ignored0, ignored1, ignored2, ignored3) -> {
+ throw new
UnsupportedOperationException("notifyCheckpointAborted was called");
+ });
+
+ final DefaultScheduler scheduler =
createSchedulerAndStartScheduling(jobGraph);
+
+ final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+ final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+
+ // we have to make sure that the tasks are running before
stop-with-savepoint is triggered
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.RUNNING));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), succeedingExecutionAttemptId,
ExecutionState.RUNNING));
+
+ final String savepointFolder =
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+ // trigger savepoint and wait for checkpoint to be retrieved by
TaskManagerGateway
+ final CompletableFuture<String> stopWithSavepointFuture =
+ scheduler.stopWithSavepoint(savepointFolder, false);
+ checkpointTriggeredLatch.await();
+
+ acknowledgePendingCheckpoint(scheduler, 1);
+
+ assertThat(
+ "Both the executions where notified about the completed
checkpoint.",
+ executionAttemptIdsWithCompletedCheckpoint,
+ containsInAnyOrder(failingExecutionAttemptId,
succeedingExecutionAttemptId));
+
+ // The savepoint creation succeeded a failure happens in the second
phase when finishing
+ // the tasks. That's why, the restarting policy is enabled.
+ testRestartBackoffTimeStrategy.setCanRestart(true);
+
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.FAILED));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ succeedingExecutionAttemptId,
+ ExecutionState.FINISHED));
+
+ // the restarts due to local failure handling and global job fail-over
are triggered
+ assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(),
hasSize(2));
+ taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+ try {
+ stopWithSavepointFuture.get();
+ fail("An exception is expected.");
+ } catch (ExecutionException e) {
+ Optional<FlinkException> flinkException =
+ ExceptionUtils.findThrowable(e, FlinkException.class);
+
+ assertTrue(flinkException.isPresent());
+ assertThat(
+ flinkException.get().getMessage(),
+ is(
+ String.format(
+ "Inconsistent execution state after
stopping with savepoint. A global fail-over was triggered to recover the job
%s.",
+ jobGraph.getJobID())));
+ }
+
+ assertThat(scheduler.getExecutionGraph().getState(),
is(JobStatus.RUNNING));
Review comment:
How do we assert that all executions are running again? I think this
test should almost pass w/o your fix to the problem if there weren't the
assertion `assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(),
hasSize(2))`.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -605,6 +624,289 @@ public void abortPendingCheckpointsWhenRestartingTasks()
throws Exception {
assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(),
is(equalTo(0)));
}
+ @Test
+ public void testStopWithSavepointFailingAfterSavepointCreation() throws
Exception {
+ // initially, we don't allow any restarts since the first phase
(savepoint creation)
+ // succeeds without any failures
+ testRestartBackoffTimeStrategy.setCanRestart(false);
+
+ final JobGraph jobGraph =
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+ final SimpleAckingTaskManagerGateway taskManagerGateway =
+ new SimpleAckingTaskManagerGateway();
+ final CountDownLatch checkpointTriggeredLatch =
+ getCheckpointTriggeredLatch(taskManagerGateway);
+
+ // collect executions to which the checkpoint completion was confirmed
+ final List<ExecutionAttemptID>
executionAttemptIdsWithCompletedCheckpoint =
+ new ArrayList<>();
+ taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+ (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+ taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+ (ignored0, ignored1, ignored2, ignored3) -> {
+ throw new
UnsupportedOperationException("notifyCheckpointAborted was called");
+ });
+
+ final DefaultScheduler scheduler =
createSchedulerAndStartScheduling(jobGraph);
+
+ final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+ final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+
+ // we have to make sure that the tasks are running before
stop-with-savepoint is triggered
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.RUNNING));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), succeedingExecutionAttemptId,
ExecutionState.RUNNING));
+
+ final String savepointFolder =
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+ // trigger savepoint and wait for checkpoint to be retrieved by
TaskManagerGateway
+ final CompletableFuture<String> stopWithSavepointFuture =
+ scheduler.stopWithSavepoint(savepointFolder, false);
+ checkpointTriggeredLatch.await();
+
+ acknowledgePendingCheckpoint(scheduler, 1);
+
+ assertThat(
+ "Both the executions where notified about the completed
checkpoint.",
+ executionAttemptIdsWithCompletedCheckpoint,
+ containsInAnyOrder(failingExecutionAttemptId,
succeedingExecutionAttemptId));
+
+ // The savepoint creation succeeded a failure happens in the second
phase when finishing
+ // the tasks. That's why, the restarting policy is enabled.
+ testRestartBackoffTimeStrategy.setCanRestart(true);
+
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.FAILED));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ succeedingExecutionAttemptId,
+ ExecutionState.FINISHED));
+
+ // the restarts due to local failure handling and global job fail-over
are triggered
+ assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(),
hasSize(2));
+ taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+ try {
+ stopWithSavepointFuture.get();
+ fail("An exception is expected.");
+ } catch (ExecutionException e) {
+ Optional<FlinkException> flinkException =
+ ExceptionUtils.findThrowable(e, FlinkException.class);
+
+ assertTrue(flinkException.isPresent());
+ assertThat(
+ flinkException.get().getMessage(),
+ is(
+ String.format(
+ "Inconsistent execution state after
stopping with savepoint. A global fail-over was triggered to recover the job
%s.",
+ jobGraph.getJobID())));
+ }
+
+ assertThat(scheduler.getExecutionGraph().getState(),
is(JobStatus.RUNNING));
+ }
+
+ @Test
+ public void testStopWithSavepointFailingWithDeclinedCheckpoint() throws
Exception {
+ // we allow restarts right from the start since the failure is going
to happen in the first
+ // phase (savepoint creation) of stop-with-savepoint
+ testRestartBackoffTimeStrategy.setCanRestart(true);
+
+ final JobGraph jobGraph =
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+ final SimpleAckingTaskManagerGateway taskManagerGateway =
+ new SimpleAckingTaskManagerGateway();
+ final CountDownLatch checkpointTriggeredLatch =
+ getCheckpointTriggeredLatch(taskManagerGateway);
+
+ // collect executions to which the checkpoint completion was confirmed
+ CountDownLatch checkpointAbortionConfirmedLatch = new
CountDownLatch(2);
+ final List<ExecutionAttemptID>
executionAttemptIdsWithAbortedCheckpoint = new ArrayList<>();
+ taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+ (executionAttemptId, jobId, actualCheckpointId, timestamp) -> {
+
executionAttemptIdsWithAbortedCheckpoint.add(executionAttemptId);
+ checkpointAbortionConfirmedLatch.countDown();
+ });
+ taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+ (ignored0, ignored1, ignored2, ignored3) -> {
+ throw new
UnsupportedOperationException("notifyCheckpointCompleted was called");
+ });
+
+ final DefaultScheduler scheduler =
createSchedulerAndStartScheduling(jobGraph);
+
+ final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+ final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+
+ // we have to make sure that the tasks are running before
stop-with-savepoint is triggered
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.RUNNING));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), succeedingExecutionAttemptId,
ExecutionState.RUNNING));
+
+ final CompletableFuture<String> stopWithSavepointFuture =
+ scheduler.stopWithSavepoint("savepoint-path", false);
+ checkpointTriggeredLatch.await();
+
+ final CheckpointCoordinator checkpointCoordinator =
getCheckpointCoordinator(scheduler);
+
+ final AcknowledgeCheckpoint acknowledgeCheckpoint =
+ new AcknowledgeCheckpoint(jobGraph.getJobID(),
succeedingExecutionAttemptId, 1);
+ final DeclineCheckpoint declineCheckpoint =
+ new DeclineCheckpoint(
+ jobGraph.getJobID(),
+ failingExecutionAttemptId,
+ 1,
+ new
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED));
+
+ checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint,
"unknown location");
+ checkpointCoordinator.receiveDeclineMessage(declineCheckpoint,
"unknown location");
+
+ // we need to wait for the confirmations to be collected since this
running in a separate
+ // thread
+ checkpointAbortionConfirmedLatch.await();
+
+ assertThat(
+ "Both of the executions where notified about the aborted
checkpoint.",
+ executionAttemptIdsWithAbortedCheckpoint,
+ is(Arrays.asList(succeedingExecutionAttemptId,
failingExecutionAttemptId)));
+
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ succeedingExecutionAttemptId,
+ ExecutionState.FINISHED));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.FAILED));
Review comment:
Why is this failure necessary? Or differently asked, what are we testing
here exactly?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -817,6 +818,126 @@ public void
testStopWithSavepointFailingWithDeclinedCheckpoint() throws Exceptio
assertThat(scheduler.getExecutionGraph().getState(),
is(JobStatus.RUNNING));
}
+ @Test
+ public void testStopWithSavepointFailingWithExpiredCheckpoint() throws
Exception {
+ // we allow restarts right from the start since the failure is going
to happen in the first
+ // phase (savepoint creation) of stop-with-savepoint
+ testRestartBackoffTimeStrategy.setCanRestart(true);
+
+ final JobGraph jobGraph = createTwoVertexJobGraph();
+ // set checkpoint timeout to a low value to simulate checkpoint
expiration
+ enableCheckpointing(jobGraph, 10);
+
+ final SimpleAckingTaskManagerGateway taskManagerGateway =
+ new SimpleAckingTaskManagerGateway();
+ final CountDownLatch checkpointTriggeredLatch =
+ getCheckpointTriggeredLatch(taskManagerGateway);
+
+ // we have to set a listener that checks for the termination of the
checkpoint handling
+ OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch();
+ taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+ (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+ checkpointAbortionWasTriggered.trigger());
+
+ // the failure handling has to happen in the same thread as the
checkpoint coordination -
+ // that's why we have to instantiate a separate ThreadExecutorService
here
+ final ScheduledExecutorService singleThreadExecutorService =
+ Executors.newSingleThreadScheduledExecutor();
+ final ComponentMainThreadExecutor mainThreadExecutor =
+
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+ singleThreadExecutorService);
+
+ final DefaultScheduler scheduler =
+ CompletableFuture.supplyAsync(
+ () ->
+ createSchedulerAndStartScheduling(
+ jobGraph, mainThreadExecutor),
+ mainThreadExecutor)
+ .get();
+
+ final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+ final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+
+ final CompletableFuture<String> stopWithSavepointFuture =
+ CompletableFuture.supplyAsync(
+ () -> {
+ // we have to make sure that the tasks are
running before
+ // stop-with-savepoint is triggered
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ failingExecutionAttemptId,
+ ExecutionState.RUNNING));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+
succeedingExecutionAttemptId,
+ ExecutionState.RUNNING));
+
+ return
scheduler.stopWithSavepoint("savepoint-path", false);
+ },
+ mainThreadExecutor)
+ .get();
+
+ checkpointTriggeredLatch.await();
+
+ final CheckpointCoordinator checkpointCoordinator =
getCheckpointCoordinator(scheduler);
+
+ final AcknowledgeCheckpoint acknowledgeCheckpoint =
+ new AcknowledgeCheckpoint(jobGraph.getJobID(),
succeedingExecutionAttemptId, 1);
+
+ checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint,
"unknown location");
+
+ // we need to wait for the expired checkpoint to be handled
+ checkpointAbortionWasTriggered.await();
+
+ CompletableFuture.runAsync(
+ () -> {
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ succeedingExecutionAttemptId,
+ ExecutionState.FINISHED));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ failingExecutionAttemptId,
+ ExecutionState.FAILED));
+
+ // the restart due to failed checkpoint handling
triggering a global job
+ // fail-over
+ assertThat(
+
taskRestartExecutor.getNonPeriodicScheduledTask(), hasSize(1));
+
taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+ },
+ mainThreadExecutor)
+ .get();
+
+ try {
+ stopWithSavepointFuture.get();
+ fail("An exception is expected.");
+ } catch (ExecutionException e) {
+ Optional<CheckpointException> actualCheckpointException =
+ findThrowable(e, CheckpointException.class);
+ assertTrue(actualCheckpointException.isPresent());
+ assertThat(
+
actualCheckpointException.get().getCheckpointFailureReason(),
+ is(CheckpointFailureReason.CHECKPOINT_EXPIRED));
+ }
+
+ // we have to wait for the main executor to be finished with
restarting tasks
+ singleThreadExecutorService.shutdown();
+ singleThreadExecutorService.awaitTermination(1, TimeUnit.SECONDS);
Review comment:
There is a `ExecutorUtils.gracefulShutDown` method for stopping
`ExecutorServices`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -908,38 +909,56 @@ public void reportCheckpointMetrics(
// will be restarted by the CheckpointCoordinatorDeActivator.
checkpointCoordinator.stopCheckpointScheduler();
+ final CompletableFuture<Collection<ExecutionState>>
executionGraphTerminationFuture =
+ FutureUtils.combineAll(
+ StreamSupport.stream(
+
executionGraph.getAllExecutionVertices().spliterator(),
Review comment:
Pipelined regions cannot be used here because they are only used for
scheduling and failover. The state management still works on a `Execution`
basis.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
##########
@@ -144,18 +144,31 @@ public static DefaultSchedulerBuilder
createSchedulerBuilder(
}
public static void enableCheckpointing(final JobGraph jobGraph) {
- enableCheckpointing(jobGraph, null, null);
+ enableCheckpointing(jobGraph, DEFAULT_CHECKPOINT_TIMEOUT_MS);
+ }
+
+ public static void enableCheckpointing(final JobGraph jobGraph, long
checkpointTimeout) {
+ enableCheckpointing(jobGraph, checkpointTimeout, null, null);
+ }
+
+ public static void enableCheckpointing(
+ final JobGraph jobGraph,
+ StateBackend stateBackend,
+ CheckpointStorage checkpointStorage) {
Review comment:
`@Nullable` is missing.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -908,38 +909,56 @@ public void reportCheckpointMetrics(
// will be restarted by the CheckpointCoordinatorDeActivator.
checkpointCoordinator.stopCheckpointScheduler();
+ final CompletableFuture<Collection<ExecutionState>>
executionGraphTerminationFuture =
+ FutureUtils.combineAll(
+ StreamSupport.stream(
+
executionGraph.getAllExecutionVertices().spliterator(),
+ false)
+
.map(ExecutionVertex::getCurrentExecutionAttempt)
+ .map(Execution::getTerminalStateFuture)
+ .collect(Collectors.toList()));
+
final CompletableFuture<String> savepointFuture =
checkpointCoordinator
.triggerSynchronousSavepoint(advanceToEndOfEventTime,
targetDirectory)
.thenApply(CompletedCheckpoint::getExternalPointer);
- final CompletableFuture<JobStatus> terminationFuture =
- executionGraph
- .getTerminationFuture()
- .handle(
- (jobstatus, throwable) -> {
- if (throwable != null) {
- log.info(
- "Failed during stopping job {}
with a savepoint. Reason: {}",
- jobGraph.getJobID(),
- throwable.getMessage());
- throw new
CompletionException(throwable);
- } else if (jobstatus !=
JobStatus.FINISHED) {
- log.info(
- "Failed during stopping job {}
with a savepoint. Reason: Reached state {} instead of FINISHED.",
- jobGraph.getJobID(),
- jobstatus);
- throw new CompletionException(
- new FlinkException(
- "Reached state "
- + jobstatus
- + " instead of
FINISHED."));
- }
- return jobstatus;
- });
-
return savepointFuture
- .thenCompose((path) -> terminationFuture.thenApply((jobStatus
-> path)))
+ .thenCompose(
+ path ->
+ executionGraphTerminationFuture
+ .handle(
+ (executionStates, throwable)
-> {
+ Set<ExecutionState>
nonFinishedStates =
+
extractNonFinishedStates(
+
executionStates);
+ if (throwable != null) {
+ log.info(
+ "Failed during
stopping job {} with a savepoint. Reason: {}",
+
jobGraph.getJobID(),
+
throwable.getMessage());
+ throw new
CompletionException(throwable);
+ } else if
(!nonFinishedStates.isEmpty()) {
+ log.info(
+ "Failed while
stopping job {} after successfully creating a savepoint. A global failover is
going to be triggered. Reason: One or more states ended up in the following
termination states instead of FINISHED: {}",
+
jobGraph.getJobID(),
+
nonFinishedStates);
+ FlinkException
+
inconsistentFinalStateException =
+ new
FlinkException(
+
String.format(
+
"Inconsistent execution state after stopping with savepoint. A global
fail-over was triggered to recover the job %s.",
+
jobGraph
+
.getJobID()));
+
executionGraph.failGlobal(
+
inconsistentFinalStateException);
Review comment:
I think the reason why we need `handleAsync` here is that
`savepointFuture` which comes out of the `CheckpointCoordinator` might not be
completed by the main thread. Please correct me if I am wrong with that
@rkhachatryan.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -817,6 +818,126 @@ public void
testStopWithSavepointFailingWithDeclinedCheckpoint() throws Exceptio
assertThat(scheduler.getExecutionGraph().getState(),
is(JobStatus.RUNNING));
}
+ @Test
+ public void testStopWithSavepointFailingWithExpiredCheckpoint() throws
Exception {
+ // we allow restarts right from the start since the failure is going
to happen in the first
+ // phase (savepoint creation) of stop-with-savepoint
+ testRestartBackoffTimeStrategy.setCanRestart(true);
+
+ final JobGraph jobGraph = createTwoVertexJobGraph();
+ // set checkpoint timeout to a low value to simulate checkpoint
expiration
+ enableCheckpointing(jobGraph, 10);
+
+ final SimpleAckingTaskManagerGateway taskManagerGateway =
+ new SimpleAckingTaskManagerGateway();
+ final CountDownLatch checkpointTriggeredLatch =
+ getCheckpointTriggeredLatch(taskManagerGateway);
+
+ // we have to set a listener that checks for the termination of the
checkpoint handling
+ OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch();
+ taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+ (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+ checkpointAbortionWasTriggered.trigger());
+
+ // the failure handling has to happen in the same thread as the
checkpoint coordination -
+ // that's why we have to instantiate a separate ThreadExecutorService
here
+ final ScheduledExecutorService singleThreadExecutorService =
+ Executors.newSingleThreadScheduledExecutor();
+ final ComponentMainThreadExecutor mainThreadExecutor =
+
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+ singleThreadExecutorService);
+
+ final DefaultScheduler scheduler =
+ CompletableFuture.supplyAsync(
+ () ->
+ createSchedulerAndStartScheduling(
+ jobGraph, mainThreadExecutor),
+ mainThreadExecutor)
+ .get();
+
+ final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+ final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+
+ final CompletableFuture<String> stopWithSavepointFuture =
+ CompletableFuture.supplyAsync(
+ () -> {
+ // we have to make sure that the tasks are
running before
+ // stop-with-savepoint is triggered
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ failingExecutionAttemptId,
+ ExecutionState.RUNNING));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+
succeedingExecutionAttemptId,
+ ExecutionState.RUNNING));
+
+ return
scheduler.stopWithSavepoint("savepoint-path", false);
+ },
+ mainThreadExecutor)
+ .get();
+
+ checkpointTriggeredLatch.await();
+
+ final CheckpointCoordinator checkpointCoordinator =
getCheckpointCoordinator(scheduler);
+
+ final AcknowledgeCheckpoint acknowledgeCheckpoint =
+ new AcknowledgeCheckpoint(jobGraph.getJobID(),
succeedingExecutionAttemptId, 1);
+
+ checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint,
"unknown location");
+
+ // we need to wait for the expired checkpoint to be handled
+ checkpointAbortionWasTriggered.await();
+
+ CompletableFuture.runAsync(
+ () -> {
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ succeedingExecutionAttemptId,
+ ExecutionState.FINISHED));
Review comment:
Why does a checkpoint abortion triggers the shut down of this task?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -817,6 +818,126 @@ public void
testStopWithSavepointFailingWithDeclinedCheckpoint() throws Exceptio
assertThat(scheduler.getExecutionGraph().getState(),
is(JobStatus.RUNNING));
}
+ @Test
+ public void testStopWithSavepointFailingWithExpiredCheckpoint() throws
Exception {
+ // we allow restarts right from the start since the failure is going
to happen in the first
+ // phase (savepoint creation) of stop-with-savepoint
+ testRestartBackoffTimeStrategy.setCanRestart(true);
+
+ final JobGraph jobGraph = createTwoVertexJobGraph();
+ // set checkpoint timeout to a low value to simulate checkpoint
expiration
+ enableCheckpointing(jobGraph, 10);
+
+ final SimpleAckingTaskManagerGateway taskManagerGateway =
+ new SimpleAckingTaskManagerGateway();
+ final CountDownLatch checkpointTriggeredLatch =
+ getCheckpointTriggeredLatch(taskManagerGateway);
+
+ // we have to set a listener that checks for the termination of the
checkpoint handling
+ OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch();
+ taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+ (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+ checkpointAbortionWasTriggered.trigger());
+
+ // the failure handling has to happen in the same thread as the
checkpoint coordination -
+ // that's why we have to instantiate a separate ThreadExecutorService
here
+ final ScheduledExecutorService singleThreadExecutorService =
+ Executors.newSingleThreadScheduledExecutor();
+ final ComponentMainThreadExecutor mainThreadExecutor =
+
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+ singleThreadExecutorService);
+
+ final DefaultScheduler scheduler =
+ CompletableFuture.supplyAsync(
+ () ->
+ createSchedulerAndStartScheduling(
+ jobGraph, mainThreadExecutor),
+ mainThreadExecutor)
+ .get();
+
+ final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+ final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+
+ final CompletableFuture<String> stopWithSavepointFuture =
+ CompletableFuture.supplyAsync(
+ () -> {
+ // we have to make sure that the tasks are
running before
+ // stop-with-savepoint is triggered
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ failingExecutionAttemptId,
+ ExecutionState.RUNNING));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+
succeedingExecutionAttemptId,
+ ExecutionState.RUNNING));
+
+ return
scheduler.stopWithSavepoint("savepoint-path", false);
+ },
+ mainThreadExecutor)
+ .get();
+
+ checkpointTriggeredLatch.await();
+
+ final CheckpointCoordinator checkpointCoordinator =
getCheckpointCoordinator(scheduler);
+
+ final AcknowledgeCheckpoint acknowledgeCheckpoint =
+ new AcknowledgeCheckpoint(jobGraph.getJobID(),
succeedingExecutionAttemptId, 1);
+
+ checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint,
"unknown location");
+
+ // we need to wait for the expired checkpoint to be handled
+ checkpointAbortionWasTriggered.await();
+
+ CompletableFuture.runAsync(
+ () -> {
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ succeedingExecutionAttemptId,
+ ExecutionState.FINISHED));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ failingExecutionAttemptId,
+ ExecutionState.FAILED));
+
+ // the restart due to failed checkpoint handling
triggering a global job
+ // fail-over
+ assertThat(
+
taskRestartExecutor.getNonPeriodicScheduledTask(), hasSize(1));
Review comment:
Why does a failed checkpoint triggers a global failover? Shouldn't the
aborted savepoint stop the stop-with-savepoint operation and let everything
else continue normally?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -605,6 +624,289 @@ public void abortPendingCheckpointsWhenRestartingTasks()
throws Exception {
assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(),
is(equalTo(0)));
}
+ @Test
+ public void testStopWithSavepointFailingAfterSavepointCreation() throws
Exception {
+ // initially, we don't allow any restarts since the first phase
(savepoint creation)
+ // succeeds without any failures
+ testRestartBackoffTimeStrategy.setCanRestart(false);
+
+ final JobGraph jobGraph =
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+ final SimpleAckingTaskManagerGateway taskManagerGateway =
+ new SimpleAckingTaskManagerGateway();
+ final CountDownLatch checkpointTriggeredLatch =
+ getCheckpointTriggeredLatch(taskManagerGateway);
+
+ // collect executions to which the checkpoint completion was confirmed
+ final List<ExecutionAttemptID>
executionAttemptIdsWithCompletedCheckpoint =
+ new ArrayList<>();
+ taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+ (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+ taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+ (ignored0, ignored1, ignored2, ignored3) -> {
+ throw new
UnsupportedOperationException("notifyCheckpointAborted was called");
+ });
+
+ final DefaultScheduler scheduler =
createSchedulerAndStartScheduling(jobGraph);
+
+ final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+ final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+
+ // we have to make sure that the tasks are running before
stop-with-savepoint is triggered
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.RUNNING));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), succeedingExecutionAttemptId,
ExecutionState.RUNNING));
+
+ final String savepointFolder =
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+ // trigger savepoint and wait for checkpoint to be retrieved by
TaskManagerGateway
+ final CompletableFuture<String> stopWithSavepointFuture =
+ scheduler.stopWithSavepoint(savepointFolder, false);
+ checkpointTriggeredLatch.await();
+
+ acknowledgePendingCheckpoint(scheduler, 1);
+
+ assertThat(
+ "Both the executions where notified about the completed
checkpoint.",
+ executionAttemptIdsWithCompletedCheckpoint,
+ containsInAnyOrder(failingExecutionAttemptId,
succeedingExecutionAttemptId));
+
+ // The savepoint creation succeeded a failure happens in the second
phase when finishing
+ // the tasks. That's why, the restarting policy is enabled.
+ testRestartBackoffTimeStrategy.setCanRestart(true);
+
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.FAILED));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ succeedingExecutionAttemptId,
+ ExecutionState.FINISHED));
+
+ // the restarts due to local failure handling and global job fail-over
are triggered
+ assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(),
hasSize(2));
+ taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+ try {
+ stopWithSavepointFuture.get();
+ fail("An exception is expected.");
+ } catch (ExecutionException e) {
+ Optional<FlinkException> flinkException =
+ ExceptionUtils.findThrowable(e, FlinkException.class);
+
+ assertTrue(flinkException.isPresent());
+ assertThat(
+ flinkException.get().getMessage(),
+ is(
+ String.format(
+ "Inconsistent execution state after
stopping with savepoint. A global fail-over was triggered to recover the job
%s.",
+ jobGraph.getJobID())));
+ }
+
+ assertThat(scheduler.getExecutionGraph().getState(),
is(JobStatus.RUNNING));
+ }
+
+ @Test
+ public void testStopWithSavepointFailingWithDeclinedCheckpoint() throws
Exception {
+ // we allow restarts right from the start since the failure is going
to happen in the first
+ // phase (savepoint creation) of stop-with-savepoint
+ testRestartBackoffTimeStrategy.setCanRestart(true);
+
+ final JobGraph jobGraph =
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+ final SimpleAckingTaskManagerGateway taskManagerGateway =
+ new SimpleAckingTaskManagerGateway();
+ final CountDownLatch checkpointTriggeredLatch =
+ getCheckpointTriggeredLatch(taskManagerGateway);
+
+ // collect executions to which the checkpoint completion was confirmed
+ CountDownLatch checkpointAbortionConfirmedLatch = new
CountDownLatch(2);
+ final List<ExecutionAttemptID>
executionAttemptIdsWithAbortedCheckpoint = new ArrayList<>();
+ taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+ (executionAttemptId, jobId, actualCheckpointId, timestamp) -> {
+
executionAttemptIdsWithAbortedCheckpoint.add(executionAttemptId);
+ checkpointAbortionConfirmedLatch.countDown();
+ });
+ taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+ (ignored0, ignored1, ignored2, ignored3) -> {
+ throw new
UnsupportedOperationException("notifyCheckpointCompleted was called");
+ });
+
+ final DefaultScheduler scheduler =
createSchedulerAndStartScheduling(jobGraph);
+
+ final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+ final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+
+ // we have to make sure that the tasks are running before
stop-with-savepoint is triggered
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.RUNNING));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), succeedingExecutionAttemptId,
ExecutionState.RUNNING));
+
+ final CompletableFuture<String> stopWithSavepointFuture =
+ scheduler.stopWithSavepoint("savepoint-path", false);
+ checkpointTriggeredLatch.await();
+
+ final CheckpointCoordinator checkpointCoordinator =
getCheckpointCoordinator(scheduler);
+
+ final AcknowledgeCheckpoint acknowledgeCheckpoint =
+ new AcknowledgeCheckpoint(jobGraph.getJobID(),
succeedingExecutionAttemptId, 1);
+ final DeclineCheckpoint declineCheckpoint =
+ new DeclineCheckpoint(
+ jobGraph.getJobID(),
+ failingExecutionAttemptId,
+ 1,
+ new
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED));
+
+ checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint,
"unknown location");
+ checkpointCoordinator.receiveDeclineMessage(declineCheckpoint,
"unknown location");
+
+ // we need to wait for the confirmations to be collected since this
running in a separate
+ // thread
+ checkpointAbortionConfirmedLatch.await();
+
+ assertThat(
+ "Both of the executions where notified about the aborted
checkpoint.",
+ executionAttemptIdsWithAbortedCheckpoint,
+ is(Arrays.asList(succeedingExecutionAttemptId,
failingExecutionAttemptId)));
+
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ succeedingExecutionAttemptId,
+ ExecutionState.FINISHED));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.FAILED));
+
+ // the restart due to failed checkpoint handling triggering a global
job fail-over
+ assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(),
hasSize(1));
+ taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+ try {
+ stopWithSavepointFuture.get();
+ fail("An exception is expected.");
+ } catch (ExecutionException e) {
+ Optional<CheckpointException> actualCheckpointException =
+ findThrowable(e, CheckpointException.class);
+ assertTrue(actualCheckpointException.isPresent());
+ assertThat(
+
actualCheckpointException.get().getCheckpointFailureReason(),
+ is(CheckpointFailureReason.CHECKPOINT_DECLINED));
+ }
+
+ assertThat(scheduler.getExecutionGraph().getState(),
is(JobStatus.RUNNING));
+ }
+
+ @Test
+ public void testStopWithSavepoint() throws Exception {
+ // we don't allow any restarts during the happy path
+ testRestartBackoffTimeStrategy.setCanRestart(false);
+
+ final JobGraph jobGraph =
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+ final SimpleAckingTaskManagerGateway taskManagerGateway =
+ new SimpleAckingTaskManagerGateway();
+ final CountDownLatch checkpointTriggeredLatch =
+ getCheckpointTriggeredLatch(taskManagerGateway);
+
+ // collect executions to which the checkpoint completion was confirmed
+ final List<ExecutionAttemptID>
executionAttemptIdsWithCompletedCheckpoint =
+ new ArrayList<>();
+ taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+ (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+ taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+ (ignored0, ignored1, ignored2, ignored3) -> {
+ throw new
UnsupportedOperationException("notifyCheckpointAborted was called");
+ });
+
+ final DefaultScheduler scheduler =
createSchedulerAndStartScheduling(jobGraph);
+
+ final Set<ExecutionAttemptID> executionAttemptIds =
+ StreamSupport.stream(
+ scheduler
+ .getExecutionGraph()
+ .getAllExecutionVertices()
+ .spliterator(),
+ false)
+ .map(ExecutionVertex::getCurrentExecutionAttempt)
+ .map(Execution::getAttemptId)
+ .collect(Collectors.toSet());
+
+ // we have to make sure that the tasks are running before
stop-with-savepoint is triggered
+ executionAttemptIds.forEach(
+ id ->
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), id,
ExecutionState.RUNNING)));
+
+ final String savepointFolder =
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+ // trigger savepoint and wait for checkpoint to be retrieved by
TaskManagerGateway
+ final CompletableFuture<String> stopWithSavepointFuture =
+ scheduler.stopWithSavepoint(savepointFolder, false);
+ checkpointTriggeredLatch.await();
+
+ acknowledgePendingCheckpoint(scheduler, 1);
+
+ assertThat(
+ "Both the executions where notified about the completed
checkpoint.",
+ executionAttemptIdsWithCompletedCheckpoint,
+ containsInAnyOrder(executionAttemptIds.toArray()));
+
+ executionAttemptIds.forEach(
+ id ->
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), id,
ExecutionState.FINISHED)));
+
+ try {
+ assertThat(
+ stopWithSavepointFuture.get(),
+ startsWith(String.format("file:%s", savepointFolder)));
+ } catch (ExecutionException e) {
+ fail("No exception is expected.");
+ }
Review comment:
This is an anti-pattern because it hides `e`. I would recommend to let
the exception simply bubble up.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -605,6 +624,289 @@ public void abortPendingCheckpointsWhenRestartingTasks()
throws Exception {
assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(),
is(equalTo(0)));
}
+ @Test
+ public void testStopWithSavepointFailingAfterSavepointCreation() throws
Exception {
+ // initially, we don't allow any restarts since the first phase
(savepoint creation)
+ // succeeds without any failures
+ testRestartBackoffTimeStrategy.setCanRestart(false);
+
+ final JobGraph jobGraph =
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+ final SimpleAckingTaskManagerGateway taskManagerGateway =
+ new SimpleAckingTaskManagerGateway();
+ final CountDownLatch checkpointTriggeredLatch =
+ getCheckpointTriggeredLatch(taskManagerGateway);
+
+ // collect executions to which the checkpoint completion was confirmed
+ final List<ExecutionAttemptID>
executionAttemptIdsWithCompletedCheckpoint =
+ new ArrayList<>();
+ taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+ (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+ taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+ (ignored0, ignored1, ignored2, ignored3) -> {
+ throw new
UnsupportedOperationException("notifyCheckpointAborted was called");
+ });
+
+ final DefaultScheduler scheduler =
createSchedulerAndStartScheduling(jobGraph);
+
+ final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+ final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+
+ // we have to make sure that the tasks are running before
stop-with-savepoint is triggered
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.RUNNING));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), succeedingExecutionAttemptId,
ExecutionState.RUNNING));
+
+ final String savepointFolder =
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+ // trigger savepoint and wait for checkpoint to be retrieved by
TaskManagerGateway
+ final CompletableFuture<String> stopWithSavepointFuture =
+ scheduler.stopWithSavepoint(savepointFolder, false);
+ checkpointTriggeredLatch.await();
+
+ acknowledgePendingCheckpoint(scheduler, 1);
+
+ assertThat(
+ "Both the executions where notified about the completed
checkpoint.",
+ executionAttemptIdsWithCompletedCheckpoint,
+ containsInAnyOrder(failingExecutionAttemptId,
succeedingExecutionAttemptId));
+
+ // The savepoint creation succeeded a failure happens in the second
phase when finishing
+ // the tasks. That's why, the restarting policy is enabled.
+ testRestartBackoffTimeStrategy.setCanRestart(true);
+
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.FAILED));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ succeedingExecutionAttemptId,
+ ExecutionState.FINISHED));
+
+ // the restarts due to local failure handling and global job fail-over
are triggered
+ assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(),
hasSize(2));
+ taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+ try {
+ stopWithSavepointFuture.get();
+ fail("An exception is expected.");
+ } catch (ExecutionException e) {
+ Optional<FlinkException> flinkException =
+ ExceptionUtils.findThrowable(e, FlinkException.class);
+
+ assertTrue(flinkException.isPresent());
+ assertThat(
+ flinkException.get().getMessage(),
+ is(
+ String.format(
+ "Inconsistent execution state after
stopping with savepoint. A global fail-over was triggered to recover the job
%s.",
+ jobGraph.getJobID())));
Review comment:
You could use `FlinkMatchers.containsMessage` here.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -605,6 +624,289 @@ public void abortPendingCheckpointsWhenRestartingTasks()
throws Exception {
assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(),
is(equalTo(0)));
}
+ @Test
+ public void testStopWithSavepointFailingAfterSavepointCreation() throws
Exception {
+ // initially, we don't allow any restarts since the first phase
(savepoint creation)
+ // succeeds without any failures
+ testRestartBackoffTimeStrategy.setCanRestart(false);
+
+ final JobGraph jobGraph =
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+ final SimpleAckingTaskManagerGateway taskManagerGateway =
+ new SimpleAckingTaskManagerGateway();
+ final CountDownLatch checkpointTriggeredLatch =
+ getCheckpointTriggeredLatch(taskManagerGateway);
+
+ // collect executions to which the checkpoint completion was confirmed
+ final List<ExecutionAttemptID>
executionAttemptIdsWithCompletedCheckpoint =
+ new ArrayList<>();
+ taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+ (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+ taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+ (ignored0, ignored1, ignored2, ignored3) -> {
+ throw new
UnsupportedOperationException("notifyCheckpointAborted was called");
+ });
+
+ final DefaultScheduler scheduler =
createSchedulerAndStartScheduling(jobGraph);
+
+ final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+ final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+
+ // we have to make sure that the tasks are running before
stop-with-savepoint is triggered
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.RUNNING));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), succeedingExecutionAttemptId,
ExecutionState.RUNNING));
+
+ final String savepointFolder =
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+ // trigger savepoint and wait for checkpoint to be retrieved by
TaskManagerGateway
+ final CompletableFuture<String> stopWithSavepointFuture =
+ scheduler.stopWithSavepoint(savepointFolder, false);
+ checkpointTriggeredLatch.await();
+
+ acknowledgePendingCheckpoint(scheduler, 1);
+
+ assertThat(
+ "Both the executions where notified about the completed
checkpoint.",
+ executionAttemptIdsWithCompletedCheckpoint,
+ containsInAnyOrder(failingExecutionAttemptId,
succeedingExecutionAttemptId));
+
+ // The savepoint creation succeeded a failure happens in the second
phase when finishing
+ // the tasks. That's why, the restarting policy is enabled.
+ testRestartBackoffTimeStrategy.setCanRestart(true);
+
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.FAILED));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ succeedingExecutionAttemptId,
+ ExecutionState.FINISHED));
+
+ // the restarts due to local failure handling and global job fail-over
are triggered
+ assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(),
hasSize(2));
+ taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+ try {
+ stopWithSavepointFuture.get();
+ fail("An exception is expected.");
+ } catch (ExecutionException e) {
+ Optional<FlinkException> flinkException =
+ ExceptionUtils.findThrowable(e, FlinkException.class);
+
+ assertTrue(flinkException.isPresent());
+ assertThat(
+ flinkException.get().getMessage(),
+ is(
+ String.format(
+ "Inconsistent execution state after
stopping with savepoint. A global fail-over was triggered to recover the job
%s.",
+ jobGraph.getJobID())));
+ }
+
+ assertThat(scheduler.getExecutionGraph().getState(),
is(JobStatus.RUNNING));
+ }
+
+ @Test
+ public void testStopWithSavepointFailingWithDeclinedCheckpoint() throws
Exception {
+ // we allow restarts right from the start since the failure is going
to happen in the first
+ // phase (savepoint creation) of stop-with-savepoint
+ testRestartBackoffTimeStrategy.setCanRestart(true);
+
+ final JobGraph jobGraph =
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+ final SimpleAckingTaskManagerGateway taskManagerGateway =
+ new SimpleAckingTaskManagerGateway();
+ final CountDownLatch checkpointTriggeredLatch =
+ getCheckpointTriggeredLatch(taskManagerGateway);
+
+ // collect executions to which the checkpoint completion was confirmed
+ CountDownLatch checkpointAbortionConfirmedLatch = new
CountDownLatch(2);
+ final List<ExecutionAttemptID>
executionAttemptIdsWithAbortedCheckpoint = new ArrayList<>();
+ taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+ (executionAttemptId, jobId, actualCheckpointId, timestamp) -> {
+
executionAttemptIdsWithAbortedCheckpoint.add(executionAttemptId);
+ checkpointAbortionConfirmedLatch.countDown();
+ });
+ taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+ (ignored0, ignored1, ignored2, ignored3) -> {
+ throw new
UnsupportedOperationException("notifyCheckpointCompleted was called");
+ });
+
+ final DefaultScheduler scheduler =
createSchedulerAndStartScheduling(jobGraph);
+
+ final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+ final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+
+ // we have to make sure that the tasks are running before
stop-with-savepoint is triggered
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.RUNNING));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), succeedingExecutionAttemptId,
ExecutionState.RUNNING));
+
+ final CompletableFuture<String> stopWithSavepointFuture =
+ scheduler.stopWithSavepoint("savepoint-path", false);
+ checkpointTriggeredLatch.await();
+
+ final CheckpointCoordinator checkpointCoordinator =
getCheckpointCoordinator(scheduler);
+
+ final AcknowledgeCheckpoint acknowledgeCheckpoint =
+ new AcknowledgeCheckpoint(jobGraph.getJobID(),
succeedingExecutionAttemptId, 1);
+ final DeclineCheckpoint declineCheckpoint =
+ new DeclineCheckpoint(
+ jobGraph.getJobID(),
+ failingExecutionAttemptId,
+ 1,
+ new
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED));
+
+ checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint,
"unknown location");
+ checkpointCoordinator.receiveDeclineMessage(declineCheckpoint,
"unknown location");
+
+ // we need to wait for the confirmations to be collected since this
running in a separate
+ // thread
+ checkpointAbortionConfirmedLatch.await();
+
+ assertThat(
+ "Both of the executions where notified about the aborted
checkpoint.",
+ executionAttemptIdsWithAbortedCheckpoint,
+ is(Arrays.asList(succeedingExecutionAttemptId,
failingExecutionAttemptId)));
+
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ succeedingExecutionAttemptId,
+ ExecutionState.FINISHED));
Review comment:
Why does this task terminate?
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
##########
@@ -539,6 +555,171 @@ public void testSubmitWithUnknownSavepointPath() throws
Exception {
}
}
+ @Test
+ public void testStopWithSavepointFailingInSnapshotCreation() throws
Exception {
+ testStopWithFailingSourceInOnePipeline(
+ new SnapshotFailingInfiniteTestSource(),
+ folder.newFolder(),
+ // two restarts expected:
+ // 1. task failure restart
+ // 2. job failover triggered by the CheckpointFailureManager
+ 2,
+ assertInSnapshotCreationFailure());
+ }
+
+ @Test
+ public void testStopWithSavepointFailingAfterSnapshotCreation() throws
Exception {
+ testStopWithFailingSourceInOnePipeline(
+ new InfiniteTestSource() {
+ @Override
+ public void cancel() {
+ throw new RuntimeException(
+ "Expected RuntimeException after snapshot
creation.");
+ }
+ },
+ folder.newFolder(),
+ // two restarts expected:
+ // 1. task failure restart
+ // 2. job failover triggered by SchedulerBase.stopWithSavepoint
+ 2,
+ assertAfterSnapshotCreationFailure());
+ }
+
+ private static BiConsumer<JobID, ExecutionException>
assertAfterSnapshotCreationFailure() {
+ return (jobId, actualException) -> {
+ Optional<FlinkException> actualFlinkException =
+ ExceptionUtils.findThrowable(actualException,
FlinkException.class);
+ assertTrue(actualFlinkException.isPresent());
+ assertThat(
+ actualFlinkException.get().getMessage(),
+ is(
+ String.format(
+ "Inconsistent execution state after
stopping with savepoint. A global fail-over was triggered to recover the job
%s.",
+ jobId)));
Review comment:
`FlinkMatchers.containsMessage` maybe
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -605,6 +625,414 @@ public void abortPendingCheckpointsWhenRestartingTasks()
throws Exception {
assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(),
is(equalTo(0)));
}
+ @Test
+ public void testStopWithSavepointFailingAfterSavepointCreation() throws
Exception {
+ // initially, we don't allow any restarts since the first phase
(savepoint creation)
+ // succeeds without any failures
+ testRestartBackoffTimeStrategy.setCanRestart(false);
+
+ final JobGraph jobGraph =
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+ final SimpleAckingTaskManagerGateway taskManagerGateway =
+ new SimpleAckingTaskManagerGateway();
+ final CountDownLatch checkpointTriggeredLatch =
+ getCheckpointTriggeredLatch(taskManagerGateway);
+
+ // collect executions to which the checkpoint completion was confirmed
+ final List<ExecutionAttemptID>
executionAttemptIdsWithCompletedCheckpoint =
+ new ArrayList<>();
+ taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+ (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+ taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+ (ignored0, ignored1, ignored2, ignored3) -> {
+ throw new
UnsupportedOperationException("notifyCheckpointAborted was called");
+ });
+
+ final DefaultScheduler scheduler =
createSchedulerAndStartScheduling(jobGraph);
+
+ final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+ final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+
+ // we have to make sure that the tasks are running before
stop-with-savepoint is triggered
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.RUNNING));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), succeedingExecutionAttemptId,
ExecutionState.RUNNING));
+
+ final String savepointFolder =
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+ // trigger savepoint and wait for checkpoint to be retrieved by
TaskManagerGateway
+ final CompletableFuture<String> stopWithSavepointFuture =
+ scheduler.stopWithSavepoint(savepointFolder, false);
+ checkpointTriggeredLatch.await();
+
+ acknowledgePendingCheckpoint(scheduler, 1);
+
+ assertThat(
+ "Both the executions where notified about the completed
checkpoint.",
+ executionAttemptIdsWithCompletedCheckpoint,
+ containsInAnyOrder(failingExecutionAttemptId,
succeedingExecutionAttemptId));
+
+ // The savepoint creation succeeded a failure happens in the second
phase when finishing
+ // the tasks. That's why, the restarting policy is enabled.
+ testRestartBackoffTimeStrategy.setCanRestart(true);
+
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.FAILED));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ succeedingExecutionAttemptId,
+ ExecutionState.FINISHED));
+
+ // the restarts due to local failure handling and global job fail-over
are triggered
+ assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(),
hasSize(2));
+ taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+ try {
+ stopWithSavepointFuture.get();
+ fail("An exception is expected.");
+ } catch (ExecutionException e) {
+ Optional<FlinkException> flinkException =
+ ExceptionUtils.findThrowable(e, FlinkException.class);
+
+ assertTrue(flinkException.isPresent());
+ assertThat(
+ flinkException.get().getMessage(),
+ is(
+ String.format(
+ "Inconsistent execution state after
stopping with savepoint. A global fail-over was triggered to recover the job
%s.",
+ jobGraph.getJobID())));
+ }
+
+ assertThat(scheduler.getExecutionGraph().getState(),
is(JobStatus.RUNNING));
+ }
+
+ @Test
+ public void testStopWithSavepointFailingWithDeclinedCheckpoint() throws
Exception {
+ // we allow restarts right from the start since the failure is going
to happen in the first
+ // phase (savepoint creation) of stop-with-savepoint
+ testRestartBackoffTimeStrategy.setCanRestart(true);
+
+ final JobGraph jobGraph =
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+ final SimpleAckingTaskManagerGateway taskManagerGateway =
+ new SimpleAckingTaskManagerGateway();
+ final CountDownLatch checkpointTriggeredLatch =
+ getCheckpointTriggeredLatch(taskManagerGateway);
+
+ // collect executions to which the checkpoint completion was confirmed
+ CountDownLatch checkpointAbortionConfirmedLatch = new
CountDownLatch(2);
+ final List<ExecutionAttemptID>
executionAttemptIdsWithAbortedCheckpoint = new ArrayList<>();
+ taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+ (executionAttemptId, jobId, actualCheckpointId, timestamp) -> {
+
executionAttemptIdsWithAbortedCheckpoint.add(executionAttemptId);
+ checkpointAbortionConfirmedLatch.countDown();
+ });
+ taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+ (ignored0, ignored1, ignored2, ignored3) -> {
+ throw new
UnsupportedOperationException("notifyCheckpointCompleted was called");
+ });
+
+ final DefaultScheduler scheduler =
createSchedulerAndStartScheduling(jobGraph);
+
+ final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+ final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+
+ // we have to make sure that the tasks are running before
stop-with-savepoint is triggered
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.RUNNING));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), succeedingExecutionAttemptId,
ExecutionState.RUNNING));
+
+ final CompletableFuture<String> stopWithSavepointFuture =
+ scheduler.stopWithSavepoint("savepoint-path", false);
+ checkpointTriggeredLatch.await();
+
+ final CheckpointCoordinator checkpointCoordinator =
getCheckpointCoordinator(scheduler);
+
+ final AcknowledgeCheckpoint acknowledgeCheckpoint =
+ new AcknowledgeCheckpoint(jobGraph.getJobID(),
succeedingExecutionAttemptId, 1);
+ final DeclineCheckpoint declineCheckpoint =
+ new DeclineCheckpoint(
+ jobGraph.getJobID(),
+ failingExecutionAttemptId,
+ 1,
+ new
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED));
+
+ checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint,
"unknown location");
+ checkpointCoordinator.receiveDeclineMessage(declineCheckpoint,
"unknown location");
+
+ // we need to wait for the confirmations to be collected since this is
running in a separate
+ // thread
+ checkpointAbortionConfirmedLatch.await();
+
+ assertThat(
+ "Both of the executions where notified about the aborted
checkpoint.",
+ executionAttemptIdsWithAbortedCheckpoint,
+ is(Arrays.asList(succeedingExecutionAttemptId,
failingExecutionAttemptId)));
+
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ succeedingExecutionAttemptId,
+ ExecutionState.FINISHED));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.FAILED));
+
+ // the restart due to failed checkpoint handling triggering a global
job fail-over
+ assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(),
hasSize(1));
+ taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+ try {
+ stopWithSavepointFuture.get();
+ fail("An exception is expected.");
+ } catch (ExecutionException e) {
+ Optional<CheckpointException> actualCheckpointException =
+ findThrowable(e, CheckpointException.class);
+ assertTrue(actualCheckpointException.isPresent());
+ assertThat(
+
actualCheckpointException.get().getCheckpointFailureReason(),
+ is(CheckpointFailureReason.CHECKPOINT_DECLINED));
+ }
+
+ assertThat(scheduler.getExecutionGraph().getState(),
is(JobStatus.RUNNING));
+ }
+
+ @Test
+ public void testStopWithSavepointFailingWithExpiredCheckpoint() throws
Exception {
+ // we allow restarts right from the start since the failure is going
to happen in the first
+ // phase (savepoint creation) of stop-with-savepoint
+ testRestartBackoffTimeStrategy.setCanRestart(true);
+
+ final JobGraph jobGraph = createTwoVertexJobGraph();
+ // set checkpoint timeout to a low value to simulate checkpoint
expiration
+ enableCheckpointing(jobGraph, 10);
+
+ final SimpleAckingTaskManagerGateway taskManagerGateway =
+ new SimpleAckingTaskManagerGateway();
+ final CountDownLatch checkpointTriggeredLatch =
+ getCheckpointTriggeredLatch(taskManagerGateway);
+
+ // we have to set a listener that checks for the termination of the
checkpoint handling
+ OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch();
+ taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+ (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+ checkpointAbortionWasTriggered.trigger());
+
+ // the failure handling has to happen in the same thread as the
checkpoint coordination -
+ // that's why we have to instantiate a separate ThreadExecutorService
here
+ final ScheduledExecutorService singleThreadExecutorService =
Review comment:
`CheckpointCoordinator` is a mess wrt its internal threading model. This
is definitely out of the scope for this PR. I think we want to refactor the
`CC` for more than 3 release now.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -817,6 +818,126 @@ public void
testStopWithSavepointFailingWithDeclinedCheckpoint() throws Exceptio
assertThat(scheduler.getExecutionGraph().getState(),
is(JobStatus.RUNNING));
}
+ @Test
+ public void testStopWithSavepointFailingWithExpiredCheckpoint() throws
Exception {
+ // we allow restarts right from the start since the failure is going
to happen in the first
+ // phase (savepoint creation) of stop-with-savepoint
+ testRestartBackoffTimeStrategy.setCanRestart(true);
+
+ final JobGraph jobGraph = createTwoVertexJobGraph();
+ // set checkpoint timeout to a low value to simulate checkpoint
expiration
+ enableCheckpointing(jobGraph, 10);
+
+ final SimpleAckingTaskManagerGateway taskManagerGateway =
+ new SimpleAckingTaskManagerGateway();
+ final CountDownLatch checkpointTriggeredLatch =
+ getCheckpointTriggeredLatch(taskManagerGateway);
+
+ // we have to set a listener that checks for the termination of the
checkpoint handling
+ OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch();
+ taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+ (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+ checkpointAbortionWasTriggered.trigger());
+
+ // the failure handling has to happen in the same thread as the
checkpoint coordination -
+ // that's why we have to instantiate a separate ThreadExecutorService
here
+ final ScheduledExecutorService singleThreadExecutorService =
+ Executors.newSingleThreadScheduledExecutor();
+ final ComponentMainThreadExecutor mainThreadExecutor =
+
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+ singleThreadExecutorService);
+
+ final DefaultScheduler scheduler =
+ CompletableFuture.supplyAsync(
+ () ->
+ createSchedulerAndStartScheduling(
+ jobGraph, mainThreadExecutor),
+ mainThreadExecutor)
+ .get();
+
+ final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+ final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+
+ final CompletableFuture<String> stopWithSavepointFuture =
+ CompletableFuture.supplyAsync(
+ () -> {
+ // we have to make sure that the tasks are
running before
+ // stop-with-savepoint is triggered
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ failingExecutionAttemptId,
+ ExecutionState.RUNNING));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+
succeedingExecutionAttemptId,
+ ExecutionState.RUNNING));
+
+ return
scheduler.stopWithSavepoint("savepoint-path", false);
+ },
+ mainThreadExecutor)
+ .get();
+
+ checkpointTriggeredLatch.await();
+
+ final CheckpointCoordinator checkpointCoordinator =
getCheckpointCoordinator(scheduler);
+
+ final AcknowledgeCheckpoint acknowledgeCheckpoint =
+ new AcknowledgeCheckpoint(jobGraph.getJobID(),
succeedingExecutionAttemptId, 1);
+
+ checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint,
"unknown location");
+
+ // we need to wait for the expired checkpoint to be handled
+ checkpointAbortionWasTriggered.await();
+
+ CompletableFuture.runAsync(
+ () -> {
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ succeedingExecutionAttemptId,
+ ExecutionState.FINISHED));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ failingExecutionAttemptId,
+ ExecutionState.FAILED));
+
+ // the restart due to failed checkpoint handling
triggering a global job
+ // fail-over
+ assertThat(
+
taskRestartExecutor.getNonPeriodicScheduledTask(), hasSize(1));
+
taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+ },
+ mainThreadExecutor)
+ .get();
+
+ try {
+ stopWithSavepointFuture.get();
+ fail("An exception is expected.");
+ } catch (ExecutionException e) {
+ Optional<CheckpointException> actualCheckpointException =
+ findThrowable(e, CheckpointException.class);
+ assertTrue(actualCheckpointException.isPresent());
+ assertThat(
+
actualCheckpointException.get().getCheckpointFailureReason(),
+ is(CheckpointFailureReason.CHECKPOINT_EXPIRED));
+ }
+
+ // we have to wait for the main executor to be finished with
restarting tasks
+ singleThreadExecutorService.shutdown();
+ singleThreadExecutorService.awaitTermination(1, TimeUnit.SECONDS);
+
+ assertThat(scheduler.getExecutionGraph().getState(),
is(JobStatus.RUNNING));
Review comment:
This access shouldn't be thread-safe. Hence it is bound to fail
eventually.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -817,6 +818,126 @@ public void
testStopWithSavepointFailingWithDeclinedCheckpoint() throws Exceptio
assertThat(scheduler.getExecutionGraph().getState(),
is(JobStatus.RUNNING));
}
+ @Test
+ public void testStopWithSavepointFailingWithExpiredCheckpoint() throws
Exception {
+ // we allow restarts right from the start since the failure is going
to happen in the first
+ // phase (savepoint creation) of stop-with-savepoint
+ testRestartBackoffTimeStrategy.setCanRestart(true);
+
+ final JobGraph jobGraph = createTwoVertexJobGraph();
+ // set checkpoint timeout to a low value to simulate checkpoint
expiration
+ enableCheckpointing(jobGraph, 10);
+
+ final SimpleAckingTaskManagerGateway taskManagerGateway =
+ new SimpleAckingTaskManagerGateway();
+ final CountDownLatch checkpointTriggeredLatch =
+ getCheckpointTriggeredLatch(taskManagerGateway);
+
+ // we have to set a listener that checks for the termination of the
checkpoint handling
+ OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch();
+ taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+ (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+ checkpointAbortionWasTriggered.trigger());
+
+ // the failure handling has to happen in the same thread as the
checkpoint coordination -
+ // that's why we have to instantiate a separate ThreadExecutorService
here
+ final ScheduledExecutorService singleThreadExecutorService =
+ Executors.newSingleThreadScheduledExecutor();
+ final ComponentMainThreadExecutor mainThreadExecutor =
+
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+ singleThreadExecutorService);
+
+ final DefaultScheduler scheduler =
+ CompletableFuture.supplyAsync(
+ () ->
+ createSchedulerAndStartScheduling(
+ jobGraph, mainThreadExecutor),
+ mainThreadExecutor)
+ .get();
+
+ final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+ final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+
+ final CompletableFuture<String> stopWithSavepointFuture =
+ CompletableFuture.supplyAsync(
+ () -> {
+ // we have to make sure that the tasks are
running before
+ // stop-with-savepoint is triggered
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ failingExecutionAttemptId,
+ ExecutionState.RUNNING));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+
succeedingExecutionAttemptId,
+ ExecutionState.RUNNING));
+
+ return
scheduler.stopWithSavepoint("savepoint-path", false);
+ },
+ mainThreadExecutor)
+ .get();
+
+ checkpointTriggeredLatch.await();
+
+ final CheckpointCoordinator checkpointCoordinator =
getCheckpointCoordinator(scheduler);
+
+ final AcknowledgeCheckpoint acknowledgeCheckpoint =
+ new AcknowledgeCheckpoint(jobGraph.getJobID(),
succeedingExecutionAttemptId, 1);
+
+ checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint,
"unknown location");
+
+ // we need to wait for the expired checkpoint to be handled
+ checkpointAbortionWasTriggered.await();
+
+ CompletableFuture.runAsync(
+ () -> {
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ succeedingExecutionAttemptId,
+ ExecutionState.FINISHED));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ failingExecutionAttemptId,
+ ExecutionState.FAILED));
+
+ // the restart due to failed checkpoint handling
triggering a global job
+ // fail-over
+ assertThat(
+
taskRestartExecutor.getNonPeriodicScheduledTask(), hasSize(1));
+
taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+ },
+ mainThreadExecutor)
+ .get();
+
+ try {
+ stopWithSavepointFuture.get();
+ fail("An exception is expected.");
+ } catch (ExecutionException e) {
+ Optional<CheckpointException> actualCheckpointException =
+ findThrowable(e, CheckpointException.class);
+ assertTrue(actualCheckpointException.isPresent());
+ assertThat(
+
actualCheckpointException.get().getCheckpointFailureReason(),
+ is(CheckpointFailureReason.CHECKPOINT_EXPIRED));
+ }
+
+ // we have to wait for the main executor to be finished with
restarting tasks
+ singleThreadExecutorService.shutdown();
+ singleThreadExecutorService.awaitTermination(1, TimeUnit.SECONDS);
+
+ assertThat(scheduler.getExecutionGraph().getState(),
is(JobStatus.RUNNING));
Review comment:
I think it is dangerous to first shut down things and then assume that
things have been done. This couples a lot of internal details because this only
works if at the time of shutting down the executor all runnables to run need to
be enqueued. If a runnable enqueues something else, then this won't work
anymore.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]