This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit abfc3012ba43b6abd6bdd40d386cfc1d84454047 Author: Rui Fan <[email protected]> AuthorDate: Thu Feb 26 14:48:38 2026 +0100 [FLINK-39140][test] Allow multiple rescales in Unaligned Checkpoint ITCases to perform checkpointing during recovery --- .../checkpointing/UnalignedCheckpointITCase.java | 10 +- .../UnalignedCheckpointRescaleITCase.java | 76 ++++++++++----- ...dCheckpointRescaleWithMixedExchangesITCase.java | 26 +++++- .../checkpointing/UnalignedCheckpointTestBase.java | 103 ++++++++++++++------- 4 files changed, 156 insertions(+), 59 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java index 86349b8be9b..2a5eb2bd8f7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java @@ -277,7 +277,15 @@ public class UnalignedCheckpointITCase extends UnalignedCheckpointTestBase { @Test public void execute() throws Exception { - execute(settings); + // Phase 1: Run with WAIT_FOR_CHECKPOINT_AND_CANCEL to produce a checkpoint + settings.setCheckpointGenerationMode( + CheckpointGenerationMode.WAIT_FOR_CHECKPOINT_AND_CANCEL); + String checkpointPath = super.execute(settings); + + // Phase 2: Restore from the checkpoint and run normally + settings.setCheckpointGenerationMode(CheckpointGenerationMode.NONE); + settings.setRestoreCheckpoint(checkpointPath); + super.execute(settings); } protected void checkCounters(JobExecutionResult result) { diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java index f8332472741..73de3838117 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java @@ -58,6 +58,7 @@ import org.junit.runners.Parameterized; import java.util.Arrays; import java.util.BitSet; import java.util.Collections; +import java.util.concurrent.ThreadLocalRandom; import static org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks; import static org.apache.flink.util.Preconditions.checkState; @@ -93,7 +94,7 @@ public class UnalignedCheckpointRescaleITCase extends UnalignedCheckpointTestBas 0, sourceSleepMillis, val -> true); - addFailingSink(source, minCheckpoints, slotSharing); + addFailingSink(source, minCheckpoints, slotSharing, expectedRestarts); } }, @@ -132,7 +133,7 @@ public class UnalignedCheckpointRescaleITCase extends UnalignedCheckpointTestBas slotSharing ? "default" : ("min" + inputIndex)); } - addFailingSink(combinedSource, minCheckpoints, slotSharing); + addFailingSink(combinedSource, minCheckpoints, slotSharing, expectedRestarts); } }, @@ -174,7 +175,7 @@ public class UnalignedCheckpointRescaleITCase extends UnalignedCheckpointTestBas .process(new TestKeyedCoProcessFunction()) .setParallelism(parallelism); - addFailingSink(connected, minCheckpoints, slotSharing); + addFailingSink(connected, minCheckpoints, slotSharing, expectedRestarts); } }, @@ -204,7 +205,7 @@ public class UnalignedCheckpointRescaleITCase extends UnalignedCheckpointTestBas combinedSource = combinedSource == null ? source : combinedSource.union(source); } - addFailingSink(combinedSource, minCheckpoints, slotSharing); + addFailingSink(combinedSource, minCheckpoints, slotSharing, expectedRestarts); } }, @@ -254,7 +255,7 @@ public class UnalignedCheckpointRescaleITCase extends UnalignedCheckpointTestBas .process(new TestBroadcastProcessFunction()) .setParallelism(2 * parallelism); - addFailingSink(joined, minCheckpoints, slotSharing); + addFailingSink(joined, minCheckpoints, slotSharing, expectedRestarts); } }, @@ -330,7 +331,7 @@ public class UnalignedCheckpointRescaleITCase extends UnalignedCheckpointTestBas .process(new TestKeyedBroadcastProcessFunction()) .setParallelism(parallelism + 2); - addFailingSink(joined, minCheckpoints, slotSharing); + addFailingSink(joined, minCheckpoints, slotSharing, expectedRestarts); } }, CUSTOM_PARTITIONER { @@ -367,7 +368,7 @@ public class UnalignedCheckpointRescaleITCase extends UnalignedCheckpointTestBas }) .name("long-to-string-map") .uid("long-to-string-map") - .map(getFailingMapper(minCheckpoints)) + .map(getFailingMapper(minCheckpoints, expectedRestarts)) .name("failing-map") .uid("failing-map") .setParallelism(parallelism) @@ -385,10 +386,13 @@ public class UnalignedCheckpointRescaleITCase extends UnalignedCheckpointTestBas }; void addFailingSink( - DataStream<Long> combinedSource, long minCheckpoints, boolean slotSharing) { + DataStream<Long> combinedSource, + long minCheckpoints, + boolean slotSharing, + int expectedRestarts) { combinedSource .shuffle() - .map(getFailingMapper(minCheckpoints)) + .map(getFailingMapper(minCheckpoints, expectedRestarts)) .name("failing-map") .uid("failing-map") .slotSharingGroup(slotSharing ? "default" : "failing-map") @@ -408,13 +412,21 @@ public class UnalignedCheckpointRescaleITCase extends UnalignedCheckpointTestBas /** * Creates a FailingMapper that only fails during snapshot operations. * - * <p>Only fails during snapshotState() when completedCheckpoints >= minCheckpoints/2 AND - * runNumber == 0. After job failovers internally, runNumber becomes attemptNumber > 0, so - * failure condition is no longer satisfied. This ensures the mapper fails exactly once - * during initial run to trigger job failover, but never fails again after failing over and - * recovery from checkpoint. + * <p>When {@code expectedRestarts <= 0}, returns a no-op FailingMapper that never fails. + * This is used in phases where no failure is expected (e.g., checkpoint-and-cancel phase). + * + * <p>When {@code expectedRestarts > 0}, fails during snapshotState() when + * completedCheckpoints >= minCheckpoints/2 AND runNumber == 0. After job failovers + * internally, runNumber becomes attemptNumber > 0, so failure condition is no longer + * satisfied. This ensures the mapper fails exactly once during initial run to trigger job + * failover, but never fails again after recovery from checkpoint. */ - private static <T> FailingMapper<T> getFailingMapper(long minCheckpoints) { + private static <T> FailingMapper<T> getFailingMapper( + long minCheckpoints, int expectedRestarts) { + if (expectedRestarts <= 0) { + return new FailingMapper<>( + state -> false, state -> false, state -> false, state -> false); + } return new FailingMapper<>( state -> false, state -> @@ -629,25 +641,45 @@ public class UnalignedCheckpointRescaleITCase extends UnalignedCheckpointTestBas */ @Test public void shouldRescaleUnalignedCheckpoint() throws Exception { + // Phase 1: prescale - generate initial checkpoint (unchanged) final UnalignedSettings prescaleSettings = new UnalignedSettings(topology) .setParallelism(oldParallelism) .setExpectedFailures(1) .setSourceSleepMs(sourceSleepMs) .setExpectedFinalJobStatus(JobStatus.FAILED); - prescaleSettings.setGenerateCheckpoint(true); - final String checkpointDir = super.execute(prescaleSettings); - assertThat(checkpointDir) + prescaleSettings.setCheckpointGenerationMode(CheckpointGenerationMode.WAIT_FOR_JOB_RESULT); + final String checkpointDir1 = super.execute(prescaleSettings); + assertThat(checkpointDir1) .as("First job must generate a checkpoint for rescale test to be valid.") .isNotNull(); - // resume - final UnalignedSettings postscaleSettings = + + // Phase 2: postscale-checkpoint - recover from checkpoint1 and generate new checkpoint + // expectedFailures defaults to 0, so expectedRestarts passed to create() is also 0, + // which causes getFailingMapper to return a no-op mapper that never fails. + final UnalignedSettings phase2Settings = new UnalignedSettings(topology) .setParallelism(newParallelism) + .setCheckpointGenerationMode( + CheckpointGenerationMode.WAIT_FOR_CHECKPOINT_AND_CANCEL) + .setRestoreCheckpoint(checkpointDir1) + .setSourceSleepMs(sourceSleepMs); + final String checkpointDir2 = super.execute(phase2Settings); + assertThat(checkpointDir2) + .as("Phase 2 must generate a checkpoint for phase 3 to be valid.") + .isNotNull(); + + // Phase 3: recovery - recover from checkpoint2 and run to completion + // Randomly choose parallelism from oldParallelism or newParallelism + int phase3Parallelism = + ThreadLocalRandom.current().nextBoolean() ? oldParallelism : newParallelism; + final UnalignedSettings phase3Settings = + new UnalignedSettings(topology) + .setParallelism(phase3Parallelism) .setExpectedFailures(1) + .setRestoreCheckpoint(checkpointDir2) .setExpectedFinalJobStatus(JobStatus.FINISHED); - postscaleSettings.setRestoreCheckpoint(checkpointDir); - super.execute(postscaleSettings); + super.execute(phase3Settings); } protected void checkCounters(JobExecutionResult result) { diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleWithMixedExchangesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleWithMixedExchangesITCase.java index 9bf0e4d8df9..fe6b065ce66 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleWithMixedExchangesITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleWithMixedExchangesITCase.java @@ -49,6 +49,8 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; @@ -68,6 +70,9 @@ import static org.apache.flink.configuration.RestartStrategyOptions.RestartStrat @RunWith(Parameterized.class) public class UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogger { + private static final Logger LOG = + LoggerFactory.getLogger(UnalignedCheckpointRescaleWithMixedExchangesITCase.class); + private static final int NUM_TASK_MANAGERS = 1; private static final int SLOTS_PER_TASK_MANAGER = 10; private static final int MAX_SLOTS = NUM_TASK_MANAGERS * SLOTS_PER_TASK_MANAGER; @@ -121,19 +126,34 @@ public class UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg CommonTestUtils.waitForJobStatus(jobClient1, Collections.singletonList(JobStatus.RUNNING)); CommonTestUtils.waitForAllTaskRunning(miniCluster, jobClient1.getJobID(), false); - String checkpointPath = + String checkpointPath1 = CommonTestUtils.waitForCheckpointWithInflightBuffers( jobClient1.getJobID(), miniCluster); jobClient1.cancel().get(); + LOG.info("First checkpoint path: {}", checkpointPath1); // Step 2: Restore the job with a different parallelism JobClient jobClient2 = - executeJobViaEnv.executeJob(getUnalignedCheckpointEnv(checkpointPath)); + executeJobViaEnv.executeJob(getUnalignedCheckpointEnv(checkpointPath1)); CommonTestUtils.waitForJobStatus(jobClient2, Collections.singletonList(JobStatus.RUNNING)); CommonTestUtils.waitForAllTaskRunning(miniCluster, jobClient2.getJobID(), false); - CommonTestUtils.waitForCheckpointWithInflightBuffers(jobClient2.getJobID(), miniCluster); + String checkpointPath2 = + CommonTestUtils.waitForCheckpointWithInflightBuffers( + jobClient2.getJobID(), miniCluster); jobClient2.cancel().get(); + LOG.info("Second checkpoint path: {}", checkpointPath2); + + // Step 3: Restore from Step 2's checkpoint with random parallelism. This validates + // that a checkpoint produced after recovery can be used for another recovery. + JobClient jobClient3 = + executeJobViaEnv.executeJob(getUnalignedCheckpointEnv(checkpointPath2)); + + CommonTestUtils.waitForJobStatus(jobClient3, Collections.singletonList(JobStatus.RUNNING)); + CommonTestUtils.waitForAllTaskRunning(miniCluster, jobClient3.getJobID(), false); + // Wait for at least one checkpoint to verify the recovery was successful + CommonTestUtils.waitForCheckpointWithInflightBuffers(jobClient3.getJobID(), miniCluster); + jobClient3.cancel().get(); } private StreamExecutionEnvironment getUnalignedCheckpointEnv(@Nullable String recoveryPath) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java index cd3e1e71b40..5478dfda1ac 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java @@ -53,6 +53,7 @@ import org.apache.flink.configuration.StateRecoveryOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.io.InputStatus; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.shuffle.ShuffleServiceOptions; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -98,6 +99,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -178,49 +180,62 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger { .setNumberSlotsPerTaskManager(slotsPerTM) .build()); miniCluster.before(); - final StreamExecutionEnvironment env = - StreamExecutionEnvironment.getExecutionEnvironment(conf); - settings.configure(env); + + final CheckpointGenerationMode mode = settings.checkpointGenerationMode; JobID jobID = null; try { - // print the test parameters to help debugging when the case is stuck System.out.println( "Starting " + getClass().getCanonicalName() + "#" + name.getMethodName() + "."); + final CompletableFuture<JobSubmissionResult> result = miniCluster.getMiniCluster().submitJob(streamGraph.getJobGraph()); - jobID = result.get().getJobID(); - checkCounters( - miniCluster - .getMiniCluster() - .requestJobResult(jobID) - .get() - .toJobExecutionResult(getClass().getClassLoader())); - if (settings.expectedFinalJobStatus != null) { - assertThat(miniCluster.getMiniCluster().getJobStatus(jobID)) - .succeedsWithin(Duration.ofMinutes(1)) - .isEqualTo(settings.expectedFinalJobStatus); + + if (mode == CheckpointGenerationMode.WAIT_FOR_CHECKPOINT_AND_CANCEL) { + CommonTestUtils.waitForAllTaskRunning(miniCluster.getMiniCluster(), jobID, false); + CommonTestUtils.waitForNewCheckpoint(jobID, miniCluster.getMiniCluster()); + miniCluster.getMiniCluster().cancelJob(jobID).get(); + final JobID cancelledJobID = jobID; + CommonTestUtils.waitUntilCondition( + () -> + miniCluster.getMiniCluster().getJobStatus(cancelledJobID).get() + == JobStatus.CANCELED); + } else { + checkCounters( + miniCluster + .getMiniCluster() + .requestJobResult(jobID) + .get() + .toJobExecutionResult(getClass().getClassLoader())); + if (settings.expectedFinalJobStatus != null) { + assertThat(miniCluster.getMiniCluster().getJobStatus(jobID)) + .succeedsWithin(Duration.ofMinutes(1)) + .isEqualTo(settings.expectedFinalJobStatus); + } } + System.out.println( "Finished " + getClass().getCanonicalName() + "#" + name.getMethodName() + "."); - if (settings.generateCheckpoint) { - return CommonTestUtils.getLatestCompletedCheckpointPath( - jobID, miniCluster.getMiniCluster()) - .orElseGet(() -> Fail.fail("Could not generate checkpoint")); - } + return retrieveCheckpointIfNeeded(jobID, miniCluster, mode); } catch (Exception e) { if (ExceptionUtils.findThrowable(e, TestException.class).isEmpty()) { throw e; } - if (settings.generateCheckpoint) { - return CommonTestUtils.getLatestCompletedCheckpointPath( - jobID, miniCluster.getMiniCluster()) - .orElseGet(() -> Fail.fail("Could not generate checkpoint")); - } + return retrieveCheckpointIfNeeded(jobID, miniCluster, mode); } finally { miniCluster.after(); } - return null; + } + + @Nullable + private String retrieveCheckpointIfNeeded( + JobID jobID, MiniClusterWithClientResource miniCluster, CheckpointGenerationMode mode) + throws ExecutionException, InterruptedException, FlinkJobNotFoundException { + if (mode == CheckpointGenerationMode.NONE) { + return null; + } + return CommonTestUtils.getLatestCompletedCheckpointPath(jobID, miniCluster.getMiniCluster()) + .orElseGet(() -> Fail.fail("Could not generate checkpoint")); } private StreamGraph getStreamGraph(UnalignedSettings settings, Configuration conf) { @@ -688,12 +703,29 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger { } } + /** + * Defines how the test generates and retrieves a checkpoint. + * + * <ul> + * <li>{@link #NONE} - No checkpoint generation; job runs normally. + * <li>{@link #WAIT_FOR_JOB_RESULT} - Job runs to completion (or expected failure), then + * retrieves the latest checkpoint path. + * <li>{@link #WAIT_FOR_CHECKPOINT_AND_CANCEL} - Waits for the first checkpoint to complete, + * then cancels the job and returns the checkpoint path. + * </ul> + */ + protected enum CheckpointGenerationMode { + NONE, + WAIT_FOR_JOB_RESULT, + WAIT_FOR_CHECKPOINT_AND_CANCEL + } + /** Builder-like interface for all relevant unaligned settings. */ protected static class UnalignedSettings { private int parallelism; private final int minCheckpoints = 10; @Nullable private String restoreCheckpoint; - private boolean generateCheckpoint = false; + private CheckpointGenerationMode checkpointGenerationMode = CheckpointGenerationMode.NONE; int expectedFailures = 0; int tolerableCheckpointFailures = 0; private final DagCreator dagCreator; @@ -719,8 +751,9 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger { return this; } - public UnalignedSettings setGenerateCheckpoint(boolean generateCheckpoint) { - this.generateCheckpoint = generateCheckpoint; + public UnalignedSettings setCheckpointGenerationMode( + CheckpointGenerationMode checkpointGenerationMode) { + this.checkpointGenerationMode = checkpointGenerationMode; return this; } @@ -773,11 +806,15 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger { .setTolerableCheckpointFailureNumber(tolerableCheckpointFailures); env.setParallelism(parallelism); RestartStrategyUtils.configureFixedDelayRestartStrategy( - env, generateCheckpoint ? expectedFailures / 2 : expectedFailures, 100L); + env, + checkpointGenerationMode == CheckpointGenerationMode.WAIT_FOR_JOB_RESULT + ? expectedFailures / 2 + : expectedFailures, + 100L); env.getCheckpointConfig().enableUnalignedCheckpoints(true); // for custom partitioner env.getCheckpointConfig().setForceUnalignedCheckpoints(true); - if (generateCheckpoint) { + if (checkpointGenerationMode != CheckpointGenerationMode.NONE) { env.getCheckpointConfig() .setExternalizedCheckpointRetention( ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION); @@ -818,8 +855,8 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger { + minCheckpoints + ", restoreCheckpoint=" + restoreCheckpoint - + ", generateCheckpoint=" - + generateCheckpoint + + ", checkpointGenerationMode=" + + checkpointGenerationMode + ", expectedFailures=" + expectedFailures + ", dagCreator="
