This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 5bbe25e6c [#2675] test(spark)(followup): Add tests for 
Roaring64NavigableMap optimization in checkSentBlockCount (#2692)
5bbe25e6c is described below

commit 5bbe25e6cb643beaa9efe409f0e3a37ca5d86109
Author: zhan7236 <[email protected]>
AuthorDate: Tue Dec 2 17:39:24 2025 +0800

    [#2675] test(spark)(followup): Add tests for Roaring64NavigableMap 
optimization in checkSentBlockCount (#2692)
    
    ### What changes were proposed in this pull request?
    
    Add unit tests to verify the `Roaring64NavigableMap` optimization 
introduced in PR #2687 for filtering duplicate blockIds from multiple replicas 
in `RssShuffleWriter#checkSentBlockCount`.
    
    ### Why are the changes needed?
    
    As suggested by the reviewer in PR #2687, tests should be added to cover 
the `Roaring64NavigableMap` optimization change.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New tests added:
    - `testRoaring64NavigableMapDeduplication`: Verifies correct deduplication 
of blockIds across multiple servers (replicas)
    - `testRoaring64NavigableMapWithLargeBlockIds`: Verifies behavior with 
large number of consecutive blockIds
    
    Test results:
    - Spark3 `RssShuffleWriterTest`: 9 tests passed (including 2 new tests)
    - Spark2 `RssShuffleWriterTest`: 5 tests passed (including 2 new tests)
    
    Related PR: #2687
---
 .../spark/shuffle/writer/RssShuffleWriterTest.java | 73 ++++++++++++++++++++++
 .../spark/shuffle/writer/RssShuffleWriterTest.java | 73 ++++++++++++++++++++++
 2 files changed, 146 insertions(+)

diff --git 
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
 
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
index 331697b85..90b4023fa 100644
--- 
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
+++ 
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
@@ -47,6 +47,7 @@ import org.apache.spark.shuffle.RssShuffleManager;
 import org.apache.spark.shuffle.RssSparkConfig;
 import org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo;
 import org.junit.jupiter.api.Test;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import org.apache.uniffle.client.api.ShuffleManagerClient;
 import org.apache.uniffle.client.api.ShuffleWriteClient;
@@ -509,4 +510,76 @@ public class RssShuffleWriterTest {
     }
     return shuffleBlockInfoList;
   }
+
+  /**
+   * Test that Roaring64NavigableMap correctly filters duplicate blockIds from 
multiple replicas.
+   * This tests the optimization introduced in issue #2675.
+   */
+  @Test
+  public void testRoaring64NavigableMapDeduplication() {
+    // Simulate serverToPartitionToBlockIds with duplicate blockIds across 
multiple servers
+    // (representing replicas)
+    Map<ShuffleServerInfo, Map<Integer, Set<Long>>> 
serverToPartitionToBlockIds = Maps.newHashMap();
+
+    final ShuffleServerInfo server1 = new ShuffleServerInfo("id1", "host1", 
100);
+    final ShuffleServerInfo server2 = new ShuffleServerInfo("id2", "host2", 
100);
+    final ShuffleServerInfo server3 = new ShuffleServerInfo("id3", "host3", 
100);
+
+    // Server1 has blockIds: 1, 2, 3 for partition 0
+    Map<Integer, Set<Long>> server1Partitions = Maps.newHashMap();
+    server1Partitions.put(0, Sets.newHashSet(1L, 2L, 3L));
+    serverToPartitionToBlockIds.put(server1, server1Partitions);
+
+    // Server2 has same blockIds (replica): 1, 2, 3 for partition 0
+    Map<Integer, Set<Long>> server2Partitions = Maps.newHashMap();
+    server2Partitions.put(0, Sets.newHashSet(1L, 2L, 3L));
+    serverToPartitionToBlockIds.put(server2, server2Partitions);
+
+    // Server3 has blockIds: 4, 5 for partition 1
+    Map<Integer, Set<Long>> server3Partitions = Maps.newHashMap();
+    server3Partitions.put(1, Sets.newHashSet(4L, 5L));
+    serverToPartitionToBlockIds.put(server3, server3Partitions);
+
+    // Using Roaring64NavigableMap to filter duplicates (as implemented in 
checkSentBlockCount)
+    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    for (Map<Integer, Set<Long>> partitionBlockIds : 
serverToPartitionToBlockIds.values()) {
+      partitionBlockIds.values().forEach(x -> 
x.forEach(blockIdBitmap::addLong));
+    }
+
+    // Expected unique blockIds: 1, 2, 3, 4, 5 = 5 unique blocks
+    // Without deduplication: 1,2,3 + 1,2,3 + 4,5 = 8 blocks
+    assertEquals(5, blockIdBitmap.getLongCardinality());
+
+    // Verify the bitmap contains all expected blockIds
+    assertTrue(blockIdBitmap.contains(1L));
+    assertTrue(blockIdBitmap.contains(2L));
+    assertTrue(blockIdBitmap.contains(3L));
+    assertTrue(blockIdBitmap.contains(4L));
+    assertTrue(blockIdBitmap.contains(5L));
+  }
+
+  /**
+   * Test Roaring64NavigableMap with large number of consecutive blockIds. 
This verifies the memory
+   * efficiency benefit of using Roaring64NavigableMap.
+   */
+  @Test
+  public void testRoaring64NavigableMapWithLargeBlockIds() {
+    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+
+    // Add a large range of consecutive blockIds (simulating real shuffle 
scenario)
+    int numBlocks = 100000;
+    for (long i = 0; i < numBlocks; i++) {
+      blockIdBitmap.addLong(i);
+    }
+
+    assertEquals(numBlocks, blockIdBitmap.getLongCardinality());
+
+    // Add same blockIds again (simulating replicas)
+    for (long i = 0; i < numBlocks; i++) {
+      blockIdBitmap.addLong(i);
+    }
+
+    // Count should remain the same after adding duplicates
+    assertEquals(numBlocks, blockIdBitmap.getLongCardinality());
+  }
 }
