This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 18655e286 [CELEBORN-1879] Ignore invalid chunk range generated by
splitSkewedPartitionLocations
18655e286 is described below
commit 18655e2869efc83d068c32a60e9879c4dcf5fe43
Author: wuziyi <[email protected]>
AuthorDate: Wed Feb 26 23:12:45 2025 +0800
[CELEBORN-1879] Ignore invalid chunk range generated by
splitSkewedPartitionLocations
### What changes were proposed in this pull request?
Ignore invalid chunk range generated by splitSkewedPartitionLocations
### Why are the changes needed?
Current implementation of skew partition split method
(`splitSkewedPartitionLocations`) may generate chunk range where `range.left`
> `range.right`. Such chunk range is invalid and will cause unnecessary
initialization of `PartitionReader`
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
ut
Closes #3117 from Z1Wu/feat/c1879.
Authored-by: wuziyi <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../shuffle/celeborn/CelebornPartitionUtil.java | 2 +-
.../celeborn/CelebornPartitionUtilSuiteJ.java | 23 ++++++++++++++++++++++
2 files changed, 24 insertions(+), 1 deletion(-)
diff --git
a/client-spark/spark-3-4/src/main/java/org/apache/spark/shuffle/celeborn/CelebornPartitionUtil.java
b/client-spark/spark-3-4/src/main/java/org/apache/spark/shuffle/celeborn/CelebornPartitionUtil.java
index da0fcc772..110c0e6c1 100644
---
a/client-spark/spark-3-4/src/main/java/org/apache/spark/shuffle/celeborn/CelebornPartitionUtil.java
+++
b/client-spark/spark-3-4/src/main/java/org/apache/spark/shuffle/celeborn/CelebornPartitionUtil.java
@@ -85,7 +85,7 @@ public class CelebornPartitionUtil {
if (currentOffset <= endOffset) {
right = j - 1;
}
- if (left >= 0 && right >= 0) {
+ if (left >= 0 && right >= 0 && right >= left) {
chunkRange.put(p.getUniqueId(), Pair.of(left, right));
}
j++;
diff --git
a/client-spark/spark-3-4/src/test/java/org/apache/spark/shuffle/celeborn/CelebornPartitionUtilSuiteJ.java
b/client-spark/spark-3-4/src/test/java/org/apache/spark/shuffle/celeborn/CelebornPartitionUtilSuiteJ.java
index 989dd31a9..06c431bf8 100644
---
a/client-spark/spark-3-4/src/test/java/org/apache/spark/shuffle/celeborn/CelebornPartitionUtilSuiteJ.java
+++
b/client-spark/spark-3-4/src/test/java/org/apache/spark/shuffle/celeborn/CelebornPartitionUtilSuiteJ.java
@@ -27,6 +27,29 @@ import org.apache.celeborn.common.protocol.PartitionLocation;
import org.apache.celeborn.common.protocol.StorageInfo;
public class CelebornPartitionUtilSuiteJ {
+ @Test
+ public void testSkewPartitionSplitIgnoreEmpty1() {
+ ArrayList<PartitionLocation> locations = new ArrayList<>();
+ PartitionLocation loc = genPartitionLocation(0, new Long[] {0L, 20L, 30L});
+ List<Long> offsets = loc.getStorageInfo().getChunkOffsets();
+ locations.add(loc);
+ int partitionNumber = 10;
+ Long sum = 0L;
+ for (int i = 0; i < partitionNumber; i++) {
+ Map<String, Pair<Integer, Integer>> res =
+ CelebornPartitionUtil.splitSkewedPartitionLocations(locations,
partitionNumber, i);
+ if (!res.isEmpty()) {
+ int l = res.get(loc.getUniqueId()).getLeft();
+ int r = res.get(loc.getUniqueId()).getRight();
+ Assert.assertTrue(r >= l);
+ for (int j = l; j <= r; j++) {
+ sum += offsets.get(j + 1) - offsets.get(j);
+ }
+ }
+ }
+ Assert.assertEquals(offsets.get(offsets.size() - 1), sum);
+ }
+
@Test
public void testSkewPartitionSplit() {