This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 02e36182d4a9fe6621a621b854404296a7f3ad0c
Author: Rui Fan <[email protected]>
AuthorDate: Mon Mar 2 19:58:37 2026 +0100

    [FLINK-39140][test] Change record type from Long to String in 
UnalignedCheckpointRescaleWithMixedExchangesITCase
    
    Long records (8 bytes) allow thousands of records per buffer, causing
    excessive backpressure during aligned checkpoint phases (forward/rescale
    exchanges). Using 100-char random String records reduces the record count
    per buffer, shortening the time needed to drain backpressured buffers.
---
 ...dCheckpointRescaleWithMixedExchangesITCase.java | 56 ++++++++++++++--------
 1 file changed, 35 insertions(+), 21 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleWithMixedExchangesITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleWithMixedExchangesITCase.java
index e216f8cbbbf..03aecb0f415 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleWithMixedExchangesITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleWithMixedExchangesITCase.java
@@ -60,6 +60,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
 
 import static 
org.apache.flink.configuration.RestartStrategyOptions.RestartStrategyType.NO_RESTART_STRATEGY;
 
@@ -78,6 +79,12 @@ public class 
UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg
     private static final int MAX_SLOTS = NUM_TASK_MANAGERS * 
SLOTS_PER_TASK_MANAGER;
     private static final Random RANDOM = new Random();
 
+    // Use 100-char String records instead of Long to increase per-record 
size. With Long (8 bytes),
+    // each buffer holds thousands of records, causing excessive backpressure 
during aligned
+    // checkpoint phases (forward/rescale exchanges). Larger String records 
reduce the record count
+    // per buffer, shortening the time needed to drain backpressured buffers.
+    private static final int RECORD_LENGTH = 100;
+
     private static MiniClusterWithClientResource cluster;
     @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
