This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 81a1de166d1cec9a4e326140e60d7cd122468f92
Author: noorall <863485...@qq.com>
AuthorDate: Wed Jan 8 19:35:03 2025 +0800

    [hotfix][runtime] 
ConsumedSubpartitionContext#getConsumedShuffleDescriptorRanges should return a 
non-overlapping result
---
 .../flink/runtime/deployment/ConsumedSubpartitionContext.java | 11 ++++++-----
 .../runtime/deployment/ConsumedSubpartitionContextTest.java   |  4 ++--
 2 files changed, 8 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ConsumedSubpartitionContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ConsumedSubpartitionContext.java
index e3270d44e9c..602206eedb3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ConsumedSubpartitionContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ConsumedSubpartitionContext.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.deployment;
 
 import org.apache.flink.runtime.executiongraph.IndexRange;
-import org.apache.flink.runtime.executiongraph.IndexRangeUtil;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
 
@@ -32,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
 
+import static 
org.apache.flink.runtime.executiongraph.IndexRangeUtil.mergeIndexRanges;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -78,8 +78,10 @@ class ConsumedSubpartitionContext implements Serializable {
     }
 
     public Collection<IndexRange> getConsumedShuffleDescriptorRanges() {
+        // The original consumed shuffle descriptors may have overlaps, we 
need to deduplicate it
+        // by merging.
         return Collections.unmodifiableCollection(
-                consumedShuffleDescriptorToSubpartitionRangeMap.keySet());
+                
mergeIndexRanges(consumedShuffleDescriptorToSubpartitionRangeMap.keySet()));
     }
 
     public IndexRange getConsumedSubpartitionRange(int shuffleDescriptorIndex) 
{
@@ -96,7 +98,7 @@ class ConsumedSubpartitionContext implements Serializable {
             }
         }
         List<IndexRange> mergedConsumedSubpartitionRanges =
-                IndexRangeUtil.mergeIndexRanges(consumedSubpartitionRanges);
+                mergeIndexRanges(consumedSubpartitionRanges);
         checkState(
                 mergedConsumedSubpartitionRanges.size() == 1,
                 "Illegal consumed subpartition range for shuffle descriptor 
index "
@@ -159,8 +161,7 @@ class ConsumedSubpartitionContext implements Serializable {
         // merging.
         int numConsumedShuffleDescriptors = 0;
         List<IndexRange> mergedConsumedShuffleDescriptor =
-                IndexRangeUtil.mergeIndexRanges(
-                        
consumedShuffleDescriptorToSubpartitionRangeMap.keySet());
+                
mergeIndexRanges(consumedShuffleDescriptorToSubpartitionRangeMap.keySet());
         for (IndexRange range : mergedConsumedShuffleDescriptor) {
             numConsumedShuffleDescriptors += range.size();
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ConsumedSubpartitionContextTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ConsumedSubpartitionContextTest.java
index 7e683fcdfc5..d3e694fdf67 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ConsumedSubpartitionContextTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ConsumedSubpartitionContextTest.java
@@ -54,7 +54,7 @@ class ConsumedSubpartitionContextTest {
 
         Collection<IndexRange> shuffleDescriptorRanges =
                 context.getConsumedShuffleDescriptorRanges();
-        assertThat(shuffleDescriptorRanges).hasSize(2);
+        assertThat(shuffleDescriptorRanges).hasSize(1);
 
         assertThat(context.getConsumedSubpartitionRange(0)).isEqualTo(new 
IndexRange(0, 2));
         assertThat(context.getConsumedSubpartitionRange(1)).isEqualTo(new 
IndexRange(0, 2));
@@ -106,7 +106,7 @@ class ConsumedSubpartitionContextTest {
 
         Collection<IndexRange> shuffleDescriptorRanges =
                 context.getConsumedShuffleDescriptorRanges();
-        assertThat(shuffleDescriptorRanges).hasSize(2);
+        assertThat(shuffleDescriptorRanges).hasSize(1);
 
         assertThat(context.getConsumedSubpartitionRange(0)).isEqualTo(new 
IndexRange(1, 2));
         assertThat(context.getConsumedSubpartitionRange(1)).isEqualTo(new 
IndexRange(1, 2));

Reply via email to