mridulm commented on code in PR #2070:
URL:
https://github.com/apache/incubator-celeborn/pull/2070#discussion_r1382356410
##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -363,19 +378,50 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
}
}
+ def processResponseWithCache(
+ context: RpcCallContext,
+ partitionLocations: Array[PartitionLocation] = Array.empty): Unit = {
+ if (context.isInstanceOf[LocalNettyRpcCallContext]) {
+ context.reply(RegisterShuffleResponse(
+ StatusCode.SUCCESS,
+ getInitialLocs(partitionLocations)))
+ } else {
+ val cachedMsg = getRegisterShuffleResponseRpcCache.get(
+ shuffleId,
+ new Callable[ByteBuffer]() {
+ override def call(): ByteBuffer = {
+ val returnedMsg =
+ RegisterShuffleResponse(StatusCode.SUCCESS,
getInitialLocs(partitionLocations))
+
context.asInstanceOf[RemoteNettyRpcCallContext].nettyEnv.serialize(returnedMsg)
+ }
+ })
+
context.asInstanceOf[RemoteNettyRpcCallContext].callback.onSuccess(cachedMsg)
+ }
+ }
+
+ def getInitialLocs(partitionLocations: Array[PartitionLocation]):
Array[PartitionLocation] = {
+ var initialLocs: Array[PartitionLocation] = partitionLocations
+ if (initialLocs.nonEmpty) {
+ initialLocs = workerSnapshots(shuffleId)
+ .values()
+ .asScala
+ .flatMap(_.getAllPrimaryLocationsWithMinEpoch())
+ .toArray
+ }
+ initialLocs
+ }
+
// Reply to all RegisterShuffle request for current shuffle id.
- def reply(response: PbRegisterShuffleResponse): Unit = {
+ def reply(
+ response: PbRegisterShuffleResponse = null,
Review Comment:
We are trying to make `response` optional here, given this (for example,
`reply()` is a confusing by valid call now), we can do two things:
a) Rename to `replyRegisterShuffle`,
b) Change `response: PbRegisterShuffleResponse` to `failureResponse:
Option[PbRegisterShuffleResponse]` -> and replace all `null` check in method
with `isEmpty`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]