waitinfuture commented on code in PR #2070:
URL:
https://github.com/apache/incubator-celeborn/pull/2070#discussion_r1384336311
##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -386,7 +415,18 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
// otherwise will lost original exception message
context.reply(response)
}
- case PartitionType.REDUCE => context.reply(response)
+ case PartitionType.REDUCE =>
+ val rpcContext: RpcCallContext = context.context
+ if (rpcContext.isInstanceOf[LocalNettyRpcCallContext]) {
+ context.reply(response)
+ } else {
+ val serializedMsg =
Review Comment:
the `serializedMsg` should be put to the start of the method to avoid
serialization for each request
##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -319,17 +332,33 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
.values()
.asScala
.flatMap(_.getAllPrimaryLocationsWithMinEpoch())
- .filter(p =>
- (partitionType == PartitionType.REDUCE && p.getEpoch == 0) ||
(partitionType == PartitionType.MAP && p.getId == partitionId))
.toArray
+ val rpcContext: RpcCallContext = context.context
partitionType match {
- case PartitionType.MAP => processMapTaskReply(
+ case PartitionType.MAP =>
+ processMapTaskReply(
shuffleId,
- context.context,
+ rpcContext,
partitionId,
- initialLocs)
+ initialLocs.filter(p => p.getId == partitionId))
case PartitionType.REDUCE =>
- context.reply(RegisterShuffleResponse(StatusCode.SUCCESS,
initialLocs))
+ if (rpcContext.isInstanceOf[LocalNettyRpcCallContext]) {
+ context.reply(RegisterShuffleResponse(
+ StatusCode.SUCCESS,
+ initialLocs.filter(p => p.getEpoch == 0)))
+ } else {
+ val cachedMsg = getRegisterShuffleResponseRpcCache.get(
+ shuffleId,
+ new Callable[ByteBuffer]() {
+ override def call(): ByteBuffer = {
+
rpcContext.asInstanceOf[RemoteNettyRpcCallContext].nettyEnv.serialize(
+ RegisterShuffleResponse(
+ StatusCode.SUCCESS,
+ initialLocs.filter(p => p.getEpoch == 0)))
Review Comment:
We don't need to compute initialLocs when cache hit
##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -76,6 +79,16 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
JavaUtils.newConcurrentHashMap[Int, ConcurrentHashMap[Int,
PartitionLocation]]()
private val userIdentifier: UserIdentifier =
IdentityProvider.instantiate(conf).provide()
+ private val rpcCacheSize = conf.clientRpcCacheSize
+ private val rpcCacheConcurrencyLevel = conf.clientRpcCacheConcurrencyLevel
+ private val rpcCacheExpireTime = conf.clientRpcCacheExpireTime
+
+ private val getRegisterShuffleResponseRpcCache: Cache[Int, ByteBuffer] =
CacheBuilder.newBuilder()
Review Comment:
nit: better to rename to `registerShuffleResponseRpcCache`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]