This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 151fd3567 [CELEBORN-1923] Correct Celeborn available slots calculation
logic
151fd3567 is described below
commit 151fd35676e5e09cb975b4b8e084dc9201da01b3
Author: Angerszhuuuu <[email protected]>
AuthorDate: Sun Mar 23 15:27:06 2025 -0700
[CELEBORN-1923] Correct Celeborn available slots calculation logic
### What changes were proposed in this pull request?
Fix incorrect logic when calculate disk available slots
### Why are the changes needed?
Now we use `usableSize / estimatedPartitionSize = maxSlots`
Then `availableSlots = maxSlots - allocatedSlots`
But `availableSlots` should be `usableSize / estimizatedPartitionSize`
### Does this PR introduce _any_ user-facing change?
Yea
### How was this patch tested?
MT
Closes #3162 from AngersZhuuuu/CELEBORN-1923.
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
---
.../org/apache/celeborn/common/meta/DeviceInfo.scala | 6 ++++--
.../org/apache/celeborn/common/meta/WorkerInfo.scala | 15 +++++++++------
.../org/apache/celeborn/common/meta/WorkerInfoSuite.scala | 6 +++---
.../celeborn/service/deploy/master/SlotsAllocator.java | 15 ++++++++-------
.../deploy/master/clustermeta/AbstractMetaManager.java | 4 ++--
.../service/deploy/master/SlotsAllocatorSuiteJ.java | 2 +-
6 files changed, 27 insertions(+), 21 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
index de57b0630..928794fd9 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
@@ -84,6 +84,7 @@ class DiskInfo(
var totalSpace = 0L
var storageType: StorageInfo.Type = StorageInfo.Type.SSD
var maxSlots: Long = 0
+ var availableSlots: Long = 0
lazy val shuffleAllocations = new util.HashMap[String, Integer]()
lazy val applicationAllocations = new util.HashMap[String, Integer]()
@@ -123,8 +124,8 @@ class DiskInfo(
*
* @return the available slots of the disk.
*/
- def availableSlots(): Long = this.synchronized {
- math.max(maxSlots - activeSlots, 0L)
+ def getAvailableSlots(): Long = {
+ math.max(availableSlots, 0L)
}
def allocateSlots(shuffleKey: String, slots: Int): Unit = this.synchronized {
@@ -175,6 +176,7 @@ class DiskInfo(
override def toString: String = this.synchronized {
val (emptyShuffles, nonEmptyShuffles) =
shuffleAllocations.asScala.partition(_._2 == 0)
s"DiskInfo(maxSlots: $maxSlots," +
+ s" availableSlots: $availableSlots," +
s" committed shuffles ${emptyShuffles.size}," +
s" running applications ${applicationAllocations.size}," +
s" shuffleAllocations: ${nonEmptyShuffles.toMap}," +
diff --git
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
index 020f93d7d..71401735f 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
@@ -163,7 +163,7 @@ class WorkerInfo(
lazy val toUniqueId = s"$host:$rpcPort:$pushPort:$fetchPort:$replicatePort"
def slotAvailable(): Boolean = this.synchronized {
- diskInfos.asScala.exists { case (_, disk) => (disk.maxSlots -
disk.activeSlots) > 0 }
+ diskInfos.asScala.exists { case (_, disk) => (disk.availableSlots) > 0 }
}
def getTotalSlots(): Long = this.synchronized {
@@ -178,14 +178,15 @@ class WorkerInfo(
this.workerStatus = workerStatus;
}
- def updateDiskMaxSlots(estimatedPartitionSize: Long): Unit =
this.synchronized {
+ def updateDiskSlots(estimatedPartitionSize: Long): Unit = this.synchronized {
diskInfos.asScala.foreach { case (_, disk) =>
- disk.maxSlots_$eq(disk.actualUsableSpace / estimatedPartitionSize)
+ disk.maxSlots_$eq(disk.totalSpace / estimatedPartitionSize)
+ disk.availableSlots_$eq(disk.actualUsableSpace / estimatedPartitionSize)
}
}
def totalAvailableSlots(): Long = this.synchronized {
- diskInfos.asScala.map(_._2.availableSlots()).sum
+ diskInfos.asScala.map(_._2.getAvailableSlots()).sum
}
def totalSpace(): Long = this.synchronized {
@@ -212,12 +213,14 @@ class WorkerInfo(
curDisk.avgFlushTime = newDisk.avgFlushTime
curDisk.avgFetchTime = newDisk.avgFetchTime
if (estimatedPartitionSize.nonEmpty && curDisk.storageType !=
StorageInfo.Type.HDFS && curDisk.storageType != StorageInfo.Type.S3) {
- curDisk.maxSlots = curDisk.actualUsableSpace /
estimatedPartitionSize.get
+ curDisk.maxSlots = curDisk.totalSpace / estimatedPartitionSize.get
+ curDisk.availableSlots = curDisk.actualUsableSpace /
estimatedPartitionSize.get
}
curDisk.setStatus(newDisk.status)
} else {
if (estimatedPartitionSize.nonEmpty && newDisk.storageType !=
StorageInfo.Type.HDFS && newDisk.storageType != StorageInfo.Type.S3) {
- newDisk.maxSlots = newDisk.actualUsableSpace /
estimatedPartitionSize.get
+ newDisk.maxSlots = newDisk.totalSpace / estimatedPartitionSize.get
+ newDisk.availableSlots = newDisk.actualUsableSpace /
estimatedPartitionSize.get
}
diskInfos.put(mountPoint, newDisk)
}
diff --git
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
index e7d211205..2db11002b 100644
---
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
+++
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
@@ -355,9 +355,9 @@ class WorkerInfoSuite extends CelebornFunSuite {
|SlotsUsed: 60
|LastHeartbeat: 0
|Disks: $placeholder
- | DiskInfo0: DiskInfo(maxSlots: 0, committed shuffles 0, running
applications 0, shuffleAllocations: Map(), mountPoint: disk3, usableSpace:
2048.0 MiB, totalSpace: 2048.0 MiB, avgFlushTime: 3 ns, avgFetchTime: 3 ns,
activeSlots: 30, storageType: SSD) status: HEALTHY dirs $placeholder
- | DiskInfo1: DiskInfo(maxSlots: 0, committed shuffles 0, running
applications 0, shuffleAllocations: Map(), mountPoint: disk1, usableSpace:
2048.0 MiB, totalSpace: 2048.0 MiB, avgFlushTime: 1 ns, avgFetchTime: 1 ns,
activeSlots: 10, storageType: SSD) status: HEALTHY dirs $placeholder
- | DiskInfo2: DiskInfo(maxSlots: 0, committed shuffles 0, running
applications 0, shuffleAllocations: Map(), mountPoint: disk2, usableSpace:
2048.0 MiB, totalSpace: 2048.0 MiB, avgFlushTime: 2 ns, avgFetchTime: 2 ns,
activeSlots: 20, storageType: SSD) status: HEALTHY dirs $placeholder
+ | DiskInfo0: DiskInfo(maxSlots: 0, availableSlots: 0, committed
shuffles 0, running applications 0, shuffleAllocations: Map(), mountPoint:
disk3, usableSpace: 2048.0 MiB, totalSpace: 2048.0 MiB, avgFlushTime: 3 ns,
avgFetchTime: 3 ns, activeSlots: 30, storageType: SSD) status: HEALTHY dirs
$placeholder
+ | DiskInfo1: DiskInfo(maxSlots: 0, availableSlots: 0, committed
shuffles 0, running applications 0, shuffleAllocations: Map(), mountPoint:
disk1, usableSpace: 2048.0 MiB, totalSpace: 2048.0 MiB, avgFlushTime: 1 ns,
avgFetchTime: 1 ns, activeSlots: 10, storageType: SSD) status: HEALTHY dirs
$placeholder
+ | DiskInfo2: DiskInfo(maxSlots: 0, availableSlots: 0, committed
shuffles 0, running applications 0, shuffleAllocations: Map(), mountPoint:
disk2, usableSpace: 2048.0 MiB, totalSpace: 2048.0 MiB, avgFlushTime: 2 ns,
avgFetchTime: 2 ns, activeSlots: 20, storageType: SSD) status: HEALTHY dirs
$placeholder
|UserResourceConsumption: $placeholder
| UserIdentifier: `tenant1`.`name1`, ResourceConsumption:
ResourceConsumption(diskBytesWritten: 20.0 MiB, diskFileCount: 1,
hdfsBytesWritten: 50.0 MiB, hdfsFileCount: 1, subResourceConsumptions:
(application_1697697127390_2171854 -> ResourceConsumption(diskBytesWritten:
20.0 MiB, diskFileCount: 1, hdfsBytesWritten: 50.0 MiB, hdfsFileCount: 1,
subResourceConsumptions: empty)))
|WorkerRef: null
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
index d28d5d676..aab79cc2f 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
@@ -74,17 +74,17 @@ public class SlotsAllocator {
&& diskInfoEntry.getValue().storageType() !=
StorageInfo.Type.S3) {
usableDisks.add(
new UsableDiskInfo(
- diskInfoEntry.getValue(),
diskInfoEntry.getValue().availableSlots()));
+ diskInfoEntry.getValue(),
diskInfoEntry.getValue().getAvailableSlots()));
} else if (StorageInfo.HDFSAvailable(availableStorageTypes)
&& diskInfoEntry.getValue().storageType() ==
StorageInfo.Type.HDFS) {
usableDisks.add(
new UsableDiskInfo(
- diskInfoEntry.getValue(),
diskInfoEntry.getValue().availableSlots()));
+ diskInfoEntry.getValue(),
diskInfoEntry.getValue().getAvailableSlots()));
} else if (StorageInfo.S3Available(availableStorageTypes)
&& diskInfoEntry.getValue().storageType() ==
StorageInfo.Type.S3) {
usableDisks.add(
new UsableDiskInfo(
- diskInfoEntry.getValue(),
diskInfoEntry.getValue().availableSlots()));
+ diskInfoEntry.getValue(),
diskInfoEntry.getValue().getAvailableSlots()));
}
}
}
@@ -151,7 +151,8 @@ public class SlotsAllocator {
|| (shouldReplicate
&& (usableDisks.size() == 1
||
usableDisks.stream().map(diskToWorkerMap::get).distinct().count() <= 1));
- boolean noAvailableSlots =
usableDisks.stream().mapToLong(DiskInfo::availableSlots).sum() <= 0;
+ boolean noAvailableSlots =
+ usableDisks.stream().mapToLong(DiskInfo::getAvailableSlots).sum() <= 0;
if (noUsableDisks || noAvailableSlots) {
logger.warn(
@@ -511,7 +512,7 @@ public class SlotsAllocator {
long[] groupAvailableSlots = new long[groupSize];
for (int i = 0; i < groupSize; i++) {
for (DiskInfo disk : groups.get(i)) {
- groupAvailableSlots[i] += disk.availableSlots();
+ groupAvailableSlots[i] += disk.getAvailableSlots();
}
}
double[] currentAllocation = new double[groupSize];
@@ -557,8 +558,8 @@ public class SlotsAllocator {
restrictions.computeIfAbsent(diskWorkerMap.get(disk), v -> new
ArrayList<>());
long allocated =
(int) Math.ceil((groupAllocations[i] + groupLeft) / (double)
disksInsideGroup);
- if (allocated > disk.availableSlots()) {
- allocated = disk.availableSlots();
+ if (allocated > disk.getAvailableSlots()) {
+ allocated = disk.getAvailableSlots();
}
if (allocated > groupRequired) {
allocated = groupRequired;
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index cf37796bf..a794e059c 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -330,7 +330,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
} else {
workerInfo.networkLocation_$eq(rackResolver.resolve(host).getNetworkLocation());
}
- workerInfo.updateDiskMaxSlots(estimatedPartitionSize);
+ workerInfo.updateDiskSlots(estimatedPartitionSize);
synchronized (workersMap) {
workersMap.putIfAbsent(workerInfo.toUniqueId(), workerInfo);
shutdownWorkers.remove(workerInfo);
@@ -573,7 +573,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
HashSet<WorkerInfo> workers = new HashSet(workersMap.values());
excludedWorkers.forEach(workers::remove);
manuallyExcludedWorkers.forEach(workers::remove);
- workers.forEach(workerInfo ->
workerInfo.updateDiskMaxSlots(estimatedPartitionSize));
+ workers.forEach(workerInfo ->
workerInfo.updateDiskSlots(estimatedPartitionSize));
}
private boolean isWorkerAvailable(WorkerInfo workerInfo) {
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
index e5b7595a9..78f7727bf 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
@@ -465,7 +465,7 @@ public class SlotsAllocatorSuiteJ {
final List<WorkerInfo> workers = prepareWorkers(true);
// Simulates no available slots behavior with greatly changed
estimatedPartitionSize for workers
// with usable disks.
- workers.forEach(workerInfo ->
workerInfo.updateDiskMaxSlots(Long.MAX_VALUE));
+ workers.forEach(workerInfo -> workerInfo.updateDiskSlots(Long.MAX_VALUE));
final List<Integer> partitionIds = Collections.singletonList(0);
final boolean shouldReplicate = false;