tillrohrmann commented on a change in pull request #12611:
URL: https://github.com/apache/flink/pull/12611#discussion_r438903982
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
##########
@@ -529,6 +537,48 @@ public void
testTriggerCheckpointSnapshotMasterHookFailed() throws Exception {
assertEquals(0,
checkpointCoordinator.getTriggerRequestQueue().size());
}
+ /**
+ * This test only fails eventually.
+ */
+ @Test
+ public void
discardingTriggeringCheckpointWillExecuteNextCheckpointRequest() throws
Exception {
+ final ExecutionVertex executionVertex = mockExecutionVertex(new
ExecutionAttemptID());
+
+ final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor();
+ final CheckpointCoordinator checkpointCoordinator = new
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .setTasks(new ExecutionVertex[]{executionVertex})
+ .setTimer(new
ScheduledExecutorServiceAdapter(scheduledExecutorService))
+
.setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder()
+ .build())
+ .build();
+
+ final CompletableFuture<String> masterHookCheckpointFuture =
new CompletableFuture<>();
+ final OneShotLatch triggerCheckpointLatch = new OneShotLatch();
+ checkpointCoordinator.addMasterHook(new
TestingMasterHook(masterHookCheckpointFuture, triggerCheckpointLatch));
+
+ try {
+ checkpointCoordinator.triggerCheckpoint(false);
+ final CompletableFuture<CompletedCheckpoint>
secondCheckpoint = checkpointCoordinator.triggerCheckpoint(false);
+
+ triggerCheckpointLatch.await();
+ masterHookCheckpointFuture.complete("Completed");
+
+ // discard triggering checkpoint
+ checkpointCoordinator.abortPendingCheckpoints(new
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED));
+
+ try {
+ // verify that the second checkpoint request
will be executed and eventually times out
+ secondCheckpoint.get();
Review comment:
Strictly speaking it would also work w/o the latch but it increases the
likelihood of the right thread interleaving here.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
##########
@@ -529,6 +537,48 @@ public void
testTriggerCheckpointSnapshotMasterHookFailed() throws Exception {
assertEquals(0,
checkpointCoordinator.getTriggerRequestQueue().size());
}
+ /**
+ * This test only fails eventually.
+ */
+ @Test
+ public void
discardingTriggeringCheckpointWillExecuteNextCheckpointRequest() throws
Exception {
+ final ExecutionVertex executionVertex = mockExecutionVertex(new
ExecutionAttemptID());
+
+ final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor();
+ final CheckpointCoordinator checkpointCoordinator = new
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .setTasks(new ExecutionVertex[]{executionVertex})
+ .setTimer(new
ScheduledExecutorServiceAdapter(scheduledExecutorService))
+
.setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder()
+ .build())
+ .build();
+
+ final CompletableFuture<String> masterHookCheckpointFuture =
new CompletableFuture<>();
+ final OneShotLatch triggerCheckpointLatch = new OneShotLatch();
+ checkpointCoordinator.addMasterHook(new
TestingMasterHook(masterHookCheckpointFuture, triggerCheckpointLatch));
+
+ try {
+ checkpointCoordinator.triggerCheckpoint(false);
+ final CompletableFuture<CompletedCheckpoint>
secondCheckpoint = checkpointCoordinator.triggerCheckpoint(false);
+
+ triggerCheckpointLatch.await();
+ masterHookCheckpointFuture.complete("Completed");
+
+ // discard triggering checkpoint
+ checkpointCoordinator.abortPendingCheckpoints(new
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED));
+
+ try {
+ // verify that the second checkpoint request
will be executed and eventually times out
+ secondCheckpoint.get();
Review comment:
The first checkpoint is the one we are aborting. The latch is used
because we don't want to abort the first checkpoint too fast before it hasn't
been created yet.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]