rmetzger commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r575036273
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -908,38 +909,57 @@ public void reportCheckpointMetrics(
// will be restarted by the CheckpointCoordinatorDeActivator.
checkpointCoordinator.stopCheckpointScheduler();
+ final CompletableFuture<Collection<ExecutionState>>
executionGraphTerminationFuture =
+ FutureUtils.combineAll(
+ StreamSupport.stream(
+
executionGraph.getAllExecutionVertices().spliterator(),
+ false)
+
.map(ExecutionVertex::getCurrentExecutionAttempt)
+ .map(Execution::getTerminalStateFuture)
+ .collect(Collectors.toList()));
+
final CompletableFuture<String> savepointFuture =
checkpointCoordinator
.triggerSynchronousSavepoint(advanceToEndOfEventTime,
targetDirectory)
.thenApply(CompletedCheckpoint::getExternalPointer);
- final CompletableFuture<JobStatus> terminationFuture =
- executionGraph
- .getTerminationFuture()
- .handle(
- (jobstatus, throwable) -> {
- if (throwable != null) {
- log.info(
- "Failed during stopping job {}
with a savepoint. Reason: {}",
- jobGraph.getJobID(),
- throwable.getMessage());
- throw new
CompletionException(throwable);
- } else if (jobstatus !=
JobStatus.FINISHED) {
- log.info(
- "Failed during stopping job {}
with a savepoint. Reason: Reached state {} instead of FINISHED.",
- jobGraph.getJobID(),
- jobstatus);
- throw new CompletionException(
- new FlinkException(
- "Reached state "
- + jobstatus
- + " instead of
FINISHED."));
- }
- return jobstatus;
- });
-
return savepointFuture
- .thenCompose((path) -> terminationFuture.thenApply((jobStatus
-> path)))
+ .thenCompose(
+ path ->
+ executionGraphTerminationFuture
+ .handleAsync(
+ (executionStates, throwable)
-> {
+ Set<ExecutionState>
nonFinishedStates =
+
extractNonFinishedStates(
+
executionStates);
+ if (throwable != null) {
+ log.info(
+ "Failed during
stopping job {} with a savepoint. Reason: {}",
+
jobGraph.getJobID(),
+
throwable.getMessage());
+ throw new
CompletionException(throwable);
+ } else if
(!nonFinishedStates.isEmpty()) {
+ log.info(
+ "Failed while
stopping job {} after successfully creating a savepoint. A global failover is
going to be triggered. Reason: One or more states ended up in the following
termination states instead of FINISHED: {}",
+
jobGraph.getJobID(),
+
nonFinishedStates);
+ FlinkException
+
inconsistentFinalStateException =
+ new
FlinkException(
+
String.format(
+
"Inconsistent execution state after stopping with savepoint. A global
fail-over was triggered to recover the job %s.",
+
jobGraph
+
.getJobID()));
+
executionGraph.failGlobal(
+
inconsistentFinalStateException);
Review comment:
Have you considered other means of triggering a global failover? This
approach seems to work, but I'm not sure if it is a good idea make a detour via
the execution graph to notify the scheduler, from within the scheduler that we
want a global failover.
We could also introduce an abstract "triggerGlobalFailover()" method into
the SchedulerBase for this purpose:
```diff
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index edd1da436c3..aa5cb248d48 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -333,6 +333,11 @@ public class DefaultScheduler extends SchedulerBase
implements SchedulerOperatio
schedulingStrategy.onPartitionConsumable(partitionId);
}
+ @Override
+ protected void triggerGlobalFailover(Throwable cause) {
+ handleGlobalFailure(cause);
+ }
+
//
------------------------------------------------------------------------
// SchedulerOperations
//
------------------------------------------------------------------------
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 4144b0dc5b2..ef6147eb187 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -950,7 +950,7 @@ public abstract class SchedulerBase implements
SchedulerNG {
"Inconsistent execution state after stopping with savepoint. A
global fail-over was triggered to recover the job %s.",
jobGraph
.getJobID()));
-
executionGraph.failGlobal(
+
triggerGlobalFailover(
inconsistentFinalStateException);
throw new
CompletionException(
@@ -973,6 +973,8 @@ public abstract class SchedulerBase implements
SchedulerNG {
mainThreadExecutor);
}
+ protected abstract void triggerGlobalFailover(Throwable cause);
+
private static Set<ExecutionState> extractNonFinishedStates(
```
I'm not saying you should change the code, I would rather like to hear your
opinion and discuss this first.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -908,38 +909,57 @@ public void reportCheckpointMetrics(
// will be restarted by the CheckpointCoordinatorDeActivator.
checkpointCoordinator.stopCheckpointScheduler();
+ final CompletableFuture<Collection<ExecutionState>>
executionGraphTerminationFuture =
+ FutureUtils.combineAll(
+ StreamSupport.stream(
+
executionGraph.getAllExecutionVertices().spliterator(),
+ false)
+
.map(ExecutionVertex::getCurrentExecutionAttempt)
+ .map(Execution::getTerminalStateFuture)
+ .collect(Collectors.toList()));
+
final CompletableFuture<String> savepointFuture =
checkpointCoordinator
.triggerSynchronousSavepoint(advanceToEndOfEventTime,
targetDirectory)
.thenApply(CompletedCheckpoint::getExternalPointer);
- final CompletableFuture<JobStatus> terminationFuture =
- executionGraph
- .getTerminationFuture()
- .handle(
- (jobstatus, throwable) -> {
- if (throwable != null) {
- log.info(
- "Failed during stopping job {}
with a savepoint. Reason: {}",
- jobGraph.getJobID(),
- throwable.getMessage());
- throw new
CompletionException(throwable);
- } else if (jobstatus !=
JobStatus.FINISHED) {
- log.info(
- "Failed during stopping job {}
with a savepoint. Reason: Reached state {} instead of FINISHED.",
- jobGraph.getJobID(),
- jobstatus);
- throw new CompletionException(
- new FlinkException(
- "Reached state "
- + jobstatus
- + " instead of
FINISHED."));
- }
- return jobstatus;
- });
-
return savepointFuture
- .thenCompose((path) -> terminationFuture.thenApply((jobStatus
-> path)))
+ .thenCompose(
+ path ->
+ executionGraphTerminationFuture
+ .handleAsync(
Review comment:
I wanted to suggest moving the .handleAsync() up to the definition of
the `executionGraphTerminationFuture`, as it would reduce the indentation by
one level, and move things conceptually related closer to each other.
However, the `testStopWithSavepointFailingWithDeclinedCheckpoint` test
started failing (it isn't clear to my why that's happening).
Do you happen to know why this happens?
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
##########
@@ -539,6 +555,171 @@ public void testSubmitWithUnknownSavepointPath() throws
Exception {
}
}
+ @Test
+ public void testStopWithSavepointFailingInSnapshotCreation() throws
Exception {
+ testStopWithFailingSourceInOnePipeline(
+ new SnapshotFailingInfiniteTestSource(),
+ folder.newFolder(),
+ // two restarts expected:
+ // 1. task failure restart
+ // 2. job failover triggered by the CheckpointFailureManager
+ 2,
+ assertInSnapshotCreationFailure());
+ }
+
+ @Test
+ public void testStopWithSavepointFailingAfterSnapshotCreation() throws
Exception {
+ testStopWithFailingSourceInOnePipeline(
+ new InfiniteTestSource() {
+ @Override
+ public void cancel() {
+ throw new RuntimeException(
+ "Expected RuntimeException after snapshot
creation.");
+ }
+ },
+ folder.newFolder(),
+ // two restarts expected:
+ // 1. task failure restart
+ // 2. job failover triggered by SchedulerBase.stopWithSavepoint
+ 2,
+ assertAfterSnapshotCreationFailure());
+ }
+
+ private static BiConsumer<JobID, ExecutionException>
assertAfterSnapshotCreationFailure() {
+ return (jobId, actualException) -> {
+ Optional<FlinkException> actualFlinkException =
+ ExceptionUtils.findThrowable(actualException,
FlinkException.class);
+ assertTrue(actualFlinkException.isPresent());
+ assertThat(
+ actualFlinkException.get().getMessage(),
+ is(
+ String.format(
+ "Inconsistent execution state after
stopping with savepoint. A global fail-over was triggered to recover the job
%s.",
+ jobId)));
+ };
+ }
+
+ private static BiConsumer<JobID, ExecutionException>
assertInSnapshotCreationFailure() {
+ return (ignored, actualException) -> {
+ Optional<CheckpointException> actualFailureCause =
+ ExceptionUtils.findThrowable(actualException,
CheckpointException.class);
+ assertTrue(actualFailureCause.isPresent());
+ assertThat(
+ actualFailureCause.get().getCheckpointFailureReason(),
+ is(CheckpointFailureReason.JOB_FAILOVER_REGION));
+ };
+ }
+
+ private static OneShotLatch failingPipelineLatch;
+ private static OneShotLatch succeedingPipelineLatch;
+
+ /**
+ * FLINK-21030
+ *
+ * <p>Tests the handling of a failure that happened while stopping an
embarrassingly parallel
+ * job with a Savepoint. The test expects that the stopping action fails
and all executions are
+ * in state {@code RUNNING} afterwards.
+ *
+ * @param failingSource the failing {@link SourceFunction} used in one of
the two pipelines.
+ * @param expectedMaximumNumberOfRestarts the maximum number of restarts
allowed by the restart
+ * strategy.
+ * @param exceptionAssertion asserts the client-call exception to verify
that the right error
+ * was handled.
+ * @see SavepointITCase#failingPipelineLatch The latch used to trigger the
successful start of
+ * the later on failing pipeline.
+ * @see SavepointITCase#succeedingPipelineLatch The latch that triggers
the successful start of
+ * the succeeding pipeline.
+ * @throws Exception if an error occurred while running the test.
+ */
+ private static void testStopWithFailingSourceInOnePipeline(
+ InfiniteTestSource failingSource,
+ File savepointDir,
+ int expectedMaximumNumberOfRestarts,
+ BiConsumer<JobID, ExecutionException> exceptionAssertion)
+ throws Exception {
+ MiniClusterWithClientResource cluster =
+ new MiniClusterWithClientResource(
+ new
MiniClusterResourceConfiguration.Builder().build());
+
+ failingPipelineLatch = new OneShotLatch();
+ succeedingPipelineLatch = new OneShotLatch();
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.getConfig()
+ .setRestartStrategy(
+
RestartStrategies.fixedDelayRestart(expectedMaximumNumberOfRestarts, 10));
Review comment:
```suggestion
RestartStrategies.fixedDelayRestart(expectedMaximumNumberOfRestarts, 0));
```
Let's not waste CI time ;)
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -908,38 +909,57 @@ public void reportCheckpointMetrics(
// will be restarted by the CheckpointCoordinatorDeActivator.
checkpointCoordinator.stopCheckpointScheduler();
+ final CompletableFuture<Collection<ExecutionState>>
executionGraphTerminationFuture =
+ FutureUtils.combineAll(
+ StreamSupport.stream(
+
executionGraph.getAllExecutionVertices().spliterator(),
+ false)
+
.map(ExecutionVertex::getCurrentExecutionAttempt)
+ .map(Execution::getTerminalStateFuture)
+ .collect(Collectors.toList()));
Review comment:
nit: maybe move the creation of this list outside the future creation to
improve readability
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -908,38 +909,57 @@ public void reportCheckpointMetrics(
// will be restarted by the CheckpointCoordinatorDeActivator.
checkpointCoordinator.stopCheckpointScheduler();
+ final CompletableFuture<Collection<ExecutionState>>
executionGraphTerminationFuture =
+ FutureUtils.combineAll(
+ StreamSupport.stream(
+
executionGraph.getAllExecutionVertices().spliterator(),
+ false)
+
.map(ExecutionVertex::getCurrentExecutionAttempt)
+ .map(Execution::getTerminalStateFuture)
+ .collect(Collectors.toList()));
+
final CompletableFuture<String> savepointFuture =
checkpointCoordinator
.triggerSynchronousSavepoint(advanceToEndOfEventTime,
targetDirectory)
.thenApply(CompletedCheckpoint::getExternalPointer);
- final CompletableFuture<JobStatus> terminationFuture =
- executionGraph
- .getTerminationFuture()
Review comment:
We are not tracking this future anymore, but relying on the completion
of all execution's termination futures.
This seems to be sufficient, because the ExecutionGraph is doing the same on
all relevant operations (suspend, failJob)
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -605,6 +625,414 @@ public void abortPendingCheckpointsWhenRestartingTasks()
throws Exception {
assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(),
is(equalTo(0)));
}
+ @Test
+ public void testStopWithSavepointFailingAfterSavepointCreation() throws
Exception {
+ // initially, we don't allow any restarts since the first phase
(savepoint creation)
+ // succeeds without any failures
+ testRestartBackoffTimeStrategy.setCanRestart(false);
+
+ final JobGraph jobGraph =
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+ final SimpleAckingTaskManagerGateway taskManagerGateway =
+ new SimpleAckingTaskManagerGateway();
+ final CountDownLatch checkpointTriggeredLatch =
+ getCheckpointTriggeredLatch(taskManagerGateway);
+
+ // collect executions to which the checkpoint completion was confirmed
+ final List<ExecutionAttemptID>
executionAttemptIdsWithCompletedCheckpoint =
+ new ArrayList<>();
+ taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+ (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+ taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+ (ignored0, ignored1, ignored2, ignored3) -> {
+ throw new
UnsupportedOperationException("notifyCheckpointAborted was called");
+ });
+
+ final DefaultScheduler scheduler =
createSchedulerAndStartScheduling(jobGraph);
+
+ final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+ final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+
+ // we have to make sure that the tasks are running before
stop-with-savepoint is triggered
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.RUNNING));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), succeedingExecutionAttemptId,
ExecutionState.RUNNING));
+
+ final String savepointFolder =
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+ // trigger savepoint and wait for checkpoint to be retrieved by
TaskManagerGateway
+ final CompletableFuture<String> stopWithSavepointFuture =
+ scheduler.stopWithSavepoint(savepointFolder, false);
+ checkpointTriggeredLatch.await();
+
+ acknowledgePendingCheckpoint(scheduler, 1);
+
+ assertThat(
+ "Both the executions where notified about the completed
checkpoint.",
+ executionAttemptIdsWithCompletedCheckpoint,
+ containsInAnyOrder(failingExecutionAttemptId,
succeedingExecutionAttemptId));
+
+ // The savepoint creation succeeded a failure happens in the second
phase when finishing
+ // the tasks. That's why, the restarting policy is enabled.
+ testRestartBackoffTimeStrategy.setCanRestart(true);
+
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.FAILED));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ succeedingExecutionAttemptId,
+ ExecutionState.FINISHED));
+
+ // the restarts due to local failure handling and global job fail-over
are triggered
+ assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(),
hasSize(2));
+ taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+ try {
+ stopWithSavepointFuture.get();
+ fail("An exception is expected.");
+ } catch (ExecutionException e) {
+ Optional<FlinkException> flinkException =
+ ExceptionUtils.findThrowable(e, FlinkException.class);
+
+ assertTrue(flinkException.isPresent());
+ assertThat(
+ flinkException.get().getMessage(),
+ is(
+ String.format(
+ "Inconsistent execution state after
stopping with savepoint. A global fail-over was triggered to recover the job
%s.",
+ jobGraph.getJobID())));
+ }
+
+ assertThat(scheduler.getExecutionGraph().getState(),
is(JobStatus.RUNNING));
+ }
+
+ @Test
+ public void testStopWithSavepointFailingWithDeclinedCheckpoint() throws
Exception {
+ // we allow restarts right from the start since the failure is going
to happen in the first
+ // phase (savepoint creation) of stop-with-savepoint
+ testRestartBackoffTimeStrategy.setCanRestart(true);
+
+ final JobGraph jobGraph =
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+ final SimpleAckingTaskManagerGateway taskManagerGateway =
+ new SimpleAckingTaskManagerGateway();
+ final CountDownLatch checkpointTriggeredLatch =
+ getCheckpointTriggeredLatch(taskManagerGateway);
+
+ // collect executions to which the checkpoint completion was confirmed
+ CountDownLatch checkpointAbortionConfirmedLatch = new
CountDownLatch(2);
+ final List<ExecutionAttemptID>
executionAttemptIdsWithAbortedCheckpoint = new ArrayList<>();
+ taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+ (executionAttemptId, jobId, actualCheckpointId, timestamp) -> {
+
executionAttemptIdsWithAbortedCheckpoint.add(executionAttemptId);
+ checkpointAbortionConfirmedLatch.countDown();
+ });
+ taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+ (ignored0, ignored1, ignored2, ignored3) -> {
+ throw new
UnsupportedOperationException("notifyCheckpointCompleted was called");
+ });
+
+ final DefaultScheduler scheduler =
createSchedulerAndStartScheduling(jobGraph);
+
+ final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+ final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+
+ // we have to make sure that the tasks are running before
stop-with-savepoint is triggered
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.RUNNING));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), succeedingExecutionAttemptId,
ExecutionState.RUNNING));
+
+ final CompletableFuture<String> stopWithSavepointFuture =
+ scheduler.stopWithSavepoint("savepoint-path", false);
+ checkpointTriggeredLatch.await();
+
+ final CheckpointCoordinator checkpointCoordinator =
getCheckpointCoordinator(scheduler);
+
+ final AcknowledgeCheckpoint acknowledgeCheckpoint =
+ new AcknowledgeCheckpoint(jobGraph.getJobID(),
succeedingExecutionAttemptId, 1);
+ final DeclineCheckpoint declineCheckpoint =
+ new DeclineCheckpoint(
+ jobGraph.getJobID(),
+ failingExecutionAttemptId,
+ 1,
+ new
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED));
+
+ checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint,
"unknown location");
+ checkpointCoordinator.receiveDeclineMessage(declineCheckpoint,
"unknown location");
+
+ // we need to wait for the confirmations to be collected since this is
running in a separate
+ // thread
+ checkpointAbortionConfirmedLatch.await();
+
+ assertThat(
+ "Both of the executions where notified about the aborted
checkpoint.",
+ executionAttemptIdsWithAbortedCheckpoint,
+ is(Arrays.asList(succeedingExecutionAttemptId,
failingExecutionAttemptId)));
+
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ succeedingExecutionAttemptId,
+ ExecutionState.FINISHED));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.FAILED));
+
+ // the restart due to failed checkpoint handling triggering a global
job fail-over
+ assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(),
hasSize(1));
+ taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+ try {
+ stopWithSavepointFuture.get();
+ fail("An exception is expected.");
+ } catch (ExecutionException e) {
+ Optional<CheckpointException> actualCheckpointException =
+ findThrowable(e, CheckpointException.class);
+ assertTrue(actualCheckpointException.isPresent());
+ assertThat(
+
actualCheckpointException.get().getCheckpointFailureReason(),
+ is(CheckpointFailureReason.CHECKPOINT_DECLINED));
+ }
+
+ assertThat(scheduler.getExecutionGraph().getState(),
is(JobStatus.RUNNING));
+ }
+
+ @Test
+ public void testStopWithSavepointFailingWithExpiredCheckpoint() throws
Exception {
+ // we allow restarts right from the start since the failure is going
to happen in the first
+ // phase (savepoint creation) of stop-with-savepoint
+ testRestartBackoffTimeStrategy.setCanRestart(true);
+
+ final JobGraph jobGraph = createTwoVertexJobGraph();
+ // set checkpoint timeout to a low value to simulate checkpoint
expiration
+ enableCheckpointing(jobGraph, 10);
+
+ final SimpleAckingTaskManagerGateway taskManagerGateway =
+ new SimpleAckingTaskManagerGateway();
+ final CountDownLatch checkpointTriggeredLatch =
+ getCheckpointTriggeredLatch(taskManagerGateway);
+
+ // we have to set a listener that checks for the termination of the
checkpoint handling
+ OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch();
+ taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+ (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+ checkpointAbortionWasTriggered.trigger());
+
+ // the failure handling has to happen in the same thread as the
checkpoint coordination -
+ // that's why we have to instantiate a separate ThreadExecutorService
here
+ final ScheduledExecutorService singleThreadExecutorService =
+ Executors.newSingleThreadScheduledExecutor();
+ final ComponentMainThreadExecutor mainThreadExecutor =
+
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+ singleThreadExecutorService);
+
+ final DefaultScheduler scheduler =
+ CompletableFuture.supplyAsync(
+ () ->
+ createSchedulerAndStartScheduling(
+ jobGraph, mainThreadExecutor),
+ mainThreadExecutor)
+ .get();
+
+ final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+ final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+
+ final CompletableFuture<String> stopWithSavepointFuture =
+ CompletableFuture.supplyAsync(
+ () -> {
+ // we have to make sure that the tasks are
running before
+ // stop-with-savepoint is triggered
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ failingExecutionAttemptId,
+ ExecutionState.RUNNING));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+
succeedingExecutionAttemptId,
+ ExecutionState.RUNNING));
+
+ return
scheduler.stopWithSavepoint("savepoint-path", false);
+ },
+ mainThreadExecutor)
+ .get();
+
+ checkpointTriggeredLatch.await();
+
+ final CheckpointCoordinator checkpointCoordinator =
getCheckpointCoordinator(scheduler);
+
+ final AcknowledgeCheckpoint acknowledgeCheckpoint =
+ new AcknowledgeCheckpoint(jobGraph.getJobID(),
succeedingExecutionAttemptId, 1);
+
+ checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint,
"unknown location");
+
+ // we need to wait for the expired checkpoint to be handled
+ checkpointAbortionWasTriggered.await();
+
+ CompletableFuture.runAsync(
+ () -> {
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ succeedingExecutionAttemptId,
+ ExecutionState.FINISHED));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ failingExecutionAttemptId,
+ ExecutionState.FAILED));
+
+ // the restart due to failed checkpoint handling
triggering a global job
+ // fail-over
+ assertThat(
+
taskRestartExecutor.getNonPeriodicScheduledTask(), hasSize(1));
+
taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+ },
+ mainThreadExecutor)
+ .get();
+
+ try {
+ stopWithSavepointFuture.get();
+ fail("An exception is expected.");
+ } catch (ExecutionException e) {
+ Optional<CheckpointException> actualCheckpointException =
+ findThrowable(e, CheckpointException.class);
+ assertTrue(actualCheckpointException.isPresent());
+ assertThat(
+
actualCheckpointException.get().getCheckpointFailureReason(),
+ is(CheckpointFailureReason.CHECKPOINT_EXPIRED));
+ }
+
+ // we have to wait for the main executor to be finished with
restarting tasks
+ singleThreadExecutorService.shutdown();
+ singleThreadExecutorService.awaitTermination(1, TimeUnit.SECONDS);
+
+ assertThat(scheduler.getExecutionGraph().getState(),
is(JobStatus.RUNNING));
+ }
+
+ @Test
+ public void testStopWithSavepoint() throws Exception {
+ // we don't allow any restarts during the happy path
+ testRestartBackoffTimeStrategy.setCanRestart(false);
+
+ final JobGraph jobGraph =
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+ final SimpleAckingTaskManagerGateway taskManagerGateway =
+ new SimpleAckingTaskManagerGateway();
+ final CountDownLatch checkpointTriggeredLatch =
+ getCheckpointTriggeredLatch(taskManagerGateway);
+
+ // collect executions to which the checkpoint completion was confirmed
+ final List<ExecutionAttemptID>
executionAttemptIdsWithCompletedCheckpoint =
+ new ArrayList<>();
+ taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+ (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+ taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+ (ignored0, ignored1, ignored2, ignored3) -> {
+ throw new
UnsupportedOperationException("notifyCheckpointAborted was called");
+ });
+
+ final DefaultScheduler scheduler =
createSchedulerAndStartScheduling(jobGraph);
+
+ final Set<ExecutionAttemptID> executionAttemptIds =
+ StreamSupport.stream(
+ scheduler
+ .getExecutionGraph()
+ .getAllExecutionVertices()
+ .spliterator(),
+ false)
+ .map(ExecutionVertex::getCurrentExecutionAttempt)
+ .map(Execution::getAttemptId)
+ .collect(Collectors.toSet());
+
+ // we have to make sure that the tasks are running before
stop-with-savepoint is triggered
+ executionAttemptIds.forEach(
+ id ->
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), id,
ExecutionState.RUNNING)));
+
+ final String savepointFolder =
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+ // trigger savepoint and wait for checkpoint to be retrieved by
TaskManagerGateway
+ final CompletableFuture<String> stopWithSavepointFuture =
+ scheduler.stopWithSavepoint(savepointFolder, false);
+ checkpointTriggeredLatch.await();
+
+ acknowledgePendingCheckpoint(scheduler, 1);
+
+ assertThat(
+ "Both the executions where notified about the completed
checkpoint.",
+ executionAttemptIdsWithCompletedCheckpoint,
+ containsInAnyOrder(executionAttemptIds.toArray()));
+
+ executionAttemptIds.forEach(
+ id ->
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), id,
ExecutionState.FINISHED)));
+
+ try {
+ assertThat(
+ stopWithSavepointFuture.get(),
+ startsWith(String.format("file:%s", savepointFolder)));
+ } catch (ExecutionException e) {
+ fail("No exception is expected.");
+ }
Review comment:
Can't you just remove the try catch and throw the exception out of the
test. Then, we would get the stack trace + message also in CI output.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -605,6 +625,414 @@ public void abortPendingCheckpointsWhenRestartingTasks()
throws Exception {
assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(),
is(equalTo(0)));
}
+ @Test
+ public void testStopWithSavepointFailingAfterSavepointCreation() throws
Exception {
+ // initially, we don't allow any restarts since the first phase
(savepoint creation)
+ // succeeds without any failures
+ testRestartBackoffTimeStrategy.setCanRestart(false);
+
+ final JobGraph jobGraph =
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+ final SimpleAckingTaskManagerGateway taskManagerGateway =
+ new SimpleAckingTaskManagerGateway();
+ final CountDownLatch checkpointTriggeredLatch =
+ getCheckpointTriggeredLatch(taskManagerGateway);
+
+ // collect executions to which the checkpoint completion was confirmed
+ final List<ExecutionAttemptID>
executionAttemptIdsWithCompletedCheckpoint =
+ new ArrayList<>();
+ taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+ (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+ taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+ (ignored0, ignored1, ignored2, ignored3) -> {
+ throw new
UnsupportedOperationException("notifyCheckpointAborted was called");
+ });
+
+ final DefaultScheduler scheduler =
createSchedulerAndStartScheduling(jobGraph);
+
+ final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+ final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+
+ // we have to make sure that the tasks are running before
stop-with-savepoint is triggered
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.RUNNING));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), succeedingExecutionAttemptId,
ExecutionState.RUNNING));
+
+ final String savepointFolder =
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+ // trigger savepoint and wait for checkpoint to be retrieved by
TaskManagerGateway
+ final CompletableFuture<String> stopWithSavepointFuture =
+ scheduler.stopWithSavepoint(savepointFolder, false);
+ checkpointTriggeredLatch.await();
+
+ acknowledgePendingCheckpoint(scheduler, 1);
+
+ assertThat(
+ "Both the executions where notified about the completed
checkpoint.",
+ executionAttemptIdsWithCompletedCheckpoint,
+ containsInAnyOrder(failingExecutionAttemptId,
succeedingExecutionAttemptId));
+
+ // The savepoint creation succeeded a failure happens in the second
phase when finishing
+ // the tasks. That's why, the restarting policy is enabled.
+ testRestartBackoffTimeStrategy.setCanRestart(true);
+
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.FAILED));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ succeedingExecutionAttemptId,
+ ExecutionState.FINISHED));
+
+ // the restarts due to local failure handling and global job fail-over
are triggered
+ assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(),
hasSize(2));
+ taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+ try {
+ stopWithSavepointFuture.get();
+ fail("An exception is expected.");
+ } catch (ExecutionException e) {
+ Optional<FlinkException> flinkException =
+ ExceptionUtils.findThrowable(e, FlinkException.class);
+
+ assertTrue(flinkException.isPresent());
+ assertThat(
+ flinkException.get().getMessage(),
+ is(
+ String.format(
+ "Inconsistent execution state after
stopping with savepoint. A global fail-over was triggered to recover the job
%s.",
+ jobGraph.getJobID())));
+ }
+
+ assertThat(scheduler.getExecutionGraph().getState(),
is(JobStatus.RUNNING));
+ }
+
+ @Test
+ public void testStopWithSavepointFailingWithDeclinedCheckpoint() throws
Exception {
+ // we allow restarts right from the start since the failure is going
to happen in the first
+ // phase (savepoint creation) of stop-with-savepoint
+ testRestartBackoffTimeStrategy.setCanRestart(true);
+
+ final JobGraph jobGraph =
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+ final SimpleAckingTaskManagerGateway taskManagerGateway =
+ new SimpleAckingTaskManagerGateway();
+ final CountDownLatch checkpointTriggeredLatch =
+ getCheckpointTriggeredLatch(taskManagerGateway);
+
+ // collect executions to which the checkpoint completion was confirmed
+ CountDownLatch checkpointAbortionConfirmedLatch = new
CountDownLatch(2);
+ final List<ExecutionAttemptID>
executionAttemptIdsWithAbortedCheckpoint = new ArrayList<>();
+ taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+ (executionAttemptId, jobId, actualCheckpointId, timestamp) -> {
+
executionAttemptIdsWithAbortedCheckpoint.add(executionAttemptId);
+ checkpointAbortionConfirmedLatch.countDown();
+ });
+ taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+ (ignored0, ignored1, ignored2, ignored3) -> {
+ throw new
UnsupportedOperationException("notifyCheckpointCompleted was called");
+ });
+
+ final DefaultScheduler scheduler =
createSchedulerAndStartScheduling(jobGraph);
+
+ final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+ final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+
+ // we have to make sure that the tasks are running before
stop-with-savepoint is triggered
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.RUNNING));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), succeedingExecutionAttemptId,
ExecutionState.RUNNING));
+
+ final CompletableFuture<String> stopWithSavepointFuture =
+ scheduler.stopWithSavepoint("savepoint-path", false);
+ checkpointTriggeredLatch.await();
+
+ final CheckpointCoordinator checkpointCoordinator =
getCheckpointCoordinator(scheduler);
+
+ final AcknowledgeCheckpoint acknowledgeCheckpoint =
+ new AcknowledgeCheckpoint(jobGraph.getJobID(),
succeedingExecutionAttemptId, 1);
+ final DeclineCheckpoint declineCheckpoint =
+ new DeclineCheckpoint(
+ jobGraph.getJobID(),
+ failingExecutionAttemptId,
+ 1,
+ new
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED));
+
+ checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint,
"unknown location");
+ checkpointCoordinator.receiveDeclineMessage(declineCheckpoint,
"unknown location");
+
+ // we need to wait for the confirmations to be collected since this is
running in a separate
+ // thread
+ checkpointAbortionConfirmedLatch.await();
+
+ assertThat(
+ "Both of the executions where notified about the aborted
checkpoint.",
+ executionAttemptIdsWithAbortedCheckpoint,
+ is(Arrays.asList(succeedingExecutionAttemptId,
failingExecutionAttemptId)));
Review comment:
Do we need to enforce order here?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -908,38 +909,57 @@ public void reportCheckpointMetrics(
// will be restarted by the CheckpointCoordinatorDeActivator.
checkpointCoordinator.stopCheckpointScheduler();
+ final CompletableFuture<Collection<ExecutionState>>
executionGraphTerminationFuture =
+ FutureUtils.combineAll(
+ StreamSupport.stream(
+
executionGraph.getAllExecutionVertices().spliterator(),
+ false)
+
.map(ExecutionVertex::getCurrentExecutionAttempt)
+ .map(Execution::getTerminalStateFuture)
+ .collect(Collectors.toList()));
+
final CompletableFuture<String> savepointFuture =
checkpointCoordinator
.triggerSynchronousSavepoint(advanceToEndOfEventTime,
targetDirectory)
.thenApply(CompletedCheckpoint::getExternalPointer);
- final CompletableFuture<JobStatus> terminationFuture =
- executionGraph
- .getTerminationFuture()
- .handle(
- (jobstatus, throwable) -> {
- if (throwable != null) {
- log.info(
- "Failed during stopping job {}
with a savepoint. Reason: {}",
- jobGraph.getJobID(),
- throwable.getMessage());
- throw new
CompletionException(throwable);
- } else if (jobstatus !=
JobStatus.FINISHED) {
- log.info(
- "Failed during stopping job {}
with a savepoint. Reason: Reached state {} instead of FINISHED.",
- jobGraph.getJobID(),
- jobstatus);
- throw new CompletionException(
- new FlinkException(
- "Reached state "
- + jobstatus
- + " instead of
FINISHED."));
- }
- return jobstatus;
- });
-
return savepointFuture
- .thenCompose((path) -> terminationFuture.thenApply((jobStatus
-> path)))
+ .thenCompose(
+ path ->
+ executionGraphTerminationFuture
+ .handleAsync(
Review comment:
If we are going to change the behavior here, we also need to change the
`FutureUtils.combineAll()` to a `waitForAll()`, to get all the final states.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -908,38 +909,57 @@ public void reportCheckpointMetrics(
// will be restarted by the CheckpointCoordinatorDeActivator.
checkpointCoordinator.stopCheckpointScheduler();
+ final CompletableFuture<Collection<ExecutionState>>
executionGraphTerminationFuture =
+ FutureUtils.combineAll(
+ StreamSupport.stream(
+
executionGraph.getAllExecutionVertices().spliterator(),
+ false)
+
.map(ExecutionVertex::getCurrentExecutionAttempt)
+ .map(Execution::getTerminalStateFuture)
+ .collect(Collectors.toList()));
+
final CompletableFuture<String> savepointFuture =
checkpointCoordinator
.triggerSynchronousSavepoint(advanceToEndOfEventTime,
targetDirectory)
.thenApply(CompletedCheckpoint::getExternalPointer);
- final CompletableFuture<JobStatus> terminationFuture =
- executionGraph
- .getTerminationFuture()
- .handle(
- (jobstatus, throwable) -> {
- if (throwable != null) {
- log.info(
- "Failed during stopping job {}
with a savepoint. Reason: {}",
- jobGraph.getJobID(),
- throwable.getMessage());
- throw new
CompletionException(throwable);
- } else if (jobstatus !=
JobStatus.FINISHED) {
- log.info(
- "Failed during stopping job {}
with a savepoint. Reason: Reached state {} instead of FINISHED.",
- jobGraph.getJobID(),
- jobstatus);
- throw new CompletionException(
- new FlinkException(
- "Reached state "
- + jobstatus
- + " instead of
FINISHED."));
- }
- return jobstatus;
- });
-
return savepointFuture
- .thenCompose((path) -> terminationFuture.thenApply((jobStatus
-> path)))
+ .thenCompose(
+ path ->
+ executionGraphTerminationFuture
+ .handleAsync(
+ (executionStates, throwable)
-> {
+ Set<ExecutionState>
nonFinishedStates =
+
extractNonFinishedStates(
+
executionStates);
+ if (throwable != null) {
Review comment:
Under which circumstances are we running into this condition?
(Background: We should end up in this condition if the termination future of
the `Execution` is completed exceptionally. I couldn't spot where that's
happening. But if there's ever going to be a code change that returns a
throwable here, then we might now properly go into a global failover)
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -605,6 +625,414 @@ public void abortPendingCheckpointsWhenRestartingTasks()
throws Exception {
assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(),
is(equalTo(0)));
}
+ @Test
+ public void testStopWithSavepointFailingAfterSavepointCreation() throws
Exception {
+ // initially, we don't allow any restarts since the first phase
(savepoint creation)
+ // succeeds without any failures
+ testRestartBackoffTimeStrategy.setCanRestart(false);
+
+ final JobGraph jobGraph =
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+ final SimpleAckingTaskManagerGateway taskManagerGateway =
+ new SimpleAckingTaskManagerGateway();
+ final CountDownLatch checkpointTriggeredLatch =
+ getCheckpointTriggeredLatch(taskManagerGateway);
+
+ // collect executions to which the checkpoint completion was confirmed
+ final List<ExecutionAttemptID>
executionAttemptIdsWithCompletedCheckpoint =
+ new ArrayList<>();
+ taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+ (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+ taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+ (ignored0, ignored1, ignored2, ignored3) -> {
+ throw new
UnsupportedOperationException("notifyCheckpointAborted was called");
+ });
+
+ final DefaultScheduler scheduler =
createSchedulerAndStartScheduling(jobGraph);
+
+ final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+ final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+
+ // we have to make sure that the tasks are running before
stop-with-savepoint is triggered
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.RUNNING));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), succeedingExecutionAttemptId,
ExecutionState.RUNNING));
+
+ final String savepointFolder =
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+ // trigger savepoint and wait for checkpoint to be retrieved by
TaskManagerGateway
+ final CompletableFuture<String> stopWithSavepointFuture =
+ scheduler.stopWithSavepoint(savepointFolder, false);
+ checkpointTriggeredLatch.await();
+
+ acknowledgePendingCheckpoint(scheduler, 1);
+
+ assertThat(
+ "Both the executions where notified about the completed
checkpoint.",
+ executionAttemptIdsWithCompletedCheckpoint,
+ containsInAnyOrder(failingExecutionAttemptId,
succeedingExecutionAttemptId));
+
+ // The savepoint creation succeeded a failure happens in the second
phase when finishing
+ // the tasks. That's why, the restarting policy is enabled.
+ testRestartBackoffTimeStrategy.setCanRestart(true);
+
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.FAILED));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ succeedingExecutionAttemptId,
+ ExecutionState.FINISHED));
+
+ // the restarts due to local failure handling and global job fail-over
are triggered
+ assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(),
hasSize(2));
+ taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+ try {
+ stopWithSavepointFuture.get();
+ fail("An exception is expected.");
+ } catch (ExecutionException e) {
+ Optional<FlinkException> flinkException =
+ ExceptionUtils.findThrowable(e, FlinkException.class);
+
+ assertTrue(flinkException.isPresent());
+ assertThat(
+ flinkException.get().getMessage(),
+ is(
+ String.format(
+ "Inconsistent execution state after
stopping with savepoint. A global fail-over was triggered to recover the job
%s.",
+ jobGraph.getJobID())));
+ }
+
+ assertThat(scheduler.getExecutionGraph().getState(),
is(JobStatus.RUNNING));
+ }
+
+ @Test
+ public void testStopWithSavepointFailingWithDeclinedCheckpoint() throws
Exception {
+ // we allow restarts right from the start since the failure is going
to happen in the first
+ // phase (savepoint creation) of stop-with-savepoint
+ testRestartBackoffTimeStrategy.setCanRestart(true);
+
+ final JobGraph jobGraph =
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+ final SimpleAckingTaskManagerGateway taskManagerGateway =
+ new SimpleAckingTaskManagerGateway();
+ final CountDownLatch checkpointTriggeredLatch =
+ getCheckpointTriggeredLatch(taskManagerGateway);
+
+ // collect executions to which the checkpoint completion was confirmed
+ CountDownLatch checkpointAbortionConfirmedLatch = new
CountDownLatch(2);
+ final List<ExecutionAttemptID>
executionAttemptIdsWithAbortedCheckpoint = new ArrayList<>();
+ taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+ (executionAttemptId, jobId, actualCheckpointId, timestamp) -> {
+
executionAttemptIdsWithAbortedCheckpoint.add(executionAttemptId);
+ checkpointAbortionConfirmedLatch.countDown();
+ });
+ taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+ (ignored0, ignored1, ignored2, ignored3) -> {
+ throw new
UnsupportedOperationException("notifyCheckpointCompleted was called");
+ });
+
+ final DefaultScheduler scheduler =
createSchedulerAndStartScheduling(jobGraph);
+
+ final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+ final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+
+ // we have to make sure that the tasks are running before
stop-with-savepoint is triggered
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.RUNNING));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), succeedingExecutionAttemptId,
ExecutionState.RUNNING));
+
+ final CompletableFuture<String> stopWithSavepointFuture =
+ scheduler.stopWithSavepoint("savepoint-path", false);
+ checkpointTriggeredLatch.await();
+
+ final CheckpointCoordinator checkpointCoordinator =
getCheckpointCoordinator(scheduler);
+
+ final AcknowledgeCheckpoint acknowledgeCheckpoint =
+ new AcknowledgeCheckpoint(jobGraph.getJobID(),
succeedingExecutionAttemptId, 1);
+ final DeclineCheckpoint declineCheckpoint =
+ new DeclineCheckpoint(
+ jobGraph.getJobID(),
+ failingExecutionAttemptId,
+ 1,
+ new
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED));
+
+ checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint,
"unknown location");
+ checkpointCoordinator.receiveDeclineMessage(declineCheckpoint,
"unknown location");
+
+ // we need to wait for the confirmations to be collected since this is
running in a separate
+ // thread
+ checkpointAbortionConfirmedLatch.await();
+
+ assertThat(
+ "Both of the executions where notified about the aborted
checkpoint.",
+ executionAttemptIdsWithAbortedCheckpoint,
+ is(Arrays.asList(succeedingExecutionAttemptId,
failingExecutionAttemptId)));
+
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ succeedingExecutionAttemptId,
+ ExecutionState.FINISHED));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.FAILED));
+
+ // the restart due to failed checkpoint handling triggering a global
job fail-over
+ assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(),
hasSize(1));
+ taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+
+ try {
+ stopWithSavepointFuture.get();
+ fail("An exception is expected.");
+ } catch (ExecutionException e) {
+ Optional<CheckpointException> actualCheckpointException =
+ findThrowable(e, CheckpointException.class);
+ assertTrue(actualCheckpointException.isPresent());
+ assertThat(
+
actualCheckpointException.get().getCheckpointFailureReason(),
+ is(CheckpointFailureReason.CHECKPOINT_DECLINED));
+ }
+
+ assertThat(scheduler.getExecutionGraph().getState(),
is(JobStatus.RUNNING));
+ }
+
+ @Test
+ public void testStopWithSavepointFailingWithExpiredCheckpoint() throws
Exception {
+ // we allow restarts right from the start since the failure is going
to happen in the first
+ // phase (savepoint creation) of stop-with-savepoint
+ testRestartBackoffTimeStrategy.setCanRestart(true);
+
+ final JobGraph jobGraph = createTwoVertexJobGraph();
+ // set checkpoint timeout to a low value to simulate checkpoint
expiration
+ enableCheckpointing(jobGraph, 10);
+
+ final SimpleAckingTaskManagerGateway taskManagerGateway =
+ new SimpleAckingTaskManagerGateway();
+ final CountDownLatch checkpointTriggeredLatch =
+ getCheckpointTriggeredLatch(taskManagerGateway);
+
+ // we have to set a listener that checks for the termination of the
checkpoint handling
+ OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch();
+ taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+ (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+ checkpointAbortionWasTriggered.trigger());
+
+ // the failure handling has to happen in the same thread as the
checkpoint coordination -
+ // that's why we have to instantiate a separate ThreadExecutorService
here
+ final ScheduledExecutorService singleThreadExecutorService =
Review comment:
Why can't we use
`ComponentMainThreadExecutorServiceAdapter.forMainThread()`?
The proper solution would be passing checkpoint coordinator or its timer
thread from the test into it. But that introducing this change seems beyond the
scope of this PR.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -605,6 +625,414 @@ public void abortPendingCheckpointsWhenRestartingTasks()
throws Exception {
assertThat(checkpointCoordinator.getNumberOfPendingCheckpoints(),
is(equalTo(0)));
}
+ @Test
+ public void testStopWithSavepointFailingAfterSavepointCreation() throws
Exception {
+ // initially, we don't allow any restarts since the first phase
(savepoint creation)
+ // succeeds without any failures
+ testRestartBackoffTimeStrategy.setCanRestart(false);
+
+ final JobGraph jobGraph =
createTwoVertexJobGraphWithCheckpointingEnabled();
+
+ final SimpleAckingTaskManagerGateway taskManagerGateway =
+ new SimpleAckingTaskManagerGateway();
+ final CountDownLatch checkpointTriggeredLatch =
+ getCheckpointTriggeredLatch(taskManagerGateway);
+
+ // collect executions to which the checkpoint completion was confirmed
+ final List<ExecutionAttemptID>
executionAttemptIdsWithCompletedCheckpoint =
+ new ArrayList<>();
+ taskManagerGateway.setNotifyCheckpointCompleteConsumer(
+ (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+
executionAttemptIdsWithCompletedCheckpoint.add(executionAttemptId));
+ taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+ (ignored0, ignored1, ignored2, ignored3) -> {
+ throw new
UnsupportedOperationException("notifyCheckpointAborted was called");
+ });
+
+ final DefaultScheduler scheduler =
createSchedulerAndStartScheduling(jobGraph);
+
+ final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+ final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+
+ // we have to make sure that the tasks are running before
stop-with-savepoint is triggered
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.RUNNING));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), succeedingExecutionAttemptId,
ExecutionState.RUNNING));
+
+ final String savepointFolder =
TEMPORARY_FOLDER.newFolder().getAbsolutePath();
+
+ // trigger savepoint and wait for checkpoint to be retrieved by
TaskManagerGateway
+ final CompletableFuture<String> stopWithSavepointFuture =
+ scheduler.stopWithSavepoint(savepointFolder, false);
+ checkpointTriggeredLatch.await();
+
+ acknowledgePendingCheckpoint(scheduler, 1);
+
+ assertThat(
+ "Both the executions where notified about the completed
checkpoint.",
+ executionAttemptIdsWithCompletedCheckpoint,
+ containsInAnyOrder(failingExecutionAttemptId,
succeedingExecutionAttemptId));
+
+ // The savepoint creation succeeded a failure happens in the second
phase when finishing
+ // the tasks. That's why, the restarting policy is enabled.
+ testRestartBackoffTimeStrategy.setCanRestart(true);
+
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(), failingExecutionAttemptId,
ExecutionState.FAILED));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ succeedingExecutionAttemptId,
+ ExecutionState.FINISHED));
+
+ // the restarts due to local failure handling and global job fail-over
are triggered
+ assertThat(taskRestartExecutor.getNonPeriodicScheduledTask(),
hasSize(2));
Review comment:
This feels like the test depends on some implementation details of the
test's subject. It is likely that somebody who's changing the scheduler, and
subsequently runs into a test failure here will just adjust this number to make
the test pass again. Can you comment which two scheduled tasks are expected to
be in there?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]