This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.5 by this push:
     new d990dc888 [CELEBORN-1509] Reply response without holding a lock
d990dc888 is described below

commit d990dc888fb27628ecda0a33f71efd6c0fe1a8e8
Author: zhangzhao.08 <[email protected]>
AuthorDate: Mon Jul 15 15:00:51 2024 +0800

    [CELEBORN-1509] Reply response without holding a lock
    
    ### What changes were proposed in this pull request?
    
    Avoid performing IO operations while holding locks to prevent the ispatcher 
from being blocked.
    
    ### Why are the changes needed?
    
    If there is already a LatestPartition, a response can be made without 
holding the lock.
    
![image](https://github.com/user-attachments/assets/d1632429-5f63-4358-8d13-a82643533679)
    
![image](https://github.com/user-attachments/assets/f9201828-28d4-44d2-91a9-64907384199f)
    
    ### Does this PR introduce _any_ user-facing change?
    NO
    
    ### How was this patch tested?
    Internal test.
    
    Closes #2627 from zhaostu4/dispatcher_blocked.
    
    Authored-by: zhangzhao.08 <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
    (cherry picked from commit 9336b085382484953fbfd3688a0c97c372a839d6)
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../org/apache/celeborn/client/ChangePartitionManager.scala | 13 +++++++++++++
 1 file changed, 13 insertions(+)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala
index 9cf1dca52..6909d5262 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala
@@ -151,6 +151,19 @@ class ChangePartitionManager(
       oldPartition,
       cause)
 
+    // If new slot for the partition has been allocated, reply and return.
+    // Else register and allocate for it.
+    getLatestPartition(shuffleId, partitionId, oldEpoch).foreach { latestLoc =>
+      context.reply(
+        partitionId,
+        StatusCode.SUCCESS,
+        Some(latestLoc),
+        lifecycleManager.workerStatusTracker.workerAvailable(oldPartition))
+      logDebug(s"New partition found, old partition $partitionId-$oldEpoch 
return it." +
+        s" shuffleId: $shuffleId $latestLoc")
+      return
+    }
+
     requests.synchronized {
       if (requests.containsKey(partitionId)) {
         requests.get(partitionId).add(changePartition)

Reply via email to