This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 18655e286 [CELEBORN-1879] Ignore invalid chunk range generated by 
splitSkewedPartitionLocations
18655e286 is described below

commit 18655e2869efc83d068c32a60e9879c4dcf5fe43
Author: wuziyi <[email protected]>
AuthorDate: Wed Feb 26 23:12:45 2025 +0800

    [CELEBORN-1879] Ignore invalid chunk range generated by 
splitSkewedPartitionLocations
    
    ### What changes were proposed in this pull request?
    
     Ignore invalid chunk range generated by splitSkewedPartitionLocations
    
    ### Why are the changes needed?
    
    Current implementation of skew partition split method 
(`splitSkewedPartitionLocations`)  may generate chunk range where `range.left` 
> `range.right`. Such chunk range is invalid and will cause unnecessary 
initialization of `PartitionReader`
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    ut
    
    Closes #3117 from Z1Wu/feat/c1879.
    
    Authored-by: wuziyi <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../shuffle/celeborn/CelebornPartitionUtil.java    |  2 +-
 .../celeborn/CelebornPartitionUtilSuiteJ.java      | 23 ++++++++++++++++++++++
 2 files changed, 24 insertions(+), 1 deletion(-)

diff --git 
a/client-spark/spark-3-4/src/main/java/org/apache/spark/shuffle/celeborn/CelebornPartitionUtil.java
 
b/client-spark/spark-3-4/src/main/java/org/apache/spark/shuffle/celeborn/CelebornPartitionUtil.java
index da0fcc772..110c0e6c1 100644
--- 
a/client-spark/spark-3-4/src/main/java/org/apache/spark/shuffle/celeborn/CelebornPartitionUtil.java
+++ 
b/client-spark/spark-3-4/src/main/java/org/apache/spark/shuffle/celeborn/CelebornPartitionUtil.java
@@ -85,7 +85,7 @@ public class CelebornPartitionUtil {
         if (currentOffset <= endOffset) {
           right = j - 1;
         }
-        if (left >= 0 && right >= 0) {
+        if (left >= 0 && right >= 0 && right >= left) {
           chunkRange.put(p.getUniqueId(), Pair.of(left, right));
         }
         j++;
diff --git 
a/client-spark/spark-3-4/src/test/java/org/apache/spark/shuffle/celeborn/CelebornPartitionUtilSuiteJ.java
 
b/client-spark/spark-3-4/src/test/java/org/apache/spark/shuffle/celeborn/CelebornPartitionUtilSuiteJ.java
index 989dd31a9..06c431bf8 100644
--- 
a/client-spark/spark-3-4/src/test/java/org/apache/spark/shuffle/celeborn/CelebornPartitionUtilSuiteJ.java
+++ 
b/client-spark/spark-3-4/src/test/java/org/apache/spark/shuffle/celeborn/CelebornPartitionUtilSuiteJ.java
@@ -27,6 +27,29 @@ import org.apache.celeborn.common.protocol.PartitionLocation;
 import org.apache.celeborn.common.protocol.StorageInfo;
 
 public class CelebornPartitionUtilSuiteJ {
+  @Test
+  public void testSkewPartitionSplitIgnoreEmpty1() {
+    ArrayList<PartitionLocation> locations = new ArrayList<>();
+    PartitionLocation loc = genPartitionLocation(0, new Long[] {0L, 20L, 30L});
+    List<Long> offsets = loc.getStorageInfo().getChunkOffsets();
+    locations.add(loc);
+    int partitionNumber = 10;
+    Long sum = 0L;
+    for (int i = 0; i < partitionNumber; i++) {
+      Map<String, Pair<Integer, Integer>> res =
+          CelebornPartitionUtil.splitSkewedPartitionLocations(locations, 
partitionNumber, i);
+      if (!res.isEmpty()) {
+        int l = res.get(loc.getUniqueId()).getLeft();
+        int r = res.get(loc.getUniqueId()).getRight();
+        Assert.assertTrue(r >= l);
+        for (int j = l; j <= r; j++) {
+          sum += offsets.get(j + 1) - offsets.get(j);
+        }
+      }
+    }
+    Assert.assertEquals(offsets.get(offsets.size() - 1), sum);
+  }
+
   @Test
   public void testSkewPartitionSplit() {
 

Reply via email to