This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new c5ba14296 [CELEBORN-1109] Cache RegisterShuffleResponse to improve the 
processing speed of LifecycleManager
c5ba14296 is described below

commit c5ba14296ee2c9939946f2b8c743a0a70ffe3c6a
Author: xiyu.zk <[email protected]>
AuthorDate: Tue Nov 7 18:05:22 2023 +0800

    [CELEBORN-1109] Cache RegisterShuffleResponse to improve the processing 
speed of LifecycleManager
    
    ### What changes were proposed in this pull request?
    Cache RegisterShuffleResponse to improve the processing speed of 
LifecycleManager
    
    ### Why are the changes needed?
    During the processing of the registerShuffle request, constructing the 
RegisterShuffleResponse instance and serialization can indeed consume a 
significant amount of time.  When there are a large number of registerShuffle 
requests that need to be processed by the LifecycleManager simultaneously, the 
response time of the LifecycleManager will be delayed. Therefore, caching is 
needed to improve the processing performance of the LifecycleManager.
    
    
![image](https://github.com/apache/incubator-celeborn/assets/107825064/06d3cb3c-156a-46c7-a08d-fefa18b26e40)
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #2070 from kerwin-zk/issue-1109.
    
    Authored-by: xiyu.zk <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
    (cherry picked from commit ffbbe257fbb41b27638cfc6ee03201f7f6cc6112)
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../apache/celeborn/client/LifecycleManager.scala  | 97 ++++++++++++++++++----
 1 file changed, 79 insertions(+), 18 deletions(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 4c30b92a5..9f1513c32 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -17,15 +17,17 @@
 
 package org.apache.celeborn.client
 
+import java.nio.ByteBuffer
 import java.util
 import java.util.{function, List => JList}
-import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit}
+import java.util.concurrent.{Callable, ConcurrentHashMap, ScheduledFuture, 
TimeUnit}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.util.Random
 
 import com.google.common.annotations.VisibleForTesting
+import com.google.common.cache.{Cache, CacheBuilder}
 
 import org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers, 
ShuffleFailedWorkers}
 import org.apache.celeborn.client.listener.WorkerStatusListener
@@ -39,6 +41,7 @@ import 
org.apache.celeborn.common.protocol.RpcNameConstants.WORKER_EP
 import org.apache.celeborn.common.protocol.message.ControlMessages._
 import org.apache.celeborn.common.protocol.message.StatusCode
 import org.apache.celeborn.common.rpc._
+import org.apache.celeborn.common.rpc.netty.{LocalNettyRpcCallContext, 
RemoteNettyRpcCallContext}
 import org.apache.celeborn.common.util.{JavaUtils, PbSerDeUtils, ThreadUtils, 
Utils}
 // Can Remove this if celeborn don't support scala211 in future
 import org.apache.celeborn.common.util.FunctionConverter._
@@ -77,6 +80,16 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
   private val userIdentifier: UserIdentifier = 
IdentityProvider.instantiate(conf).provide()
   private val availableStorageTypes = conf.availableStorageTypes
 
+  private val rpcCacheSize = conf.clientRpcCacheSize
+  private val rpcCacheConcurrencyLevel = conf.clientRpcCacheConcurrencyLevel
+  private val rpcCacheExpireTime = conf.clientRpcCacheExpireTime
+
+  private val registerShuffleResponseRpcCache: Cache[Int, ByteBuffer] = 
CacheBuilder.newBuilder()
+    .concurrencyLevel(rpcCacheConcurrencyLevel)
+    .expireAfterAccess(rpcCacheExpireTime, TimeUnit.MILLISECONDS)
+    .maximumSize(rpcCacheSize)
+    .build().asInstanceOf[Cache[Int, ByteBuffer]]
+
   @VisibleForTesting
   def workerSnapshots(shuffleId: Int): util.Map[WorkerInfo, 
ShufflePartitionLocationInfo] =
     shuffleAllocatedWorkers.get(shuffleId)
@@ -316,21 +329,32 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
         // If shuffle is registered, reply this shuffle's partition location 
and return.
         // Else add this request to registeringShuffleRequest.
         if (registeredShuffle.contains(shuffleId)) {
-          val initialLocs = workerSnapshots(shuffleId)
-            .values()
-            .asScala
-            .flatMap(_.getAllPrimaryLocationsWithMinEpoch())
-            .filter(p =>
-              (partitionType == PartitionType.REDUCE && p.getEpoch == 0) || 
(partitionType == PartitionType.MAP && p.getId == partitionId))
-            .toArray
+          val rpcContext: RpcCallContext = context.context
           partitionType match {
-            case PartitionType.MAP => processMapTaskReply(
+            case PartitionType.MAP =>
+              processMapTaskReply(
                 shuffleId,
-                context.context,
+                rpcContext,
                 partitionId,
-                initialLocs)
+                getInitialLocs(shuffleId, p => p.getId == partitionId))
             case PartitionType.REDUCE =>
-              context.reply(RegisterShuffleResponse(StatusCode.SUCCESS, 
initialLocs))
+              if (rpcContext.isInstanceOf[LocalNettyRpcCallContext]) {
+                context.reply(RegisterShuffleResponse(
+                  StatusCode.SUCCESS,
+                  getInitialLocs(shuffleId, p => p.getEpoch == 0)))
+              } else {
+                val cachedMsg = registerShuffleResponseRpcCache.get(
+                  shuffleId,
+                  new Callable[ByteBuffer]() {
+                    override def call(): ByteBuffer = {
+                      
rpcContext.asInstanceOf[RemoteNettyRpcCallContext].nettyEnv.serialize(
+                        RegisterShuffleResponse(
+                          StatusCode.SUCCESS,
+                          getInitialLocs(shuffleId, p => p.getEpoch == 0)))
+                    }
+                  })
+                
rpcContext.asInstanceOf[RemoteNettyRpcCallContext].callback.onSuccess(cachedMsg)
+              }
             case _ =>
               throw new UnsupportedOperationException(s"Not support 
$partitionType yet")
           }
@@ -345,6 +369,17 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
       }
     }
 
