This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new c45197c0c [CELEBORN-1843] Optimize roundrobin for more balanced disk
slot allocation
c45197c0c is described below
commit c45197c0c16b4041b09f5d7aece11ab43d2df3b8
Author: gaoyajun02 <[email protected]>
AuthorDate: Wed Feb 12 14:30:08 2025 +0800
[CELEBORN-1843] Optimize roundrobin for more balanced disk slot allocation
### What changes were proposed in this pull request?
This PR optimizes the RoundRobin algorithm to achieve a more balanced disk
slot allocation across workers.
Previously, when allocating 3000 partitions using RoundRobin, the slot
distribution across worker disks was [668, 666, 666], which resulted in one
disk having 2 more slots than the others.
After the optimization, the slot distribution is now [667, 667, 666],
ensuring a more balanced allocation.
### Why are the changes needed?
The changes are necessary to improve load balancing across worker disks,
reducing the risk of overloading a single disk. This ensures a more predictable
and fair distribution of slots, which can lead to better performance and
resource utilization.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT
Closes #3074 from gaoyajun02/1843.
Authored-by: gaoyajun02 <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../service/deploy/master/SlotsAllocator.java | 19 +++----
.../deploy/master/SlotsAllocatorSuiteJ.java | 63 ++++++++++++++++++----
2 files changed, 58 insertions(+), 24 deletions(-)
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
index 2cc91493c..d28d5d676 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
@@ -334,9 +334,8 @@ public class SlotsAllocator {
boolean shouldReplicate,
boolean shouldRackAware,
int availableStorageTypes) {
- // workerInfo -> (diskIndexForPrimary, diskIndexForReplica)
- Map<WorkerInfo, Integer> workerDiskIndexForPrimary = new HashMap<>();
- Map<WorkerInfo, Integer> workerDiskIndexForReplica = new HashMap<>();
+ // workerInfo -> (diskIndexForPrimaryAndReplica)
+ Map<WorkerInfo, Integer> workerDiskIndex = new HashMap<>();
List<Integer> partitionIdList = new LinkedList<>(partitionIds);
final int workerSize = workers.size();
@@ -361,11 +360,7 @@ public class SlotsAllocator {
}
storageInfo =
getStorageInfo(
- workers,
- nextPrimaryInd,
- slotsRestrictions,
- workerDiskIndexForPrimary,
- availableStorageTypes);
+ workers, nextPrimaryInd, slotsRestrictions, workerDiskIndex,
availableStorageTypes);
} else {
if (StorageInfo.localDiskAvailable(availableStorageTypes)) {
while (!workers.get(nextPrimaryInd).haveDisk()) {
@@ -376,8 +371,7 @@ public class SlotsAllocator {
}
}
storageInfo =
- getStorageInfo(
- workers, nextPrimaryInd, null, workerDiskIndexForPrimary,
availableStorageTypes);
+ getStorageInfo(workers, nextPrimaryInd, null, workerDiskIndex,
availableStorageTypes);
}
PartitionLocation primaryPartition =
createLocation(partitionId, workers.get(nextPrimaryInd), null,
storageInfo, true);
@@ -398,7 +392,7 @@ public class SlotsAllocator {
workers,
nextReplicaInd,
slotsRestrictions,
- workerDiskIndexForReplica,
+ workerDiskIndex,
availableStorageTypes);
} else if (shouldRackAware) {
while (nextReplicaInd == nextPrimaryInd
@@ -418,8 +412,7 @@ public class SlotsAllocator {
}
}
storageInfo =
- getStorageInfo(
- workers, nextReplicaInd, null, workerDiskIndexForReplica,
availableStorageTypes);
+ getStorageInfo(workers, nextReplicaInd, null, workerDiskIndex,
availableStorageTypes);
}
PartitionLocation replicaPartition =
createLocation(
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
index 0fb4c9d42..e5b7595a9 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
@@ -216,26 +216,54 @@ public class SlotsAllocatorSuiteJ {
check(workers, partitionIds, shouldReplicate, true);
}
+ @Test
+ public void testAllocate3000ReduceIdsWithReplicateOnRoundRobin() {
+ final List<WorkerInfo> workers = prepareWorkers(true);
+ final List<Integer> partitionIds = new ArrayList<>();
+ for (int i = 0; i < 3000; i++) {
+ partitionIds.add(i);
+ }
+ final boolean shouldReplicate = true;
+
+ check(workers, partitionIds, shouldReplicate, true, true);
+ }
+
private void check(
List<WorkerInfo> workers,
List<Integer> partitionIds,
boolean shouldReplicate,
boolean expectSuccess) {
+ check(workers, partitionIds, shouldReplicate, expectSuccess, false);
+ }
+
+ private void check(
+ List<WorkerInfo> workers,
+ List<Integer> partitionIds,
+ boolean shouldReplicate,
+ boolean expectSuccess,
+ boolean roundrobin) {
String shuffleKey = "appId-1";
CelebornConf conf = new CelebornConf();
conf.set(CelebornConf.MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_NUM().key(),
"2");
conf.set(CelebornConf.MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_GRADIENT().key(),
"1");
- Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>>
slots =
- SlotsAllocator.offerSlotsLoadAware(
- workers,
- partitionIds,
- shouldReplicate,
- false,
- conf.masterSlotAssignLoadAwareDiskGroupNum(),
- conf.masterSlotAssignLoadAwareDiskGroupGradient(),
- conf.masterSlotAssignLoadAwareFlushTimeWeight(),
- conf.masterSlotAssignLoadAwareFetchTimeWeight(),
- StorageInfo.ALL_TYPES_AVAILABLE_MASK);
+ Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>>
slots;
+ if (roundrobin) {
+ slots =
+ SlotsAllocator.offerSlotsRoundRobin(
+ workers, partitionIds, shouldReplicate, false,
StorageInfo.ALL_TYPES_AVAILABLE_MASK);
+ } else {
+ slots =
+ SlotsAllocator.offerSlotsLoadAware(
+ workers,
+ partitionIds,
+ shouldReplicate,
+ false,
+ conf.masterSlotAssignLoadAwareDiskGroupNum(),
+ conf.masterSlotAssignLoadAwareDiskGroupGradient(),
+ conf.masterSlotAssignLoadAwareFlushTimeWeight(),
+ conf.masterSlotAssignLoadAwareFetchTimeWeight(),
+ StorageInfo.ALL_TYPES_AVAILABLE_MASK);
+ }
if (expectSuccess) {
if (shouldReplicate) {
slots.forEach(
@@ -266,6 +294,19 @@ public class SlotsAllocatorSuiteJ {
if (allocationMap.containsKey("UNKNOWN_DISK")) {
unknownDiskSlots += allocationMap.get("UNKNOWN_DISK");
}
+ if (roundrobin && !allocationMap.isEmpty()) {
+ int maxSlots = Collections.max(allocationMap.values());
+ int minSlots = Collections.min(allocationMap.values());
+ assertTrue(
+ "Worker "
+ + worker.host()
+ + " has unbalanced slot allocation. "
+ + "Max: "
+ + maxSlots
+ + ", Min: "
+ + minSlots,
+ maxSlots - minSlots <= 1);
+ }
}
int allocateToDiskSlots = 0;
for (WorkerInfo worker : workers) {