This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 136511563 [#2705] fix(spark): Use read-write lock for
`MutableShuffleHandleInfo` to avoid global locking (#2706)
136511563 is described below
commit 13651156383ca0c29ab984d75c825c5a8c9145db
Author: Zhen Wang <[email protected]>
AuthorDate: Thu Jan 8 19:20:03 2026 +0800
[#2705] fix(spark): Use read-write lock for `MutableShuffleHandleInfo` to
avoid global locking (#2706)
### What changes were proposed in this pull request?
Add read-write lock for MutableShuffleHandleInfo to avoid global locking
### Why are the changes needed?
To fix a critical and potentially bug, particularly during the startup of
large tasks in Spark jobs, although #2667 only partially addresses the issue.
closes #2705
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Needn't
---
.../spark/shuffle/handle/MutableShuffleHandleInfo.java | 17 ++++++++++++++++-
.../uniffle/shuffle/manager/RssShuffleManagerBase.java | 5 ++++-
2 files changed, 20 insertions(+), 2 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/MutableShuffleHandleInfo.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/MutableShuffleHandleInfo.java
index 4460c39bc..4872dc171 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/MutableShuffleHandleInfo.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/MutableShuffleHandleInfo.java
@@ -27,6 +27,8 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
@@ -72,6 +74,8 @@ public class MutableShuffleHandleInfo extends
ShuffleHandleInfoBase {
private AtomicBoolean isUpdated = new AtomicBoolean(false);
+ private final ReentrantReadWriteLock readWriteLock = new
ReentrantReadWriteLock();
+
public MutableShuffleHandleInfo(
int shuffleId,
Map<Integer, List<ShuffleServerInfo>> partitionToServers,
@@ -128,6 +132,14 @@ public class MutableShuffleHandleInfo extends
ShuffleHandleInfoBase {
return isUpdated.get();
}
+ public Lock readLock() {
+ return readWriteLock.readLock();
+ }
+
+ public Lock writeLock() {
+ return readWriteLock.writeLock();
+ }
+
public Set<ShuffleServerInfo> getReplacements(String faultyServerId) {
return excludedServerToReplacements.get(faultyServerId);
}
@@ -317,7 +329,8 @@ public class MutableShuffleHandleInfo extends
ShuffleHandleInfoBase {
}
public static RssProtos.MutableShuffleHandleInfo
toProto(MutableShuffleHandleInfo handleInfo) {
- synchronized (handleInfo) {
+ handleInfo.readLock().lock();
+ try {
// value: (PartitionId, ReplicaIndex, SequenceIndex)
Map<ShuffleServerInfo, List<Triple<Integer, Integer, Integer>>>
serverToPartitions =
new HashMap<>();
@@ -385,6 +398,8 @@ public class MutableShuffleHandleInfo extends
ShuffleHandleInfoBase {
.addAllSplitPartitionId(handleInfo.excludedServerForPartitionToReplacements.keySet())
.build();
return handleProto;
+ } finally {
+ handleInfo.readLock().unlock();
}
}
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index fdd700b44..8b92eb632 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -1083,7 +1083,8 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
throw new RssException(
"An unexpected error occurred: internalHandle is null, which should
not happen");
}
- synchronized (internalHandle) {
+ internalHandle.writeLock().lock();
+ try {
// If the reassignment servers for one partition exceeds the max
reassign server num,
// it should fast fail.
if (!partitionSplit) {
@@ -1185,6 +1186,8 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
postReassignTriggeredEvent(
partitionSplit ? reassignTriggeredOnPartitionSplit :
reassignTriggeredOnBlockSendFailure);
return internalHandle;
+ } finally {
+ internalHandle.writeLock().unlock();
}
}