+    def getInitialLocs(
+        shuffleId: Int,
+        partitionLocationFilter: PartitionLocation => Boolean): 
Array[PartitionLocation] = {
+      workerSnapshots(shuffleId)
+        .values()
+        .asScala
+        .flatMap(_.getAllPrimaryLocationsWithMinEpoch())
+        .filter(partitionLocationFilter)
+        .toArray
+    }
+
     def processMapTaskReply(
         shuffleId: Int,
         context: RpcCallContext,
@@ -365,8 +400,24 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
     }
 
     // Reply to all RegisterShuffle request for current shuffle id.
-    def reply(response: PbRegisterShuffleResponse): Unit = {
+    def replyRegisterShuffle(response: PbRegisterShuffleResponse): Unit = {
       registeringShuffleRequest.synchronized {
+        val serializedMsg: Option[ByteBuffer] = partitionType match {
+          case PartitionType.REDUCE =>
+            context.context match {
+              case remoteContext: RemoteNettyRpcCallContext =>
+                if (response.getStatus == StatusCode.SUCCESS.getValue) {
+                  Option(remoteContext.nettyEnv.serialize(
+                    response))
+                } else {
+                  Option.empty
+                }
+
+              case _ => Option.empty
+            }
+          case _ => Option.empty
+        }
+
         registeringShuffleRequest.asScala
           .get(shuffleId)
           .foreach(_.asScala.foreach(context => {
@@ -387,7 +438,15 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
                   // otherwise will lost original exception message
                   context.reply(response)
                 }
-              case PartitionType.REDUCE => context.reply(response)
+              case PartitionType.REDUCE =>
+                if (context.context.isInstanceOf[
+                    LocalNettyRpcCallContext] || response.getStatus != 
StatusCode.SUCCESS.getValue) {
+                  context.reply(response)
+                } else {
+                  registerShuffleResponseRpcCache.put(shuffleId, 
serializedMsg.get)
+                  
context.context.asInstanceOf[RemoteNettyRpcCallContext].callback.onSuccess(
+                    serializedMsg.get)
+                }
               case _ =>
                 throw new UnsupportedOperationException(s"Not support 
$partitionType yet")
             }
@@ -404,11 +463,11 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
     res.status match {
       case StatusCode.REQUEST_FAILED =>
         logInfo(s"OfferSlots RPC request failed for $shuffleId!")
-        reply(RegisterShuffleResponse(StatusCode.REQUEST_FAILED, Array.empty))
+        
replyRegisterShuffle(RegisterShuffleResponse(StatusCode.REQUEST_FAILED, 
Array.empty))
         return
       case StatusCode.SLOT_NOT_AVAILABLE =>
         logInfo(s"OfferSlots for $shuffleId failed!")
-        reply(RegisterShuffleResponse(StatusCode.SLOT_NOT_AVAILABLE, 
Array.empty))
+        
replyRegisterShuffle(RegisterShuffleResponse(StatusCode.SLOT_NOT_AVAILABLE, 
Array.empty))
         return
       case StatusCode.SUCCESS =>
         logInfo(s"OfferSlots for $shuffleId Success!Slots Info: 
${res.workerResource}")
@@ -455,7 +514,7 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
     // If reserve slots failed, clear allocated resources, reply 
ReserveSlotFailed and return.
     if (!reserveSlotsSuccess) {
       logError(s"reserve buffer for $shuffleId failed, reply to all.")
-      reply(RegisterShuffleResponse(StatusCode.RESERVE_SLOTS_FAILED, 
Array.empty))
+      
replyRegisterShuffle(RegisterShuffleResponse(StatusCode.RESERVE_SLOTS_FAILED, 
Array.empty))
     } else {
       logInfo(s"ReserveSlots for $shuffleId success with details:$slots!")
       // Forth, register shuffle success, update status
@@ -475,7 +534,9 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
       // Fifth, reply the allocated partition location to ShuffleClient.
       logInfo(s"Handle RegisterShuffle Success for $shuffleId.")
       val allPrimaryPartitionLocations = 
slots.asScala.flatMap(_._2._1.asScala).toArray
-      reply(RegisterShuffleResponse(StatusCode.SUCCESS, 
allPrimaryPartitionLocations))
+      replyRegisterShuffle(RegisterShuffleResponse(
+        StatusCode.SUCCESS,
+        allPrimaryPartitionLocations))
     }
   }
 

Reply via email to