This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 66cc21d4e2c091c0f5211bf558d1a69364519f9b
Author: Hangxiang Yu <[email protected]>
AuthorDate: Wed Aug 2 22:22:28 2023 +0800

    [FLINK-32523][test] Guarantee all operators triggering decline checkpoint 
together for NotifyCheckpointAbortedITCase#testNotifyCheckpointAborted
---
 .../checkpointing/NotifyCheckpointAbortedITCase.java | 20 ++++++++++++++------
 1 file changed, 14 insertions(+), 6 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
index 920ec342229..a3994883dcd 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
@@ -96,6 +96,7 @@ import static org.junit.Assert.assertEquals;
 public class NotifyCheckpointAbortedITCase extends TestLogger {
 
     private static final long DECLINE_CHECKPOINT_ID = 2L;
+    private static final OneShotLatch DECLINE_CHECKPOINT_WAIT_LATCH = new 
OneShotLatch();
     private static final long TEST_TIMEOUT = 100000;
     private static final String DECLINE_SINK_NAME = "DeclineSink";
     private static MiniClusterWithClientResource cluster;
@@ -184,7 +185,7 @@ public class NotifyCheckpointAbortedITCase extends 
TestLogger {
         resetAllOperatorsNotifyAbortedLatches();
         verifyAllOperatorsNotifyAbortedTimes(1);
 
-        NormalSource.waitLatch.trigger();
+        DECLINE_CHECKPOINT_WAIT_LATCH.trigger();
         log.info("Verifying whether all operators have been notified of 
checkpoint-2 aborted.");
         verifyAllOperatorsNotifyAborted();
         log.info("Verified that all operators have been notified of 
checkpoint-2 aborted.");
@@ -214,7 +215,6 @@ public class NotifyCheckpointAbortedITCase extends 
TestLogger {
             implements SourceFunction<Tuple2<Integer, Integer>>, 
CheckpointedFunction {
         private static final long serialVersionUID = 1L;
         protected volatile boolean running;
-        private static final OneShotLatch waitLatch = new OneShotLatch();
 
         NormalSource() {
             this.running = true;
@@ -241,7 +241,7 @@ public class NotifyCheckpointAbortedITCase extends 
TestLogger {
         @Override
         public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
             if (context.getCheckpointId() == DECLINE_CHECKPOINT_ID) {
-                waitLatch.await();
+                DECLINE_CHECKPOINT_WAIT_LATCH.await();
             }
         }
 
@@ -249,7 +249,7 @@ public class NotifyCheckpointAbortedITCase extends 
TestLogger {
         public void initializeState(FunctionInitializationContext context) 
throws Exception {}
 
         static void reset() {
-            waitLatch.reset();
+            DECLINE_CHECKPOINT_WAIT_LATCH.reset();
         }
     }
 
@@ -287,7 +287,11 @@ public class NotifyCheckpointAbortedITCase extends 
TestLogger {
         }
 
         @Override
-        public void snapshotState(FunctionSnapshotContext context) {}
+        public void snapshotState(FunctionSnapshotContext context) throws 
InterruptedException {
+            if (context.getCheckpointId() == DECLINE_CHECKPOINT_ID) {
+                DECLINE_CHECKPOINT_WAIT_LATCH.await();
+            }
+        }
 
         @Override
         public void initializeState(FunctionInitializationContext context) 
throws Exception {
@@ -327,7 +331,11 @@ public class NotifyCheckpointAbortedITCase extends 
TestLogger {
             implements SnapshotStrategy<OperatorStateHandle, 
SnapshotResources> {
 
         @Override
-        public SnapshotResources syncPrepareResources(long checkpointId) {
+        public SnapshotResources syncPrepareResources(long checkpointId)
+                throws InterruptedException {
+            if (checkpointId == DECLINE_CHECKPOINT_ID) {
+                DECLINE_CHECKPOINT_WAIT_LATCH.await();
+            }
             return null;
         }
 

Reply via email to