RexXiong commented on code in PR #2701:
URL: https://github.com/apache/celeborn/pull/2701#discussion_r1766837883
##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -4476,6 +4477,14 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(true)
+ val CLIENT_BATCH_REMOVE_EXPIRED_SHUFFLE: ConfigEntry[Boolean] =
+
buildConf("celeborn.client.shuffle.batchHandleRemoveExpiredShuffles.enabled")
+ .categories("client")
+ .version("0.6.0")
+ .doc("This is an optimization on remove Expired Shuffles and it's true
by default.")
Review Comment:
It seems default with false value?
##########
master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java:
##########
@@ -844,6 +844,93 @@ public void testHandleUnRegisterShuffle() throws
InterruptedException {
Assert.assertTrue(STATUSSYSTEM3.registeredShuffle.isEmpty());
}
+ @Test
+ public void testBatchHandleUnRegisterShuffle() throws InterruptedException {
+ AbstractMetaManager statusSystem = pickLeaderStatusSystem();
+ Assert.assertNotNull(statusSystem);
+
+ statusSystem.handleRegisterWorker(
+ HOSTNAME1,
+ RPCPORT1,
+ PUSHPORT1,
+ FETCHPORT1,
+ REPLICATEPORT1,
+ INTERNALPORT1,
+ NETWORK_LOCATION1,
+ disks1,
+ userResourceConsumption1,
+ getNewReqeustId());
+ statusSystem.handleRegisterWorker(
+ HOSTNAME2,
+ RPCPORT2,
+ PUSHPORT2,
+ FETCHPORT2,
+ REPLICATEPORT2,
+ INTERNALPORT2,
+ NETWORK_LOCATION2,
+ disks2,
+ userResourceConsumption2,
+ getNewReqeustId());
+ statusSystem.handleRegisterWorker(
+ HOSTNAME3,
+ RPCPORT3,
+ PUSHPORT3,
+ FETCHPORT3,
+ REPLICATEPORT3,
+ INTERNALPORT3,
+ NETWORK_LOCATION3,
+ disks3,
+ userResourceConsumption3,
+ getNewReqeustId());
+
+ WorkerInfo workerInfo1 =
+ new WorkerInfo(
+ HOSTNAME1,
+ RPCPORT1,
+ PUSHPORT1,
+ FETCHPORT1,
+ REPLICATEPORT1,
+ INTERNALPORT1,
+ disks1,
+ userResourceConsumption1);
+ WorkerInfo workerInfo2 =
+ new WorkerInfo(
+ HOSTNAME2,
+ RPCPORT2,
+ PUSHPORT2,
+ FETCHPORT2,
+ REPLICATEPORT2,
+ INTERNALPORT2,
+ disks2,
+ userResourceConsumption2);
+
+ Map<String, Map<String, Integer>> workersToAllocate = new HashMap<>();
+ Map<String, Integer> allocations = new HashMap<>();
+ allocations.put("disk1", 5);
+ workersToAllocate.put(workerInfo1.toUniqueId(), allocations);
+ workersToAllocate.put(workerInfo2.toUniqueId(), allocations);
+
+ List<String> shuffleKeys = new ArrayList<>();
+ for (int i = 1; i <= 3; i++) {
+ String shuffleKey = APPID1 + "-" + i;
+ shuffleKeys.add(shuffleKey);
+ statusSystem.handleRequestSlots(shuffleKey, HOSTNAME1,
workersToAllocate, getNewReqeustId());
+ }
+
+ Thread.sleep(3000L);
+
+ Assert.assertEquals(3, STATUSSYSTEM1.registeredShuffle.size());
+ Assert.assertEquals(3, STATUSSYSTEM2.registeredShuffle.size());
+ Assert.assertEquals(3, STATUSSYSTEM3.registeredShuffle.size());
+
+ statusSystem.batchHandleUnRegisterShuffles(shuffleKeys, getNewReqeustId());
Review Comment:
Pls test following two scenario 1. Unregister only one shuffleKey then check
the result 2.Unregister all then check the result
##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -1042,6 +1042,7 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
def batchHandleChangePartitionNumThreads: Int =
get(CLIENT_BATCH_HANDLE_CHANGE_PARTITION_THREADS)
def batchHandleChangePartitionRequestInterval: Long =
get(CLIENT_BATCH_HANDLE_CHANGE_PARTITION_INTERVAL)
+ def batchHandleRemoveExpiredShuffles: Boolean =
get(CLIENT_BATCH_REMOVE_EXPIRED_SHUFFLE)
Review Comment:
batchHandleRemoveExpiredShuffles -> batchHandleRemoveExpiredShufflesEnabled
##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -4476,6 +4477,14 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(true)
+ val CLIENT_BATCH_REMOVE_EXPIRED_SHUFFLE: ConfigEntry[Boolean] =
Review Comment:
CLIENT_BATCH_REMOVE_EXPIRED_SHUFFLE ->
CLIENT_BATCH_REMOVE_EXPIRED_SHUFFLE_ENABLED
##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -1589,10 +1592,23 @@ class LifecycleManager(val appUniqueId: String, val
conf: CelebornConf) extends
latestPartitionLocation.remove(shuffleId)
commitManager.removeExpiredShuffle(shuffleId)
changePartitionManager.removeExpiredShuffle(shuffleId)
- val unregisterShuffleResponse = requestMasterUnregisterShuffle(
- UnregisterShuffle(appUniqueId, shuffleId,
MasterClient.genRequestId()))
- // if unregister shuffle not success, wait next turn
- if (StatusCode.SUCCESS ==
Utils.toStatusCode(unregisterShuffleResponse.getStatus)) {
+ if (!batchRemoveExpiredShuffles) {
+ val unregisterShuffleResponse = requestMasterUnregisterShuffle(
+ UnregisterShuffle(appUniqueId, shuffleId,
MasterClient.genRequestId()))
+ // if unregister shuffle not success, wait next turn
+ if (StatusCode.SUCCESS ==
Utils.toStatusCode(unregisterShuffleResponse.getStatus)) {
+ unregisterShuffleTime.remove(shuffleId)
+ }
+ } else {
+ batchRemoveShuffleIds.add(shuffleId)
+ }
+ }
+ }
+ if (!batchRemoveShuffleIds.isEmpty) {
+ val unregisterShuffleResponse = batchRequestMasterUnregisterShuffles(
+ BatchUnregisterShuffles(appUniqueId, batchRemoveShuffleIds,
MasterClient.genRequestId()))
+ if (StatusCode.SUCCESS ==
Utils.toStatusCode(unregisterShuffleResponse.getStatus)) {
Review Comment:
should use batchRemoveShuffleIds to removed shuffleIds from
unregisterShuffleTime
##########
master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java:
##########
@@ -118,6 +118,13 @@ public ResourceResponse
handleWriteRequest(ResourceProtos.ResourceRequest reques
metaSystem.updateUnregisterShuffleMeta(shuffleKey);
break;
+ case BatchUnRegisterShuffle:
+ List<String> shuffleKeys =
+ request.getBatchUnregisterShuffleRequest().getShuffleKeysList();
+ metaSystem.registeredShuffle.removeAll(shuffleKeys);
Review Comment:
why not extract a new method, such as
`updateUnregisterShuffleMeta(shuffleKeys)` in AbstractMetaManager then
Single/Ha Master will use same method to do batch unregister meta update.
##########
master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java:
##########
@@ -36,6 +36,8 @@ void handleRequestSlots(
void handleUnRegisterShuffle(String shuffleKey, String requestId);
+ void batchHandleUnRegisterShuffles(List<String> shuffleKeys, String
requestId);
Review Comment:
May be use HandleBatchUnregisterShuffle? as the request is
BatchUnregisterShuffleRequest
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]