tillrohrmann commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r576677189
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -817,6 +818,126 @@ public void
testStopWithSavepointFailingWithDeclinedCheckpoint() throws Exceptio
assertThat(scheduler.getExecutionGraph().getState(),
is(JobStatus.RUNNING));
}
+ @Test
+ public void testStopWithSavepointFailingWithExpiredCheckpoint() throws
Exception {
+ // we allow restarts right from the start since the failure is going
to happen in the first
+ // phase (savepoint creation) of stop-with-savepoint
+ testRestartBackoffTimeStrategy.setCanRestart(true);
+
+ final JobGraph jobGraph = createTwoVertexJobGraph();
+ // set checkpoint timeout to a low value to simulate checkpoint
expiration
+ enableCheckpointing(jobGraph, 10);
+
+ final SimpleAckingTaskManagerGateway taskManagerGateway =
+ new SimpleAckingTaskManagerGateway();
+ final CountDownLatch checkpointTriggeredLatch =
+ getCheckpointTriggeredLatch(taskManagerGateway);
+
+ // we have to set a listener that checks for the termination of the
checkpoint handling
+ OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch();
+ taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+ (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+ checkpointAbortionWasTriggered.trigger());
+
+ // the failure handling has to happen in the same thread as the
checkpoint coordination -
+ // that's why we have to instantiate a separate ThreadExecutorService
here
+ final ScheduledExecutorService singleThreadExecutorService =
+ Executors.newSingleThreadScheduledExecutor();
+ final ComponentMainThreadExecutor mainThreadExecutor =
+
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+ singleThreadExecutorService);
+
+ final DefaultScheduler scheduler =
+ CompletableFuture.supplyAsync(
+ () ->
+ createSchedulerAndStartScheduling(
+ jobGraph, mainThreadExecutor),
+ mainThreadExecutor)
+ .get();
+
+ final ExecutionAttemptID succeedingExecutionAttemptId =
+
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+ final ExecutionAttemptID failingExecutionAttemptId =
+
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+ .getCurrentExecutionAttempt()
+ .getAttemptId();
+
+ final CompletableFuture<String> stopWithSavepointFuture =
+ CompletableFuture.supplyAsync(
+ () -> {
+ // we have to make sure that the tasks are
running before
+ // stop-with-savepoint is triggered
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ failingExecutionAttemptId,
+ ExecutionState.RUNNING));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+
succeedingExecutionAttemptId,
+ ExecutionState.RUNNING));
+
+ return
scheduler.stopWithSavepoint("savepoint-path", false);
+ },
+ mainThreadExecutor)
+ .get();
+
+ checkpointTriggeredLatch.await();
+
+ final CheckpointCoordinator checkpointCoordinator =
getCheckpointCoordinator(scheduler);
+
+ final AcknowledgeCheckpoint acknowledgeCheckpoint =
+ new AcknowledgeCheckpoint(jobGraph.getJobID(),
succeedingExecutionAttemptId, 1);
+
+ checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint,
"unknown location");
+
+ // we need to wait for the expired checkpoint to be handled
+ checkpointAbortionWasTriggered.await();
+
+ CompletableFuture.runAsync(
+ () -> {
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ succeedingExecutionAttemptId,
+ ExecutionState.FINISHED));
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ failingExecutionAttemptId,
+ ExecutionState.FAILED));
+
+ // the restart due to failed checkpoint handling
triggering a global job
+ // fail-over
+ assertThat(
+
taskRestartExecutor.getNonPeriodicScheduledTask(), hasSize(1));
Review comment:
Ah ok, so it is something the `CheckpointCoordinator` does. I think it
is ok to not test `CheckpointCoordinator` specific functionality and defer it
to some unit tests for the `CC`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]