This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push:
new 3cc2f5bbcb3 [FLINK-29217][tests] Guarantee checkpoint order in OC test
3cc2f5bbcb3 is described below
commit 3cc2f5bbcb31fea9c12510a6d6fc515655d65564
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Sat Sep 10 10:39:51 2022 +0800
[FLINK-29217][tests] Guarantee checkpoint order in OC test
This closes #20780.
---
...ordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java | 8 ++++++++
1 file changed, 8 insertions(+)
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java
index 5af4f469140..6a67ca2d13a 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java
@@ -666,6 +666,14 @@ public class
CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
if (!isEventSentAfterSecondCheckpoint &&
isCoordinatorSecondCheckpointCompleted) {
isEventSentAfterSecondCheckpoint = true;
+ }
+
+ // If the checkpoint coordinator receives the completion message
of checkpoint 1 and
+ // checkpoint 2 at the same time, checkpoint 2 might be completed
before checkpoint 1
+ // due to the async mechanism in the checkpoint process, which
would cause checkpoint 1
+ // to be accidentally aborted. In order to avoid this situation,
the following code is
+ // required to make sure that checkpoint 2 is not completed until
checkpoint 1 finishes.
+ if (isEventSentAfterSecondCheckpoint &&
isJobFirstCheckpointCompleted) {
EventReceivingOperator.shouldUnblockAllCheckpoint = true;
}
}