This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new 71ead8784 [CELEBORN-872][MASTER] Extract the same allocation logic for 
both loadaware and roundrobin
71ead8784 is described below

commit 71ead8784734eb8fc830cbdeb8d700c70f3808d7
Author: zwangsheng <[email protected]>
AuthorDate: Thu Aug 3 20:14:45 2023 +0800

    [CELEBORN-872][MASTER] Extract the same allocation logic for both loadaware 
and roundrobin
    
    ### What changes were proposed in this pull request?
    As title
    
    ### Why are the changes needed?
    Reduce duplicate code segments, improve code readability and maintenance 
difficulty.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Unit Test
    
    Closes #1786 from zwangsheng/CELEBORN-872.
    
    Authored-by: zwangsheng <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../service/deploy/master/SlotsAllocator.java      | 74 +++++++++++-----------
 1 file changed, 36 insertions(+), 38 deletions(-)

diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
index 7d9ffab37..10fb31824 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
@@ -60,8 +60,6 @@ public class SlotsAllocator {
     if (workers.size() < 2 && shouldReplicate) {
       return new HashMap<>();
     }
-    Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>> 
slots =
-        new HashMap<>();
     Map<WorkerInfo, List<UsableDiskInfo>> restrictions = new HashMap<>();
     for (WorkerInfo worker : workers) {
       List<UsableDiskInfo> usableDisks =
@@ -74,16 +72,7 @@ public class SlotsAllocator {
         }
       }
     }
-    List<Integer> remain =
-        roundRobin(slots, partitionIds, workers, restrictions, 
shouldReplicate, shouldRackAware);
-    if (!remain.isEmpty()) {
-      remain = roundRobin(slots, remain, workers, null, shouldReplicate, 
shouldRackAware);
-    }
-
-    if (!remain.isEmpty()) {
-      roundRobin(slots, remain, workers, null, shouldReplicate, false);
-    }
-    return slots;
+    return locateSlots(partitionIds, workers, restrictions, shouldReplicate, 
shouldRackAware);
   }
 
   /**
@@ -139,36 +128,12 @@ public class SlotsAllocator {
       initLoadAwareAlgorithm(diskGroupCount, diskGroupGradient);
     }
 
-    Map<WorkerInfo, List<UsableDiskInfo>> restriction =
+    Map<WorkerInfo, List<UsableDiskInfo>> restrictions =
         getRestriction(
             placeDisksToGroups(usableDisks, diskGroupCount, flushTimeWeight, 
fetchTimeWeight),
             diskToWorkerMap,
             shouldReplicate ? partitionIds.size() * 2 : partitionIds.size());
-
-    Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>> 
slots =
-        new HashMap<>();
-    List<Integer> remainPartitions =
-        roundRobin(
-            slots,
-            partitionIds,
-            new ArrayList<>(restriction.keySet()),
-            restriction,
-            shouldReplicate,
-            shouldRackAware);
-    if (!remainPartitions.isEmpty()) {
-      remainPartitions =
-          roundRobin(
-              slots,
-              remainPartitions,
-              new ArrayList<>(workers),
-              null,
-              shouldReplicate,
-              shouldRackAware);
-    }
-    if (!remainPartitions.isEmpty()) {
-      roundRobin(slots, remainPartitions, new ArrayList<>(workers), null, 
shouldReplicate, false);
-    }
-    return slots;
+    return locateSlots(partitionIds, workers, restrictions, shouldReplicate, 
shouldRackAware);
   }
 
   private static StorageInfo getStorageInfo(
@@ -188,6 +153,39 @@ public class SlotsAllocator {
     return storageInfo;
   }
 
+  /**
+   * Progressive locate slots for all partitions <br>
+   * 1. try to allocate for all partitions under restrictions <br>
+   * 2. allocate remain partitions to all workers <br>
+   * 3. allocate remain partitions to all workers again without considering 
rack aware <br>
+   */
+  private static Map<WorkerInfo, Tuple2<List<PartitionLocation>, 
List<PartitionLocation>>>
+      locateSlots(
+          List<Integer> partitionIds,
+          List<WorkerInfo> workers,
+          Map<WorkerInfo, List<UsableDiskInfo>> restrictions,
+          boolean shouldReplicate,
+          boolean shouldRackAware) {
+    Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>> 
slots =
+        new HashMap<>();
+
+    List<Integer> remain =
+        roundRobin(
+            slots,
+            partitionIds,
+            new LinkedList<>(restrictions.keySet()),
+            restrictions,
+            shouldReplicate,
+            shouldRackAware);
+    if (!remain.isEmpty()) {
+      remain = roundRobin(slots, remain, workers, null, shouldReplicate, 
shouldRackAware);
+    }
+    if (!remain.isEmpty()) {
+      roundRobin(slots, remain, workers, null, shouldReplicate, false);
+    }
+    return slots;
+  }
+
   private static List<Integer> roundRobin(
       Map<WorkerInfo, Tuple2<List<PartitionLocation>, 
List<PartitionLocation>>> slots,
       List<Integer> partitionIds,

Reply via email to