This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new 330c49837 [CELEBORN-1034] Offer slots uses random range of available 
workers instead of shuffling
330c49837 is described below

commit 330c49837deb347da1f0decd7e7bd6f2460c9b2a
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Wed Oct 18 17:00:03 2023 +0800

    [CELEBORN-1034] Offer slots uses random range of available workers instead 
of shuffling
    
    ### What changes were proposed in this pull request?
    In original design, (primary worker, replica worker) pairs tends to stay 
stable, for example,
    for primary PartitionLocations on Worker A, their replica 
PartitionLocations will all be on
    Worker B in general scenarios, i.e. all workers are healthy and works well. 
This way, one Worker
    will have only one (or very few) connections to other workers' replicate 
netty server.
    
    However, https://github.com/apache/incubator-celeborn/pull/1790 calls 
`Collections.shuffle(availableWorkers)`,
    causing the number of replica connections increases dramatically:
    
![image](https://github.com/apache/incubator-celeborn/assets/948245/013c7bc8-a224-413e-9c0c-519ae76c9d32)
    
    ### Why are the changes needed?
    This PR refine the logic of selecting limited number of workers, instead of 
shuffling,
    Master just randomly picks a range of available workers.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Manual test.
    
    Closes #1975 from waitinfuture/1034.
    
    Lead-authored-by: zky.zhoukeyong <[email protected]>
    Co-authored-by: Keyong Zhou <[email protected]>
    Signed-off-by: mingji <[email protected]>
    (cherry picked from commit a5dfd67d5b9bcb7d5da59f441ed1d60b4bc27cd3)
    Signed-off-by: mingji <[email protected]>
---
 .../celeborn/service/deploy/master/Master.scala     | 21 +++++++++++++++------
 1 file changed, 15 insertions(+), 6 deletions(-)

diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 79e1b9a37..b134781b8 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -615,22 +615,31 @@ private[celeborn] class Master(
     val numReducers = requestSlots.partitionIdList.size()
     val shuffleKey = Utils.makeShuffleKey(requestSlots.applicationId, 
requestSlots.shuffleId)
 
-    var availableWorkers = workersAvailable()
-    Collections.shuffle(availableWorkers)
+    val availableWorkers = workersAvailable()
+    val numAvailableWorkers = availableWorkers.size()
     val numWorkers = Math.min(
       Math.max(
         if (requestSlots.shouldReplicate) 2 else 1,
         if (requestSlots.maxWorkers <= 0) slotsAssignMaxWorkers
         else Math.min(slotsAssignMaxWorkers, requestSlots.maxWorkers)),
-      availableWorkers.size())
-    availableWorkers = availableWorkers.subList(0, numWorkers)
+      numAvailableWorkers)
+    val startIndex = Random.nextInt(numAvailableWorkers)
+    val selectedWorkers = new util.ArrayList[WorkerInfo](numWorkers)
+    selectedWorkers.addAll(availableWorkers.subList(
+      startIndex,
+      Math.min(numAvailableWorkers, startIndex + numWorkers)))
+    if (startIndex + numWorkers > numAvailableWorkers) {
+      selectedWorkers.addAll(availableWorkers.subList(
+        0,
+        startIndex + numWorkers - numAvailableWorkers))
+    }
     // offer slots
     val slots =
       masterSource.sample(MasterSource.OFFER_SLOTS_TIME, 
s"offerSlots-${Random.nextInt()}") {
         statusSystem.workers.synchronized {
           if (slotsAssignPolicy == SlotsAssignPolicy.LOADAWARE && 
!hasHDFSStorage) {
             SlotsAllocator.offerSlotsLoadAware(
-              availableWorkers,
+              selectedWorkers,
               requestSlots.partitionIdList,
               requestSlots.shouldReplicate,
               requestSlots.shouldRackAware,
@@ -641,7 +650,7 @@ private[celeborn] class Master(
               loadAwareFetchTimeWeight)
           } else {
             SlotsAllocator.offerSlotsRoundRobin(
-              availableWorkers,
+              selectedWorkers,
               requestSlots.partitionIdList,
               requestSlots.shouldReplicate,
               requestSlots.shouldRackAware)

Reply via email to