This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit abfc3012ba43b6abd6bdd40d386cfc1d84454047
Author: Rui Fan <[email protected]>
AuthorDate: Thu Feb 26 14:48:38 2026 +0100

    [FLINK-39140][test] Allow multiple rescales in Unaligned Checkpoint ITCases 
to perform checkpointing during recovery
---
 .../checkpointing/UnalignedCheckpointITCase.java   |  10 +-
 .../UnalignedCheckpointRescaleITCase.java          |  76 ++++++++++-----
 ...dCheckpointRescaleWithMixedExchangesITCase.java |  26 +++++-
 .../checkpointing/UnalignedCheckpointTestBase.java | 103 ++++++++++++++-------
 4 files changed, 156 insertions(+), 59 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
index 86349b8be9b..2a5eb2bd8f7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
@@ -277,7 +277,15 @@ public class UnalignedCheckpointITCase extends 
UnalignedCheckpointTestBase {
 
     @Test
     public void execute() throws Exception {
-        execute(settings);
+        // Phase 1: Run with WAIT_FOR_CHECKPOINT_AND_CANCEL to produce a 
checkpoint
+        settings.setCheckpointGenerationMode(
+                CheckpointGenerationMode.WAIT_FOR_CHECKPOINT_AND_CANCEL);
+        String checkpointPath = super.execute(settings);
+
+        // Phase 2: Restore from the checkpoint and run normally
+        settings.setCheckpointGenerationMode(CheckpointGenerationMode.NONE);
+        settings.setRestoreCheckpoint(checkpointPath);
+        super.execute(settings);
     }
 
     protected void checkCounters(JobExecutionResult result) {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java
index f8332472741..73de3838117 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java
@@ -58,6 +58,7 @@ import org.junit.runners.Parameterized;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collections;
+import java.util.concurrent.ThreadLocalRandom;
 
 import static 
org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -93,7 +94,7 @@ public class UnalignedCheckpointRescaleITCase extends 
UnalignedCheckpointTestBas
                                 0,
                                 sourceSleepMillis,
                                 val -> true);
-                addFailingSink(source, minCheckpoints, slotSharing);
+                addFailingSink(source, minCheckpoints, slotSharing, 
expectedRestarts);
             }
         },
 
@@ -132,7 +133,7 @@ public class UnalignedCheckpointRescaleITCase extends 
UnalignedCheckpointTestBas
                                                     slotSharing ? "default" : 
("min" + inputIndex));
                 }
 
-                addFailingSink(combinedSource, minCheckpoints, slotSharing);
+                addFailingSink(combinedSource, minCheckpoints, slotSharing, 
expectedRestarts);
             }
         },
 
@@ -174,7 +175,7 @@ public class UnalignedCheckpointRescaleITCase extends 
UnalignedCheckpointTestBas
                                 .process(new TestKeyedCoProcessFunction())
                                 .setParallelism(parallelism);
 
-                addFailingSink(connected, minCheckpoints, slotSharing);
+                addFailingSink(connected, minCheckpoints, slotSharing, 
expectedRestarts);
             }
         },
 
@@ -204,7 +205,7 @@ public class UnalignedCheckpointRescaleITCase extends 
UnalignedCheckpointTestBas
                     combinedSource = combinedSource == null ? source : 
combinedSource.union(source);
                 }
 
-                addFailingSink(combinedSource, minCheckpoints, slotSharing);
+                addFailingSink(combinedSource, minCheckpoints, slotSharing, 
expectedRestarts);
             }
         },
 
@@ -254,7 +255,7 @@ public class UnalignedCheckpointRescaleITCase extends 
UnalignedCheckpointTestBas
                                 .process(new TestBroadcastProcessFunction())
                                 .setParallelism(2 * parallelism);
 
-                addFailingSink(joined, minCheckpoints, slotSharing);
+                addFailingSink(joined, minCheckpoints, slotSharing, 
expectedRestarts);
             }
         },
 
@@ -330,7 +331,7 @@ public class UnalignedCheckpointRescaleITCase extends 
UnalignedCheckpointTestBas
                                 .process(new 
TestKeyedBroadcastProcessFunction())
                                 .setParallelism(parallelism + 2);
 
