tillrohrmann commented on a change in pull request #12611:
URL: https://github.com/apache/flink/pull/12611#discussion_r439410491



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
##########
@@ -529,6 +537,48 @@ public void 
testTriggerCheckpointSnapshotMasterHookFailed() throws Exception {
                assertEquals(0, 
checkpointCoordinator.getTriggerRequestQueue().size());
        }
 
+       /**
+        * This test only fails eventually.
+        */
+       @Test
+       public void 
discardingTriggeringCheckpointWillExecuteNextCheckpointRequest() throws 
Exception {
+               final ExecutionVertex executionVertex = mockExecutionVertex(new 
ExecutionAttemptID());
+
+               final ScheduledExecutorService scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor();
+               final CheckpointCoordinator checkpointCoordinator = new 
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+                       .setTasks(new ExecutionVertex[]{executionVertex})
+                       .setTimer(new 
ScheduledExecutorServiceAdapter(scheduledExecutorService))
+                       
.setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder()
+                               .build())
+                       .build();
+
+               final CompletableFuture<String> masterHookCheckpointFuture = 
new CompletableFuture<>();
+               final OneShotLatch triggerCheckpointLatch = new OneShotLatch();
+               checkpointCoordinator.addMasterHook(new 
TestingMasterHook(masterHookCheckpointFuture, triggerCheckpointLatch));
+
+               try {
+                       checkpointCoordinator.triggerCheckpoint(false);
+                       final CompletableFuture<CompletedCheckpoint> 
secondCheckpoint = checkpointCoordinator.triggerCheckpoint(false);
+
+                       triggerCheckpointLatch.await();
+                       masterHookCheckpointFuture.complete("Completed");
+
+                       // discard triggering checkpoint
+                       checkpointCoordinator.abortPendingCheckpoints(new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED));
+
+                       try {
+                               // verify that the second checkpoint request 
will be executed and eventually times out
+                               secondCheckpoint.get();

Review comment:
       The test case should fail eventually. It is in the nature of concurrency 
related bugs that sometimes they require a certain thread interleaving which in 
this case is not easy to reproduce. I will check whether the test fails on my 
machine.
   
   Concerning the `manuallyTriggeredSchedulerExecutor`, I deliberately did not 
use it because it leaks internal implementation details. Concretely, the test 
has to know how many tasks are being enqueued until one reaches the 
`whenCompleteAsync` call. I think this is very brittle and should be avoided if 
possible. Imagine that someone changes the implementation of the 
`CheckpointCoordinator` so that there is another stage scheduled, this would 
immediately break the test w/o a very visible reason.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to