RexXiong commented on code in PR #2532:
URL: https://github.com/apache/celeborn/pull/2532#discussion_r1623738476
##########
client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala:
##########
@@ -151,15 +170,22 @@ class ChangePartitionManager(
oldPartition,
cause)
- requests.synchronized {
- if (requests.containsKey(partitionId)) {
- requests.get(partitionId).add(changePartition)
- logTrace(s"[handleRequestPartitionLocation] For $shuffleId, request
for same partition" +
- s"$partitionId-$oldEpoch exists, register context.")
- return
+ val locksForShuffle = locks.computeIfAbsent(shuffleId, locksRegisterFunc)
+ locksForShuffle(partitionId % locksForShuffle.length).synchronized {
+ var newEntry = false
+ val set = requests.computeIfAbsent(
+ partitionId,
+ new java.util.function.Function[Integer,
util.Set[ChangePartitionRequest]] {
+ override def apply(t: Integer): util.Set[ChangePartitionRequest] = {
+ newEntry = true
+ new util.HashSet[ChangePartitionRequest]()
+ }
+ })
+
+ if (newEntry) {
Review Comment:
Thanks @wangshengjie123 , and i think the root cause for
[CELEBORN-1388](https://issues.apache.org/jira/browse/CELEBORN-1388) is a
partition request coming while the same partition id requests removed when the
partition id revive is handled by ChangePartitionManager, this cause the coming
partition is new in changePartitionRequests, leading to it being handled
multiple times, IMO we can just remove the if(newEntry) branch to force every
partition to check whether if there is already newer partition there.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]