This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0d3133b017f0ec3d6cd25f8fd95a8cfa181afa90
Author: Roman Khachatryan <[email protected]>
AuthorDate: Tue Feb 17 23:10:50 2026 +0100

    [FLINK-34099][tests] Harden CheckpointIntervalDuringBacklogITCase
    
    There are a few issues with both production code and test.
    
    Test problems:
    1. The test is too short (20 records vs 100 ms checkpoint interval)
      and has low chances to catch any bugs
    2. In fact, a checkpoint IS triggered during backlog;
      but triggering fails because the 2nd source is slow to startup
    3. The 2nd source is slow to start due to busy wait
    
    After FLINK-38940, it became more flaky because CheckpointCoordinator
    now waits for all tasks running.
    And even more with FLINK-38939, which schedules the checkpoint
    faster.
    
    This change adjusts the test so that it catches bug more reliably.
---
 .../CheckpointIntervalDuringBacklogITCase.java     | 37 +++++++++++++++++++---
 1 file changed, 33 insertions(+), 4 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointIntervalDuringBacklogITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointIntervalDuringBacklogITCase.java
index 29f926a4b5e..158e996f052 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointIntervalDuringBacklogITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointIntervalDuringBacklogITCase.java
@@ -33,15 +33,18 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.hybrid.HybridSource;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.test.util.NumberSequenceSourceWithWaitForCheckpoint;
 import org.apache.flink.util.CloseableIterator;
 
 import org.junit.After;
+import org.junit.Rule;
 import org.junit.Test;
 
 import javax.annotation.Nullable;
@@ -63,10 +66,23 @@ import static org.assertj.core.api.Assertions.assertThat;
  */
 public class CheckpointIntervalDuringBacklogITCase {
     private static final int NUM_SPLITS = 2;
-    private static final int NUM_RECORDS = 40;
+    private static final int NUM_RECORDS = 100; // the more records the higher 
chance to catch a bug
+    private static final int SLEEP_MS_PER_RECORD =
+            50; // same; avoid busy wait lest delays slot allocation
     private static final List<Long> EXPECTED_RESULT =
             LongStream.rangeClosed(0, NUM_RECORDS - 
1).boxed().collect(Collectors.toList());
 
+    @Rule
+    public MiniClusterWithClientResource cluster =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            // allocate more, independent resources to speed 
up both sources startup
+                            // to minimize the chances of hitting 
NOT_ALL_REQUIRED_TASKS_RUNNING
+                            // by the initial checkpoint
+                            .setNumberTaskManagers(2)
+                            .setNumberSlotsPerTaskManager(1)
+                            .build());
+
     @After
     public void tearDown() {
         CheckpointRecordingOperator.reset();
@@ -124,8 +140,14 @@ public class CheckpointIntervalDuringBacklogITCase {
 
     @Test
     public void testNoCheckpointDuringBacklog() throws Exception {
+        final int recordsBeforeSwitch = NUM_RECORDS / 2;
+        Duration expectedSwitchTime = Duration.ofMillis(recordsBeforeSwitch * 
SLEEP_MS_PER_RECORD);
+        // give as much time as possible to deploy both sources
+        // but less than the first source run time
+        Duration firstCheckpointTime = expectedSwitchTime.dividedBy(2);
+
         Configuration configuration = new Configuration();
-        configuration.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, 
Duration.ofMillis(100));
+        configuration.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, 
firstCheckpointTime);
         configuration.set(
                 CheckpointingOptions.CHECKPOINTING_INTERVAL_DURING_BACKLOG, 
Duration.ofMillis(0));
         final StreamExecutionEnvironment env =
@@ -133,8 +155,8 @@ public class CheckpointIntervalDuringBacklogITCase {
         env.setParallelism(1);
 
         Source<Long, ?, ?> source =
-                HybridSource.builder(new NumberSequenceSource(0, NUM_RECORDS / 
2 - 1))
-                        .addSource(new NumberSequenceSource(NUM_RECORDS / 2, 
NUM_RECORDS - 1))
+                HybridSource.builder(new NumberSequenceSource(0, 
recordsBeforeSwitch - 1))
+                        .addSource(new 
NumberSequenceSource(recordsBeforeSwitch, NUM_RECORDS - 1))
                         .build();
 
         runAndVerifyResult(env, source);
@@ -330,6 +352,13 @@ public class CheckpointIntervalDuringBacklogITCase {
         @Override
         public void processElement(StreamRecord<T> element) {
             numRecords++;
+            if (numRecords < NUM_RECORDS / 2) {
+                try {
+                    Thread.sleep(SLEEP_MS_PER_RECORD);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
             output.collect(element);
         }
 

Reply via email to