mridulm commented on code in PR #2070:
URL:
https://github.com/apache/incubator-celeborn/pull/2070#discussion_r1382356666
##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -76,6 +79,16 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
JavaUtils.newConcurrentHashMap[Int, ConcurrentHashMap[Int,
PartitionLocation]]()
private val userIdentifier: UserIdentifier =
IdentityProvider.instantiate(conf).provide()
+ private val rpcCacheSize = conf.clientRpcCacheSize
+ private val rpcCacheConcurrencyLevel = conf.clientRpcCacheConcurrencyLevel
+ private val rpcCacheExpireTime = conf.clientRpcCacheExpireTime
+
+ private val getRegisterShuffleResponseRpcCache: Cache[Int, ByteBuffer] =
CacheBuilder.newBuilder()
+ .concurrencyLevel(rpcCacheConcurrencyLevel)
+ .expireAfterAccess(rpcCacheExpireTime, TimeUnit.MILLISECONDS)
+ .maximumSize(rpcCacheSize)
+ .build().asInstanceOf[Cache[Int, ByteBuffer]]
Review Comment:
Shouldn't we not invalidate cache entry for mutations to `workerSnapshots`
##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -363,19 +378,50 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
}
}
+ def processResponseWithCache(
+ context: RpcCallContext,
+ partitionLocations: Array[PartitionLocation] = Array.empty): Unit = {
+ if (context.isInstanceOf[LocalNettyRpcCallContext]) {
+ context.reply(RegisterShuffleResponse(
+ StatusCode.SUCCESS,
+ getInitialLocs(partitionLocations)))
+ } else {
+ val cachedMsg = getRegisterShuffleResponseRpcCache.get(
+ shuffleId,
+ new Callable[ByteBuffer]() {
+ override def call(): ByteBuffer = {
+ val returnedMsg =
+ RegisterShuffleResponse(StatusCode.SUCCESS,
getInitialLocs(partitionLocations))
+
context.asInstanceOf[RemoteNettyRpcCallContext].nettyEnv.serialize(returnedMsg)
+ }
+ })
+
context.asInstanceOf[RemoteNettyRpcCallContext].callback.onSuccess(cachedMsg)
+ }
+ }
+
+ def getInitialLocs(partitionLocations: Array[PartitionLocation]):
Array[PartitionLocation] = {
+ var initialLocs: Array[PartitionLocation] = partitionLocations
+ if (initialLocs.nonEmpty) {
+ initialLocs = workerSnapshots(shuffleId)
+ .values()
+ .asScala
+ .flatMap(_.getAllPrimaryLocationsWithMinEpoch())
+ .toArray
+ }
+ initialLocs
+ }
+
// Reply to all RegisterShuffle request for current shuffle id.
- def reply(response: PbRegisterShuffleResponse): Unit = {
+ def reply(
+ response: PbRegisterShuffleResponse = null,
Review Comment:
We are trying to make `response` optional here, give this, we can do two
things:
a) Rename to `replyRegisterShuffle`,
b) Change `response: PbRegisterShuffleResponse` to `failureResponse:
Option[PbRegisterShuffleResponse]` -> and replace all `null` check in method
with `isEmpty`
##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -363,19 +378,50 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
}
}
+ def processResponseWithCache(
+ context: RpcCallContext,
+ partitionLocations: Array[PartitionLocation] = Array.empty): Unit = {
+ if (context.isInstanceOf[LocalNettyRpcCallContext]) {
+ context.reply(RegisterShuffleResponse(
+ StatusCode.SUCCESS,
+ getInitialLocs(partitionLocations)))
+ } else {
+ val cachedMsg = getRegisterShuffleResponseRpcCache.get(
+ shuffleId,
+ new Callable[ByteBuffer]() {
+ override def call(): ByteBuffer = {
+ val returnedMsg =
+ RegisterShuffleResponse(StatusCode.SUCCESS,
getInitialLocs(partitionLocations))
+
context.asInstanceOf[RemoteNettyRpcCallContext].nettyEnv.serialize(returnedMsg)
+ }
+ })
+
context.asInstanceOf[RemoteNettyRpcCallContext].callback.onSuccess(cachedMsg)
+ }
+ }
+
+ def getInitialLocs(partitionLocations: Array[PartitionLocation]):
Array[PartitionLocation] = {
+ var initialLocs: Array[PartitionLocation] = partitionLocations
+ if (initialLocs.nonEmpty) {
Review Comment:
Shouldn't this not be `isEmpty` ?
If yes, why aren't the current tests not catching this ? Can you add a test
to catch this ?
##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -348,10 +353,20 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
shuffleId: Int,
context: RpcCallContext,
partitionId: Int,
- partitionLocations: Array[PartitionLocation]): Unit = {
+ partitionLocations: Array[PartitionLocation] = Array.empty): Unit = {
+ var locations: Array[PartitionLocation] = partitionLocations
+ if (locations.isEmpty) {
+ locations = workerSnapshots(shuffleId)
+ .values()
+ .asScala
+ .flatMap(_.getAllPrimaryLocationsWithMinEpoch())
+ .filter(p => p.getId == partitionId)
+ .toArray
+ }
Review Comment:
nit:
```suggestion
val locations = {
if (locations.nonEmpty) {
partitionLocations
} else {
if (locations.isEmpty) {
locations = workerSnapshots(shuffleId)
.values()
.asScala
.flatMap(_.getAllPrimaryLocationsWithMinEpoch())
.filter(p => p.getId == partitionId)
.toArray
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]