gaoyunhaii commented on code in PR #19464:
URL: https://github.com/apache/flink/pull/19464#discussion_r851969315
##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java:
##########
@@ -3688,6 +3688,66 @@ public void
testTimeoutWhileCheckpointOperatorCoordinatorNotFinishing() throws E
}
}
+ @Test
+ public void testAbortingBeforeTriggeringCheckpointOperatorCoordinator()
throws Exception {
+ // Warn: The case is fragile since a specific order of executing the
tasks is required to
+ // reproduce the issue.
+ JobVertexID jobVertexID = new JobVertexID();
+ ExecutionGraph graph =
+ new
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+ .addJobVertex(jobVertexID)
+ .build(EXECUTOR_RESOURCE.getExecutor());
+
+ String trigger = "Trigger";
+ String abort = "Abort";
+ final List<String> notificationSequence = new ArrayList<>();
+
CheckpointCoordinatorTestingUtils.MockOperatorCoordinatorCheckpointContext
context =
+ new CheckpointCoordinatorTestingUtils
+
.MockOperatorCheckpointCoordinatorContextBuilder()
+ .setOperatorID(new OperatorID())
+ .setOnCallingCheckpointCoordinator(
+ (id, future) ->
notificationSequence.add(trigger + id))
+ .setOnCallingAbortCurrentTriggering(() ->
notificationSequence.add(abort))
+ .build();
+
+ CheckpointCoordinator checkpointCoordinator =
+ new CheckpointCoordinatorBuilder()
+ .setCheckpointCoordinatorConfiguration(
+ CheckpointCoordinatorConfiguration.builder()
+ .setCheckpointTimeout(10)
+ .build())
+ .setIoExecutor(manuallyTriggeredScheduledExecutor)
+ .setTimer(manuallyTriggeredScheduledExecutor)
+
.setCoordinatorsToCheckpoint(Collections.singleton(context))
+ .build(graph);
+ try {
+ checkpointCoordinator.triggerCheckpoint(false);
+ // trigger twice to get checkpoint id and create pending checkpoint
+ manuallyTriggeredScheduledExecutor.trigger();
+ manuallyTriggeredScheduledExecutor.trigger();
+
+ // declineCheckpoint should be called after pending checkpoint is
created but before the
+ // following steps
+ declineCheckpoint(1L, checkpointCoordinator, jobVertexID, graph);
+ // then trigger all tasks. the order is 1.initialize checkpoint
location, 2.handle
+ // checkpoint abortion, 3.trigger coordinator checkpointing for
the aborted checkpoint.
+ // The disordering of abortion and triggering was causing an error
+ manuallyTriggeredScheduledExecutor.triggerAll();
+
+ // trigger the next checkpoint
+ checkState(!checkpointCoordinator.isTriggering());
+ checkpointCoordinator.triggerCheckpoint(false);
+ manuallyTriggeredScheduledExecutor.triggerAll();
+
+ Assert.assertTrue(
+ !notificationSequence.contains(trigger + "1")
+ || notificationSequence.indexOf(trigger + "1")
Review Comment:
Got that~
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]