-                addFailingSink(joined, minCheckpoints, slotSharing);
+                addFailingSink(joined, minCheckpoints, slotSharing, 
expectedRestarts);
             }
         },
         CUSTOM_PARTITIONER {
@@ -367,7 +368,7 @@ public class UnalignedCheckpointRescaleITCase extends 
UnalignedCheckpointTestBas
                                 })
                         .name("long-to-string-map")
                         .uid("long-to-string-map")
-                        .map(getFailingMapper(minCheckpoints))
+                        .map(getFailingMapper(minCheckpoints, 
expectedRestarts))
                         .name("failing-map")
                         .uid("failing-map")
                         .setParallelism(parallelism)
@@ -385,10 +386,13 @@ public class UnalignedCheckpointRescaleITCase extends 
UnalignedCheckpointTestBas
         };
 
         void addFailingSink(
-                DataStream<Long> combinedSource, long minCheckpoints, boolean 
slotSharing) {
+                DataStream<Long> combinedSource,
+                long minCheckpoints,
+                boolean slotSharing,
+                int expectedRestarts) {
             combinedSource
                     .shuffle()
-                    .map(getFailingMapper(minCheckpoints))
+                    .map(getFailingMapper(minCheckpoints, expectedRestarts))
                     .name("failing-map")
                     .uid("failing-map")
                     .slotSharingGroup(slotSharing ? "default" : "failing-map")
@@ -408,13 +412,21 @@ public class UnalignedCheckpointRescaleITCase extends 
UnalignedCheckpointTestBas
         /**
          * Creates a FailingMapper that only fails during snapshot operations.
          *
-         * <p>Only fails during snapshotState() when completedCheckpoints >= 
minCheckpoints/2 AND
-         * runNumber == 0. After job failovers internally, runNumber becomes 
attemptNumber > 0, so
-         * failure condition is no longer satisfied. This ensures the mapper 
fails exactly once
-         * during initial run to trigger job failover, but never fails again 
after failing over and
-         * recovery from checkpoint.
+         * <p>When {@code expectedRestarts <= 0}, returns a no-op 
FailingMapper that never fails.
+         * This is used in phases where no failure is expected (e.g., 
checkpoint-and-cancel phase).
+         *
+         * <p>When {@code expectedRestarts > 0}, fails during snapshotState() 
when
+         * completedCheckpoints >= minCheckpoints/2 AND runNumber == 0. After 
job failovers
+         * internally, runNumber becomes attemptNumber > 0, so failure 
condition is no longer
+         * satisfied. This ensures the mapper fails exactly once during 
initial run to trigger job
+         * failover, but never fails again after recovery from checkpoint.
          */
-        private static <T> FailingMapper<T> getFailingMapper(long 
minCheckpoints) {
+        private static <T> FailingMapper<T> getFailingMapper(
+                long minCheckpoints, int expectedRestarts) {
+            if (expectedRestarts <= 0) {
+                return new FailingMapper<>(
+                        state -> false, state -> false, state -> false, state 
-> false);
+            }
             return new FailingMapper<>(
                     state -> false,
                     state ->
@@ -629,25 +641,45 @@ public class UnalignedCheckpointRescaleITCase extends 
UnalignedCheckpointTestBas
      */
     @Test
     public void shouldRescaleUnalignedCheckpoint() throws Exception {
+        // Phase 1: prescale - generate initial checkpoint (unchanged)
         final UnalignedSettings prescaleSettings =
                 new UnalignedSettings(topology)
                         .setParallelism(oldParallelism)
                         .setExpectedFailures(1)
                         .setSourceSleepMs(sourceSleepMs)
                         .setExpectedFinalJobStatus(JobStatus.FAILED);
-        prescaleSettings.setGenerateCheckpoint(true);
-        final String checkpointDir = super.execute(prescaleSettings);
-        assertThat(checkpointDir)
+        
prescaleSettings.setCheckpointGenerationMode(CheckpointGenerationMode.WAIT_FOR_JOB_RESULT);
+        final String checkpointDir1 = super.execute(prescaleSettings);
+        assertThat(checkpointDir1)
                 .as("First job must generate a checkpoint for rescale test to 
be valid.")
                 .isNotNull();
-        // resume
-        final UnalignedSettings postscaleSettings =
+
+        // Phase 2: postscale-checkpoint - recover from checkpoint1 and 
generate new checkpoint
+        // expectedFailures defaults to 0, so expectedRestarts passed to 
create() is also 0,
+        // which causes getFailingMapper to return a no-op mapper that never 
fails.
+        final UnalignedSettings phase2Settings =
                 new UnalignedSettings(topology)
                         .setParallelism(newParallelism)
+                        .setCheckpointGenerationMode(
+                                
CheckpointGenerationMode.WAIT_FOR_CHECKPOINT_AND_CANCEL)
+                        .setRestoreCheckpoint(checkpointDir1)
+                        .setSourceSleepMs(sourceSleepMs);
+        final String checkpointDir2 = super.execute(phase2Settings);
+        assertThat(checkpointDir2)
+                .as("Phase 2 must generate a checkpoint for phase 3 to be 
valid.")
+                .isNotNull();
+
+        // Phase 3: recovery - recover from checkpoint2 and run to completion
+        // Randomly choose parallelism from oldParallelism or newParallelism
+        int phase3Parallelism =
+                ThreadLocalRandom.current().nextBoolean() ? oldParallelism : 
newParallelism;
+        final UnalignedSettings phase3Settings =
+                new UnalignedSettings(topology)
+                        .setParallelism(phase3Parallelism)
                         .setExpectedFailures(1)
+                        .setRestoreCheckpoint(checkpointDir2)
                         .setExpectedFinalJobStatus(JobStatus.FINISHED);
-        postscaleSettings.setRestoreCheckpoint(checkpointDir);
-        super.execute(postscaleSettings);
+        super.execute(phase3Settings);
     }
 
     protected void checkCounters(JobExecutionResult result) {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleWithMixedExchangesITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleWithMixedExchangesITCase.java
index 9bf0e4d8df9..fe6b065ce66 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleWithMixedExchangesITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleWithMixedExchangesITCase.java
@@ -49,6 +49,8 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
@@ -68,6 +70,9 @@ import static 
org.apache.flink.configuration.RestartStrategyOptions.RestartStrat
 @RunWith(Parameterized.class)
 public class UnalignedCheckpointRescaleWithMixedExchangesITCase extends 
TestLogger {
 
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(UnalignedCheckpointRescaleWithMixedExchangesITCase.class);
+
     private static final int NUM_TASK_MANAGERS = 1;
     private static final int SLOTS_PER_TASK_MANAGER = 10;
     private static final int MAX_SLOTS = NUM_TASK_MANAGERS * 
SLOTS_PER_TASK_MANAGER;
@@ -121,19 +126,34 @@ public class 
UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg
 
         CommonTestUtils.waitForJobStatus(jobClient1, 
Collections.singletonList(JobStatus.RUNNING));
         CommonTestUtils.waitForAllTaskRunning(miniCluster, 
jobClient1.getJobID(), false);
-        String checkpointPath =
+        String checkpointPath1 =
                 CommonTestUtils.waitForCheckpointWithInflightBuffers(
                         jobClient1.getJobID(), miniCluster);
         jobClient1.cancel().get();
+        LOG.info("First checkpoint path: {}", checkpointPath1);
 
         // Step 2: Restore the job with a different parallelism
         JobClient jobClient2 =
-                
executeJobViaEnv.executeJob(getUnalignedCheckpointEnv(checkpointPath));
+                
executeJobViaEnv.executeJob(getUnalignedCheckpointEnv(checkpointPath1));
 
         CommonTestUtils.waitForJobStatus(jobClient2, 
Collections.singletonList(JobStatus.RUNNING));
         CommonTestUtils.waitForAllTaskRunning(miniCluster, 
jobClient2.getJobID(), false);
-        
CommonTestUtils.waitForCheckpointWithInflightBuffers(jobClient2.getJobID(), 
miniCluster);
+        String checkpointPath2 =
+                CommonTestUtils.waitForCheckpointWithInflightBuffers(
+                        jobClient2.getJobID(), miniCluster);
         jobClient2.cancel().get();
+        LOG.info("Second checkpoint path: {}", checkpointPath2);
+
+        // Step 3: Restore from Step 2's checkpoint with random parallelism. 
This validates
+        // that a checkpoint produced after recovery can be used for another 
recovery.
+        JobClient jobClient3 =
+                
executeJobViaEnv.executeJob(getUnalignedCheckpointEnv(checkpointPath2));
+
+        CommonTestUtils.waitForJobStatus(jobClient3, 
Collections.singletonList(JobStatus.RUNNING));
+        CommonTestUtils.waitForAllTaskRunning(miniCluster, 
jobClient3.getJobID(), false);
+        // Wait for at least one checkpoint to verify the recovery was 
successful
+        
CommonTestUtils.waitForCheckpointWithInflightBuffers(jobClient3.getJobID(), 
miniCluster);
+        jobClient3.cancel().get();
     }
 
     private StreamExecutionEnvironment getUnalignedCheckpointEnv(@Nullable 
String recoveryPath)
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
index cd3e1e71b40..5478dfda1ac 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
@@ -53,6 +53,7 @@ import org.apache.flink.configuration.StateRecoveryOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.shuffle.ShuffleServiceOptions;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -98,6 +99,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -178,49 +180,62 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
                                 .setNumberSlotsPerTaskManager(slotsPerTM)
                                 .build());
         miniCluster.before();
-        final StreamExecutionEnvironment env =
-                StreamExecutionEnvironment.getExecutionEnvironment(conf);
-        settings.configure(env);
+
+        final CheckpointGenerationMode mode = 
settings.checkpointGenerationMode;
         JobID jobID = null;
         try {
-            // print the test parameters to help debugging when the case is 
stuck
             System.out.println(
                     "Starting " + getClass().getCanonicalName() + "#" + 
name.getMethodName() + ".");
+
             final CompletableFuture<JobSubmissionResult> result =
                     
miniCluster.getMiniCluster().submitJob(streamGraph.getJobGraph());
-
             jobID = result.get().getJobID();
-            checkCounters(
-                    miniCluster
-                            .getMiniCluster()
-                            .requestJobResult(jobID)
-                            .get()
-                            
.toJobExecutionResult(getClass().getClassLoader()));
-            if (settings.expectedFinalJobStatus != null) {
-                assertThat(miniCluster.getMiniCluster().getJobStatus(jobID))
-                        .succeedsWithin(Duration.ofMinutes(1))
-                        .isEqualTo(settings.expectedFinalJobStatus);
+
+            if (mode == 
CheckpointGenerationMode.WAIT_FOR_CHECKPOINT_AND_CANCEL) {
+                
CommonTestUtils.waitForAllTaskRunning(miniCluster.getMiniCluster(), jobID, 
false);
+                CommonTestUtils.waitForNewCheckpoint(jobID, 
miniCluster.getMiniCluster());
+                miniCluster.getMiniCluster().cancelJob(jobID).get();
+                final JobID cancelledJobID = jobID;
+                CommonTestUtils.waitUntilCondition(
+                        () ->
+                                
miniCluster.getMiniCluster().getJobStatus(cancelledJobID).get()
+                                        == JobStatus.CANCELED);
+            } else {
+                checkCounters(
+                        miniCluster
+                                .getMiniCluster()
+                                .requestJobResult(jobID)
+                                .get()
+                                
.toJobExecutionResult(getClass().getClassLoader()));
+                if (settings.expectedFinalJobStatus != null) {
+                    
assertThat(miniCluster.getMiniCluster().getJobStatus(jobID))
+                            .succeedsWithin(Duration.ofMinutes(1))
+                            .isEqualTo(settings.expectedFinalJobStatus);
+                }
             }
+
             System.out.println(
                     "Finished " + getClass().getCanonicalName() + "#" + 
name.getMethodName() + ".");
-            if (settings.generateCheckpoint) {
-                return CommonTestUtils.getLatestCompletedCheckpointPath(
-                                jobID, miniCluster.getMiniCluster())
-                        .orElseGet(() -> Fail.fail("Could not generate 
checkpoint"));
-            }
+            return retrieveCheckpointIfNeeded(jobID, miniCluster, mode);
         } catch (Exception e) {
             if (ExceptionUtils.findThrowable(e, 
TestException.class).isEmpty()) {
                 throw e;
             }
-            if (settings.generateCheckpoint) {
-                return CommonTestUtils.getLatestCompletedCheckpointPath(
-                                jobID, miniCluster.getMiniCluster())
-                        .orElseGet(() -> Fail.fail("Could not generate 
checkpoint"));
-            }
+            return retrieveCheckpointIfNeeded(jobID, miniCluster, mode);
         } finally {
             miniCluster.after();
         }
-        return null;
+    }
+
+    @Nullable
+    private String retrieveCheckpointIfNeeded(
+            JobID jobID, MiniClusterWithClientResource miniCluster, 
CheckpointGenerationMode mode)
+            throws ExecutionException, InterruptedException, 
FlinkJobNotFoundException {
+        if (mode == CheckpointGenerationMode.NONE) {
+            return null;
+        }
+        return CommonTestUtils.getLatestCompletedCheckpointPath(jobID, 
miniCluster.getMiniCluster())
+                .orElseGet(() -> Fail.fail("Could not generate checkpoint"));
     }
 
     private StreamGraph getStreamGraph(UnalignedSettings settings, 
Configuration conf) {
@@ -688,12 +703,29 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
         }
     }
 
+    /**
+     * Defines how the test generates and retrieves a checkpoint.
+     *
+     * <ul>
+     *   <li>{@link #NONE} - No checkpoint generation; job runs normally.
+     *   <li>{@link #WAIT_FOR_JOB_RESULT} - Job runs to completion (or 
expected failure), then
+     *       retrieves the latest checkpoint path.
+     *   <li>{@link #WAIT_FOR_CHECKPOINT_AND_CANCEL} - Waits for the first 
checkpoint to complete,
+     *       then cancels the job and returns the checkpoint path.
+     * </ul>
+     */
+    protected enum CheckpointGenerationMode {
+        NONE,
+        WAIT_FOR_JOB_RESULT,
+        WAIT_FOR_CHECKPOINT_AND_CANCEL
+    }
+
     /** Builder-like interface for all relevant unaligned settings. */
     protected static class UnalignedSettings {
         private int parallelism;
         private final int minCheckpoints = 10;
         @Nullable private String restoreCheckpoint;
-        private boolean generateCheckpoint = false;
+        private CheckpointGenerationMode checkpointGenerationMode = 
CheckpointGenerationMode.NONE;
         int expectedFailures = 0;
         int tolerableCheckpointFailures = 0;
         private final DagCreator dagCreator;
@@ -719,8 +751,9 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
             return this;
         }
 
-        public UnalignedSettings setGenerateCheckpoint(boolean 
generateCheckpoint) {
-            this.generateCheckpoint = generateCheckpoint;
+        public UnalignedSettings setCheckpointGenerationMode(
+                CheckpointGenerationMode checkpointGenerationMode) {
+            this.checkpointGenerationMode = checkpointGenerationMode;
             return this;
         }
 
@@ -773,11 +806,15 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
                     
.setTolerableCheckpointFailureNumber(tolerableCheckpointFailures);
             env.setParallelism(parallelism);
             RestartStrategyUtils.configureFixedDelayRestartStrategy(
-                    env, generateCheckpoint ? expectedFailures / 2 : 
expectedFailures, 100L);
+                    env,
+                    checkpointGenerationMode == 
CheckpointGenerationMode.WAIT_FOR_JOB_RESULT
+                            ? expectedFailures / 2
+                            : expectedFailures,
+                    100L);
             env.getCheckpointConfig().enableUnalignedCheckpoints(true);
             // for custom partitioner
             env.getCheckpointConfig().setForceUnalignedCheckpoints(true);
-            if (generateCheckpoint) {
+            if (checkpointGenerationMode != CheckpointGenerationMode.NONE) {
                 env.getCheckpointConfig()
                         .setExternalizedCheckpointRetention(
                                 
ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
@@ -818,8 +855,8 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
                     + minCheckpoints
                     + ", restoreCheckpoint="
                     + restoreCheckpoint
-                    + ", generateCheckpoint="
-                    + generateCheckpoint
+                    + ", checkpointGenerationMode="
+                    + checkpointGenerationMode
                     + ", expectedFailures="
                     + expectedFailures
                     + ", dagCreator="

Reply via email to