zaynt4606 commented on code in PR #2852:
URL: https://github.com/apache/celeborn/pull/2852#discussion_r1825554691


##########
client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala:
##########
@@ -291,42 +292,71 @@ class ChangePartitionManager(
     }
 
     val candidates = new util.HashSet[WorkerInfo]()
-    if (clientShuffleDynamicResourceEnabled) {
-      // availableWorkers wont filter excludedWorkers in heartBeat So have to 
do filtering.
-      candidates.addAll(lifecycleManager
-        .workerStatusTracker
-        .availableWorkersWithEndpoint
-        .values()
+
+    val snapshotCandidates =
+      lifecycleManager
+        .workerSnapshots(shuffleId)
+        .keySet()
         .asScala
-        .toSet
         .filter(lifecycleManager.workerStatusTracker.workerAvailable)
-        .asJava)
-
-      // SetupEndpoint for those availableWorkers without endpoint
-      val workersRequireEndpoints = new util.HashSet[WorkerInfo](
-        
lifecycleManager.workerStatusTracker.availableWorkersWithoutEndpoint.asScala.filter(
-          lifecycleManager.workerStatusTracker.workerAvailable).asJava)
-      val connectFailedWorkers = new ShuffleFailedWorkers()
-      lifecycleManager.setupEndpoints(
-        workersRequireEndpoints,
-        shuffleId,
-        connectFailedWorkers)
-      
workersRequireEndpoints.removeAll(connectFailedWorkers.asScala.keys.toList.asJava)
-      candidates.addAll(workersRequireEndpoints)
-
-      // Update worker status
-      
lifecycleManager.workerStatusTracker.addWorkersWithEndpoint(workersRequireEndpoints)
-      
lifecycleManager.workerStatusTracker.recordWorkerFailure(connectFailedWorkers)
-      
lifecycleManager.workerStatusTracker.removeFromExcludedWorkers(candidates)
-    } else {
-      val snapshotCandidates =
-        lifecycleManager
-          .workerSnapshots(shuffleId)
-          .keySet()
+        .asJava
+    candidates.addAll(snapshotCandidates)
+
+    if (clientShuffleDynamicResourceEnabled) {
+      val shuffleAllocatedWorkers = 
lifecycleManager.workerSnapshots(shuffleId).size()
+      val unavailableWorkerRatio = 1 - (snapshotCandidates.size * 1.0 / 
shuffleAllocatedWorkers)
+      if (candidates.size < 1 || (pushReplicateEnabled && candidates.size < 2)
+        || (unavailableWorkerRatio >= clientShuffleDynamicResourceFactor)) {
+
+        val numPartitions = 
lifecycleManager.latestPartitionLocation.get(shuffleId).size()
+        val ids = new util.ArrayList((0 until 
numPartitions).map(Integer.valueOf).asJava)
+        val requestSlotsRes = 
lifecycleManager.requestMasterRequestSlotsWithRetry(shuffleId, ids)
+

Review Comment:
   There are no changes to the requestSlots process in the master. Anything 
else to notice?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to