This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new d2befe033 [CELEBORN-2008] SlotsAllocator should select disks randomly
in RoundRobin mode
d2befe033 is described below
commit d2befe0334d8701a57d932af8853933f2f96d7d0
Author: Xianming Lei <[email protected]>
AuthorDate: Thu May 22 17:18:42 2025 -0700
[CELEBORN-2008] SlotsAllocator should select disks randomly in RoundRobin
mode
### What changes were proposed in this pull request?
SlotsAllocator should select disks randomly in RoundRobin mode
### Why are the changes needed?
The current round robin selection mechanism is to select the first disk of
each worker first, then the second disk of each worker, and finally the third
disk. This can easily cause disk storage space skew. We should select disks
randomly instead of selecting the first disk first.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs.
Closes #3275 from leixm/CELEBORN-2008.
Authored-by: Xianming Lei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
---
.../org/apache/celeborn/service/deploy/master/SlotsAllocator.java | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
index 608c3e7bd..5e35d881d 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
@@ -194,9 +194,11 @@ public class SlotsAllocator {
int availableStorageTypes) {
WorkerInfo selectedWorker = workers.get(workerIndex);
StorageInfo storageInfo;
- int diskIndex = workerDiskIndex.computeIfAbsent(selectedWorker, v -> 0);
if (restrictions != null) {
List<UsableDiskInfo> usableDiskInfos = restrictions.get(selectedWorker);
+ int diskIndex =
+ workerDiskIndex.computeIfAbsent(
+ selectedWorker, v -> rand.nextInt(usableDiskInfos.size()));
while (usableDiskInfos.get(diskIndex).usableSlots <= 0) {
diskIndex = (diskIndex + 1) % usableDiskInfos.size();
}
@@ -225,6 +227,8 @@ public class SlotsAllocator {
.filter(p -> p.storageType() != StorageInfo.Type.OSS)
.collect(Collectors.toList())
.toArray(new DiskInfo[0]);
+ int diskIndex =
+ workerDiskIndex.computeIfAbsent(selectedWorker, v ->
rand.nextInt(diskInfos.length));
storageInfo =
new StorageInfo(
diskInfos[diskIndex].mountPoint(),