This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 02e36182d4a9fe6621a621b854404296a7f3ad0c Author: Rui Fan <[email protected]> AuthorDate: Mon Mar 2 19:58:37 2026 +0100 [FLINK-39140][test] Change record type from Long to String in UnalignedCheckpointRescaleWithMixedExchangesITCase Long records (8 bytes) allow thousands of records per buffer, causing excessive backpressure during aligned checkpoint phases (forward/rescale exchanges). Using 100-char random String records reduces the record count per buffer, shortening the time needed to drain backpressured buffers. --- ...dCheckpointRescaleWithMixedExchangesITCase.java | 56 ++++++++++++++-------- 1 file changed, 35 insertions(+), 21 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleWithMixedExchangesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleWithMixedExchangesITCase.java index e216f8cbbbf..03aecb0f415 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleWithMixedExchangesITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleWithMixedExchangesITCase.java @@ -60,6 +60,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; import static org.apache.flink.configuration.RestartStrategyOptions.RestartStrategyType.NO_RESTART_STRATEGY; @@ -78,6 +79,12 @@ public class UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg private static final int MAX_SLOTS = NUM_TASK_MANAGERS * SLOTS_PER_TASK_MANAGER; private static final Random RANDOM = new Random(); + // Use 100-char String records instead of Long to increase per-record size. With Long (8 bytes), + // each buffer holds thousands of records, causing excessive backpressure during aligned + // checkpoint phases (forward/rescale exchanges). Larger String records reduce the record count + // per buffer, shortening the time needed to drain backpressured buffers. + private static final int RECORD_LENGTH = 100; + private static MiniClusterWithClientResource cluster; @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -198,12 +205,12 @@ public class UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg private static JobClient createMultiOutputDAG(StreamExecutionEnvironment env) throws Exception { int sourceParallelism = getRandomParallelism(); - DataStream<Long> sourceStream = + DataStream<String> sourceStream = env.fromSource(createSource(), WatermarkStrategy.noWatermarks(), "Data Generator") .setParallelism(sourceParallelism); sourceStream - .keyBy((KeySelector<Long, Long>) value -> value) + .keyBy((KeySelector<String, String>) value -> value) .map( x -> { Thread.sleep(5); @@ -227,17 +234,17 @@ public class UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg private static JobClient createMultiInputDAG(StreamExecutionEnvironment env) throws Exception { int source1Parallelism = getRandomParallelism(); - DataStream<Long> sourceStream1 = + DataStream<String> sourceStream1 = env.fromSource(createSource(), WatermarkStrategy.noWatermarks(), "Source 1") .setParallelism(source1Parallelism); int source2Parallelism = getRandomParallelism(); - DataStream<Long> sourceStream2 = + DataStream<String> sourceStream2 = env.fromSource(createSource(), WatermarkStrategy.noWatermarks(), "Source 2") .setParallelism(source2Parallelism); // Keep the same parallelism to ensure the ForwardPartitioner will be used. - DataStream<Long> forwardedStream = + DataStream<String> forwardedStream = sourceStream2.map(x -> x).setParallelism(source2Parallelism); sourceStream1 @@ -254,12 +261,12 @@ public class UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg throws Exception { int sourceParallelism = getRandomParallelism(); - DataStream<Long> sourceStream = + DataStream<String> sourceStream = env.fromSource(createSource(), WatermarkStrategy.noWatermarks(), "Data Generator") .setParallelism(sourceParallelism); sourceStream - .keyBy((KeySelector<Long, Long>) value -> value) + .keyBy((KeySelector<String, String>) value -> value) .map( x -> { Thread.sleep(5); @@ -285,20 +292,20 @@ public class UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg throws Exception { // Multi-input part int source1Parallelism = getRandomParallelism(); - DataStream<Long> sourceStream1 = + DataStream<String> sourceStream1 = env.fromSource(createSource(), WatermarkStrategy.noWatermarks(), "Source 1") .setParallelism(source1Parallelism); int source2Parallelism = getRandomParallelism(); - DataStream<Long> sourceStream2 = + DataStream<String> sourceStream2 = env.fromSource(createSource(), WatermarkStrategy.noWatermarks(), "Source 2") .setParallelism(source2Parallelism); // Keep the same parallelism to ensure the ForwardPartitioner will be used. - DataStream<Long> forwardedStream = + DataStream<String> forwardedStream = sourceStream2.map(x -> x).setParallelism(source2Parallelism); - DataStream<Long> multiInputMap = + DataStream<String> multiInputMap = sourceStream1 .rebalance() .connect(forwardedStream.rebalance()) @@ -308,7 +315,7 @@ public class UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg // Multi-output part multiInputMap - .keyBy((KeySelector<Long, Long>) value -> value) + .keyBy((KeySelector<String, String>) value -> value) .map( x -> { Thread.sleep(5); @@ -338,14 +345,14 @@ public class UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg private static JobClient createPartEmptyHashExchangeDAG(StreamExecutionEnvironment env) throws Exception { int source1Parallelism = getRandomParallelism(); - DataStream<Long> sourceStream1 = + DataStream<String> sourceStream1 = env.fromSource(createSource(), WatermarkStrategy.noWatermarks(), "Source 1") .setParallelism(source1Parallelism); int source2Parallelism = getRandomParallelism(); // Filter all records to simulate empty state exchange - DataStream<Long> sourceStream2 = + DataStream<String> sourceStream2 = env.fromSource(createSource(), WatermarkStrategy.noWatermarks(), "Source 2") .setParallelism(source2Parallelism) .filter(value -> false) @@ -353,7 +360,7 @@ public class UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg sourceStream1 .union(sourceStream2) - .keyBy((KeySelector<Long, Long>) value -> value) + .keyBy((KeySelector<String, String>) value -> value) .map( x -> { Thread.sleep(5); @@ -369,9 +376,16 @@ public class UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg return RANDOM.nextInt(MAX_SLOTS) + 1; } - private static DataGeneratorSource<Long> createSource() { + private static DataGeneratorSource<String> createSource() { return new DataGeneratorSource<>( - index -> index, + index -> { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + StringBuilder sb = new StringBuilder(RECORD_LENGTH); + for (int i = 0; i < RECORD_LENGTH; i++) { + sb.append((char) ('a' + rnd.nextInt(26))); + } + return sb.toString(); + }, new NumberSequenceSource(0, Long.MAX_VALUE - 1) { @Override protected List<NumberSequenceSplit> splitNumberRange( @@ -380,19 +394,19 @@ public class UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg } }, RateLimiterStrategy.perSecond(5000), - Types.LONG) {}; + Types.STRING) {}; } /** A simple CoMapFunction that sleeps for 1ms for each element. */ - private static class SleepingCoMap implements CoMapFunction<Long, Long, Long> { + private static class SleepingCoMap<T> implements CoMapFunction<T, T, T> { @Override - public Long map1(Long value) throws Exception { + public T map1(T value) throws Exception { Thread.sleep(1); return value; } @Override - public Long map2(Long value) throws Exception { + public T map2(T value) throws Exception { Thread.sleep(1); return value; }
