zaynt4606 commented on code in PR #2852: URL: https://github.com/apache/celeborn/pull/2852#discussion_r1825554691
########## client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala: ########## @@ -291,42 +292,71 @@ class ChangePartitionManager( } val candidates = new util.HashSet[WorkerInfo]() - if (clientShuffleDynamicResourceEnabled) { - // availableWorkers wont filter excludedWorkers in heartBeat So have to do filtering. - candidates.addAll(lifecycleManager - .workerStatusTracker - .availableWorkersWithEndpoint - .values() + + val snapshotCandidates = + lifecycleManager + .workerSnapshots(shuffleId) + .keySet() .asScala - .toSet .filter(lifecycleManager.workerStatusTracker.workerAvailable) - .asJava) - - // SetupEndpoint for those availableWorkers without endpoint - val workersRequireEndpoints = new util.HashSet[WorkerInfo]( - lifecycleManager.workerStatusTracker.availableWorkersWithoutEndpoint.asScala.filter( - lifecycleManager.workerStatusTracker.workerAvailable).asJava) - val connectFailedWorkers = new ShuffleFailedWorkers() - lifecycleManager.setupEndpoints( - workersRequireEndpoints, - shuffleId, - connectFailedWorkers) - workersRequireEndpoints.removeAll(connectFailedWorkers.asScala.keys.toList.asJava) - candidates.addAll(workersRequireEndpoints) - - // Update worker status - lifecycleManager.workerStatusTracker.addWorkersWithEndpoint(workersRequireEndpoints) - lifecycleManager.workerStatusTracker.recordWorkerFailure(connectFailedWorkers) - lifecycleManager.workerStatusTracker.removeFromExcludedWorkers(candidates) - } else { - val snapshotCandidates = - lifecycleManager - .workerSnapshots(shuffleId) - .keySet() + .asJava + candidates.addAll(snapshotCandidates) + + if (clientShuffleDynamicResourceEnabled) { + val shuffleAllocatedWorkers = lifecycleManager.workerSnapshots(shuffleId).size() + val unavailableWorkerRatio = 1 - (snapshotCandidates.size * 1.0 / shuffleAllocatedWorkers) + if (candidates.size < 1 || (pushReplicateEnabled && candidates.size < 2) + || (unavailableWorkerRatio >= clientShuffleDynamicResourceFactor)) { + + val numPartitions = lifecycleManager.latestPartitionLocation.get(shuffleId).size() + val ids = new util.ArrayList((0 until numPartitions).map(Integer.valueOf).asJava) + val requestSlotsRes = lifecycleManager.requestMasterRequestSlotsWithRetry(shuffleId, ids) + Review Comment: There are no changes to the requestSlots process in the master. Anything else to notice? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@celeborn.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org