This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new b53c535c5 [#2675] improvement(spark): Optimize `checkSentBlockCount` 
by using Roaring64NavigableMap (#2687)
b53c535c5 is described below

commit b53c535c538b1f93b2516b1abf8b6ef92c784b5d
Author: zhan7236 <[email protected]>
AuthorDate: Mon Dec 1 10:32:44 2025 +0800

    [#2675] improvement(spark): Optimize `checkSentBlockCount` by using 
Roaring64NavigableMap (#2687)
    
    ### What changes were proposed in this pull request?
    
    Replace `HashSet<Long>` with `Roaring64NavigableMap` in 
`RssShuffleWriter#checkSentBlockCount` method for both Spark2 and Spark3 
clients. This optimization uses a compressed bitmap data structure to filter 
duplicate blockIds from multiple replicas.
    
    Changes:
    - Added import for `org.roaringbitmap.longlong.Roaring64NavigableMap`
    - Replaced `Set<Long> blockIds = new HashSet<>()` with 
`Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf()`
    - Changed `blockIds.addAll(x)` to `x.forEach(blockIdBitmap::addLong)`
    - Changed `blockIds.size()` to `blockIdBitmap.getLongCardinality()`
    
    ### Why are the changes needed?
    
    `Roaring64NavigableMap` is a compressed bitmap data structure that is more 
memory-efficient than `HashSet<Long>`, especially when storing large numbers of 
blockIds (which are typically consecutive or near-consecutive long integers). 
This optimization can significantly reduce memory usage in large-scale shuffle 
scenarios.
    
    Fix: #2675
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    - Compiled successfully with both `spark2` and `spark3` profiles
    - All existing unit tests pass:
      - `RssShuffleWriterTest` for Spark3: 7 tests passed
      - `RssShuffleWriterTest` for Spark2: 3 tests passed
    - Code style verified with `mvn spotless:check`
---
 .../java/org/apache/spark/shuffle/writer/RssShuffleWriter.java     | 7 ++++---
 .../java/org/apache/spark/shuffle/writer/RssShuffleWriter.java     | 7 ++++---
 2 files changed, 8 insertions(+), 6 deletions(-)

diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 1cd8113c0..fd7f72ccb 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -59,6 +59,7 @@ import org.apache.spark.shuffle.ShuffleWriter;
 import org.apache.spark.shuffle.handle.ShuffleHandleInfo;
 import org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo;
 import org.apache.spark.storage.BlockManagerId;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -335,11 +336,11 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     }
 
     // to filter the multiple replica's duplicate blockIds
-    Set<Long> blockIds = new HashSet<>();
+    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
     for (Map<Integer, Set<Long>> partitionBlockIds : 
serverToPartitionToBlockIds.values()) {
-      partitionBlockIds.values().forEach(x -> blockIds.addAll(x));
+      partitionBlockIds.values().forEach(x -> 
x.forEach(blockIdBitmap::addLong));
     }
-    long serverTracked = blockIds.size();
+    long serverTracked = blockIdBitmap.getLongCardinality();
     if (expected != serverTracked || expected != bufferManagerTracked) {
       throw new RssSendFailedException(
           "Potential block loss may occur for task["
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 596746346..ae61cb1fe 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -66,6 +66,7 @@ import org.apache.spark.shuffle.ShuffleWriter;
 import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo;
 import org.apache.spark.shuffle.handle.ShuffleHandleInfo;
 import org.apache.spark.storage.BlockManagerId;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -448,11 +449,11 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     }
 
     // to filter the multiple replica's duplicate blockIds
-    Set<Long> blockIds = new HashSet<>();
+    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
     for (Map<Integer, Set<Long>> partitionBlockIds : 
serverToPartitionToBlockIds.values()) {
-      partitionBlockIds.values().forEach(x -> blockIds.addAll(x));
+      partitionBlockIds.values().forEach(x -> 
x.forEach(blockIdBitmap::addLong));
     }
-    long serverTracked = blockIds.size();
+    long serverTracked = blockIdBitmap.getLongCardinality();
     if (expected != serverTracked || expected != bufferManagerTracked) {
       throw new RssSendFailedException(
           "Potential block loss may occur for task["

Reply via email to