This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new a5dfd67d5 [CELEBORN-1034] Offer slots uses random range of available 
workers instead of shuffling
a5dfd67d5 is described below

commit a5dfd67d5b9bcb7d5da59f441ed1d60b4bc27cd3
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Wed Oct 18 17:00:03 2023 +0800

    [CELEBORN-1034] Offer slots uses random range of available workers instead 
of shuffling
    
    ### What changes were proposed in this pull request?
    In original design, (primary worker, replica worker) pairs tends to stay 
stable, for example,
    for primary PartitionLocations on Worker A, their replica 
PartitionLocations will all be on
    Worker B in general scenarios, i.e. all workers are healthy and works well. 
This way, one Worker
    will have only one (or very few) connections to other workers' replicate 
netty server.
    
    However, https://github.com/apache/incubator-celeborn/pull/1790 calls 
`Collections.shuffle(availableWorkers)`,
    causing the number of replica connections increases dramatically:
    
![image](https://github.com/apache/incubator-celeborn/assets/948245/013c7bc8-a224-413e-9c0c-519ae76c9d32)
    
    ### Why are the changes needed?
    This PR refine the logic of selecting limited number of workers, instead of 
shuffling,
    Master just randomly picks a range of available workers.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Manual test.
    
    Closes #1975 from waitinfuture/1034.
    
    Lead-authored-by: zky.zhoukeyong <[email protected]>
    Co-authored-by: Keyong Zhou <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../celeborn/service/deploy/master/Master.scala     | 21 +++++++++++++++------
 1 file changed, 15 insertions(+), 6 deletions(-)

diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 18a5b7560..a2226b6b0 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -615,22 +615,31 @@ private[celeborn] class Master(
     val numReducers = requestSlots.partitionIdList.size()
     val shuffleKey = Utils.makeShuffleKey(requestSlots.applicationId, 
requestSlots.shuffleId)
 
-    var availableWorkers = workersAvailable()
-    Collections.shuffle(availableWorkers)
+    val availableWorkers = workersAvailable()
+    val numAvailableWorkers = availableWorkers.size()
     val numWorkers = Math.min(
       Math.max(
         if (requestSlots.shouldReplicate) 2 else 1,
         if (requestSlots.maxWorkers <= 0) slotsAssignMaxWorkers
         else Math.min(slotsAssignMaxWorkers, requestSlots.maxWorkers)),
-      availableWorkers.size())
-    availableWorkers = availableWorkers.subList(0, numWorkers)
+      numAvailableWorkers)
+    val startIndex = Random.nextInt(numAvailableWorkers)
+    val selectedWorkers = new util.ArrayList[WorkerInfo](numWorkers)
+    selectedWorkers.addAll(availableWorkers.subList(
+      startIndex,
+      Math.min(numAvailableWorkers, startIndex + numWorkers)))
+    if (startIndex + numWorkers > numAvailableWorkers) {
+      selectedWorkers.addAll(availableWorkers.subList(
+        0,
+        startIndex + numWorkers - numAvailableWorkers))
+    }
     // offer slots
     val slots =
       masterSource.sample(MasterSource.OFFER_SLOTS_TIME, 
s"offerSlots-${Random.nextInt()}") {
         statusSystem.workers.synchronized {
           if (slotsAssignPolicy == SlotsAssignPolicy.LOADAWARE && 
!hasHDFSStorage) {
             SlotsAllocator.offerSlotsLoadAware(
-              availableWorkers,
+              selectedWorkers,
               requestSlots.partitionIdList,
               requestSlots.shouldReplicate,
               requestSlots.shouldRackAware,
@@ -641,7 +650,7 @@ private[celeborn] class Master(
               loadAwareFetchTimeWeight)
           } else {
             SlotsAllocator.offerSlotsRoundRobin(
-              availableWorkers,
+              selectedWorkers,
               requestSlots.partitionIdList,
               requestSlots.shouldReplicate,
               requestSlots.shouldRackAware)

Reply via email to