This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 1d162dcf6 [#2631] fix(server): Potential data loss due to the shuffle
result report retry (#2632)
1d162dcf6 is described below
commit 1d162dcf65b247d298fb35d1b64e1c392f40f285
Author: yl09099 <[email protected]>
AuthorDate: Fri Sep 26 17:34:32 2025 +0800
[#2631] fix(server): Potential data loss due to the shuffle result report
retry (#2632)
### What changes were proposed in this pull request?
Solve the problem of data loss caused by Spark task retries and Block
metadata Report retries.
### Why are the changes needed?
Fix: #2631
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Exist UT.
---
.../apache/uniffle/server/ShuffleTaskManager.java | 4 +--
.../server/block/DefaultShuffleBlockIdManager.java | 35 ++++++++++++++++++----
.../block/PartitionedShuffleBlockIdManager.java | 31 ++++++++++++++++---
.../server/block/ShuffleBlockIdManager.java | 7 ++++-
4 files changed, 65 insertions(+), 12 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 08db55836..f2c120464 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -908,9 +908,9 @@ public class ShuffleTaskManager {
ShuffleBlockIdManager manager =
shuffleTaskInfo.getShuffleBlockIdManager();
if (manager != null) {
- manager.removeBlockIdByAppId(appId);
+ manager.remove(appId);
}
- shuffleBlockIdManager.removeBlockIdByAppId(appId);
+ shuffleBlockIdManager.remove(appId);
shuffleBufferManager.removeBuffer(appId);
shuffleFlushManager.removeResources(appId);
diff --git
a/server/src/main/java/org/apache/uniffle/server/block/DefaultShuffleBlockIdManager.java
b/server/src/main/java/org/apache/uniffle/server/block/DefaultShuffleBlockIdManager.java
index 3ddd4e81a..b0ba17368 100644
---
a/server/src/main/java/org/apache/uniffle/server/block/DefaultShuffleBlockIdManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/block/DefaultShuffleBlockIdManager.java
@@ -23,6 +23,8 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
@@ -52,9 +54,12 @@ public class DefaultShuffleBlockIdManager implements
ShuffleBlockIdManager {
// configuration
// appId -> shuffleId -> hashId -> blockIds
private Map<String, Map<Integer, Roaring64NavigableMap[]>>
partitionsToBlockIds;
+ // appId -- shuffleId_bitmapIndex -- Lock
+ private final Map<String, Map<String, ReadWriteLock>> bitmapLocks;
public DefaultShuffleBlockIdManager() {
this.partitionsToBlockIds = JavaUtils.newConcurrentMap();
+ this.bitmapLocks = JavaUtils.newConcurrentMap();
}
@VisibleForTesting
@@ -119,9 +124,12 @@ public class DefaultShuffleBlockIdManager implements
ShuffleBlockIdManager {
int totalUpdatedBlockCount = 0;
for (Map.Entry<Integer, long[]> entry : partitionToBlockIds.entrySet()) {
Integer partitionId = entry.getKey();
- Roaring64NavigableMap bitmap = blockIds[partitionId % bitmapNum];
+ int bitmapIndex = partitionId % bitmapNum;
+ Roaring64NavigableMap bitmap = blockIds[bitmapIndex];
int updatedBlockCount = 0;
- synchronized (bitmap) {
+ ReadWriteLock lock = getLockForBitmap(appId, shuffleId, bitmapIndex);
+ lock.writeLock().lock();
+ try {
for (long blockId : entry.getValue()) {
if (!bitmap.contains(blockId)) {
bitmap.addLong(blockId);
@@ -129,6 +137,8 @@ public class DefaultShuffleBlockIdManager implements
ShuffleBlockIdManager {
totalUpdatedBlockCount++;
}
}
+ } finally {
+ lock.writeLock().unlock();
}
taskInfo.incBlockNumber(shuffleId, partitionId, updatedBlockCount);
}
@@ -166,9 +176,16 @@ public class DefaultShuffleBlockIdManager implements
ShuffleBlockIdManager {
}
Roaring64NavigableMap res = Roaring64NavigableMap.bitmapOf();
for (Map.Entry<Integer, Set<Integer>> entry :
bitmapIndexToPartitions.entrySet()) {
+ Integer bitmapIndex = entry.getKey();
Set<Integer> requestPartitions = entry.getValue();
- Roaring64NavigableMap bitmap = blockIds[entry.getKey()];
- getBlockIdsByPartitionId(requestPartitions, bitmap, res, blockIdLayout);
+ Roaring64NavigableMap bitmap = blockIds[bitmapIndex];
+ ReadWriteLock lock = getLockForBitmap(appId, shuffleId, bitmapIndex);
+ lock.readLock().lock();
+ try {
+ getBlockIdsByPartitionId(requestPartitions, bitmap, res,
blockIdLayout);
+ } finally {
+ lock.readLock().unlock();
+ }
}
if (res.getLongCardinality() != expectedBlockNumber) {
throw new RssException(
@@ -194,8 +211,9 @@ public class DefaultShuffleBlockIdManager implements
ShuffleBlockIdManager {
}
@Override
- public void removeBlockIdByAppId(String appId) {
+ public void remove(String appId) {
partitionsToBlockIds.remove(appId);
+ bitmapLocks.remove(appId);
}
@Override
@@ -229,4 +247,11 @@ public class DefaultShuffleBlockIdManager implements
ShuffleBlockIdManager {
public long getBitmapNum(String appId, int shuffleId) {
return partitionsToBlockIds.get(appId).get(shuffleId).length;
}
+
+ private ReadWriteLock getLockForBitmap(String appId, int shuffleId, int
bitmapArrLocation) {
+ Map<String, ReadWriteLock> innerMap =
+ bitmapLocks.computeIfAbsent(appId, k -> JavaUtils.newConcurrentMap());
+ String key = shuffleId + "_" + bitmapArrLocation;
+ return innerMap.computeIfAbsent(key, k -> new ReentrantReadWriteLock());
+ }
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/block/PartitionedShuffleBlockIdManager.java
b/server/src/main/java/org/apache/uniffle/server/block/PartitionedShuffleBlockIdManager.java
index f31ff8fc1..c7d9f2657 100644
---
a/server/src/main/java/org/apache/uniffle/server/block/PartitionedShuffleBlockIdManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/block/PartitionedShuffleBlockIdManager.java
@@ -23,6 +23,8 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
@@ -40,9 +42,12 @@ public class PartitionedShuffleBlockIdManager implements
ShuffleBlockIdManager {
// appId -> shuffleId -> partitionId -> blockIds
private Map<String, Map<Integer, Map<Integer, Roaring64NavigableMap>>>
partitionsToBlockIds;
+ // appId -- shuffleId_parition -- Lock
+ private final Map<String, Map<String, ReadWriteLock>> bitmapLocks;
public PartitionedShuffleBlockIdManager() {
this.partitionsToBlockIds = new ConcurrentHashMap<>();
+ this.bitmapLocks = JavaUtils.newConcurrentMap();
}
public void registerAppId(String appId) {
@@ -70,7 +75,9 @@ public class PartitionedShuffleBlockIdManager implements
ShuffleBlockIdManager {
partitions.computeIfAbsent(partitionId, k ->
Roaring64NavigableMap.bitmapOf());
Roaring64NavigableMap bitmap = partitions.get(partitionId);
int updatedBlockCount = 0;
- synchronized (bitmap) {
+ ReadWriteLock lock = getLockForBitmap(appId, shuffleId, partitionId);
+ lock.writeLock().lock();
+ try {
for (long blockId : entry.getValue()) {
if (!bitmap.contains(blockId)) {
bitmap.addLong(blockId);
@@ -78,6 +85,8 @@ public class PartitionedShuffleBlockIdManager implements
ShuffleBlockIdManager {
totalUpdatedBlockCount++;
}
}
+ } finally {
+ lock.writeLock().unlock();
}
taskInfo.incBlockNumber(shuffleId, partitionId, updatedBlockCount);
}
@@ -105,8 +114,14 @@ public class PartitionedShuffleBlockIdManager implements
ShuffleBlockIdManager {
Roaring64NavigableMap res = Roaring64NavigableMap.bitmapOf();
for (int partitionId : partitions) {
expectedBlockNumber += taskInfo.getBlockNumber(shuffleId, partitionId);
- Roaring64NavigableMap bitmap = partitionToBlockId.get(partitionId);
- res.or(bitmap);
+ ReadWriteLock lockForBitmap = getLockForBitmap(appId, shuffleId,
partitionId);
+ lockForBitmap.readLock().lock();
+ try {
+ Roaring64NavigableMap bitmap = partitionToBlockId.get(partitionId);
+ res.or(bitmap);
+ } finally {
+ lockForBitmap.readLock().unlock();
+ }
}
if (res.getLongCardinality() != expectedBlockNumber) {
@@ -123,8 +138,9 @@ public class PartitionedShuffleBlockIdManager implements
ShuffleBlockIdManager {
}
@Override
- public void removeBlockIdByAppId(String appId) {
+ public void remove(String appId) {
partitionsToBlockIds.remove(appId);
+ bitmapLocks.remove(appId);
}
@Override
@@ -167,4 +183,11 @@ public class PartitionedShuffleBlockIdManager implements
ShuffleBlockIdManager {
public long getBitmapNum(String appId, int shuffleId) {
return partitionsToBlockIds.get(appId).get(shuffleId).size();
}
+
+ private ReadWriteLock getLockForBitmap(String appId, int shuffleId, int
partititionId) {
+ Map<String, ReadWriteLock> innerMap =
+ bitmapLocks.computeIfAbsent(appId, k -> JavaUtils.newConcurrentMap());
+ String key = shuffleId + "_" + partititionId;
+ return innerMap.computeIfAbsent(key, k -> new ReentrantReadWriteLock());
+ }
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/block/ShuffleBlockIdManager.java
b/server/src/main/java/org/apache/uniffle/server/block/ShuffleBlockIdManager.java
index 130be9afb..13fefd3e4 100644
---
a/server/src/main/java/org/apache/uniffle/server/block/ShuffleBlockIdManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/block/ShuffleBlockIdManager.java
@@ -46,7 +46,12 @@ public interface ShuffleBlockIdManager {
void removeBlockIdByShuffleId(String appId, List<Integer> shuffleIds);
- void removeBlockIdByAppId(String appId);
+ /**
+ * Clear the objects during the operation of the APP.
+ *
+ * @param appId
+ */
+ void remove(String appId);
long getTotalBlockCount();