diff --git 
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
 
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
index 9e9e68db5..dd52aa915 100644
--- 
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
+++ 
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
@@ -54,6 +54,7 @@ import 
org.apache.spark.shuffle.handle.MutableShuffleHandleInfo;
 import org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import org.apache.uniffle.client.api.ShuffleManagerClient;
 import org.apache.uniffle.client.api.ShuffleWriteClient;
@@ -1064,4 +1065,76 @@ public class RssShuffleWriterTest {
     }
     return shuffleBlockInfoList;
   }
+
+  /**
+   * Test that Roaring64NavigableMap correctly filters duplicate blockIds from 
multiple replicas.
+   * This tests the optimization introduced in issue #2675.
+   */
+  @Test
+  public void testRoaring64NavigableMapDeduplication() {
+    // Simulate serverToPartitionToBlockIds with duplicate blockIds across 
multiple servers
+    // (representing replicas)
+    Map<ShuffleServerInfo, Map<Integer, Set<Long>>> 
serverToPartitionToBlockIds = Maps.newHashMap();
+
+    final ShuffleServerInfo server1 = new ShuffleServerInfo("id1", "host1", 
100);
+    final ShuffleServerInfo server2 = new ShuffleServerInfo("id2", "host2", 
100);
+    final ShuffleServerInfo server3 = new ShuffleServerInfo("id3", "host3", 
100);
+
+    // Server1 has blockIds: 1, 2, 3 for partition 0
+    Map<Integer, Set<Long>> server1Partitions = Maps.newHashMap();
+    server1Partitions.put(0, Sets.newHashSet(1L, 2L, 3L));
+    serverToPartitionToBlockIds.put(server1, server1Partitions);
+
+    // Server2 has same blockIds (replica): 1, 2, 3 for partition 0
+    Map<Integer, Set<Long>> server2Partitions = Maps.newHashMap();
+    server2Partitions.put(0, Sets.newHashSet(1L, 2L, 3L));
+    serverToPartitionToBlockIds.put(server2, server2Partitions);
+
+    // Server3 has blockIds: 4, 5 for partition 1
+    Map<Integer, Set<Long>> server3Partitions = Maps.newHashMap();
+    server3Partitions.put(1, Sets.newHashSet(4L, 5L));
+    serverToPartitionToBlockIds.put(server3, server3Partitions);
+
+    // Using Roaring64NavigableMap to filter duplicates (as implemented in 
checkSentBlockCount)
+    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+    for (Map<Integer, Set<Long>> partitionBlockIds : 
serverToPartitionToBlockIds.values()) {
+      partitionBlockIds.values().forEach(x -> 
x.forEach(blockIdBitmap::addLong));
+    }
+
+    // Expected unique blockIds: 1, 2, 3, 4, 5 = 5 unique blocks
+    // Without deduplication: 1,2,3 + 1,2,3 + 4,5 = 8 blocks
+    assertEquals(5, blockIdBitmap.getLongCardinality());
+
+    // Verify the bitmap contains all expected blockIds
+    assertTrue(blockIdBitmap.contains(1L));
+    assertTrue(blockIdBitmap.contains(2L));
+    assertTrue(blockIdBitmap.contains(3L));
+    assertTrue(blockIdBitmap.contains(4L));
+    assertTrue(blockIdBitmap.contains(5L));
+  }
+
+  /**
+   * Test Roaring64NavigableMap with large number of consecutive blockIds. 
This verifies the memory
+   * efficiency benefit of using Roaring64NavigableMap.
+   */
+  @Test
+  public void testRoaring64NavigableMapWithLargeBlockIds() {
+    Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
+
+    // Add a large range of consecutive blockIds (simulating real shuffle 
scenario)
+    int numBlocks = 100000;
+    for (long i = 0; i < numBlocks; i++) {
+      blockIdBitmap.addLong(i);
+    }
+
+    assertEquals(numBlocks, blockIdBitmap.getLongCardinality());
+
+    // Add same blockIds again (simulating replicas)
+    for (long i = 0; i < numBlocks; i++) {
+      blockIdBitmap.addLong(i);
+    }
+
+    // Count should remain the same after adding duplicates
+    assertEquals(numBlocks, blockIdBitmap.getLongCardinality());
+  }
 }

Reply via email to