This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.5 by this push:
new e6ef9bd57 [CELEBORN-2008] SlotsAllocator should select disks randomly
in RoundRobin mode
e6ef9bd57 is described below
commit e6ef9bd575f1825b6146a70ea9ff5ac8dbad89e9
Author: Xianming Lei <[email protected]>
AuthorDate: Thu May 22 17:18:42 2025 -0700
[CELEBORN-2008] SlotsAllocator should select disks randomly in RoundRobin
mode
### What changes were proposed in this pull request?
SlotsAllocator should select disks randomly in RoundRobin mode
### Why are the changes needed?
The current round robin selection mechanism is to select the first disk of
each worker first, then the second disk of each worker, and finally the third
disk. This can easily cause disk storage space skew. We should select disks
randomly instead of selecting the first disk first.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs.
Closes #3275 from leixm/CELEBORN-2008.
Authored-by: Xianming Lei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
(cherry picked from commit d2befe0334d8701a57d932af8853933f2f96d7d0)
Signed-off-by: Wang, Fei <[email protected]>
---
.../org/apache/celeborn/service/deploy/master/SlotsAllocator.java | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
index 787aa5178..f6a039dc7 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
@@ -178,9 +178,11 @@ public class SlotsAllocator {
int availableStorageTypes) {
WorkerInfo selectedWorker = workers.get(workerIndex);
StorageInfo storageInfo;
- int diskIndex = workerDiskIndex.computeIfAbsent(selectedWorker, v -> 0);
if (restrictions != null) {
List<UsableDiskInfo> usableDiskInfos = restrictions.get(selectedWorker);
+ int diskIndex =
+ workerDiskIndex.computeIfAbsent(
+ selectedWorker, v -> rand.nextInt(usableDiskInfos.size()));
while (usableDiskInfos.get(diskIndex).usableSlots <= 0) {
diskIndex = (diskIndex + 1) % usableDiskInfos.size();
}
@@ -203,6 +205,8 @@ public class SlotsAllocator {
.filter(p -> p.storageType() != StorageInfo.Type.HDFS)
.collect(Collectors.toList())
.toArray(new DiskInfo[0]);
+ int diskIndex =
+ workerDiskIndex.computeIfAbsent(selectedWorker, v ->
rand.nextInt(diskInfos.length));
storageInfo =
new StorageInfo(
diskInfos[diskIndex].mountPoint(),