This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new b53c535c5 [#2675] improvement(spark): Optimize `checkSentBlockCount`
by using Roaring64NavigableMap (#2687)
b53c535c5 is described below
commit b53c535c538b1f93b2516b1abf8b6ef92c784b5d
Author: zhan7236 <[email protected]>
AuthorDate: Mon Dec 1 10:32:44 2025 +0800
[#2675] improvement(spark): Optimize `checkSentBlockCount` by using
Roaring64NavigableMap (#2687)
### What changes were proposed in this pull request?
Replace `HashSet<Long>` with `Roaring64NavigableMap` in
`RssShuffleWriter#checkSentBlockCount` method for both Spark2 and Spark3
clients. This optimization uses a compressed bitmap data structure to filter
duplicate blockIds from multiple replicas.
Changes:
- Added import for `org.roaringbitmap.longlong.Roaring64NavigableMap`
- Replaced `Set<Long> blockIds = new HashSet<>()` with
`Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf()`
- Changed `blockIds.addAll(x)` to `x.forEach(blockIdBitmap::addLong)`
- Changed `blockIds.size()` to `blockIdBitmap.getLongCardinality()`
### Why are the changes needed?
`Roaring64NavigableMap` is a compressed bitmap data structure that is more
memory-efficient than `HashSet<Long>`, especially when storing large numbers of
blockIds (which are typically consecutive or near-consecutive long integers).
This optimization can significantly reduce memory usage in large-scale shuffle
scenarios.
Fix: #2675
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- Compiled successfully with both `spark2` and `spark3` profiles
- All existing unit tests pass:
- `RssShuffleWriterTest` for Spark3: 7 tests passed
- `RssShuffleWriterTest` for Spark2: 3 tests passed
- Code style verified with `mvn spotless:check`
---
.../java/org/apache/spark/shuffle/writer/RssShuffleWriter.java | 7 ++++---
.../java/org/apache/spark/shuffle/writer/RssShuffleWriter.java | 7 ++++---
2 files changed, 8 insertions(+), 6 deletions(-)
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 1cd8113c0..fd7f72ccb 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -59,6 +59,7 @@ import org.apache.spark.shuffle.ShuffleWriter;
import org.apache.spark.shuffle.handle.ShuffleHandleInfo;
import org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo;
import org.apache.spark.storage.BlockManagerId;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -335,11 +336,11 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
}
// to filter the multiple replica's duplicate blockIds
- Set<Long> blockIds = new HashSet<>();
+ Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
for (Map<Integer, Set<Long>> partitionBlockIds :
serverToPartitionToBlockIds.values()) {
- partitionBlockIds.values().forEach(x -> blockIds.addAll(x));
+ partitionBlockIds.values().forEach(x ->
x.forEach(blockIdBitmap::addLong));
}
- long serverTracked = blockIds.size();
+ long serverTracked = blockIdBitmap.getLongCardinality();
if (expected != serverTracked || expected != bufferManagerTracked) {
throw new RssSendFailedException(
"Potential block loss may occur for task["
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 596746346..ae61cb1fe 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -66,6 +66,7 @@ import org.apache.spark.shuffle.ShuffleWriter;
import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo;
import org.apache.spark.shuffle.handle.ShuffleHandleInfo;
import org.apache.spark.storage.BlockManagerId;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -448,11 +449,11 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
}
// to filter the multiple replica's duplicate blockIds
- Set<Long> blockIds = new HashSet<>();
+ Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
for (Map<Integer, Set<Long>> partitionBlockIds :
serverToPartitionToBlockIds.values()) {
- partitionBlockIds.values().forEach(x -> blockIds.addAll(x));
+ partitionBlockIds.values().forEach(x ->
x.forEach(blockIdBitmap::addLong));
}
- long serverTracked = blockIds.size();
+ long serverTracked = blockIdBitmap.getLongCardinality();
if (expected != serverTracked || expected != bufferManagerTracked) {
throw new RssSendFailedException(
"Potential block loss may occur for task["