This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new c45197c0c [CELEBORN-1843] Optimize roundrobin for more balanced disk 
slot allocation
c45197c0c is described below

commit c45197c0c16b4041b09f5d7aece11ab43d2df3b8
Author: gaoyajun02 <[email protected]>
AuthorDate: Wed Feb 12 14:30:08 2025 +0800

    [CELEBORN-1843] Optimize roundrobin for more balanced disk slot allocation
    
    ### What changes were proposed in this pull request?
    
    This PR optimizes the RoundRobin algorithm to achieve a more balanced disk 
slot allocation across workers.
    Previously, when allocating 3000 partitions using RoundRobin, the slot 
distribution across worker disks was [668, 666, 666], which resulted in one 
disk having 2 more slots than the others.
    After the optimization, the slot distribution is now [667, 667, 666], 
ensuring a more balanced allocation.
    
    ### Why are the changes needed?
    
    The changes are necessary to improve load balancing across worker disks, 
reducing the risk of overloading a single disk. This ensures a more predictable 
and fair distribution of slots, which can lead to better performance and 
resource utilization.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    UT
    
    Closes #3074 from gaoyajun02/1843.
    
    Authored-by: gaoyajun02 <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../service/deploy/master/SlotsAllocator.java      | 19 +++----
 .../deploy/master/SlotsAllocatorSuiteJ.java        | 63 ++++++++++++++++++----
 2 files changed, 58 insertions(+), 24 deletions(-)

diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
index 2cc91493c..d28d5d676 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
@@ -334,9 +334,8 @@ public class SlotsAllocator {
       boolean shouldReplicate,
       boolean shouldRackAware,
       int availableStorageTypes) {
-    // workerInfo -> (diskIndexForPrimary, diskIndexForReplica)
-    Map<WorkerInfo, Integer> workerDiskIndexForPrimary = new HashMap<>();
-    Map<WorkerInfo, Integer> workerDiskIndexForReplica = new HashMap<>();
+    // workerInfo -> (diskIndexForPrimaryAndReplica)
+    Map<WorkerInfo, Integer> workerDiskIndex = new HashMap<>();
     List<Integer> partitionIdList = new LinkedList<>(partitionIds);
 
     final int workerSize = workers.size();
@@ -361,11 +360,7 @@ public class SlotsAllocator {
         }
         storageInfo =
             getStorageInfo(
-                workers,
-                nextPrimaryInd,
-                slotsRestrictions,
-                workerDiskIndexForPrimary,
-                availableStorageTypes);
+                workers, nextPrimaryInd, slotsRestrictions, workerDiskIndex, 
availableStorageTypes);
       } else {
         if (StorageInfo.localDiskAvailable(availableStorageTypes)) {
           while (!workers.get(nextPrimaryInd).haveDisk()) {
@@ -376,8 +371,7 @@ public class SlotsAllocator {
           }
         }
         storageInfo =
-            getStorageInfo(
-                workers, nextPrimaryInd, null, workerDiskIndexForPrimary, 
availableStorageTypes);
+            getStorageInfo(workers, nextPrimaryInd, null, workerDiskIndex, 
availableStorageTypes);
       }
       PartitionLocation primaryPartition =
           createLocation(partitionId, workers.get(nextPrimaryInd), null, 
storageInfo, true);
@@ -398,7 +392,7 @@ public class SlotsAllocator {
                   workers,
                   nextReplicaInd,
                   slotsRestrictions,
-                  workerDiskIndexForReplica,
+                  workerDiskIndex,
                   availableStorageTypes);
         } else if (shouldRackAware) {
           while (nextReplicaInd == nextPrimaryInd
@@ -418,8 +412,7 @@ public class SlotsAllocator {
             }
           }
           storageInfo =
-              getStorageInfo(
-                  workers, nextReplicaInd, null, workerDiskIndexForReplica, 
availableStorageTypes);
+              getStorageInfo(workers, nextReplicaInd, null, workerDiskIndex, 
availableStorageTypes);
         }
         PartitionLocation replicaPartition =
             createLocation(
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
index 0fb4c9d42..e5b7595a9 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
@@ -216,26 +216,54 @@ public class SlotsAllocatorSuiteJ {
     check(workers, partitionIds, shouldReplicate, true);
   }
 
+  @Test
+  public void testAllocate3000ReduceIdsWithReplicateOnRoundRobin() {
+    final List<WorkerInfo> workers = prepareWorkers(true);
+    final List<Integer> partitionIds = new ArrayList<>();
+    for (int i = 0; i < 3000; i++) {
+      partitionIds.add(i);
+    }
+    final boolean shouldReplicate = true;
+
+    check(workers, partitionIds, shouldReplicate, true, true);
+  }
+
   private void check(
       List<WorkerInfo> workers,
       List<Integer> partitionIds,
       boolean shouldReplicate,
       boolean expectSuccess) {
+    check(workers, partitionIds, shouldReplicate, expectSuccess, false);
+  }
+
+  private void check(
+      List<WorkerInfo> workers,
+      List<Integer> partitionIds,
+      boolean shouldReplicate,
+      boolean expectSuccess,
+      boolean roundrobin) {
     String shuffleKey = "appId-1";
     CelebornConf conf = new CelebornConf();
     conf.set(CelebornConf.MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_NUM().key(), 
"2");
     
conf.set(CelebornConf.MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_GRADIENT().key(), 
"1");
-    Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>> 
slots =
-        SlotsAllocator.offerSlotsLoadAware(
-            workers,
-            partitionIds,
-            shouldReplicate,
-            false,
-            conf.masterSlotAssignLoadAwareDiskGroupNum(),
-            conf.masterSlotAssignLoadAwareDiskGroupGradient(),
-            conf.masterSlotAssignLoadAwareFlushTimeWeight(),
-            conf.masterSlotAssignLoadAwareFetchTimeWeight(),
-            StorageInfo.ALL_TYPES_AVAILABLE_MASK);
+    Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>> 
slots;
+    if (roundrobin) {
+      slots =
+          SlotsAllocator.offerSlotsRoundRobin(
+              workers, partitionIds, shouldReplicate, false, 
StorageInfo.ALL_TYPES_AVAILABLE_MASK);
+    } else {
+      slots =
+          SlotsAllocator.offerSlotsLoadAware(
+              workers,
+              partitionIds,
+              shouldReplicate,
+              false,
+              conf.masterSlotAssignLoadAwareDiskGroupNum(),
+              conf.masterSlotAssignLoadAwareDiskGroupGradient(),
+              conf.masterSlotAssignLoadAwareFlushTimeWeight(),
+              conf.masterSlotAssignLoadAwareFetchTimeWeight(),
+              StorageInfo.ALL_TYPES_AVAILABLE_MASK);
+    }
     if (expectSuccess) {
       if (shouldReplicate) {
         slots.forEach(
@@ -266,6 +294,19 @@ public class SlotsAllocatorSuiteJ {
         if (allocationMap.containsKey("UNKNOWN_DISK")) {
           unknownDiskSlots += allocationMap.get("UNKNOWN_DISK");
         }
+        if (roundrobin && !allocationMap.isEmpty()) {
+          int maxSlots = Collections.max(allocationMap.values());
+          int minSlots = Collections.min(allocationMap.values());
+          assertTrue(
+              "Worker "
+                  + worker.host()
+                  + " has unbalanced slot allocation. "
+                  + "Max: "
+                  + maxSlots
+                  + ", Min: "
+                  + minSlots,
+              maxSlots - minSlots <= 1);
+        }
       }
       int allocateToDiskSlots = 0;
       for (WorkerInfo worker : workers) {

Reply via email to