RexXiong commented on code in PR #2701:
URL: https://github.com/apache/celeborn/pull/2701#discussion_r1766837883


##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -4476,6 +4477,14 @@ object CelebornConf extends Logging {
       .booleanConf
       .createWithDefault(true)
 
+  val CLIENT_BATCH_REMOVE_EXPIRED_SHUFFLE: ConfigEntry[Boolean] =
+    
buildConf("celeborn.client.shuffle.batchHandleRemoveExpiredShuffles.enabled")
+      .categories("client")
+      .version("0.6.0")
+      .doc("This is an optimization on remove Expired Shuffles and it's true 
by default.")

Review Comment:
   It seems default with false value?



##########
master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java:
##########
@@ -844,6 +844,93 @@ public void testHandleUnRegisterShuffle() throws 
InterruptedException {
     Assert.assertTrue(STATUSSYSTEM3.registeredShuffle.isEmpty());
   }
 
+  @Test
+  public void testBatchHandleUnRegisterShuffle() throws InterruptedException {
+    AbstractMetaManager statusSystem = pickLeaderStatusSystem();
+    Assert.assertNotNull(statusSystem);
+
+    statusSystem.handleRegisterWorker(
+        HOSTNAME1,
+        RPCPORT1,
+        PUSHPORT1,
+        FETCHPORT1,
+        REPLICATEPORT1,
+        INTERNALPORT1,
+        NETWORK_LOCATION1,
+        disks1,
+        userResourceConsumption1,
+        getNewReqeustId());
+    statusSystem.handleRegisterWorker(
+        HOSTNAME2,
+        RPCPORT2,
+        PUSHPORT2,
+        FETCHPORT2,
+        REPLICATEPORT2,
+        INTERNALPORT2,
+        NETWORK_LOCATION2,
+        disks2,
+        userResourceConsumption2,
+        getNewReqeustId());
+    statusSystem.handleRegisterWorker(
+        HOSTNAME3,
+        RPCPORT3,
+        PUSHPORT3,
+        FETCHPORT3,
+        REPLICATEPORT3,
+        INTERNALPORT3,
+        NETWORK_LOCATION3,
+        disks3,
+        userResourceConsumption3,
+        getNewReqeustId());
+
+    WorkerInfo workerInfo1 =
+        new WorkerInfo(
+            HOSTNAME1,
+            RPCPORT1,
+            PUSHPORT1,
+            FETCHPORT1,
+            REPLICATEPORT1,
+            INTERNALPORT1,
+            disks1,
+            userResourceConsumption1);
+    WorkerInfo workerInfo2 =
+        new WorkerInfo(
+            HOSTNAME2,
+            RPCPORT2,
+            PUSHPORT2,
+            FETCHPORT2,
+            REPLICATEPORT2,
+            INTERNALPORT2,
+            disks2,
+            userResourceConsumption2);
+
+    Map<String, Map<String, Integer>> workersToAllocate = new HashMap<>();
+    Map<String, Integer> allocations = new HashMap<>();
+    allocations.put("disk1", 5);
+    workersToAllocate.put(workerInfo1.toUniqueId(), allocations);
+    workersToAllocate.put(workerInfo2.toUniqueId(), allocations);
+
+    List<String> shuffleKeys = new ArrayList<>();
+    for (int i = 1; i <= 3; i++) {
+      String shuffleKey = APPID1 + "-" + i;
+      shuffleKeys.add(shuffleKey);
+      statusSystem.handleRequestSlots(shuffleKey, HOSTNAME1, 
workersToAllocate, getNewReqeustId());
+    }
+
+    Thread.sleep(3000L);
+
+    Assert.assertEquals(3, STATUSSYSTEM1.registeredShuffle.size());
+    Assert.assertEquals(3, STATUSSYSTEM2.registeredShuffle.size());
+    Assert.assertEquals(3, STATUSSYSTEM3.registeredShuffle.size());
+
+    statusSystem.batchHandleUnRegisterShuffles(shuffleKeys, getNewReqeustId());

Review Comment:
   Pls test following two scenario 1. Unregister only one shuffleKey then check 
the result 2.Unregister all then check the result



##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -1042,6 +1042,7 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   def batchHandleChangePartitionNumThreads: Int = 
get(CLIENT_BATCH_HANDLE_CHANGE_PARTITION_THREADS)
   def batchHandleChangePartitionRequestInterval: Long =
     get(CLIENT_BATCH_HANDLE_CHANGE_PARTITION_INTERVAL)
+  def batchHandleRemoveExpiredShuffles: Boolean = 
get(CLIENT_BATCH_REMOVE_EXPIRED_SHUFFLE)

Review Comment:
   batchHandleRemoveExpiredShuffles -> batchHandleRemoveExpiredShufflesEnabled



##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -4476,6 +4477,14 @@ object CelebornConf extends Logging {
       .booleanConf
       .createWithDefault(true)
 
+  val CLIENT_BATCH_REMOVE_EXPIRED_SHUFFLE: ConfigEntry[Boolean] =

Review Comment:
   CLIENT_BATCH_REMOVE_EXPIRED_SHUFFLE -> 
CLIENT_BATCH_REMOVE_EXPIRED_SHUFFLE_ENABLED



##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -1589,10 +1592,23 @@ class LifecycleManager(val appUniqueId: String, val 
conf: CelebornConf) extends
         latestPartitionLocation.remove(shuffleId)
         commitManager.removeExpiredShuffle(shuffleId)
         changePartitionManager.removeExpiredShuffle(shuffleId)
-        val unregisterShuffleResponse = requestMasterUnregisterShuffle(
-          UnregisterShuffle(appUniqueId, shuffleId, 
MasterClient.genRequestId()))
-        // if unregister shuffle not success, wait next turn
-        if (StatusCode.SUCCESS == 
Utils.toStatusCode(unregisterShuffleResponse.getStatus)) {
+        if (!batchRemoveExpiredShuffles) {
+          val unregisterShuffleResponse = requestMasterUnregisterShuffle(
+            UnregisterShuffle(appUniqueId, shuffleId, 
MasterClient.genRequestId()))
+          // if unregister shuffle not success, wait next turn
+          if (StatusCode.SUCCESS == 
Utils.toStatusCode(unregisterShuffleResponse.getStatus)) {
+            unregisterShuffleTime.remove(shuffleId)
+          }
+        } else {
+          batchRemoveShuffleIds.add(shuffleId)
+        }
+      }
+    }
+    if (!batchRemoveShuffleIds.isEmpty) {
+      val unregisterShuffleResponse = batchRequestMasterUnregisterShuffles(
+        BatchUnregisterShuffles(appUniqueId, batchRemoveShuffleIds, 
MasterClient.genRequestId()))
+      if (StatusCode.SUCCESS == 
Utils.toStatusCode(unregisterShuffleResponse.getStatus)) {

Review Comment:
   should use batchRemoveShuffleIds to removed shuffleIds from 
unregisterShuffleTime



##########
master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java:
##########
@@ -118,6 +118,13 @@ public ResourceResponse 
handleWriteRequest(ResourceProtos.ResourceRequest reques
           metaSystem.updateUnregisterShuffleMeta(shuffleKey);
           break;
 
+        case BatchUnRegisterShuffle:
+          List<String> shuffleKeys =
+              request.getBatchUnregisterShuffleRequest().getShuffleKeysList();
+          metaSystem.registeredShuffle.removeAll(shuffleKeys);

Review Comment:
   why not extract a new method, such as 
`updateUnregisterShuffleMeta(shuffleKeys)` in AbstractMetaManager then 
Single/Ha Master will use same method to do batch unregister meta update.



##########
master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java:
##########
@@ -36,6 +36,8 @@ void handleRequestSlots(
 
   void handleUnRegisterShuffle(String shuffleKey, String requestId);
 
+  void batchHandleUnRegisterShuffles(List<String> shuffleKeys, String 
requestId);

Review Comment:
   May be use HandleBatchUnregisterShuffle? as the request is 
BatchUnregisterShuffleRequest



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to