This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d162dcf6 [#2631] fix(server): Potential data loss due to the shuffle 
result report retry (#2632)
1d162dcf6 is described below

commit 1d162dcf65b247d298fb35d1b64e1c392f40f285
Author: yl09099 <[email protected]>
AuthorDate: Fri Sep 26 17:34:32 2025 +0800

    [#2631] fix(server): Potential data loss due to the shuffle result report 
retry (#2632)
    
    ### What changes were proposed in this pull request?
    
    Solve the problem of data loss caused by Spark task retries and Block 
metadata Report retries.
    
    ### Why are the changes needed?
    
    Fix: #2631
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Exist UT.
---
 .../apache/uniffle/server/ShuffleTaskManager.java  |  4 +--
 .../server/block/DefaultShuffleBlockIdManager.java | 35 ++++++++++++++++++----
 .../block/PartitionedShuffleBlockIdManager.java    | 31 ++++++++++++++++---
 .../server/block/ShuffleBlockIdManager.java        |  7 ++++-
 4 files changed, 65 insertions(+), 12 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 08db55836..f2c120464 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -908,9 +908,9 @@ public class ShuffleTaskManager {
 
       ShuffleBlockIdManager manager = 
shuffleTaskInfo.getShuffleBlockIdManager();
       if (manager != null) {
-        manager.removeBlockIdByAppId(appId);
+        manager.remove(appId);
       }
-      shuffleBlockIdManager.removeBlockIdByAppId(appId);
+      shuffleBlockIdManager.remove(appId);
       shuffleBufferManager.removeBuffer(appId);
       shuffleFlushManager.removeResources(appId);
 
diff --git 
a/server/src/main/java/org/apache/uniffle/server/block/DefaultShuffleBlockIdManager.java
 
b/server/src/main/java/org/apache/uniffle/server/block/DefaultShuffleBlockIdManager.java
index 3ddd4e81a..b0ba17368 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/block/DefaultShuffleBlockIdManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/block/DefaultShuffleBlockIdManager.java
@@ -23,6 +23,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
@@ -52,9 +54,12 @@ public class DefaultShuffleBlockIdManager implements 
ShuffleBlockIdManager {
   // configuration
   // appId -> shuffleId -> hashId -> blockIds
   private Map<String, Map<Integer, Roaring64NavigableMap[]>> 
partitionsToBlockIds;
+  // appId -- shuffleId_bitmapIndex -- Lock
+  private final Map<String, Map<String, ReadWriteLock>> bitmapLocks;
 
   public DefaultShuffleBlockIdManager() {
     this.partitionsToBlockIds = JavaUtils.newConcurrentMap();
+    this.bitmapLocks = JavaUtils.newConcurrentMap();
   }
 
   @VisibleForTesting
@@ -119,9 +124,12 @@ public class DefaultShuffleBlockIdManager implements 
ShuffleBlockIdManager {
     int totalUpdatedBlockCount = 0;
     for (Map.Entry<Integer, long[]> entry : partitionToBlockIds.entrySet()) {
       Integer partitionId = entry.getKey();
-      Roaring64NavigableMap bitmap = blockIds[partitionId % bitmapNum];
+      int bitmapIndex = partitionId % bitmapNum;
+      Roaring64NavigableMap bitmap = blockIds[bitmapIndex];
       int updatedBlockCount = 0;
-      synchronized (bitmap) {
+      ReadWriteLock lock = getLockForBitmap(appId, shuffleId, bitmapIndex);
+      lock.writeLock().lock();
+      try {
         for (long blockId : entry.getValue()) {
           if (!bitmap.contains(blockId)) {
             bitmap.addLong(blockId);
@@ -129,6 +137,8 @@ public class DefaultShuffleBlockIdManager implements 
ShuffleBlockIdManager {
             totalUpdatedBlockCount++;
           }
         }
+      } finally {
+        lock.writeLock().unlock();
       }
       taskInfo.incBlockNumber(shuffleId, partitionId, updatedBlockCount);
     }
@@ -166,9 +176,16 @@ public class DefaultShuffleBlockIdManager implements 
ShuffleBlockIdManager {
     }
     Roaring64NavigableMap res = Roaring64NavigableMap.bitmapOf();
     for (Map.Entry<Integer, Set<Integer>> entry : 
bitmapIndexToPartitions.entrySet()) {
+      Integer bitmapIndex = entry.getKey();
       Set<Integer> requestPartitions = entry.getValue();
-      Roaring64NavigableMap bitmap = blockIds[entry.getKey()];
-      getBlockIdsByPartitionId(requestPartitions, bitmap, res, blockIdLayout);
+      Roaring64NavigableMap bitmap = blockIds[bitmapIndex];
+      ReadWriteLock lock = getLockForBitmap(appId, shuffleId, bitmapIndex);
+      lock.readLock().lock();
+      try {
+        getBlockIdsByPartitionId(requestPartitions, bitmap, res, 
blockIdLayout);
+      } finally {
+        lock.readLock().unlock();
+      }
     }
     if (res.getLongCardinality() != expectedBlockNumber) {
       throw new RssException(
@@ -194,8 +211,9 @@ public class DefaultShuffleBlockIdManager implements 
ShuffleBlockIdManager {
   }
 
   @Override
-  public void removeBlockIdByAppId(String appId) {
+  public void remove(String appId) {
     partitionsToBlockIds.remove(appId);
+    bitmapLocks.remove(appId);
   }
 
   @Override
@@ -229,4 +247,11 @@ public class DefaultShuffleBlockIdManager implements 
ShuffleBlockIdManager {
   public long getBitmapNum(String appId, int shuffleId) {
     return partitionsToBlockIds.get(appId).get(shuffleId).length;
   }
+
+  private ReadWriteLock getLockForBitmap(String appId, int shuffleId, int 
bitmapArrLocation) {
+    Map<String, ReadWriteLock> innerMap =
+        bitmapLocks.computeIfAbsent(appId, k -> JavaUtils.newConcurrentMap());
+    String key = shuffleId + "_" + bitmapArrLocation;
+    return innerMap.computeIfAbsent(key, k -> new ReentrantReadWriteLock());
+  }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/block/PartitionedShuffleBlockIdManager.java
 
b/server/src/main/java/org/apache/uniffle/server/block/PartitionedShuffleBlockIdManager.java
index f31ff8fc1..c7d9f2657 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/block/PartitionedShuffleBlockIdManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/block/PartitionedShuffleBlockIdManager.java
@@ -23,6 +23,8 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
@@ -40,9 +42,12 @@ public class PartitionedShuffleBlockIdManager implements 
ShuffleBlockIdManager {
 
   // appId -> shuffleId -> partitionId -> blockIds
   private Map<String, Map<Integer, Map<Integer, Roaring64NavigableMap>>> 
partitionsToBlockIds;
+  // appId -- shuffleId_parition -- Lock
+  private final Map<String, Map<String, ReadWriteLock>> bitmapLocks;
 
   public PartitionedShuffleBlockIdManager() {
     this.partitionsToBlockIds = new ConcurrentHashMap<>();
+    this.bitmapLocks = JavaUtils.newConcurrentMap();
   }
 
   public void registerAppId(String appId) {
@@ -70,7 +75,9 @@ public class PartitionedShuffleBlockIdManager implements 
ShuffleBlockIdManager {
       partitions.computeIfAbsent(partitionId, k -> 
Roaring64NavigableMap.bitmapOf());
       Roaring64NavigableMap bitmap = partitions.get(partitionId);
       int updatedBlockCount = 0;
-      synchronized (bitmap) {
+      ReadWriteLock lock = getLockForBitmap(appId, shuffleId, partitionId);
+      lock.writeLock().lock();
+      try {
         for (long blockId : entry.getValue()) {
           if (!bitmap.contains(blockId)) {
             bitmap.addLong(blockId);
@@ -78,6 +85,8 @@ public class PartitionedShuffleBlockIdManager implements 
ShuffleBlockIdManager {
             totalUpdatedBlockCount++;
           }
         }
+      } finally {
+        lock.writeLock().unlock();
       }
       taskInfo.incBlockNumber(shuffleId, partitionId, updatedBlockCount);
     }
@@ -105,8 +114,14 @@ public class PartitionedShuffleBlockIdManager implements 
ShuffleBlockIdManager {
     Roaring64NavigableMap res = Roaring64NavigableMap.bitmapOf();
     for (int partitionId : partitions) {
       expectedBlockNumber += taskInfo.getBlockNumber(shuffleId, partitionId);
-      Roaring64NavigableMap bitmap = partitionToBlockId.get(partitionId);
-      res.or(bitmap);
+      ReadWriteLock lockForBitmap = getLockForBitmap(appId, shuffleId, 
partitionId);
+      lockForBitmap.readLock().lock();
+      try {
+        Roaring64NavigableMap bitmap = partitionToBlockId.get(partitionId);
+        res.or(bitmap);
+      } finally {
+        lockForBitmap.readLock().unlock();
+      }
     }
 
     if (res.getLongCardinality() != expectedBlockNumber) {
@@ -123,8 +138,9 @@ public class PartitionedShuffleBlockIdManager implements 
ShuffleBlockIdManager {
   }
 
   @Override
-  public void removeBlockIdByAppId(String appId) {
+  public void remove(String appId) {
     partitionsToBlockIds.remove(appId);
+    bitmapLocks.remove(appId);
   }
 
   @Override
@@ -167,4 +183,11 @@ public class PartitionedShuffleBlockIdManager implements 
ShuffleBlockIdManager {
   public long getBitmapNum(String appId, int shuffleId) {
     return partitionsToBlockIds.get(appId).get(shuffleId).size();
   }
+
+  private ReadWriteLock getLockForBitmap(String appId, int shuffleId, int 
partititionId) {
+    Map<String, ReadWriteLock> innerMap =
+        bitmapLocks.computeIfAbsent(appId, k -> JavaUtils.newConcurrentMap());
+    String key = shuffleId + "_" + partititionId;
+    return innerMap.computeIfAbsent(key, k -> new ReentrantReadWriteLock());
+  }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/block/ShuffleBlockIdManager.java
 
b/server/src/main/java/org/apache/uniffle/server/block/ShuffleBlockIdManager.java
index 130be9afb..13fefd3e4 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/block/ShuffleBlockIdManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/block/ShuffleBlockIdManager.java
@@ -46,7 +46,12 @@ public interface ShuffleBlockIdManager {
 
   void removeBlockIdByShuffleId(String appId, List<Integer> shuffleIds);
 
-  void removeBlockIdByAppId(String appId);
+  /**
+   * Clear the objects during the operation of the APP.
+   *
+   * @param appId
+   */
+  void remove(String appId);
 
   long getTotalBlockCount();
 

Reply via email to