This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 36396f789cd [FLINK-29217][tests] Guarantee checkpoint order in OC test
36396f789cd is described below

commit 36396f789cd51cffa7cd690f609c3d253034a5c8
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Sat Sep 10 10:39:12 2022 +0800

    [FLINK-29217][tests] Guarantee checkpoint order in OC test
    
    This closes #20781.
---
 ...ordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java
index 5af4f469140..6a67ca2d13a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java
@@ -666,6 +666,14 @@ public class 
CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
 
             if (!isEventSentAfterSecondCheckpoint && 
isCoordinatorSecondCheckpointCompleted) {
                 isEventSentAfterSecondCheckpoint = true;
+            }
+
+            // If the checkpoint coordinator receives the completion message 
of checkpoint 1 and
+            // checkpoint 2 at the same time, checkpoint 2 might be completed 
before checkpoint 1
+            // due to the async mechanism in the checkpoint process, which 
would cause checkpoint 1
+            // to be accidentally aborted. In order to avoid this situation, 
the following code is
+            // required to make sure that checkpoint 2 is not completed until 
checkpoint 1 finishes.
+            if (isEventSentAfterSecondCheckpoint && 
isJobFirstCheckpointCompleted) {
                 EventReceivingOperator.shouldUnblockAllCheckpoint = true;
             }
         }

Reply via email to