This is an automated email from the ASF dual-hosted git repository.

junrui pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.0 by this push:
     new 247b070943d [FLINK-37576][runtime] Fix the incorrect status of the 
isBroadcast field in AllToAllBlockingResultInfo when submitting a job graph.
247b070943d is described below

commit 247b070943d54dfb749bc97f8d93a87d6db8cd05
Author: JunRuiLee <jrlee....@gmail.com>
AuthorDate: Fri Mar 28 14:53:52 2025 +0800

    [FLINK-37576][runtime] Fix the incorrect status of the isBroadcast field in 
AllToAllBlockingResultInfo when submitting a job graph.
    
    This issue has minimal impact on users because RestClusterClient is 
annotated with @Internal, and users can only submit stream graphs.
---
 .../adaptivebatch/AllToAllBlockingResultInfo.java  |  1 +
 .../scheduling/AdaptiveBatchSchedulerITCase.java   | 35 ++++++++++++++++++++++
 2 files changed, 36 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AllToAllBlockingResultInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AllToAllBlockingResultInfo.java
index 4d54646cdb0..de390a42783 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AllToAllBlockingResultInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AllToAllBlockingResultInfo.java
@@ -74,6 +74,7 @@ public class AllToAllBlockingResultInfo extends 
AbstractBlockingResultInfo {
             Map<Integer, long[]> subpartitionBytesByPartitionIndex) {
         super(resultId, numOfPartitions, numOfSubpartitions, 
subpartitionBytesByPartitionIndex);
         this.singleSubpartitionContainsAllData = 
singleSubpartitionContainsAllData;
+        this.isBroadcast = singleSubpartitionContainsAllData;
     }
 
     @Override
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java
index c7f9b3c97cc..adad853102e 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
 import org.apache.flink.runtime.scheduler.adaptivebatch.OperatorsFinished;
 import 
org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphOptimizationStrategy;
@@ -43,6 +44,7 @@ import 
org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge;
 import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
+import org.apache.flink.test.runtime.JobGraphRunningUtil;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -88,6 +90,39 @@ class AdaptiveBatchSchedulerITCase {
         testSchedulingBase(false);
     }
 
+    @Test
+    void testSubmitJobGraphWithBroadcastEdge() throws Exception {
+        final Configuration configuration = createConfiguration();
+        // make sure the map operator has two sub-tasks
+        
configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MIN_PARALLELISM,
 2);
+        
configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM,
 2);
+
+        final StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.createLocalEnvironment(configuration);
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+
+        env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
+                .setParallelism(1)
+                .broadcast()
+                .map(new NumberCounter());
+
+        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+        JobGraphRunningUtil.execute(jobGraph, configuration, 1, 2);
+
+        Map<Long, Long> numberCountResultMap =
+                numberCountResults.stream()
+                        .flatMap(map -> map.entrySet().stream())
+                        .collect(
+                                Collectors.toMap(
+                                        Map.Entry::getKey, 
Map.Entry::getValue, Long::sum));
+
+        Map<Long, Long> expectedResult =
+                LongStream.range(0, NUMBERS_TO_PRODUCE)
+                        .boxed()
+                        .collect(Collectors.toMap(Function.identity(), i -> 
2L));
+        assertThat(numberCountResultMap).isEqualTo(expectedResult);
+    }
+
     @Test
     void testSchedulingWithDynamicSourceParallelismInference() throws 
Exception {
         testSchedulingBase(true);

Reply via email to