gaoyunhaii commented on code in PR #19464:
URL: https://github.com/apache/flink/pull/19464#discussion_r851959033


##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java:
##########
@@ -3688,6 +3688,66 @@ public void 
testTimeoutWhileCheckpointOperatorCoordinatorNotFinishing() throws E
         }
     }
 
+    @Test
+    public void testAbortingBeforeTriggeringCheckpointOperatorCoordinator() 
throws Exception {
+        // Warn: The case is fragile since a specific order of executing the 
tasks is required to
+        // reproduce the issue.
+        JobVertexID jobVertexID = new JobVertexID();
+        ExecutionGraph graph =
+                new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                        .addJobVertex(jobVertexID)
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        String trigger = "Trigger";
+        String abort = "Abort";
+        final List<String> notificationSequence = new ArrayList<>();
+        
CheckpointCoordinatorTestingUtils.MockOperatorCoordinatorCheckpointContext 
context =
+                new CheckpointCoordinatorTestingUtils
+                                
.MockOperatorCheckpointCoordinatorContextBuilder()
+                        .setOperatorID(new OperatorID())
+                        .setOnCallingCheckpointCoordinator(
+                                (id, future) -> 
notificationSequence.add(trigger + id))
+                        .setOnCallingAbortCurrentTriggering(() -> 
notificationSequence.add(abort))
+                        .build();
+
+        CheckpointCoordinator checkpointCoordinator =
+                new CheckpointCoordinatorBuilder()
+                        .setCheckpointCoordinatorConfiguration(
+                                CheckpointCoordinatorConfiguration.builder()
+                                        .setCheckpointTimeout(10)
+                                        .build())
+                        .setIoExecutor(manuallyTriggeredScheduledExecutor)
+                        .setTimer(manuallyTriggeredScheduledExecutor)
+                        
.setCoordinatorsToCheckpoint(Collections.singleton(context))
+                        .build(graph);
+        try {
+            checkpointCoordinator.triggerCheckpoint(false);
+            // trigger twice to get checkpoint id and create pending checkpoint
+            manuallyTriggeredScheduledExecutor.trigger();
+            manuallyTriggeredScheduledExecutor.trigger();
+
+            // declineCheckpoint should be called after pending checkpoint is 
created but before the
+            // following steps
+            declineCheckpoint(1L, checkpointCoordinator, jobVertexID, graph);
+            // then trigger all tasks. the order is 1.initialize checkpoint 
location, 2.handle
+            // checkpoint abortion, 3.trigger coordinator checkpointing for 
the aborted checkpoint.
+            // The disordering of abortion and triggering was causing an error
+            manuallyTriggeredScheduledExecutor.triggerAll();
+
+            // trigger the next checkpoint
+            checkState(!checkpointCoordinator.isTriggering());
+            checkpointCoordinator.triggerCheckpoint(false);
+            manuallyTriggeredScheduledExecutor.triggerAll();
+
+            Assert.assertTrue(
+                    !notificationSequence.contains(trigger + "1")
+                            || notificationSequence.indexOf(trigger + "1")

Review Comment:
   I think in this case there should be no situation like 
`notificationSequence.indexOf(trigger + "1") < 
notificationSequence.indexOf(abort)` since 
`!notificationSequence.contains(trigger + "1")` should always be ture?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to