This is an automated email from the ASF dual-hosted git repository. angerszhuuuu pushed a commit to branch CELEBORN-1923 in repository https://gitbox.apache.org/repos/asf/celeborn.git
commit 9cfa648802213989d1065eb7e1549d86621ee851 Author: Angerszhuuuu <[email protected]> AuthorDate: Thu Mar 20 18:16:44 2025 +0800 CELEBORN-1923 Celeborn calculate available slots using incorrect logic --- .../org/apache/celeborn/common/meta/DeviceInfo.scala | 6 ++++-- .../org/apache/celeborn/common/meta/WorkerInfo.scala | 15 +++++++++------ .../celeborn/service/deploy/master/SlotsAllocator.java | 14 +++++++------- .../deploy/master/clustermeta/AbstractMetaManager.java | 4 ++-- .../service/deploy/master/SlotsAllocatorSuiteJ.java | 2 +- 5 files changed, 23 insertions(+), 18 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala index de57b0630..bff17f9a4 100644 --- a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala +++ b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala @@ -84,6 +84,7 @@ class DiskInfo( var totalSpace = 0L var storageType: StorageInfo.Type = StorageInfo.Type.SSD var maxSlots: Long = 0 + var availableSlots: Long = 0 lazy val shuffleAllocations = new util.HashMap[String, Integer]() lazy val applicationAllocations = new util.HashMap[String, Integer]() @@ -123,8 +124,8 @@ class DiskInfo( * * @return the available slots of the disk. */ - def availableSlots(): Long = this.synchronized { - math.max(maxSlots - activeSlots, 0L) + def getAvailableSlots(): Long = this.synchronized { + math.max(availableSlots, 0L) } def allocateSlots(shuffleKey: String, slots: Int): Unit = this.synchronized { @@ -175,6 +176,7 @@ class DiskInfo( override def toString: String = this.synchronized { val (emptyShuffles, nonEmptyShuffles) = shuffleAllocations.asScala.partition(_._2 == 0) s"DiskInfo(maxSlots: $maxSlots," + + s" availableSlots: $availableSlots," + s" committed shuffles ${emptyShuffles.size}," + s" running applications ${applicationAllocations.size}," + s" shuffleAllocations: ${nonEmptyShuffles.toMap}," + diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala index 020f93d7d..71401735f 100644 --- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala +++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala @@ -163,7 +163,7 @@ class WorkerInfo( lazy val toUniqueId = s"$host:$rpcPort:$pushPort:$fetchPort:$replicatePort" def slotAvailable(): Boolean = this.synchronized { - diskInfos.asScala.exists { case (_, disk) => (disk.maxSlots - disk.activeSlots) > 0 } + diskInfos.asScala.exists { case (_, disk) => (disk.availableSlots) > 0 } } def getTotalSlots(): Long = this.synchronized { @@ -178,14 +178,15 @@ class WorkerInfo( this.workerStatus = workerStatus; } - def updateDiskMaxSlots(estimatedPartitionSize: Long): Unit = this.synchronized { + def updateDiskSlots(estimatedPartitionSize: Long): Unit = this.synchronized { diskInfos.asScala.foreach { case (_, disk) => - disk.maxSlots_$eq(disk.actualUsableSpace / estimatedPartitionSize) + disk.maxSlots_$eq(disk.totalSpace / estimatedPartitionSize) + disk.availableSlots_$eq(disk.actualUsableSpace / estimatedPartitionSize) } } def totalAvailableSlots(): Long = this.synchronized { - diskInfos.asScala.map(_._2.availableSlots()).sum + diskInfos.asScala.map(_._2.getAvailableSlots()).sum } def totalSpace(): Long = this.synchronized { @@ -212,12 +213,14 @@ class WorkerInfo( curDisk.avgFlushTime = newDisk.avgFlushTime curDisk.avgFetchTime = newDisk.avgFetchTime if (estimatedPartitionSize.nonEmpty && curDisk.storageType != StorageInfo.Type.HDFS && curDisk.storageType != StorageInfo.Type.S3) { - curDisk.maxSlots = curDisk.actualUsableSpace / estimatedPartitionSize.get + curDisk.maxSlots = curDisk.totalSpace / estimatedPartitionSize.get + curDisk.availableSlots = curDisk.actualUsableSpace / estimatedPartitionSize.get } curDisk.setStatus(newDisk.status) } else { if (estimatedPartitionSize.nonEmpty && newDisk.storageType != StorageInfo.Type.HDFS && newDisk.storageType != StorageInfo.Type.S3) { - newDisk.maxSlots = newDisk.actualUsableSpace / estimatedPartitionSize.get + newDisk.maxSlots = newDisk.totalSpace / estimatedPartitionSize.get + newDisk.availableSlots = newDisk.actualUsableSpace / estimatedPartitionSize.get } diskInfos.put(mountPoint, newDisk) } diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java index d28d5d676..abb8fb592 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java @@ -74,17 +74,17 @@ public class SlotsAllocator { && diskInfoEntry.getValue().storageType() != StorageInfo.Type.S3) { usableDisks.add( new UsableDiskInfo( - diskInfoEntry.getValue(), diskInfoEntry.getValue().availableSlots())); + diskInfoEntry.getValue(), diskInfoEntry.getValue().getAvailableSlots())); } else if (StorageInfo.HDFSAvailable(availableStorageTypes) && diskInfoEntry.getValue().storageType() == StorageInfo.Type.HDFS) { usableDisks.add( new UsableDiskInfo( - diskInfoEntry.getValue(), diskInfoEntry.getValue().availableSlots())); + diskInfoEntry.getValue(), diskInfoEntry.getValue().getAvailableSlots())); } else if (StorageInfo.S3Available(availableStorageTypes) && diskInfoEntry.getValue().storageType() == StorageInfo.Type.S3) { usableDisks.add( new UsableDiskInfo( - diskInfoEntry.getValue(), diskInfoEntry.getValue().availableSlots())); + diskInfoEntry.getValue(), diskInfoEntry.getValue().getAvailableSlots())); } } } @@ -151,7 +151,7 @@ public class SlotsAllocator { || (shouldReplicate && (usableDisks.size() == 1 || usableDisks.stream().map(diskToWorkerMap::get).distinct().count() <= 1)); - boolean noAvailableSlots = usableDisks.stream().mapToLong(DiskInfo::availableSlots).sum() <= 0; + boolean noAvailableSlots = usableDisks.stream().mapToLong(DiskInfo::getAvailableSlots).sum() <= 0; if (noUsableDisks || noAvailableSlots) { logger.warn( @@ -511,7 +511,7 @@ public class SlotsAllocator { long[] groupAvailableSlots = new long[groupSize]; for (int i = 0; i < groupSize; i++) { for (DiskInfo disk : groups.get(i)) { - groupAvailableSlots[i] += disk.availableSlots(); + groupAvailableSlots[i] += disk.getAvailableSlots(); } } double[] currentAllocation = new double[groupSize]; @@ -557,8 +557,8 @@ public class SlotsAllocator { restrictions.computeIfAbsent(diskWorkerMap.get(disk), v -> new ArrayList<>()); long allocated = (int) Math.ceil((groupAllocations[i] + groupLeft) / (double) disksInsideGroup); - if (allocated > disk.availableSlots()) { - allocated = disk.availableSlots(); + if (allocated > disk.getAvailableSlots()) { + allocated = disk.getAvailableSlots(); } if (allocated > groupRequired) { allocated = groupRequired; diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java index cf37796bf..a794e059c 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java @@ -330,7 +330,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler { } else { workerInfo.networkLocation_$eq(rackResolver.resolve(host).getNetworkLocation()); } - workerInfo.updateDiskMaxSlots(estimatedPartitionSize); + workerInfo.updateDiskSlots(estimatedPartitionSize); synchronized (workersMap) { workersMap.putIfAbsent(workerInfo.toUniqueId(), workerInfo); shutdownWorkers.remove(workerInfo); @@ -573,7 +573,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler { HashSet<WorkerInfo> workers = new HashSet(workersMap.values()); excludedWorkers.forEach(workers::remove); manuallyExcludedWorkers.forEach(workers::remove); - workers.forEach(workerInfo -> workerInfo.updateDiskMaxSlots(estimatedPartitionSize)); + workers.forEach(workerInfo -> workerInfo.updateDiskSlots(estimatedPartitionSize)); } private boolean isWorkerAvailable(WorkerInfo workerInfo) { diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java index e5b7595a9..78f7727bf 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java @@ -465,7 +465,7 @@ public class SlotsAllocatorSuiteJ { final List<WorkerInfo> workers = prepareWorkers(true); // Simulates no available slots behavior with greatly changed estimatedPartitionSize for workers // with usable disks. - workers.forEach(workerInfo -> workerInfo.updateDiskMaxSlots(Long.MAX_VALUE)); + workers.forEach(workerInfo -> workerInfo.updateDiskSlots(Long.MAX_VALUE)); final List<Integer> partitionIds = Collections.singletonList(0); final boolean shouldReplicate = false;