@@ -198,12 +205,12 @@ public class 
UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg
     private static JobClient createMultiOutputDAG(StreamExecutionEnvironment 
env) throws Exception {
 
         int sourceParallelism = getRandomParallelism();
-        DataStream<Long> sourceStream =
+        DataStream<String> sourceStream =
                 env.fromSource(createSource(), 
WatermarkStrategy.noWatermarks(), "Data Generator")
                         .setParallelism(sourceParallelism);
 
         sourceStream
-                .keyBy((KeySelector<Long, Long>) value -> value)
+                .keyBy((KeySelector<String, String>) value -> value)
                 .map(
                         x -> {
                             Thread.sleep(5);
@@ -227,17 +234,17 @@ public class 
UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg
 
     private static JobClient createMultiInputDAG(StreamExecutionEnvironment 
env) throws Exception {
         int source1Parallelism = getRandomParallelism();
-        DataStream<Long> sourceStream1 =
+        DataStream<String> sourceStream1 =
                 env.fromSource(createSource(), 
WatermarkStrategy.noWatermarks(), "Source 1")
                         .setParallelism(source1Parallelism);
 
         int source2Parallelism = getRandomParallelism();
-        DataStream<Long> sourceStream2 =
+        DataStream<String> sourceStream2 =
                 env.fromSource(createSource(), 
WatermarkStrategy.noWatermarks(), "Source 2")
                         .setParallelism(source2Parallelism);
 
         // Keep the same parallelism to ensure the ForwardPartitioner will be 
used.
-        DataStream<Long> forwardedStream =
+        DataStream<String> forwardedStream =
                 sourceStream2.map(x -> x).setParallelism(source2Parallelism);
 
         sourceStream1
@@ -254,12 +261,12 @@ public class 
UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg
             throws Exception {
 
         int sourceParallelism = getRandomParallelism();
-        DataStream<Long> sourceStream =
+        DataStream<String> sourceStream =
                 env.fromSource(createSource(), 
WatermarkStrategy.noWatermarks(), "Data Generator")
                         .setParallelism(sourceParallelism);
 
         sourceStream
-                .keyBy((KeySelector<Long, Long>) value -> value)
+                .keyBy((KeySelector<String, String>) value -> value)
                 .map(
                         x -> {
                             Thread.sleep(5);
@@ -285,20 +292,20 @@ public class 
UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg
             throws Exception {
         // Multi-input part
         int source1Parallelism = getRandomParallelism();
-        DataStream<Long> sourceStream1 =
+        DataStream<String> sourceStream1 =
                 env.fromSource(createSource(), 
WatermarkStrategy.noWatermarks(), "Source 1")
                         .setParallelism(source1Parallelism);
 
         int source2Parallelism = getRandomParallelism();
-        DataStream<Long> sourceStream2 =
+        DataStream<String> sourceStream2 =
                 env.fromSource(createSource(), 
WatermarkStrategy.noWatermarks(), "Source 2")
                         .setParallelism(source2Parallelism);
 
         // Keep the same parallelism to ensure the ForwardPartitioner will be 
used.
-        DataStream<Long> forwardedStream =
+        DataStream<String> forwardedStream =
                 sourceStream2.map(x -> x).setParallelism(source2Parallelism);
 
-        DataStream<Long> multiInputMap =
+        DataStream<String> multiInputMap =
                 sourceStream1
                         .rebalance()
                         .connect(forwardedStream.rebalance())
@@ -308,7 +315,7 @@ public class 
UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg
 
         // Multi-output part
         multiInputMap
-                .keyBy((KeySelector<Long, Long>) value -> value)
+                .keyBy((KeySelector<String, String>) value -> value)
                 .map(
                         x -> {
                             Thread.sleep(5);
@@ -338,14 +345,14 @@ public class 
UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg
     private static JobClient 
createPartEmptyHashExchangeDAG(StreamExecutionEnvironment env)
             throws Exception {
         int source1Parallelism = getRandomParallelism();
-        DataStream<Long> sourceStream1 =
+        DataStream<String> sourceStream1 =
                 env.fromSource(createSource(), 
WatermarkStrategy.noWatermarks(), "Source 1")
                         .setParallelism(source1Parallelism);
 
         int source2Parallelism = getRandomParallelism();
 
         // Filter all records to simulate empty state exchange
-        DataStream<Long> sourceStream2 =
+        DataStream<String> sourceStream2 =
                 env.fromSource(createSource(), 
WatermarkStrategy.noWatermarks(), "Source 2")
                         .setParallelism(source2Parallelism)
                         .filter(value -> false)
@@ -353,7 +360,7 @@ public class 
UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg
 
         sourceStream1
                 .union(sourceStream2)
-                .keyBy((KeySelector<Long, Long>) value -> value)
+                .keyBy((KeySelector<String, String>) value -> value)
                 .map(
                         x -> {
                             Thread.sleep(5);
@@ -369,9 +376,16 @@ public class 
UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg
         return RANDOM.nextInt(MAX_SLOTS) + 1;
     }
 
-    private static DataGeneratorSource<Long> createSource() {
+    private static DataGeneratorSource<String> createSource() {
         return new DataGeneratorSource<>(
-                index -> index,
+                index -> {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+                    StringBuilder sb = new StringBuilder(RECORD_LENGTH);
+                    for (int i = 0; i < RECORD_LENGTH; i++) {
+                        sb.append((char) ('a' + rnd.nextInt(26)));
+                    }
+                    return sb.toString();
+                },
                 new NumberSequenceSource(0, Long.MAX_VALUE - 1) {
                     @Override
                     protected List<NumberSequenceSplit> splitNumberRange(
@@ -380,19 +394,19 @@ public class 
UnalignedCheckpointRescaleWithMixedExchangesITCase extends TestLogg
                     }
                 },
                 RateLimiterStrategy.perSecond(5000),
-                Types.LONG) {};
+                Types.STRING) {};
     }
 
     /** A simple CoMapFunction that sleeps for 1ms for each element. */
-    private static class SleepingCoMap implements CoMapFunction<Long, Long, 
Long> {
+    private static class SleepingCoMap<T> implements CoMapFunction<T, T, T> {
         @Override
-        public Long map1(Long value) throws Exception {
+        public T map1(T value) throws Exception {
             Thread.sleep(1);
             return value;
         }
 
         @Override
-        public Long map2(Long value) throws Exception {
+        public T map2(T value) throws Exception {
             Thread.sleep(1);
             return value;
         }

Reply via email to