This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e4b940  [Improvement] Add RssUtils#cloneBitMap() (#103)
4e4b940 is described below

commit 4e4b9400940fa4ed53af370f7bb54d85861ddbcc
Author: Kaijie Chen <[email protected]>
AuthorDate: Fri Jul 29 16:49:55 2022 +0800

    [Improvement] Add RssUtils#cloneBitMap() (#103)
    
    ### What changes were proposed in this pull request?
    
    1. Add `RssUtils#cloneBitMap()`.
    2. Replace `deserializeBitMap(serializeBitMap(bitmap))` by 
`cloneBitMap(bitmap)`.
    
    ### Why are the changes needed?
    
    1. No need to handle `IOException`.
    2. More efficient.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New unit test `RssUtilsTest#testCloneBitmap()`
---
 .../apache/uniffle/client/impl/ShuffleReadClientImpl.java   | 13 ++-----------
 .../main/java/org/apache/uniffle/common/util/RssUtils.java  |  6 ++++++
 .../java/org/apache/uniffle/common/util/RssUtilsTest.java   |  9 +++++++++
 .../org/apache/uniffle/test/SparkClientWithLocalTest.java   |  2 +-
 .../java/org/apache/uniffle/server/ShuffleTaskManager.java  |  7 ++-----
 5 files changed, 20 insertions(+), 17 deletions(-)

diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index fc8b4d3..5059f87 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -17,7 +17,6 @@
 
 package org.apache.uniffle.client.impl;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Queue;
@@ -110,11 +109,7 @@ public class ShuffleReadClientImpl implements 
ShuffleReadClient {
     }
 
     // copy blockIdBitmap to track all pending blocks
-    try {
-      pendingBlockIds = 
RssUtils.deserializeBitMap(RssUtils.serializeBitMap(blockIdBitmap));
-    } catch (IOException ioe) {
-      throw new RuntimeException("Can't create pending blockIds.", ioe);
-    }
+    pendingBlockIds = RssUtils.cloneBitMap(blockIdBitmap);
 
     clientReadHandler = 
ShuffleHandlerFactory.getInstance().createShuffleReadHandler(request);
   }
@@ -213,11 +208,7 @@ public class ShuffleReadClientImpl implements 
ShuffleReadClient {
   @Override
   public void checkProcessedBlockIds() {
     Roaring64NavigableMap cloneBitmap;
-    try {
-      cloneBitmap = 
RssUtils.deserializeBitMap(RssUtils.serializeBitMap(blockIdBitmap));
-    } catch (IOException ioe) {
-      throw new RuntimeException("Can't validate processed blockIds.", ioe);
-    }
+    cloneBitmap = RssUtils.cloneBitMap(blockIdBitmap);
     cloneBitmap.and(processedBlockIds);
     if (!blockIdBitmap.equals(cloneBitmap)) {
       throw new RssException("Blocks read inconsistent: expected " + 
blockIdBitmap.getLongCardinality()
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java 
b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index 2219aba..8b7f500 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -172,6 +172,12 @@ public class RssUtils {
     return bitmap;
   }
 
+  public static Roaring64NavigableMap cloneBitMap(Roaring64NavigableMap 
bitmap) {
+    Roaring64NavigableMap clone = Roaring64NavigableMap.bitmapOf();
+    clone.or(bitmap);
+    return clone;
+  }
+
   public static List<ShuffleDataSegment> transIndexDataToSegments(
       ShuffleIndexResult shuffleIndexResult, int readBufferSize) {
     if (shuffleIndexResult == null || shuffleIndexResult.isEmpty()) {
diff --git 
a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java 
b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
index 6d8292e..3c946eb 100644
--- a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
@@ -36,6 +36,7 @@ import org.apache.uniffle.common.ShuffleIndexResult;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
@@ -88,6 +89,14 @@ public class RssUtilsTest {
     assertEquals(Roaring64NavigableMap.bitmapOf(), 
RssUtils.deserializeBitMap(new byte[]{}));
   }
 
+  @Test
+  public void testCloneBitmap() {
+    Roaring64NavigableMap bitmap1 = Roaring64NavigableMap.bitmapOf(1, 2, 100, 
10000);
+    Roaring64NavigableMap bitmap2 = RssUtils.cloneBitMap(bitmap1);
+    assertNotSame(bitmap1, bitmap2);
+    assertEquals(bitmap1, bitmap2);
+  }
+
   @Test
   public void testShuffleIndexSegment() {
     ShuffleIndexResult shuffleIndexResult = new ShuffleIndexResult();
diff --git 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
index b0e68d2..4c4e190 100644
--- 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
+++ 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
@@ -318,7 +318,7 @@ public class SparkClientWithLocalTest extends 
ShuffleReadWriteBase {
     ShuffleReadClientImpl readClient;
 
     createTestData(testAppId, expectedData, blockIdBitmap, taskIdBitmap);
-    Roaring64NavigableMap beforeAdded = 
RssUtils.deserializeBitMap(RssUtils.serializeBitMap(blockIdBitmap));
+    Roaring64NavigableMap beforeAdded = RssUtils.cloneBitMap(blockIdBitmap);
     // write data by another task, read data again, the cache for index file 
should be updated
     blocks = createShuffleBlockList(
         0, 0, 1, 3, 25, blockIdBitmap, Maps.newHashMap(), mockSSI);
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 0876c51..3243c2d 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -170,11 +170,9 @@ public class ShuffleTaskManager {
       if (System.currentTimeMillis() - start > commitTimeout) {
         throw new RuntimeException("Shuffle data commit timeout for " + 
commitTimeout + " ms");
       }
-      byte[] bitmapBytes;
       synchronized (cachedBlockIds) {
-        bitmapBytes = RssUtils.serializeBitMap(cachedBlockIds);
+        cloneBlockIds = RssUtils.cloneBitMap(cachedBlockIds);
       }
-      cloneBlockIds = RssUtils.deserializeBitMap(bitmapBytes);
       long expectedCommitted = cloneBlockIds.getLongCardinality();
       shuffleBufferManager.commitShuffleTask(appId, shuffleId);
       Roaring64NavigableMap committedBlockIds;
@@ -183,9 +181,8 @@ public class ShuffleTaskManager {
       while (true) {
         committedBlockIds = shuffleFlushManager.getCommittedBlockIds(appId, 
shuffleId);
         synchronized (committedBlockIds) {
-          bitmapBytes = RssUtils.serializeBitMap(committedBlockIds);
+          cloneCommittedBlockIds = RssUtils.cloneBitMap(committedBlockIds);
         }
-        cloneCommittedBlockIds = RssUtils.deserializeBitMap(bitmapBytes);
         cloneBlockIds.andNot(cloneCommittedBlockIds);
         if (cloneBlockIds.isEmpty()) {
           break;

Reply via email to