mridulm commented on code in PR #2701:
URL: https://github.com/apache/celeborn/pull/2701#discussion_r1729463860


##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -4418,7 +4419,7 @@ object CelebornConf extends Logging {
       .version("0.3.0")
       .doc("Interval for client to check expired shuffles.")
       .timeConf(TimeUnit.MILLISECONDS)
-      .createWithDefaultString("60s")
+      .createWithDefaultString("20s")

Review Comment:
   Agree, let us keep the defaults unless there are good reasons to change it.



##########
master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java:
##########
@@ -118,6 +118,16 @@ public ResourceResponse 
handleWriteRequest(ResourceProtos.ResourceRequest reques
           metaSystem.updateUnregisterShuffleMeta(shuffleKey);
           break;
 
+        case BatchUnRegisterShuffle:
+          List<String> shuffleKeys =
+              request.getBatchUnregisterShuffleRequest().getShuffleKeysList();
+          shuffleKeys.forEach(
+              key -> {
+                LOG.debug("Handle unregister shuffle for {}", key);

Review Comment:
   pull this out of the foreach and dump all in one shot ? debug logging is 
already noisy :-)



##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -1655,6 +1693,25 @@ class LifecycleManager(val appUniqueId: String, val 
conf: CelebornConf) extends
     }
   }
 
+  private def batchRequestMasterUnregisterShuffles(message: 
PbBatchUnregisterShuffles)
+      : PbBatchUnregisterShuffleResponses = {
+    try {
+      logInfo(s"AskSync UnregisterShuffle for ${message.getShuffleIdsList}")
+      masterClient.askSync[PbBatchUnregisterShuffleResponses](
+        message,
+        classOf[PbBatchUnregisterShuffleResponses])
+    } catch {
+      case e: Exception =>
+        logError(s"AskSync UnregisterShuffle for ${message.getShuffleIdsList} 
failed.", e)
+        val map = JavaUtils.newConcurrentHashMap[Integer, StatusCode]()
+        val shuffleIds = message.getShuffleIdsList

Review Comment:
   Only for subset where `PbUnregisterShuffleResponsesInfo.status` is failed ?



##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -4450,6 +4451,14 @@ object CelebornConf extends Logging {
       .booleanConf
       .createWithDefault(true)
 
+  val CLIENT_BATCH_REMOVE_EXPIRED_SHUFFLE: ConfigEntry[Boolean] =
+    
buildConf("celeborn.client.shuffle.batchHandleRemoveExpiredShuffles.enabled")
+      .categories("client")
+      .version("0.5.0")
+      .doc("This is an optimization on remove Expired Shuffles and it's true 
by default.")
+      .booleanConf
+      .createWithDefault(true)

Review Comment:
   Let us start with `false` and flip it in later releases.



##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -1583,6 +1590,37 @@ class LifecycleManager(val appUniqueId: String, val 
conf: CelebornConf) extends
     }
   }
 
+  private def removeBatchExpiredShuffles(): Unit = {
+    val shuffleIds = new util.ArrayList[Integer]()
+    unregisterShuffleTime.keys().asScala.foreach { shuffleId =>

Review Comment:
   Check for `shuffleExpiredCheckIntervalMs` before removing.
   Essentially, keep it functionally similar to `removeExpiredShuffle` except 
for sending a batch of requests to master.
   
   Might be a good idea to do this within `removeExpiredShuffle` directly and 
minimize code divergence
   
   ```
   
   private def removeExpiredShuffle(batchRemove: Boolean): Unit = {
       val batchRemoveShuffleIds = new ArrayBuffer[Int]()
       val currentTime = System.currentTimeMillis()
       unregisterShuffleTime.keys().asScala.foreach { shuffleId =>
         if (unregisterShuffleTime.get(shuffleId) < currentTime - 
shuffleExpiredCheckIntervalMs) {
   
           <existing code>
   
           if (batchRemove) {
             batchRemoveShuffleIds += shuffleId
           } else {
             <existing unregister code>
           }
         }
       }
   
       if (batchRemoveShuffleIds.nonEmpty) {
         // send batch request
       }
     }
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to