wangshengjie123 commented on code in PR #2532:
URL: https://github.com/apache/celeborn/pull/2532#discussion_r1629434692


##########
client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala:
##########
@@ -151,15 +170,22 @@ class ChangePartitionManager(
       oldPartition,
       cause)
 
-    requests.synchronized {
-      if (requests.containsKey(partitionId)) {
-        requests.get(partitionId).add(changePartition)
-        logTrace(s"[handleRequestPartitionLocation] For $shuffleId, request 
for same partition" +
-          s"$partitionId-$oldEpoch exists, register context.")
-        return
+    val locksForShuffle = locks.computeIfAbsent(shuffleId, locksRegisterFunc)
+    locksForShuffle(partitionId % locksForShuffle.length).synchronized {
+      var newEntry = false
+      val set = requests.computeIfAbsent(
+        partitionId,
+        new java.util.function.Function[Integer, 
util.Set[ChangePartitionRequest]] {
+          override def apply(t: Integer): util.Set[ChangePartitionRequest] = {
+            newEntry = true
+            new util.HashSet[ChangePartitionRequest]()
+          }
+        })
+
+      if (newEntry) {

Review Comment:
   @RexXiong Thanks for review and sorry for late reply. There is some logic 
bug if we just remove if(newEntry) branch. 
   For example: 
   - if shuffle 0 partition 0 epoch revive request comming, revive success, 
epoch form 0 to 1, and changePartitionRequests will not containes the revive 
request for shuffle 0 partition 0
   - another shuffle 0 partition 0 epoch revive request comming. 
`changePartitionRequests` will containes a empty request set, and batch revive 
thread will just add partitionId to inBatchPartitions with no revive request 
handled
   - all shuffle 0 partition 0 epoch 1 revive requsts will not be replied and 
timeout
   
   If we want to check evevy request for latestLocation, we also need change 
the logic about 
   ```
   val set = requests.computeIfAbsent(
           partitionId,
           new java.util.function.Function[Integer, 
util.Set[ChangePartitionRequest]] {
             override def apply(t: Integer): util.Set[ChangePartitionRequest] = 
{
               newEntry = true
               new util.HashSet[ChangePartitionRequest]()
             }
           })
   ```
   Besides, maybe we need also check request in method 
`handleRequestPartitions` with partition lock. I will check the code and test.



##########
client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala:
##########
@@ -151,15 +170,22 @@ class ChangePartitionManager(
       oldPartition,
       cause)
 
-    requests.synchronized {
-      if (requests.containsKey(partitionId)) {
-        requests.get(partitionId).add(changePartition)
-        logTrace(s"[handleRequestPartitionLocation] For $shuffleId, request 
for same partition" +
-          s"$partitionId-$oldEpoch exists, register context.")
-        return
+    val locksForShuffle = locks.computeIfAbsent(shuffleId, locksRegisterFunc)
+    locksForShuffle(partitionId % locksForShuffle.length).synchronized {
+      var newEntry = false
+      val set = requests.computeIfAbsent(
+        partitionId,
+        new java.util.function.Function[Integer, 
util.Set[ChangePartitionRequest]] {
+          override def apply(t: Integer): util.Set[ChangePartitionRequest] = {
+            newEntry = true
+            new util.HashSet[ChangePartitionRequest]()
+          }
+        })
+
+      if (newEntry) {

Review Comment:
   Only remove if branch already been tested before last reply, it will caused 
revive timeout,epoch 0 will success,but cannot revive for epoch 1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to