RexXiong commented on code in PR #2532:
URL: https://github.com/apache/celeborn/pull/2532#discussion_r1623738476


##########
client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala:
##########
@@ -151,15 +170,22 @@ class ChangePartitionManager(
       oldPartition,
       cause)
 
-    requests.synchronized {
-      if (requests.containsKey(partitionId)) {
-        requests.get(partitionId).add(changePartition)
-        logTrace(s"[handleRequestPartitionLocation] For $shuffleId, request 
for same partition" +
-          s"$partitionId-$oldEpoch exists, register context.")
-        return
+    val locksForShuffle = locks.computeIfAbsent(shuffleId, locksRegisterFunc)
+    locksForShuffle(partitionId % locksForShuffle.length).synchronized {
+      var newEntry = false
+      val set = requests.computeIfAbsent(
+        partitionId,
+        new java.util.function.Function[Integer, 
util.Set[ChangePartitionRequest]] {
+          override def apply(t: Integer): util.Set[ChangePartitionRequest] = {
+            newEntry = true
+            new util.HashSet[ChangePartitionRequest]()
+          }
+        })
+
+      if (newEntry) {

Review Comment:
   Thanks @wangshengjie123 , and i think the root cause for 
[CELEBORN-1388](https://issues.apache.org/jira/browse/CELEBORN-1388) is a 
partition request coming while the same partition id requests removed when the 
partition id revive is handled by ChangePartitionManager, this cause the coming 
partition is new in changePartitionRequests, leading to it being handled 
multiple times, IMO we can just remove the if(newEntry) branch to force every 
partition to check whether if there is already newer partition there. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to