RexXiong commented on code in PR #2532:
URL: https://github.com/apache/celeborn/pull/2532#discussion_r1679490352
##########
client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala:
##########
@@ -151,36 +170,22 @@ class ChangePartitionManager(
oldPartition,
cause)
- // If new slot for the partition has been allocated, reply and return.
- // Else register and allocate for it.
- getLatestPartition(shuffleId, partitionId, oldEpoch).foreach { latestLoc =>
- context.reply(
- partitionId,
- StatusCode.SUCCESS,
- Some(latestLoc),
- lifecycleManager.workerStatusTracker.workerAvailable(oldPartition))
- logDebug(s"New partition found, old partition $partitionId-$oldEpoch
return it." +
- s" shuffleId: $shuffleId $latestLoc")
- return
- }
-
- requests.synchronized {
+ val locksForShuffle = locks.computeIfAbsent(shuffleId, locksRegisterFunc)
Review Comment:
With the introduction of fine-grained locking, request locks may no longer
be an issue, and [CELEBORN-1509](https://github.com/apache/celeborn/pull/2627)
might not be necessary. Consider this scenario: partition 0 has a latest epoch
of 4, and the revive partition requests are 3, 3, 3, 4, 3, 3, 3.
After [CELEBORN-1509](https://github.com/apache/celeborn/pull/2627), the
requests for epoch 3 will receive the latest epoch 4. Even after updating the
request to epoch 4, the remaining requests will still return epoch 4.
Therefore, I believe it is better to check the unhandled partition IDs first.
Without [CELEBORN-1509](https://github.com/apache/celeborn/pull/2627), the
initial requests for epoch 3 will also receive epoch 4. However, once epoch 4
is handled, the remaining requests will wait for a new partition epoch.
cc @waitinfuture @zhaostu4
##########
client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala:
##########
@@ -229,14 +234,16 @@ class ChangePartitionManager(
// remove together to reduce lock time
def replySuccess(locations: Array[PartitionLocation]): Unit = {
- requestsMap.synchronized {
- locations.map { location =>
+ val locksForShuffle = locks.computeIfAbsent(shuffleId, locksRegisterFunc)
+ locations.map { location =>
+ locksForShuffle(location.getId % locksForShuffle.length).synchronized {
+ val ret = requestsMap.remove(location.getId)
if (batchHandleChangePartitionEnabled) {
inBatchPartitions.get(shuffleId).remove(location.getId)
}
// Here one partition id can be remove more than once,
// so need to filter null result before reply.
- location -> Option(requestsMap.remove(location.getId))
+ location -> Option(ret)
Review Comment:
IMO we don't need introduce a variable for removedPartitionRequests? just
keep as before?
##########
client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala:
##########
@@ -250,12 +257,14 @@ class ChangePartitionManager(
// remove together to reduce lock time
def replyFailure(status: StatusCode): Unit = {
- requestsMap.synchronized {
- changePartitions.map { changePartition =>
+ changePartitions.map { changePartition =>
+ val locksForShuffle = locks.computeIfAbsent(shuffleId,
locksRegisterFunc)
+ locksForShuffle(changePartition.partitionId %
locksForShuffle.length).synchronized {
+ val r = requestsMap.remove(changePartition.partitionId)
if (batchHandleChangePartitionEnabled) {
inBatchPartitions.get(shuffleId).remove(changePartition.partitionId)
}
- Option(requestsMap.remove(changePartition.partitionId))
+ Option(r)
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]