ifndef-SleePy commented on a change in pull request #10332:
[FLINK-13905][checkpointing] Separate checkpoint triggering into several
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r368273302
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
##########
@@ -283,31 +300,494 @@ public void testStopPeriodicScheduler() throws
Exception {
failureManager);
// Periodic
+ final CompletableFuture<CompletedCheckpoint>
onCompletionPromise1 = coord.triggerCheckpoint(
+ System.currentTimeMillis(),
+
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+ null,
+ true,
+ false);
+ manuallyTriggeredScheduledExecutor.triggerAll();
try {
- coord.triggerCheckpoint(
- System.currentTimeMillis(),
-
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
- null,
- true,
- false);
- manuallyTriggeredScheduledExecutor.triggerAll();
+ onCompletionPromise1.get();
fail("The triggerCheckpoint call expected an
exception");
- } catch (CheckpointException e) {
-
assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN,
e.getCheckpointFailureReason());
+ } catch (ExecutionException e) {
+ final Optional<CheckpointException>
checkpointExceptionOptional =
+ ExceptionUtils.findThrowable(e,
CheckpointException.class);
+ assertTrue(checkpointExceptionOptional.isPresent());
+
assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN,
+
checkpointExceptionOptional.get().getCheckpointFailureReason());
}
// Not periodic
+ final CompletableFuture<CompletedCheckpoint>
onCompletionPromise2 = coord.triggerCheckpoint(
+ System.currentTimeMillis(),
+
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+ null,
+ false,
+ false);
+ manuallyTriggeredScheduledExecutor.triggerAll();
+ assertFalse(onCompletionPromise2.isCompletedExceptionally());
+ }
+
+ @Test
+ public void testTriggerCheckpointWithShuttingDownCoordinator() throws
Exception {
+ // create some mock Execution vertices that receive the
checkpoint trigger messages
+ final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+ ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+
+ // set up the coordinator and validate the initial state
+ CheckpointCoordinatorConfiguration chkConfig = new
CheckpointCoordinatorConfiguration(
+ 600000,
+ 600000,
+ 0,
+ Integer.MAX_VALUE,
+
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+ true,
+ false,
+ 0);
+ CheckpointCoordinator coord = new CheckpointCoordinator(
+ new JobID(),
+ chkConfig,
+ new ExecutionVertex[] { vertex1 },
+ new ExecutionVertex[] { vertex1 },
+ new ExecutionVertex[] { vertex1 },
+ new StandaloneCheckpointIDCounter(),
+ new StandaloneCompletedCheckpointStore(1),
+ new MemoryStateBackend(),
+ Executors.directExecutor(),
+ manuallyTriggeredScheduledExecutor,
+ SharedStateRegistry.DEFAULT_FACTORY,
+ failureManager);
+
+ coord.startCheckpointScheduler();
+ // Periodic
+ final CompletableFuture<CompletedCheckpoint>
onCompletionPromise = coord.triggerCheckpoint(
+ System.currentTimeMillis(),
+
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+ null,
+ true,
+ false);
+
+ coord.shutdown(JobStatus.FAILED);
+ manuallyTriggeredScheduledExecutor.triggerAll();
try {
- coord.triggerCheckpoint(
- System.currentTimeMillis(),
-
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
- null,
- false,
- false);
- manuallyTriggeredScheduledExecutor.triggerAll();
- } catch (CheckpointException e) {
- fail("Unexpected exception : " +
e.getCheckpointFailureReason().message());
+ onCompletionPromise.get();
+ fail("Should not reach here");
+ } catch (ExecutionException e) {
+ final Optional<CheckpointException>
checkpointExceptionOptional =
+ ExceptionUtils.findThrowable(e,
CheckpointException.class);
+ assertTrue(checkpointExceptionOptional.isPresent());
+
assertEquals(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN,
+
checkpointExceptionOptional.get().getCheckpointFailureReason());
}
}
+ @Test
+ public void testTriggerCheckpointRequestQueued() throws Exception {
+ // create some mock Execution vertices that receive the
checkpoint trigger messages
+ final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+ ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+
+ // set up the coordinator and validate the initial state
+ CheckpointCoordinatorConfiguration chkConfig = new
CheckpointCoordinatorConfiguration(
+ 600000,
+ 600000,
+ 0,
+ Integer.MAX_VALUE,
+
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+ true,
+ false,
+ 0);
+ CheckpointCoordinator coord = new CheckpointCoordinator(
+ new JobID(),
+ chkConfig,
+ new ExecutionVertex[] { vertex1 },
+ new ExecutionVertex[] { vertex1 },
+ new ExecutionVertex[] { vertex1 },
+ new StandaloneCheckpointIDCounter(),
+ new StandaloneCompletedCheckpointStore(1),
+ new MemoryStateBackend(),
+ Executors.directExecutor(),
+ manuallyTriggeredScheduledExecutor,
+ SharedStateRegistry.DEFAULT_FACTORY,
+ failureManager);
+
+ coord.startCheckpointScheduler();
+ // Periodic
+ final CompletableFuture<CompletedCheckpoint>
onCompletionPromise1 = coord.triggerCheckpoint(
+ System.currentTimeMillis(),
+
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+ null,
+ true,
+ false);
+ assertTrue(coord.isTriggering());
+ assertEquals(0, coord.getTriggerRequestQueue().size());
+
+ // another trigger before the prior one finished
+
manuallyTriggeredScheduledExecutor.triggerPeriodicScheduledTasks();
+ assertTrue(coord.isTriggering());
+ assertEquals(1, coord.getTriggerRequestQueue().size());
+
+ manuallyTriggeredScheduledExecutor.triggerAll();
+ assertFalse(onCompletionPromise1.isCompletedExceptionally());
+ assertFalse(coord.isTriggering());
+ assertEquals(0, coord.getTriggerRequestQueue().size());
+ }
+
+ @Test
+ public void testTriggerCheckpointRequestQueuedWithFailure() throws
Exception {
+ // create some mock Execution vertices that receive the
checkpoint trigger messages
+ final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+ final AtomicInteger taskManagerCheckpointTriggeredTimes = new
AtomicInteger(0);
+ final SimpleAckingTaskManagerGateway.CheckpointConsumer
checkpointConsumer =
+ (executionAttemptID,
+ jobId, checkpointId,
+ timestamp,
+ checkpointOptions,
+ advanceToEndOfEventTime) ->
taskManagerCheckpointTriggeredTimes.incrementAndGet();
+ ExecutionVertex vertex1 = mockExecutionVertex(attemptID1,
checkpointConsumer);
+
+ // set up the coordinator and validate the initial state
+ CheckpointCoordinatorConfiguration chkConfig = new
CheckpointCoordinatorConfiguration(
+ 600000,
+ 600000,
+ 0,
+ Integer.MAX_VALUE,
+
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+ true,
+ false,
+ 0);
+ CheckpointCoordinator coord = new CheckpointCoordinator(
+ new JobID(),
+ chkConfig,
+ new ExecutionVertex[] { vertex1 },
+ new ExecutionVertex[] { vertex1 },
+ new ExecutionVertex[] { vertex1 },
+ new TestingCheckpointIDCounter(),
+ new StandaloneCompletedCheckpointStore(1),
+ new MemoryStateBackend(),
+ Executors.directExecutor(),
+ manuallyTriggeredScheduledExecutor,
+ SharedStateRegistry.DEFAULT_FACTORY,
+ failureManager);
+
+ coord.startCheckpointScheduler();
+ // Periodic
+ final CompletableFuture<CompletedCheckpoint>
onCompletionPromise1 = coord.triggerCheckpoint(
+ System.currentTimeMillis(),
+
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+ null,
+ true,
+ false);
+ assertTrue(coord.isTriggering());
+ assertEquals(0, coord.getTriggerRequestQueue().size());
+
+ // another trigger before the prior one finished
+ final CompletableFuture<CompletedCheckpoint>
onCompletionPromise2 = coord.triggerCheckpoint(
+ System.currentTimeMillis(),
+
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+ null,
+ false,
+ false);
+
+ // another trigger before the first one finished
+ final CompletableFuture<CompletedCheckpoint>
onCompletionPromise3 =
+ coord.triggerCheckpoint(System.currentTimeMillis(),
false);
+ assertTrue(coord.isTriggering());
+ assertEquals(2, coord.getTriggerRequestQueue().size());
+
+ manuallyTriggeredScheduledExecutor.triggerAll();
+ assertTrue(onCompletionPromise1.isCompletedExceptionally());
+ assertFalse(onCompletionPromise2.isCompletedExceptionally());
+ assertFalse(onCompletionPromise3.isCompletedExceptionally());
+ assertFalse(coord.isTriggering());
+ assertEquals(0, coord.getTriggerRequestQueue().size());
+ assertEquals(2, taskManagerCheckpointTriggeredTimes.get());
+ }
+
+ @Test
+ public void testTriggerCheckpointRequestCancelled() throws Exception {
+ // create some mock Execution vertices that receive the
checkpoint trigger messages
+ final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+ final AtomicInteger taskManagerCheckpointTriggeredTimes = new
AtomicInteger(0);
+ final SimpleAckingTaskManagerGateway.CheckpointConsumer
checkpointConsumer =
+ (executionAttemptID,
+ jobId, checkpointId,
+ timestamp,
+ checkpointOptions,
+ advanceToEndOfEventTime) ->
taskManagerCheckpointTriggeredTimes.incrementAndGet();
+ ExecutionVertex vertex1 = mockExecutionVertex(attemptID1,
checkpointConsumer);
+
+ // set up the coordinator and validate the initial state
+ CheckpointCoordinatorConfiguration chkConfig = new
CheckpointCoordinatorConfiguration(
+ 600000,
+ 600000,
+ 0,
+ Integer.MAX_VALUE,
+
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+ true,
+ false,
+ 0);
+ CheckpointCoordinator coord = new CheckpointCoordinator(
+ new JobID(),
+ chkConfig,
+ new ExecutionVertex[] { vertex1 },
+ new ExecutionVertex[] { vertex1 },
+ new ExecutionVertex[] { vertex1 },
+ new StandaloneCheckpointIDCounter(),
+ new StandaloneCompletedCheckpointStore(1),
+ new MemoryStateBackend(),
+ Executors.directExecutor(),
+ manuallyTriggeredScheduledExecutor,
+ SharedStateRegistry.DEFAULT_FACTORY,
+ failureManager);
+
+ final CompletableFuture<String> masterHookCheckpointFuture =
new CompletableFuture<>();
+ coord.addMasterHook(new
TestingMasterHook(masterHookCheckpointFuture));
+ coord.startCheckpointScheduler();
+ // Periodic
+ final CompletableFuture<CompletedCheckpoint>
onCompletionPromise = coord.triggerCheckpoint(
+ System.currentTimeMillis(),
+
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+ null,
+ true,
+ false);
+
+ // checkpoint trigger will not finish since master hook
checkpoint is not finished yet
+ manuallyTriggeredScheduledExecutor.triggerAll();
+ assertTrue(coord.isTriggering());
+
+ // trigger cancellation
+
manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks();
+ assertTrue(coord.isTriggering());
+
+ try {
+ onCompletionPromise.get();
+ fail("Should not reach here");
+ } catch (ExecutionException e) {
+ final Optional<CheckpointException>
checkpointExceptionOptional =
+ ExceptionUtils.findThrowable(e,
CheckpointException.class);
+ assertTrue(checkpointExceptionOptional.isPresent());
+ assertEquals(CheckpointFailureReason.CHECKPOINT_EXPIRED,
+
checkpointExceptionOptional.get().getCheckpointFailureReason());
+ }
+
+ // continue triggering
+ masterHookCheckpointFuture.complete("finish master hook");
+
+ manuallyTriggeredScheduledExecutor.triggerAll();
+ assertFalse(coord.isTriggering());
+ // it doesn't really trigger task manager to do checkpoint
+ assertEquals(0, taskManagerCheckpointTriggeredTimes.get());
+ assertEquals(0, coord.getTriggerRequestQueue().size());
+ }
+
+ @Test
+ public void testTriggerCheckpointInitializationFailed() throws
Exception {
+ // create some mock Execution vertices that receive the
checkpoint trigger messages
+ final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+ ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+
+ // set up the coordinator and validate the initial state
+ CheckpointCoordinatorConfiguration chkConfig = new
CheckpointCoordinatorConfiguration(
+ 600000,
+ 600000,
+ 0,
+ Integer.MAX_VALUE,
+
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+ true,
+ false,
+ 0);
+ CheckpointCoordinator coord = new CheckpointCoordinator(
+ new JobID(),
+ chkConfig,
+ new ExecutionVertex[] { vertex1 },
+ new ExecutionVertex[] { vertex1 },
+ new ExecutionVertex[] { vertex1 },
+ new TestingCheckpointIDCounter(),
+ new StandaloneCompletedCheckpointStore(1),
+ new MemoryStateBackend(),
+ Executors.directExecutor(),
+ manuallyTriggeredScheduledExecutor,
+ SharedStateRegistry.DEFAULT_FACTORY,
+ failureManager);
+
+ coord.startCheckpointScheduler();
+ // Periodic
+ final CompletableFuture<CompletedCheckpoint>
onCompletionPromise1 = coord.triggerCheckpoint(
+ System.currentTimeMillis(),
+
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+ null,
+ true,
+ false);
+ assertTrue(coord.isTriggering());
+ assertEquals(0, coord.getTriggerRequestQueue().size());
+
+ manuallyTriggeredScheduledExecutor.triggerAll();
+ try {
+ onCompletionPromise1.get();
+ fail("Should not reach here");
Review comment:
Hmmm, it's a bit hard to understand.
It's due to `TestingCheckpointIDCounter`. I should give it a better name.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services