This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0f589a532b1 [FLINK-38613][tests] Force number of splits in
UnalignedCheckpointRescaleWithMixedExchangesITCase
0f589a532b1 is described below
commit 0f589a532b110e5da0012c90812fc7c450bbd55a
Author: Roman Khachatryan <[email protected]>
AuthorDate: Wed Feb 18 19:21:31 2026 +0100
[FLINK-38613][tests] Force number of splits in
UnalignedCheckpointRescaleWithMixedExchangesITCase
Motivation: avoid sub-tasks finishing prematurely and fail the test
assumption of all sub-tasks always running
---
.../datagen/source/DataGeneratorSource.java | 38 ++++++++++-
...dCheckpointRescaleWithMixedExchangesITCase.java | 79 +++++++---------------
2 files changed, 59 insertions(+), 58 deletions(-)
diff --git
a/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java
b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java
index 08ab05bc647..1145ba858f6 100644
---
a/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java
+++
b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java
@@ -142,16 +142,50 @@ public class DataGeneratorSource<OUT>
rateLimiterStrategy,
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
}
+ /**
+ * Instantiates a new {@code DataGeneratorSource}.
+ *
+ * @param generatorFunction The {@code GeneratorFunction} function.
+ * @param numberSource The number source.
+ * @param rateLimiterStrategy The strategy for rate limiting.
+ * @param typeInfo The type of the produced data points.
+ */
+ public DataGeneratorSource(
+ GeneratorFunction<Long, OUT> generatorFunction,
+ NumberSequenceSource numberSource,
+ RateLimiterStrategy rateLimiterStrategy,
+ TypeInformation<OUT> typeInfo) {
+ this(
+ new GeneratorSourceReaderFactory<>(generatorFunction,
rateLimiterStrategy),
+ generatorFunction,
+ typeInfo,
+ numberSource);
+ ClosureCleaner.clean(
+ rateLimiterStrategy,
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ }
+
DataGeneratorSource(
SourceReaderFactory<OUT, NumberSequenceSplit> sourceReaderFactory,
GeneratorFunction<Long, OUT> generatorFunction,
long count,
TypeInformation<OUT> typeInfo) {
+ this(
+ sourceReaderFactory,
+ generatorFunction,
+ typeInfo,
+ // a noop source (0 elements) is used in Table tests
+ new NumberSequenceSource(0, count > 0 ? count - 1 : 0));
+ }
+
+ DataGeneratorSource(
+ SourceReaderFactory<OUT, NumberSequenceSplit> sourceReaderFactory,
+ GeneratorFunction<Long, OUT> generatorFunction,
+ TypeInformation<OUT> typeInfo,
+ NumberSequenceSource numberSource) {
this.sourceReaderFactory = checkNotNull(sourceReaderFactory);
this.generatorFunction = checkNotNull(generatorFunction);
this.typeInfo = checkNotNull(typeInfo);
- long to = count > 0 ? count - 1 : 0; // a noop source (0 elements) is
used in Table tests
- this.numberSource = new NumberSequenceSource(0, to);
+ this.numberSource = numberSource;
ClosureCleaner.clean(
generatorFunction,
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
ClosureCleaner.clean(
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleWithMixedExchangesITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleWithMixedExchangesITCase.java
index d11f787aef6..9bf0e4d8df9 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleWithMixedExchangesITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleWithMixedExchangesITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.checkpointing;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.CheckpointingOptions;
@@ -171,16 +172,10 @@ public class
UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg
}
private static JobClient createMultiOutputDAG(StreamExecutionEnvironment
env) throws Exception {
- DataGeneratorSource<Long> source =
- new DataGeneratorSource<>(
- index -> index,
- Long.MAX_VALUE,
- RateLimiterStrategy.perSecond(5000),
- Types.LONG);
int sourceParallelism = getRandomParallelism();
DataStream<Long> sourceStream =
- env.fromSource(source, WatermarkStrategy.noWatermarks(), "Data
Generator")
+ env.fromSource(createSource(),
WatermarkStrategy.noWatermarks(), "Data Generator")
.setParallelism(sourceParallelism);
sourceStream
@@ -208,25 +203,13 @@ public class
UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg
private static JobClient createMultiInputDAG(StreamExecutionEnvironment
env) throws Exception {
int source1Parallelism = getRandomParallelism();
- DataGeneratorSource<Long> source1 =
- new DataGeneratorSource<>(
- index -> index,
- Long.MAX_VALUE,
- RateLimiterStrategy.perSecond(5000),
- Types.LONG);
DataStream<Long> sourceStream1 =
- env.fromSource(source1, WatermarkStrategy.noWatermarks(),
"Source 1")
+ env.fromSource(createSource(),
WatermarkStrategy.noWatermarks(), "Source 1")
.setParallelism(source1Parallelism);
int source2Parallelism = getRandomParallelism();
- DataGeneratorSource<Long> source2 =
- new DataGeneratorSource<>(
- index -> index,
- Long.MAX_VALUE,
- RateLimiterStrategy.perSecond(5000),
- Types.LONG);
DataStream<Long> sourceStream2 =
- env.fromSource(source2, WatermarkStrategy.noWatermarks(),
"Source 2")
+ env.fromSource(createSource(),
WatermarkStrategy.noWatermarks(), "Source 2")
.setParallelism(source2Parallelism);
// Keep the same parallelism to ensure the ForwardPartitioner will be
used.
@@ -245,16 +228,10 @@ public class
UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg
private static JobClient
createRescalePartitionerDAG(StreamExecutionEnvironment env)
throws Exception {
- DataGeneratorSource<Long> source =
- new DataGeneratorSource<>(
- index -> index,
- Long.MAX_VALUE,
- RateLimiterStrategy.perSecond(5000),
- Types.LONG);
int sourceParallelism = getRandomParallelism();
DataStream<Long> sourceStream =
- env.fromSource(source, WatermarkStrategy.noWatermarks(), "Data
Generator")
+ env.fromSource(createSource(),
WatermarkStrategy.noWatermarks(), "Data Generator")
.setParallelism(sourceParallelism);
sourceStream
@@ -284,25 +261,13 @@ public class
UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg
throws Exception {
// Multi-input part
int source1Parallelism = getRandomParallelism();
- DataGeneratorSource<Long> source1 =
- new DataGeneratorSource<>(
- index -> index,
- Long.MAX_VALUE,
- RateLimiterStrategy.perSecond(5000),
- Types.LONG);
DataStream<Long> sourceStream1 =
- env.fromSource(source1, WatermarkStrategy.noWatermarks(),
"Source 1")
+ env.fromSource(createSource(),
WatermarkStrategy.noWatermarks(), "Source 1")
.setParallelism(source1Parallelism);
int source2Parallelism = getRandomParallelism();
- DataGeneratorSource<Long> source2 =
- new DataGeneratorSource<>(
- index -> index,
- Long.MAX_VALUE,
- RateLimiterStrategy.perSecond(5000),
- Types.LONG);
DataStream<Long> sourceStream2 =
- env.fromSource(source2, WatermarkStrategy.noWatermarks(),
"Source 2")
+ env.fromSource(createSource(),
WatermarkStrategy.noWatermarks(), "Source 2")
.setParallelism(source2Parallelism);
// Keep the same parallelism to ensure the ForwardPartitioner will be
used.
@@ -349,27 +314,15 @@ public class
UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg
private static JobClient
createPartEmptyHashExchangeDAG(StreamExecutionEnvironment env)
throws Exception {
int source1Parallelism = getRandomParallelism();
- DataGeneratorSource<Long> source1 =
- new DataGeneratorSource<>(
- index -> index,
- Long.MAX_VALUE,
- RateLimiterStrategy.perSecond(5000),
- Types.LONG);
DataStream<Long> sourceStream1 =
- env.fromSource(source1, WatermarkStrategy.noWatermarks(),
"Source 1")
+ env.fromSource(createSource(),
WatermarkStrategy.noWatermarks(), "Source 1")
.setParallelism(source1Parallelism);
int source2Parallelism = getRandomParallelism();
- DataGeneratorSource<Long> source2 =
- new DataGeneratorSource<>(
- index -> index,
- Long.MAX_VALUE,
- RateLimiterStrategy.perSecond(5000),
- Types.LONG);
// Filter all records to simulate empty state exchange
DataStream<Long> sourceStream2 =
- env.fromSource(source2, WatermarkStrategy.noWatermarks(),
"Source 2")
+ env.fromSource(createSource(),
WatermarkStrategy.noWatermarks(), "Source 2")
.setParallelism(source2Parallelism)
.filter(value -> false)
.setParallelism(source2Parallelism);
@@ -392,6 +345,20 @@ public class
UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg
return RANDOM.nextInt(MAX_SLOTS) + 1;
}
+ private static DataGeneratorSource<Long> createSource() {
+ return new DataGeneratorSource<>(
+ index -> index,
+ new NumberSequenceSource(0, Long.MAX_VALUE - 1) {
+ @Override
+ protected List<NumberSequenceSplit> splitNumberRange(
+ long from, long to, int numSplitsIgnored) {
+ return super.splitNumberRange(from, to, MAX_SLOTS);
+ }
+ },
+ RateLimiterStrategy.perSecond(5000),
+ Types.LONG) {};
+ }
+
/** A simple CoMapFunction that sleeps for 1ms for each element. */
private static class SleepingCoMap implements CoMapFunction<Long, Long,
Long> {
@Override