This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 136511563 [#2705] fix(spark): Use read-write lock for 
`MutableShuffleHandleInfo` to avoid global locking (#2706)
136511563 is described below

commit 13651156383ca0c29ab984d75c825c5a8c9145db
Author: Zhen Wang <[email protected]>
AuthorDate: Thu Jan 8 19:20:03 2026 +0800

    [#2705] fix(spark): Use read-write lock for `MutableShuffleHandleInfo` to 
avoid global locking (#2706)
    
    ### What changes were proposed in this pull request?
    
    Add read-write lock for MutableShuffleHandleInfo to avoid global locking
    
    ### Why are the changes needed?
    
    To fix a critical and potentially bug, particularly during the startup of 
large tasks in Spark jobs, although #2667 only partially addresses the issue.
    
    closes #2705
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Needn't
---
 .../spark/shuffle/handle/MutableShuffleHandleInfo.java  | 17 ++++++++++++++++-
 .../uniffle/shuffle/manager/RssShuffleManagerBase.java  |  5 ++++-
 2 files changed, 20 insertions(+), 2 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/MutableShuffleHandleInfo.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/MutableShuffleHandleInfo.java
index 4460c39bc..4872dc171 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/MutableShuffleHandleInfo.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/MutableShuffleHandleInfo.java
@@ -27,6 +27,8 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -72,6 +74,8 @@ public class MutableShuffleHandleInfo extends 
ShuffleHandleInfoBase {
 
   private AtomicBoolean isUpdated = new AtomicBoolean(false);
 
+  private final ReentrantReadWriteLock readWriteLock = new 
ReentrantReadWriteLock();
+
   public MutableShuffleHandleInfo(
       int shuffleId,
       Map<Integer, List<ShuffleServerInfo>> partitionToServers,
@@ -128,6 +132,14 @@ public class MutableShuffleHandleInfo extends 
ShuffleHandleInfoBase {
     return isUpdated.get();
   }
 
+  public Lock readLock() {
+    return readWriteLock.readLock();
+  }
+
+  public Lock writeLock() {
+    return readWriteLock.writeLock();
+  }
+
   public Set<ShuffleServerInfo> getReplacements(String faultyServerId) {
     return excludedServerToReplacements.get(faultyServerId);
   }
@@ -317,7 +329,8 @@ public class MutableShuffleHandleInfo extends 
ShuffleHandleInfoBase {
   }
 
   public static RssProtos.MutableShuffleHandleInfo 
toProto(MutableShuffleHandleInfo handleInfo) {
-    synchronized (handleInfo) {
+    handleInfo.readLock().lock();
+    try {
       // value: (PartitionId, ReplicaIndex, SequenceIndex)
       Map<ShuffleServerInfo, List<Triple<Integer, Integer, Integer>>> 
serverToPartitions =
           new HashMap<>();
@@ -385,6 +398,8 @@ public class MutableShuffleHandleInfo extends 
ShuffleHandleInfoBase {
               
.addAllSplitPartitionId(handleInfo.excludedServerForPartitionToReplacements.keySet())
               .build();
       return handleProto;
+    } finally {
+      handleInfo.readLock().unlock();
     }
   }
 
diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index fdd700b44..8b92eb632 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -1083,7 +1083,8 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
       throw new RssException(
           "An unexpected error occurred: internalHandle is null, which should 
not happen");
     }
-    synchronized (internalHandle) {
+    internalHandle.writeLock().lock();
+    try {
       // If the reassignment servers for one partition exceeds the max 
reassign server num,
       // it should fast fail.
       if (!partitionSplit) {
@@ -1185,6 +1186,8 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
       postReassignTriggeredEvent(
           partitionSplit ? reassignTriggeredOnPartitionSplit : 
reassignTriggeredOnBlockSendFailure);
       return internalHandle;
+    } finally {
+      internalHandle.writeLock().unlock();
     }
   }
 

Reply via email to