This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 9336b0853 [CELEBORN-1509] Reply response without holding a lock
9336b0853 is described below
commit 9336b085382484953fbfd3688a0c97c372a839d6
Author: zhangzhao.08 <[email protected]>
AuthorDate: Mon Jul 15 15:00:51 2024 +0800
[CELEBORN-1509] Reply response without holding a lock
### What changes were proposed in this pull request?
Avoid performing IO operations while holding locks to prevent the ispatcher
from being blocked.
### Why are the changes needed?
If there is already a LatestPartition, a response can be made without
holding the lock.


### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
Internal test.
Closes #2627 from zhaostu4/dispatcher_blocked.
Authored-by: zhangzhao.08 <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../org/apache/celeborn/client/ChangePartitionManager.scala | 13 +++++++++++++
1 file changed, 13 insertions(+)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala
b/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala
index 9cf1dca52..6909d5262 100644
---
a/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala
@@ -151,6 +151,19 @@ class ChangePartitionManager(
oldPartition,
cause)
+ // If new slot for the partition has been allocated, reply and return.
+ // Else register and allocate for it.
+ getLatestPartition(shuffleId, partitionId, oldEpoch).foreach { latestLoc =>
+ context.reply(
+ partitionId,
+ StatusCode.SUCCESS,
+ Some(latestLoc),
+ lifecycleManager.workerStatusTracker.workerAvailable(oldPartition))
+ logDebug(s"New partition found, old partition $partitionId-$oldEpoch
return it." +
+ s" shuffleId: $shuffleId $latestLoc")
+ return
+ }
+
requests.synchronized {
if (requests.containsKey(partitionId)) {
requests.get(partitionId).add(changePartition)