This is an automated email from the ASF dual-hosted git repository.

fcsaky pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new fdf1a2e9dfc [FLINK-38967][test] Fix flaky 
UnalignedCheckpointFailureHandlingITCase
fdf1a2e9dfc is described below

commit fdf1a2e9dfca022a2eca1e677d03e6aadeace9a7
Author: Mate Czagany <[email protected]>
AuthorDate: Wed Jan 28 12:54:26 2026 +0100

    [FLINK-38967][test] Fix flaky UnalignedCheckpointFailureHandlingITCase
---
 .../UnalignedCheckpointFailureHandlingITCase.java  | 39 +++++++++++++++-------
 1 file changed, 27 insertions(+), 12 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java
index b86dfa67c10..1cb92405958 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java
@@ -97,8 +97,10 @@ public class UnalignedCheckpointFailureHandlingITCase {
 
     @Test
     public void testCheckpointSuccessAfterFailure() throws Exception {
+        SharedReference<AtomicBoolean> failOnCloseRef = sharedObjects.add(new 
AtomicBoolean(true));
+
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        TestCheckpointStorageFactory.failOnCloseRef = sharedObjects.add(new 
AtomicBoolean(true));
+        TestCheckpointStorageFactory.failOnCloseRef = failOnCloseRef;
         TestCheckpointStorageFactory.tempFolderRef = 
sharedObjects.add(temporaryFolder);
 
         configure(
@@ -118,7 +120,7 @@ public class UnalignedCheckpointFailureHandlingITCase {
         waitForJobStatus(jobClient, singletonList(RUNNING));
         waitForAllTaskRunning(miniCluster, jobID, false);
 
-        triggerFailingCheckpoint(jobID, TestException.class, miniCluster);
+        triggerFailingCheckpoint(jobID, TestException.class, failOnCloseRef, 
miniCluster);
 
         miniCluster.triggerCheckpoint(jobID).get();
     }
@@ -161,24 +163,37 @@ public class UnalignedCheckpointFailureHandlingITCase {
                 .sinkTo(new DiscardingSink<>());
     }
 
+    /**
+     * Trigger checkpoints until the first failing checkpoint. The exception 
should come from {@link
+     * CheckpointStateOutputStream#close()} which should get called by the 
channel state writer
+     * after catching an exception in {@link 
CheckpointStateOutputStream#closeAndGetHandle()}. In
+     * some cases on writing checkpoints, only {@link 
CheckpointStateOutputStream#close()} will be
+     * called, so `failOnClose` has to be checked here.
+     */
     private void triggerFailingCheckpoint(
-            JobID jobID, Class<TestException> expectedException, MiniCluster 
miniCluster)
+            JobID jobID,
+            Class<TestException> expectedException,
+            SharedReference<AtomicBoolean> failOnCloseRef,
+            MiniCluster miniCluster)
             throws InterruptedException, ExecutionException {
-        while (true) {
-            Optional<Throwable> cpFailure =
+        boolean foundCheckpointFailure = false;
+        do {
+            Optional<Throwable> cpFailureOpt =
                     miniCluster
                             .triggerCheckpoint(jobID)
                             .thenApply(ign -> Optional.empty())
                             .handle((ign, err) -> Optional.ofNullable(err))
                             .get();
-            if (!cpFailure.isPresent()) {
-                Thread.sleep(50); // trigger again - in case of no channel 
data was written
-            } else if (isCausedBy(cpFailure.get(), expectedException)) {
-                return;
-            } else {
-                rethrow(cpFailure.get());
+
+            if (cpFailureOpt.isPresent()) {
+                Throwable cpFailure = cpFailureOpt.get();
+                if (isCausedBy(cpFailure, expectedException)) {
+                    foundCheckpointFailure = true;
+                } else {
+                    rethrow(cpFailure);
+                }
             }
-        }
+        } while (!foundCheckpointFailure || failOnCloseRef.get().get());
     }
 
     private boolean isCausedBy(Throwable t, Class<TestException> 
expectedException) {

Reply via email to