This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0d3133b017f0ec3d6cd25f8fd95a8cfa181afa90 Author: Roman Khachatryan <[email protected]> AuthorDate: Tue Feb 17 23:10:50 2026 +0100 [FLINK-34099][tests] Harden CheckpointIntervalDuringBacklogITCase There are a few issues with both production code and test. Test problems: 1. The test is too short (20 records vs 100 ms checkpoint interval) and has low chances to catch any bugs 2. In fact, a checkpoint IS triggered during backlog; but triggering fails because the 2nd source is slow to startup 3. The 2nd source is slow to start due to busy wait After FLINK-38940, it became more flaky because CheckpointCoordinator now waits for all tasks running. And even more with FLINK-38939, which schedules the checkpoint faster. This change adjusts the test so that it catches bug more reliably. --- .../CheckpointIntervalDuringBacklogITCase.java | 37 +++++++++++++++++++--- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointIntervalDuringBacklogITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointIntervalDuringBacklogITCase.java index 29f926a4b5e..158e996f052 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointIntervalDuringBacklogITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointIntervalDuringBacklogITCase.java @@ -33,15 +33,18 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.hybrid.HybridSource; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.test.util.NumberSequenceSourceWithWaitForCheckpoint; import org.apache.flink.util.CloseableIterator; import org.junit.After; +import org.junit.Rule; import org.junit.Test; import javax.annotation.Nullable; @@ -63,10 +66,23 @@ import static org.assertj.core.api.Assertions.assertThat; */ public class CheckpointIntervalDuringBacklogITCase { private static final int NUM_SPLITS = 2; - private static final int NUM_RECORDS = 40; + private static final int NUM_RECORDS = 100; // the more records the higher chance to catch a bug + private static final int SLEEP_MS_PER_RECORD = + 50; // same; avoid busy wait lest delays slot allocation private static final List<Long> EXPECTED_RESULT = LongStream.rangeClosed(0, NUM_RECORDS - 1).boxed().collect(Collectors.toList()); + @Rule + public MiniClusterWithClientResource cluster = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + // allocate more, independent resources to speed up both sources startup + // to minimize the chances of hitting NOT_ALL_REQUIRED_TASKS_RUNNING + // by the initial checkpoint + .setNumberTaskManagers(2) + .setNumberSlotsPerTaskManager(1) + .build()); + @After public void tearDown() { CheckpointRecordingOperator.reset(); @@ -124,8 +140,14 @@ public class CheckpointIntervalDuringBacklogITCase { @Test public void testNoCheckpointDuringBacklog() throws Exception { + final int recordsBeforeSwitch = NUM_RECORDS / 2; + Duration expectedSwitchTime = Duration.ofMillis(recordsBeforeSwitch * SLEEP_MS_PER_RECORD); + // give as much time as possible to deploy both sources + // but less than the first source run time + Duration firstCheckpointTime = expectedSwitchTime.dividedBy(2); + Configuration configuration = new Configuration(); - configuration.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMillis(100)); + configuration.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, firstCheckpointTime); configuration.set( CheckpointingOptions.CHECKPOINTING_INTERVAL_DURING_BACKLOG, Duration.ofMillis(0)); final StreamExecutionEnvironment env = @@ -133,8 +155,8 @@ public class CheckpointIntervalDuringBacklogITCase { env.setParallelism(1); Source<Long, ?, ?> source = - HybridSource.builder(new NumberSequenceSource(0, NUM_RECORDS / 2 - 1)) - .addSource(new NumberSequenceSource(NUM_RECORDS / 2, NUM_RECORDS - 1)) + HybridSource.builder(new NumberSequenceSource(0, recordsBeforeSwitch - 1)) + .addSource(new NumberSequenceSource(recordsBeforeSwitch, NUM_RECORDS - 1)) .build(); runAndVerifyResult(env, source); @@ -330,6 +352,13 @@ public class CheckpointIntervalDuringBacklogITCase { @Override public void processElement(StreamRecord<T> element) { numRecords++; + if (numRecords < NUM_RECORDS / 2) { + try { + Thread.sleep(SLEEP_MS_PER_RECORD); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } output.collect(element); }
