waitinfuture commented on code in PR #2070:
URL:
https://github.com/apache/incubator-celeborn/pull/2070#discussion_r1382380715
##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -315,21 +328,13 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
// If shuffle is registered, reply this shuffle's partition location
and return.
// Else add this request to registeringShuffleRequest.
if (registeredShuffle.contains(shuffleId)) {
- val initialLocs = workerSnapshots(shuffleId)
- .values()
- .asScala
- .flatMap(_.getAllPrimaryLocationsWithMinEpoch())
- .filter(p =>
- (partitionType == PartitionType.REDUCE && p.getEpoch == 0) ||
(partitionType == PartitionType.MAP && p.getId == partitionId))
- .toArray
partitionType match {
case PartitionType.MAP => processMapTaskReply(
shuffleId,
context.context,
- partitionId,
- initialLocs)
+ partitionId)
case PartitionType.REDUCE =>
- context.reply(RegisterShuffleResponse(StatusCode.SUCCESS,
initialLocs))
+ processResponseWithCache(context.context)
Review Comment:
I don't think we need to extract `processResponseWithCache`, it makes code
more complicated. IMO the logic is:
If `registeredShuffle.contains(shuffleId)` is true, try
`getRegisterShuffleResponseRpcCache`. If cache miss, get from `workerSnapshots`
like before:
```
val initialLocs = workerSnapshots(shuffleId)
.values()
.asScala
.flatMap(_.getAllPrimaryLocationsWithMinEpoch())
.filter(p => p.getEpoch == 0)
.toArray
```
, serialize and put into cache, then (as well as cache hit) and call
`context.asInstanceOf[RemoteNettyRpcCallContext].callback.onSuccess(cachedMsg)`.
I believe here the `context` must be `RemoteNettyRpcCallContext`.
In `reply`, we can just cache the serialized msg in a local variable and
call
`context.asInstanceOf[RemoteNettyRpcCallContext].callback.onSuccess(cachedMsg)`
for all context.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]