This is an automated email from the ASF dual-hosted git repository. hangxiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 66cc21d4e2c091c0f5211bf558d1a69364519f9b Author: Hangxiang Yu <[email protected]> AuthorDate: Wed Aug 2 22:22:28 2023 +0800 [FLINK-32523][test] Guarantee all operators triggering decline checkpoint together for NotifyCheckpointAbortedITCase#testNotifyCheckpointAborted --- .../checkpointing/NotifyCheckpointAbortedITCase.java | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java index 920ec342229..a3994883dcd 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java @@ -96,6 +96,7 @@ import static org.junit.Assert.assertEquals; public class NotifyCheckpointAbortedITCase extends TestLogger { private static final long DECLINE_CHECKPOINT_ID = 2L; + private static final OneShotLatch DECLINE_CHECKPOINT_WAIT_LATCH = new OneShotLatch(); private static final long TEST_TIMEOUT = 100000; private static final String DECLINE_SINK_NAME = "DeclineSink"; private static MiniClusterWithClientResource cluster; @@ -184,7 +185,7 @@ public class NotifyCheckpointAbortedITCase extends TestLogger { resetAllOperatorsNotifyAbortedLatches(); verifyAllOperatorsNotifyAbortedTimes(1); - NormalSource.waitLatch.trigger(); + DECLINE_CHECKPOINT_WAIT_LATCH.trigger(); log.info("Verifying whether all operators have been notified of checkpoint-2 aborted."); verifyAllOperatorsNotifyAborted(); log.info("Verified that all operators have been notified of checkpoint-2 aborted."); @@ -214,7 +215,6 @@ public class NotifyCheckpointAbortedITCase extends TestLogger { implements SourceFunction<Tuple2<Integer, Integer>>, CheckpointedFunction { private static final long serialVersionUID = 1L; protected volatile boolean running; - private static final OneShotLatch waitLatch = new OneShotLatch(); NormalSource() { this.running = true; @@ -241,7 +241,7 @@ public class NotifyCheckpointAbortedITCase extends TestLogger { @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { if (context.getCheckpointId() == DECLINE_CHECKPOINT_ID) { - waitLatch.await(); + DECLINE_CHECKPOINT_WAIT_LATCH.await(); } } @@ -249,7 +249,7 @@ public class NotifyCheckpointAbortedITCase extends TestLogger { public void initializeState(FunctionInitializationContext context) throws Exception {} static void reset() { - waitLatch.reset(); + DECLINE_CHECKPOINT_WAIT_LATCH.reset(); } } @@ -287,7 +287,11 @@ public class NotifyCheckpointAbortedITCase extends TestLogger { } @Override - public void snapshotState(FunctionSnapshotContext context) {} + public void snapshotState(FunctionSnapshotContext context) throws InterruptedException { + if (context.getCheckpointId() == DECLINE_CHECKPOINT_ID) { + DECLINE_CHECKPOINT_WAIT_LATCH.await(); + } + } @Override public void initializeState(FunctionInitializationContext context) throws Exception { @@ -327,7 +331,11 @@ public class NotifyCheckpointAbortedITCase extends TestLogger { implements SnapshotStrategy<OperatorStateHandle, SnapshotResources> { @Override - public SnapshotResources syncPrepareResources(long checkpointId) { + public SnapshotResources syncPrepareResources(long checkpointId) + throws InterruptedException { + if (checkpointId == DECLINE_CHECKPOINT_ID) { + DECLINE_CHECKPOINT_WAIT_LATCH.await(); + } return null; }
