This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 81a1de166d1cec9a4e326140e60d7cd122468f92 Author: noorall <863485...@qq.com> AuthorDate: Wed Jan 8 19:35:03 2025 +0800 [hotfix][runtime] ConsumedSubpartitionContext#getConsumedShuffleDescriptorRanges should return a non-overlapping result --- .../flink/runtime/deployment/ConsumedSubpartitionContext.java | 11 ++++++----- .../runtime/deployment/ConsumedSubpartitionContextTest.java | 4 ++-- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ConsumedSubpartitionContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ConsumedSubpartitionContext.java index e3270d44e9c..602206eedb3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ConsumedSubpartitionContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ConsumedSubpartitionContext.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.deployment; import org.apache.flink.runtime.executiongraph.IndexRange; -import org.apache.flink.runtime.executiongraph.IndexRangeUtil; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; @@ -32,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.function.Function; +import static org.apache.flink.runtime.executiongraph.IndexRangeUtil.mergeIndexRanges; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -78,8 +78,10 @@ class ConsumedSubpartitionContext implements Serializable { } public Collection<IndexRange> getConsumedShuffleDescriptorRanges() { + // The original consumed shuffle descriptors may have overlaps, we need to deduplicate it + // by merging. return Collections.unmodifiableCollection( - consumedShuffleDescriptorToSubpartitionRangeMap.keySet()); + mergeIndexRanges(consumedShuffleDescriptorToSubpartitionRangeMap.keySet())); } public IndexRange getConsumedSubpartitionRange(int shuffleDescriptorIndex) { @@ -96,7 +98,7 @@ class ConsumedSubpartitionContext implements Serializable { } } List<IndexRange> mergedConsumedSubpartitionRanges = - IndexRangeUtil.mergeIndexRanges(consumedSubpartitionRanges); + mergeIndexRanges(consumedSubpartitionRanges); checkState( mergedConsumedSubpartitionRanges.size() == 1, "Illegal consumed subpartition range for shuffle descriptor index " @@ -159,8 +161,7 @@ class ConsumedSubpartitionContext implements Serializable { // merging. int numConsumedShuffleDescriptors = 0; List<IndexRange> mergedConsumedShuffleDescriptor = - IndexRangeUtil.mergeIndexRanges( - consumedShuffleDescriptorToSubpartitionRangeMap.keySet()); + mergeIndexRanges(consumedShuffleDescriptorToSubpartitionRangeMap.keySet()); for (IndexRange range : mergedConsumedShuffleDescriptor) { numConsumedShuffleDescriptors += range.size(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ConsumedSubpartitionContextTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ConsumedSubpartitionContextTest.java index 7e683fcdfc5..d3e694fdf67 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ConsumedSubpartitionContextTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ConsumedSubpartitionContextTest.java @@ -54,7 +54,7 @@ class ConsumedSubpartitionContextTest { Collection<IndexRange> shuffleDescriptorRanges = context.getConsumedShuffleDescriptorRanges(); - assertThat(shuffleDescriptorRanges).hasSize(2); + assertThat(shuffleDescriptorRanges).hasSize(1); assertThat(context.getConsumedSubpartitionRange(0)).isEqualTo(new IndexRange(0, 2)); assertThat(context.getConsumedSubpartitionRange(1)).isEqualTo(new IndexRange(0, 2)); @@ -106,7 +106,7 @@ class ConsumedSubpartitionContextTest { Collection<IndexRange> shuffleDescriptorRanges = context.getConsumedShuffleDescriptorRanges(); - assertThat(shuffleDescriptorRanges).hasSize(2); + assertThat(shuffleDescriptorRanges).hasSize(1); assertThat(context.getConsumedSubpartitionRange(0)).isEqualTo(new IndexRange(1, 2)); assertThat(context.getConsumedSubpartitionRange(1)).isEqualTo(new IndexRange(1, 2));