This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c7d683c9bf6 [FLINK-38403][tests] Stabilize UnalignedCheckpoint ITs
c7d683c9bf6 is described below

commit c7d683c9bf6118e06ac8c27674892ae19e0ad992
Author: Arvid Heise <[email protected]>
AuthorDate: Tue Oct 14 14:33:08 2025 +0200

    [FLINK-38403][tests] Stabilize UnalignedCheckpoint ITs
    
    To take a checkpoint, UC ITs relied on a limited number of retries after 
failing artificially to produce a checkpoint with in-flight data. Naturally, 
this approach is flaky when the actual test environment is flaky and retry 
attempts are used up quickly for unplanned restarts.
    
    The solution is to stop fiddling around with the number of retries and 
instead use NonRecoverableError to force a failure of the job independent of 
the retries.
    
    This commit also switches to a better way to retrieve the path of that 
checkpoint (through JobManager API).
---
 .../checkpointing/UnalignedCheckpointTestBase.java | 23 +++++++++++++---------
 1 file changed, 14 insertions(+), 9 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
index 35d33d7af9b..d2de69d6ec3 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
@@ -18,6 +18,7 @@
 package org.apache.flink.test.checkpointing;
 
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.accumulators.IntCounter;
 import org.apache.flink.api.common.accumulators.LongCounter;
@@ -54,15 +55,16 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.shuffle.ShuffleServiceOptions;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.throwable.ThrowableAnnotation;
+import org.apache.flink.runtime.throwable.ThrowableType;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.sink.legacy.RichSinkFunction;
 import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.util.RestartStrategyUtils;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
-import org.apache.flink.test.util.TestUtils;
 import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
@@ -184,24 +186,28 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
             final CompletableFuture<JobSubmissionResult> result =
                     
miniCluster.getMiniCluster().submitJob(streamGraph.getJobGraph());
 
+            final JobID jobID = result.get().getJobID();
             checkCounters(
                     miniCluster
                             .getMiniCluster()
-                            .requestJobResult(result.get().getJobID())
+                            .requestJobResult(jobID)
                             .get()
                             
.toJobExecutionResult(getClass().getClassLoader()));
             System.out.println(
                     "Finished " + getClass().getCanonicalName() + "#" + 
name.getMethodName() + ".");
+            if (settings.generateCheckpoint) {
+                return CommonTestUtils.getLatestCompletedCheckpointPath(
+                                jobID, miniCluster.getMiniCluster())
+                        .map(File::new)
+                        .orElseThrow(() -> new AssertionError("Could not 
generate checkpoint"));
+            }
         } catch (Exception e) {
-            if (!ExceptionUtils.findThrowable(e, 
TestException.class).isPresent()) {
+            if (ExceptionUtils.findThrowable(e, 
TestException.class).isEmpty()) {
                 throw e;
             }
         } finally {
             miniCluster.after();
         }
-        if (settings.generateCheckpoint) {
-            return TestUtils.getMostRecentCompletedCheckpoint(checkpointDir);
-        }
         return null;
     }
 
@@ -748,8 +754,6 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
             env.getCheckpointConfig()
                     
.setTolerableCheckpointFailureNumber(tolerableCheckpointFailures);
             env.setParallelism(parallelism);
-            RestartStrategyUtils.configureFixedDelayRestartStrategy(
-                    env, generateCheckpoint ? expectedFailures / 2 : 
expectedFailures, 100L);
             env.getCheckpointConfig().enableUnalignedCheckpoints(true);
             // for custom partitioner
             env.getCheckpointConfig().setForceUnalignedCheckpoints(true);
@@ -1128,6 +1132,7 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
         return value;
     }
 
+    @ThrowableAnnotation(ThrowableType.NonRecoverableError)
     static class TestException extends Exception {
         public TestException(String s) {
             super(s);

Reply via email to