pltbkd commented on code in PR #19464:
URL: https://github.com/apache/flink/pull/19464#discussion_r851967292
##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java:
##########
@@ -3688,6 +3688,66 @@ public void
testTimeoutWhileCheckpointOperatorCoordinatorNotFinishing() throws E
}
}
+ @Test
+ public void testAbortingBeforeTriggeringCheckpointOperatorCoordinator()
throws Exception {
+ // Warn: The case is fragile since a specific order of executing the
tasks is required to
+ // reproduce the issue.
+ JobVertexID jobVertexID = new JobVertexID();
+ ExecutionGraph graph =
+ new
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+ .addJobVertex(jobVertexID)
+ .build(EXECUTOR_RESOURCE.getExecutor());
+
+ String trigger = "Trigger";
+ String abort = "Abort";
+ final List<String> notificationSequence = new ArrayList<>();
+
CheckpointCoordinatorTestingUtils.MockOperatorCoordinatorCheckpointContext
context =
+ new CheckpointCoordinatorTestingUtils
+
.MockOperatorCheckpointCoordinatorContextBuilder()
+ .setOperatorID(new OperatorID())
+ .setOnCallingCheckpointCoordinator(
+ (id, future) ->
notificationSequence.add(trigger + id))
+ .setOnCallingAbortCurrentTriggering(() ->
notificationSequence.add(abort))
+ .build();
+
+ CheckpointCoordinator checkpointCoordinator =
+ new CheckpointCoordinatorBuilder()
+ .setCheckpointCoordinatorConfiguration(
+ CheckpointCoordinatorConfiguration.builder()
+ .setCheckpointTimeout(10)
+ .build())
+ .setIoExecutor(manuallyTriggeredScheduledExecutor)
+ .setTimer(manuallyTriggeredScheduledExecutor)
+
.setCoordinatorsToCheckpoint(Collections.singleton(context))
+ .build(graph);
+ try {
+ checkpointCoordinator.triggerCheckpoint(false);
+ // trigger twice to get checkpoint id and create pending checkpoint
+ manuallyTriggeredScheduledExecutor.trigger();
+ manuallyTriggeredScheduledExecutor.trigger();
+
+ // declineCheckpoint should be called after pending checkpoint is
created but before the
+ // following steps
+ declineCheckpoint(1L, checkpointCoordinator, jobVertexID, graph);
+ // then trigger all tasks. the order is 1.initialize checkpoint
location, 2.handle
+ // checkpoint abortion, 3.trigger coordinator checkpointing for
the aborted checkpoint.
+ // The disordering of abortion and triggering was causing an error
+ manuallyTriggeredScheduledExecutor.triggerAll();
+
+ // trigger the next checkpoint
+ checkState(!checkpointCoordinator.isTriggering());
+ checkpointCoordinator.triggerCheckpoint(false);
+ manuallyTriggeredScheduledExecutor.triggerAll();
+
+ Assert.assertTrue(
+ !notificationSequence.contains(trigger + "1")
+ || notificationSequence.indexOf(trigger + "1")
Review Comment:
It's always true with the fixed code but false when we try to reproduce the
issue by reverting the patch, though there's also an AssertionError if we
remove the latter condition. The latter one is here because it's theoretical a
correct situation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]