This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f589a532b1 [FLINK-38613][tests] Force number of splits in 
UnalignedCheckpointRescaleWithMixedExchangesITCase
0f589a532b1 is described below

commit 0f589a532b110e5da0012c90812fc7c450bbd55a
Author: Roman Khachatryan <[email protected]>
AuthorDate: Wed Feb 18 19:21:31 2026 +0100

    [FLINK-38613][tests] Force number of splits in 
UnalignedCheckpointRescaleWithMixedExchangesITCase
    
    Motivation: avoid sub-tasks finishing prematurely and fail the test 
assumption of all sub-tasks always running
---
 .../datagen/source/DataGeneratorSource.java        | 38 ++++++++++-
 ...dCheckpointRescaleWithMixedExchangesITCase.java | 79 +++++++---------------
 2 files changed, 59 insertions(+), 58 deletions(-)

diff --git 
a/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java
 
b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java
index 08ab05bc647..1145ba858f6 100644
--- 
a/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java
+++ 
b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java
@@ -142,16 +142,50 @@ public class DataGeneratorSource<OUT>
                 rateLimiterStrategy, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
     }
 
+    /**
+     * Instantiates a new {@code DataGeneratorSource}.
+     *
+     * @param generatorFunction The {@code GeneratorFunction} function.
+     * @param numberSource The number source.
+     * @param rateLimiterStrategy The strategy for rate limiting.
+     * @param typeInfo The type of the produced data points.
+     */
+    public DataGeneratorSource(
+            GeneratorFunction<Long, OUT> generatorFunction,
+            NumberSequenceSource numberSource,
+            RateLimiterStrategy rateLimiterStrategy,
+            TypeInformation<OUT> typeInfo) {
+        this(
+                new GeneratorSourceReaderFactory<>(generatorFunction, 
rateLimiterStrategy),
+                generatorFunction,
+                typeInfo,
+                numberSource);
+        ClosureCleaner.clean(
+                rateLimiterStrategy, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+    }
+
     DataGeneratorSource(
             SourceReaderFactory<OUT, NumberSequenceSplit> sourceReaderFactory,
             GeneratorFunction<Long, OUT> generatorFunction,
             long count,
             TypeInformation<OUT> typeInfo) {
+        this(
+                sourceReaderFactory,
+                generatorFunction,
+                typeInfo,
+                // a noop source (0 elements) is used in Table tests
+                new NumberSequenceSource(0, count > 0 ? count - 1 : 0));
+    }
+
+    DataGeneratorSource(
+            SourceReaderFactory<OUT, NumberSequenceSplit> sourceReaderFactory,
+            GeneratorFunction<Long, OUT> generatorFunction,
+            TypeInformation<OUT> typeInfo,
+            NumberSequenceSource numberSource) {
         this.sourceReaderFactory = checkNotNull(sourceReaderFactory);
         this.generatorFunction = checkNotNull(generatorFunction);
         this.typeInfo = checkNotNull(typeInfo);
-        long to = count > 0 ? count - 1 : 0; // a noop source (0 elements) is 
used in Table tests
-        this.numberSource = new NumberSequenceSource(0, to);
+        this.numberSource = numberSource;
         ClosureCleaner.clean(
                 generatorFunction, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
         ClosureCleaner.clean(
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleWithMixedExchangesITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleWithMixedExchangesITCase.java
index d11f787aef6..9bf0e4d8df9 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleWithMixedExchangesITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleWithMixedExchangesITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.checkpointing;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
 import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.CheckpointingOptions;
@@ -171,16 +172,10 @@ public class 
UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg
     }
 
     private static JobClient createMultiOutputDAG(StreamExecutionEnvironment 
env) throws Exception {
-        DataGeneratorSource<Long> source =
-                new DataGeneratorSource<>(
-                        index -> index,
-                        Long.MAX_VALUE,
-                        RateLimiterStrategy.perSecond(5000),
-                        Types.LONG);
 
         int sourceParallelism = getRandomParallelism();
         DataStream<Long> sourceStream =
-                env.fromSource(source, WatermarkStrategy.noWatermarks(), "Data 
Generator")
+                env.fromSource(createSource(), 
WatermarkStrategy.noWatermarks(), "Data Generator")
                         .setParallelism(sourceParallelism);
 
         sourceStream
@@ -208,25 +203,13 @@ public class 
UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg
 
     private static JobClient createMultiInputDAG(StreamExecutionEnvironment 
env) throws Exception {
         int source1Parallelism = getRandomParallelism();
-        DataGeneratorSource<Long> source1 =
-                new DataGeneratorSource<>(
-                        index -> index,
-                        Long.MAX_VALUE,
-                        RateLimiterStrategy.perSecond(5000),
-                        Types.LONG);
         DataStream<Long> sourceStream1 =
-                env.fromSource(source1, WatermarkStrategy.noWatermarks(), 
"Source 1")
+                env.fromSource(createSource(), 
WatermarkStrategy.noWatermarks(), "Source 1")
                         .setParallelism(source1Parallelism);
 
         int source2Parallelism = getRandomParallelism();
-        DataGeneratorSource<Long> source2 =
-                new DataGeneratorSource<>(
-                        index -> index,
-                        Long.MAX_VALUE,
-                        RateLimiterStrategy.perSecond(5000),
-                        Types.LONG);
         DataStream<Long> sourceStream2 =
-                env.fromSource(source2, WatermarkStrategy.noWatermarks(), 
"Source 2")
+                env.fromSource(createSource(), 
WatermarkStrategy.noWatermarks(), "Source 2")
                         .setParallelism(source2Parallelism);
 
         // Keep the same parallelism to ensure the ForwardPartitioner will be 
used.
@@ -245,16 +228,10 @@ public class 
UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg
 
     private static JobClient 
createRescalePartitionerDAG(StreamExecutionEnvironment env)
             throws Exception {
-        DataGeneratorSource<Long> source =
-                new DataGeneratorSource<>(
-                        index -> index,
-                        Long.MAX_VALUE,
-                        RateLimiterStrategy.perSecond(5000),
-                        Types.LONG);
 
         int sourceParallelism = getRandomParallelism();
         DataStream<Long> sourceStream =
-                env.fromSource(source, WatermarkStrategy.noWatermarks(), "Data 
Generator")
+                env.fromSource(createSource(), 
WatermarkStrategy.noWatermarks(), "Data Generator")
                         .setParallelism(sourceParallelism);
 
         sourceStream
@@ -284,25 +261,13 @@ public class 
UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg
             throws Exception {
         // Multi-input part
         int source1Parallelism = getRandomParallelism();
-        DataGeneratorSource<Long> source1 =
-                new DataGeneratorSource<>(
-                        index -> index,
-                        Long.MAX_VALUE,
-                        RateLimiterStrategy.perSecond(5000),
-                        Types.LONG);
         DataStream<Long> sourceStream1 =
-                env.fromSource(source1, WatermarkStrategy.noWatermarks(), 
"Source 1")
+                env.fromSource(createSource(), 
WatermarkStrategy.noWatermarks(), "Source 1")
                         .setParallelism(source1Parallelism);
 
         int source2Parallelism = getRandomParallelism();
-        DataGeneratorSource<Long> source2 =
-                new DataGeneratorSource<>(
-                        index -> index,
-                        Long.MAX_VALUE,
-                        RateLimiterStrategy.perSecond(5000),
-                        Types.LONG);
         DataStream<Long> sourceStream2 =
-                env.fromSource(source2, WatermarkStrategy.noWatermarks(), 
"Source 2")
+                env.fromSource(createSource(), 
WatermarkStrategy.noWatermarks(), "Source 2")
                         .setParallelism(source2Parallelism);
 
         // Keep the same parallelism to ensure the ForwardPartitioner will be 
used.
@@ -349,27 +314,15 @@ public class 
UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg
     private static JobClient 
createPartEmptyHashExchangeDAG(StreamExecutionEnvironment env)
             throws Exception {
         int source1Parallelism = getRandomParallelism();
-        DataGeneratorSource<Long> source1 =
-                new DataGeneratorSource<>(
-                        index -> index,
-                        Long.MAX_VALUE,
-                        RateLimiterStrategy.perSecond(5000),
-                        Types.LONG);
         DataStream<Long> sourceStream1 =
-                env.fromSource(source1, WatermarkStrategy.noWatermarks(), 
"Source 1")
+                env.fromSource(createSource(), 
WatermarkStrategy.noWatermarks(), "Source 1")
                         .setParallelism(source1Parallelism);
 
         int source2Parallelism = getRandomParallelism();
-        DataGeneratorSource<Long> source2 =
-                new DataGeneratorSource<>(
-                        index -> index,
-                        Long.MAX_VALUE,
-                        RateLimiterStrategy.perSecond(5000),
-                        Types.LONG);
 
         // Filter all records to simulate empty state exchange
         DataStream<Long> sourceStream2 =
-                env.fromSource(source2, WatermarkStrategy.noWatermarks(), 
"Source 2")
+                env.fromSource(createSource(), 
WatermarkStrategy.noWatermarks(), "Source 2")
                         .setParallelism(source2Parallelism)
                         .filter(value -> false)
                         .setParallelism(source2Parallelism);
@@ -392,6 +345,20 @@ public class 
UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg
         return RANDOM.nextInt(MAX_SLOTS) + 1;
     }
 
+    private static DataGeneratorSource<Long> createSource() {
+        return new DataGeneratorSource<>(
+                index -> index,
+                new NumberSequenceSource(0, Long.MAX_VALUE - 1) {
+                    @Override
+                    protected List<NumberSequenceSplit> splitNumberRange(
+                            long from, long to, int numSplitsIgnored) {
+                        return super.splitNumberRange(from, to, MAX_SLOTS);
+                    }
+                },
+                RateLimiterStrategy.perSecond(5000),
+                Types.LONG) {};
+    }
+
     /** A simple CoMapFunction that sleeps for 1ms for each element. */
     private static class SleepingCoMap implements CoMapFunction<Long, Long, 
Long> {
         @Override

Reply via email to