mridulm commented on code in PR #2701:
URL: https://github.com/apache/celeborn/pull/2701#discussion_r1729463860
##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -4418,7 +4419,7 @@ object CelebornConf extends Logging {
.version("0.3.0")
.doc("Interval for client to check expired shuffles.")
.timeConf(TimeUnit.MILLISECONDS)
- .createWithDefaultString("60s")
+ .createWithDefaultString("20s")
Review Comment:
Agree, let us keep the defaults unless there are good reasons to change it.
##########
master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java:
##########
@@ -118,6 +118,16 @@ public ResourceResponse
handleWriteRequest(ResourceProtos.ResourceRequest reques
metaSystem.updateUnregisterShuffleMeta(shuffleKey);
break;
+ case BatchUnRegisterShuffle:
+ List<String> shuffleKeys =
+ request.getBatchUnregisterShuffleRequest().getShuffleKeysList();
+ shuffleKeys.forEach(
+ key -> {
+ LOG.debug("Handle unregister shuffle for {}", key);
Review Comment:
pull this out of the foreach and dump all in one shot ? debug logging is
already noisy :-)
##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -1655,6 +1693,25 @@ class LifecycleManager(val appUniqueId: String, val
conf: CelebornConf) extends
}
}
+ private def batchRequestMasterUnregisterShuffles(message:
PbBatchUnregisterShuffles)
+ : PbBatchUnregisterShuffleResponses = {
+ try {
+ logInfo(s"AskSync UnregisterShuffle for ${message.getShuffleIdsList}")
+ masterClient.askSync[PbBatchUnregisterShuffleResponses](
+ message,
+ classOf[PbBatchUnregisterShuffleResponses])
+ } catch {
+ case e: Exception =>
+ logError(s"AskSync UnregisterShuffle for ${message.getShuffleIdsList}
failed.", e)
+ val map = JavaUtils.newConcurrentHashMap[Integer, StatusCode]()
+ val shuffleIds = message.getShuffleIdsList
Review Comment:
Only for subset where `PbUnregisterShuffleResponsesInfo.status` is failed ?
##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -4450,6 +4451,14 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(true)
+ val CLIENT_BATCH_REMOVE_EXPIRED_SHUFFLE: ConfigEntry[Boolean] =
+
buildConf("celeborn.client.shuffle.batchHandleRemoveExpiredShuffles.enabled")
+ .categories("client")
+ .version("0.5.0")
+ .doc("This is an optimization on remove Expired Shuffles and it's true
by default.")
+ .booleanConf
+ .createWithDefault(true)
Review Comment:
Let us start with `false` and flip it in later releases.
##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -1583,6 +1590,37 @@ class LifecycleManager(val appUniqueId: String, val
conf: CelebornConf) extends
}
}
+ private def removeBatchExpiredShuffles(): Unit = {
+ val shuffleIds = new util.ArrayList[Integer]()
+ unregisterShuffleTime.keys().asScala.foreach { shuffleId =>
Review Comment:
Check for `shuffleExpiredCheckIntervalMs` before removing.
Essentially, keep it functionally similar to `removeExpiredShuffle` except
for sending a batch of requests to master.
Might be a good idea to do this within `removeExpiredShuffle` directly and
minimize code divergence
```
private def removeExpiredShuffle(batchRemove: Boolean): Unit = {
val batchRemoveShuffleIds = new ArrayBuffer[Int]()
val currentTime = System.currentTimeMillis()
unregisterShuffleTime.keys().asScala.foreach { shuffleId =>
if (unregisterShuffleTime.get(shuffleId) < currentTime -
shuffleExpiredCheckIntervalMs) {
<existing code>
if (batchRemove) {
batchRemoveShuffleIds += shuffleId
} else {
<existing unregister code>
}
}
}
if (batchRemoveShuffleIds.nonEmpty) {
// send batch request
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]