This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 5bbe25e6c [#2675] test(spark)(followup): Add tests for
Roaring64NavigableMap optimization in checkSentBlockCount (#2692)
5bbe25e6c is described below
commit 5bbe25e6cb643beaa9efe409f0e3a37ca5d86109
Author: zhan7236 <[email protected]>
AuthorDate: Tue Dec 2 17:39:24 2025 +0800
[#2675] test(spark)(followup): Add tests for Roaring64NavigableMap
optimization in checkSentBlockCount (#2692)
### What changes were proposed in this pull request?
Add unit tests to verify the `Roaring64NavigableMap` optimization
introduced in PR #2687 for filtering duplicate blockIds from multiple replicas
in `RssShuffleWriter#checkSentBlockCount`.
### Why are the changes needed?
As suggested by the reviewer in PR #2687, tests should be added to cover
the `Roaring64NavigableMap` optimization change.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New tests added:
- `testRoaring64NavigableMapDeduplication`: Verifies correct deduplication
of blockIds across multiple servers (replicas)
- `testRoaring64NavigableMapWithLargeBlockIds`: Verifies behavior with
large number of consecutive blockIds
Test results:
- Spark3 `RssShuffleWriterTest`: 9 tests passed (including 2 new tests)
- Spark2 `RssShuffleWriterTest`: 5 tests passed (including 2 new tests)
Related PR: #2687
---
.../spark/shuffle/writer/RssShuffleWriterTest.java | 73 ++++++++++++++++++++++
.../spark/shuffle/writer/RssShuffleWriterTest.java | 73 ++++++++++++++++++++++
2 files changed, 146 insertions(+)
diff --git
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
index 331697b85..90b4023fa 100644
---
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
+++
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
@@ -47,6 +47,7 @@ import org.apache.spark.shuffle.RssShuffleManager;
import org.apache.spark.shuffle.RssSparkConfig;
import org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo;
import org.junit.jupiter.api.Test;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.client.api.ShuffleManagerClient;
import org.apache.uniffle.client.api.ShuffleWriteClient;
@@ -509,4 +510,76 @@ public class RssShuffleWriterTest {
}
return shuffleBlockInfoList;
}
+
+ /**
+ * Test that Roaring64NavigableMap correctly filters duplicate blockIds from
multiple replicas.
+ * This tests the optimization introduced in issue #2675.
+ */
+ @Test
+ public void testRoaring64NavigableMapDeduplication() {
+ // Simulate serverToPartitionToBlockIds with duplicate blockIds across
multiple servers
+ // (representing replicas)
+ Map<ShuffleServerInfo, Map<Integer, Set<Long>>>
serverToPartitionToBlockIds = Maps.newHashMap();
+
+ final ShuffleServerInfo server1 = new ShuffleServerInfo("id1", "host1",
100);
+ final ShuffleServerInfo server2 = new ShuffleServerInfo("id2", "host2",
100);
+ final ShuffleServerInfo server3 = new ShuffleServerInfo("id3", "host3",
100);
+
+ // Server1 has blockIds: 1, 2, 3 for partition 0
+ Map<Integer, Set<Long>> server1Partitions = Maps.newHashMap();
+ server1Partitions.put(0, Sets.newHashSet(1L, 2L, 3L));
+ serverToPartitionToBlockIds.put(server1, server1Partitions);
+
+ // Server2 has same blockIds (replica): 1, 2, 3 for partition 0
+ Map<Integer, Set<Long>> server2Partitions = Maps.newHashMap();
+ server2Partitions.put(0, Sets.newHashSet(1L, 2L, 3L));
+ serverToPartitionToBlockIds.put(server2, server2Partitions);
+
+ // Server3 has blockIds: 4, 5 for partition 1
+ Map<Integer, Set<Long>> server3Partitions = Maps.newHashMap();
+ server3Partitions.put(1, Sets.newHashSet(4L, 5L));
+ serverToPartitionToBlockIds.put(server3, server3Partitions);
+
+ // Using Roaring64NavigableMap to filter duplicates (as implemented in
checkSentBlockCount)
+ Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ for (Map<Integer, Set<Long>> partitionBlockIds :
serverToPartitionToBlockIds.values()) {
+ partitionBlockIds.values().forEach(x ->
x.forEach(blockIdBitmap::addLong));
+ }
+
+ // Expected unique blockIds: 1, 2, 3, 4, 5 = 5 unique blocks
+ // Without deduplication: 1,2,3 + 1,2,3 + 4,5 = 8 blocks
+ assertEquals(5, blockIdBitmap.getLongCardinality());
+
+ // Verify the bitmap contains all expected blockIds
+ assertTrue(blockIdBitmap.contains(1L));
+ assertTrue(blockIdBitmap.contains(2L));
+ assertTrue(blockIdBitmap.contains(3L));
+ assertTrue(blockIdBitmap.contains(4L));
+ assertTrue(blockIdBitmap.contains(5L));
+ }
+
+ /**
+ * Test Roaring64NavigableMap with large number of consecutive blockIds.
This verifies the memory
+ * efficiency benefit of using Roaring64NavigableMap.
+ */
+ @Test
+ public void testRoaring64NavigableMapWithLargeBlockIds() {
+ Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+
+ // Add a large range of consecutive blockIds (simulating real shuffle
scenario)
+ int numBlocks = 100000;
+ for (long i = 0; i < numBlocks; i++) {
+ blockIdBitmap.addLong(i);
+ }
+
+ assertEquals(numBlocks, blockIdBitmap.getLongCardinality());
+
+ // Add same blockIds again (simulating replicas)
+ for (long i = 0; i < numBlocks; i++) {
+ blockIdBitmap.addLong(i);
+ }
+
+ // Count should remain the same after adding duplicates
+ assertEquals(numBlocks, blockIdBitmap.getLongCardinality());
+ }
}
diff --git
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
index 9e9e68db5..dd52aa915 100644
---
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
+++
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
@@ -54,6 +54,7 @@ import
org.apache.spark.shuffle.handle.MutableShuffleHandleInfo;
import org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.client.api.ShuffleManagerClient;
import org.apache.uniffle.client.api.ShuffleWriteClient;
@@ -1064,4 +1065,76 @@ public class RssShuffleWriterTest {
}
return shuffleBlockInfoList;
}
+
+ /**
+ * Test that Roaring64NavigableMap correctly filters duplicate blockIds from
multiple replicas.
+ * This tests the optimization introduced in issue #2675.
+ */
+ @Test
+ public void testRoaring64NavigableMapDeduplication() {
+ // Simulate serverToPartitionToBlockIds with duplicate blockIds across
multiple servers
+ // (representing replicas)
+ Map<ShuffleServerInfo, Map<Integer, Set<Long>>>
serverToPartitionToBlockIds = Maps.newHashMap();
+
+ final ShuffleServerInfo server1 = new ShuffleServerInfo("id1", "host1",
100);
+ final ShuffleServerInfo server2 = new ShuffleServerInfo("id2", "host2",
100);
+ final ShuffleServerInfo server3 = new ShuffleServerInfo("id3", "host3",
100);
+
+ // Server1 has blockIds: 1, 2, 3 for partition 0
+ Map<Integer, Set<Long>> server1Partitions = Maps.newHashMap();
+ server1Partitions.put(0, Sets.newHashSet(1L, 2L, 3L));
+ serverToPartitionToBlockIds.put(server1, server1Partitions);
+
+ // Server2 has same blockIds (replica): 1, 2, 3 for partition 0
+ Map<Integer, Set<Long>> server2Partitions = Maps.newHashMap();
+ server2Partitions.put(0, Sets.newHashSet(1L, 2L, 3L));
+ serverToPartitionToBlockIds.put(server2, server2Partitions);
+
+ // Server3 has blockIds: 4, 5 for partition 1
+ Map<Integer, Set<Long>> server3Partitions = Maps.newHashMap();
+ server3Partitions.put(1, Sets.newHashSet(4L, 5L));
+ serverToPartitionToBlockIds.put(server3, server3Partitions);
+
+ // Using Roaring64NavigableMap to filter duplicates (as implemented in
checkSentBlockCount)
+ Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+ for (Map<Integer, Set<Long>> partitionBlockIds :
serverToPartitionToBlockIds.values()) {
+ partitionBlockIds.values().forEach(x ->
x.forEach(blockIdBitmap::addLong));
+ }
+
+ // Expected unique blockIds: 1, 2, 3, 4, 5 = 5 unique blocks
+ // Without deduplication: 1,2,3 + 1,2,3 + 4,5 = 8 blocks
+ assertEquals(5, blockIdBitmap.getLongCardinality());
+
+ // Verify the bitmap contains all expected blockIds
+ assertTrue(blockIdBitmap.contains(1L));
+ assertTrue(blockIdBitmap.contains(2L));
+ assertTrue(blockIdBitmap.contains(3L));
+ assertTrue(blockIdBitmap.contains(4L));
+ assertTrue(blockIdBitmap.contains(5L));
+ }
+
+ /**
+ * Test Roaring64NavigableMap with large number of consecutive blockIds.
This verifies the memory
+ * efficiency benefit of using Roaring64NavigableMap.
+ */
+ @Test
+ public void testRoaring64NavigableMapWithLargeBlockIds() {
+ Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+
+ // Add a large range of consecutive blockIds (simulating real shuffle
scenario)
+ int numBlocks = 100000;
+ for (long i = 0; i < numBlocks; i++) {
+ blockIdBitmap.addLong(i);
+ }
+
+ assertEquals(numBlocks, blockIdBitmap.getLongCardinality());
+
+ // Add same blockIds again (simulating replicas)
+ for (long i = 0; i < numBlocks; i++) {
+ blockIdBitmap.addLong(i);
+ }
+
+ // Count should remain the same after adding duplicates
+ assertEquals(numBlocks, blockIdBitmap.getLongCardinality());
+ }
}