This is an automated email from the ASF dual-hosted git repository.
fcsaky pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new fdf1a2e9dfc [FLINK-38967][test] Fix flaky
UnalignedCheckpointFailureHandlingITCase
fdf1a2e9dfc is described below
commit fdf1a2e9dfca022a2eca1e677d03e6aadeace9a7
Author: Mate Czagany <[email protected]>
AuthorDate: Wed Jan 28 12:54:26 2026 +0100
[FLINK-38967][test] Fix flaky UnalignedCheckpointFailureHandlingITCase
---
.../UnalignedCheckpointFailureHandlingITCase.java | 39 +++++++++++++++-------
1 file changed, 27 insertions(+), 12 deletions(-)
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java
index b86dfa67c10..1cb92405958 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java
@@ -97,8 +97,10 @@ public class UnalignedCheckpointFailureHandlingITCase {
@Test
public void testCheckpointSuccessAfterFailure() throws Exception {
+ SharedReference<AtomicBoolean> failOnCloseRef = sharedObjects.add(new
AtomicBoolean(true));
+
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- TestCheckpointStorageFactory.failOnCloseRef = sharedObjects.add(new
AtomicBoolean(true));
+ TestCheckpointStorageFactory.failOnCloseRef = failOnCloseRef;
TestCheckpointStorageFactory.tempFolderRef =
sharedObjects.add(temporaryFolder);
configure(
@@ -118,7 +120,7 @@ public class UnalignedCheckpointFailureHandlingITCase {
waitForJobStatus(jobClient, singletonList(RUNNING));
waitForAllTaskRunning(miniCluster, jobID, false);
- triggerFailingCheckpoint(jobID, TestException.class, miniCluster);
+ triggerFailingCheckpoint(jobID, TestException.class, failOnCloseRef,
miniCluster);
miniCluster.triggerCheckpoint(jobID).get();
}
@@ -161,24 +163,37 @@ public class UnalignedCheckpointFailureHandlingITCase {
.sinkTo(new DiscardingSink<>());
}
+ /**
+ * Trigger checkpoints until the first failing checkpoint. The exception
should come from {@link
+ * CheckpointStateOutputStream#close()} which should get called by the
channel state writer
+ * after catching an exception in {@link
CheckpointStateOutputStream#closeAndGetHandle()}. In
+ * some cases on writing checkpoints, only {@link
CheckpointStateOutputStream#close()} will be
+ * called, so `failOnClose` has to be checked here.
+ */
private void triggerFailingCheckpoint(
- JobID jobID, Class<TestException> expectedException, MiniCluster
miniCluster)
+ JobID jobID,
+ Class<TestException> expectedException,
+ SharedReference<AtomicBoolean> failOnCloseRef,
+ MiniCluster miniCluster)
throws InterruptedException, ExecutionException {
- while (true) {
- Optional<Throwable> cpFailure =
+ boolean foundCheckpointFailure = false;
+ do {
+ Optional<Throwable> cpFailureOpt =
miniCluster
.triggerCheckpoint(jobID)
.thenApply(ign -> Optional.empty())
.handle((ign, err) -> Optional.ofNullable(err))
.get();
- if (!cpFailure.isPresent()) {
- Thread.sleep(50); // trigger again - in case of no channel
data was written
- } else if (isCausedBy(cpFailure.get(), expectedException)) {
- return;
- } else {
- rethrow(cpFailure.get());
+
+ if (cpFailureOpt.isPresent()) {
+ Throwable cpFailure = cpFailureOpt.get();
+ if (isCausedBy(cpFailure, expectedException)) {
+ foundCheckpointFailure = true;
+ } else {
+ rethrow(cpFailure);
+ }
}
- }
+ } while (!foundCheckpointFailure || failOnCloseRef.get().get());
}
private boolean isCausedBy(Throwable t, Class<TestException>
expectedException) {