This is an automated email from the ASF dual-hosted git repository. junrui pushed a commit to branch release-2.0 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.0 by this push: new 247b070943d [FLINK-37576][runtime] Fix the incorrect status of the isBroadcast field in AllToAllBlockingResultInfo when submitting a job graph. 247b070943d is described below commit 247b070943d54dfb749bc97f8d93a87d6db8cd05 Author: JunRuiLee <jrlee....@gmail.com> AuthorDate: Fri Mar 28 14:53:52 2025 +0800 [FLINK-37576][runtime] Fix the incorrect status of the isBroadcast field in AllToAllBlockingResultInfo when submitting a job graph. This issue has minimal impact on users because RestClusterClient is annotated with @Internal, and users can only submit stream graphs. --- .../adaptivebatch/AllToAllBlockingResultInfo.java | 1 + .../scheduling/AdaptiveBatchSchedulerITCase.java | 35 ++++++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AllToAllBlockingResultInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AllToAllBlockingResultInfo.java index 4d54646cdb0..de390a42783 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AllToAllBlockingResultInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AllToAllBlockingResultInfo.java @@ -74,6 +74,7 @@ public class AllToAllBlockingResultInfo extends AbstractBlockingResultInfo { Map<Integer, long[]> subpartitionBytesByPartitionIndex) { super(resultId, numOfPartitions, numOfSubpartitions, subpartitionBytesByPartitionIndex); this.singleSubpartitionContainsAllData = singleSubpartitionContainsAllData; + this.isBroadcast = singleSubpartitionContainsAllData; } @Override diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java index c7f9b3c97cc..adad853102e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java @@ -30,6 +30,7 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.RestOptions; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler; import org.apache.flink.runtime.scheduler.adaptivebatch.OperatorsFinished; import org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphOptimizationStrategy; @@ -43,6 +44,7 @@ import org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge; import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner; +import org.apache.flink.test.runtime.JobGraphRunningUtil; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -88,6 +90,39 @@ class AdaptiveBatchSchedulerITCase { testSchedulingBase(false); } + @Test + void testSubmitJobGraphWithBroadcastEdge() throws Exception { + final Configuration configuration = createConfiguration(); + // make sure the map operator has two sub-tasks + configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MIN_PARALLELISM, 2); + configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM, 2); + + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.createLocalEnvironment(configuration); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + + env.fromSequence(0, NUMBERS_TO_PRODUCE - 1) + .setParallelism(1) + .broadcast() + .map(new NumberCounter()); + + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + JobGraphRunningUtil.execute(jobGraph, configuration, 1, 2); + + Map<Long, Long> numberCountResultMap = + numberCountResults.stream() + .flatMap(map -> map.entrySet().stream()) + .collect( + Collectors.toMap( + Map.Entry::getKey, Map.Entry::getValue, Long::sum)); + + Map<Long, Long> expectedResult = + LongStream.range(0, NUMBERS_TO_PRODUCE) + .boxed() + .collect(Collectors.toMap(Function.identity(), i -> 2L)); + assertThat(numberCountResultMap).isEqualTo(expectedResult); + } + @Test void testSchedulingWithDynamicSourceParallelismInference() throws Exception { testSchedulingBase(true);