This is an automated email from the ASF dual-hosted git repository.

angerszhuuuu pushed a commit to branch CELEBORN-1923
in repository https://gitbox.apache.org/repos/asf/celeborn.git

commit 9cfa648802213989d1065eb7e1549d86621ee851
Author: Angerszhuuuu <[email protected]>
AuthorDate: Thu Mar 20 18:16:44 2025 +0800

    CELEBORN-1923 Celeborn calculate available slots using incorrect logic
---
 .../org/apache/celeborn/common/meta/DeviceInfo.scala      |  6 ++++--
 .../org/apache/celeborn/common/meta/WorkerInfo.scala      | 15 +++++++++------
 .../celeborn/service/deploy/master/SlotsAllocator.java    | 14 +++++++-------
 .../deploy/master/clustermeta/AbstractMetaManager.java    |  4 ++--
 .../service/deploy/master/SlotsAllocatorSuiteJ.java       |  2 +-
 5 files changed, 23 insertions(+), 18 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala 
b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
index de57b0630..bff17f9a4 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
@@ -84,6 +84,7 @@ class DiskInfo(
   var totalSpace = 0L
   var storageType: StorageInfo.Type = StorageInfo.Type.SSD
   var maxSlots: Long = 0
+  var availableSlots: Long = 0
   lazy val shuffleAllocations = new util.HashMap[String, Integer]()
   lazy val applicationAllocations = new util.HashMap[String, Integer]()
 
@@ -123,8 +124,8 @@ class DiskInfo(
    *
    * @return the available slots of the disk.
    */
-  def availableSlots(): Long = this.synchronized {
-    math.max(maxSlots - activeSlots, 0L)
+  def getAvailableSlots(): Long = this.synchronized {
+      math.max(availableSlots, 0L)
   }
 
   def allocateSlots(shuffleKey: String, slots: Int): Unit = this.synchronized {
@@ -175,6 +176,7 @@ class DiskInfo(
   override def toString: String = this.synchronized {
     val (emptyShuffles, nonEmptyShuffles) = 
shuffleAllocations.asScala.partition(_._2 == 0)
     s"DiskInfo(maxSlots: $maxSlots," +
+      s" availableSlots: $availableSlots," +
       s" committed shuffles ${emptyShuffles.size}," +
       s" running applications ${applicationAllocations.size}," +
       s" shuffleAllocations: ${nonEmptyShuffles.toMap}," +
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala 
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
index 020f93d7d..71401735f 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
@@ -163,7 +163,7 @@ class WorkerInfo(
   lazy val toUniqueId = s"$host:$rpcPort:$pushPort:$fetchPort:$replicatePort"
 
   def slotAvailable(): Boolean = this.synchronized {
-    diskInfos.asScala.exists { case (_, disk) => (disk.maxSlots - 
disk.activeSlots) > 0 }
+    diskInfos.asScala.exists { case (_, disk) => (disk.availableSlots) > 0 }
   }
 
   def getTotalSlots(): Long = this.synchronized {
@@ -178,14 +178,15 @@ class WorkerInfo(
     this.workerStatus = workerStatus;
   }
 
-  def updateDiskMaxSlots(estimatedPartitionSize: Long): Unit = 
this.synchronized {
+  def updateDiskSlots(estimatedPartitionSize: Long): Unit = this.synchronized {
     diskInfos.asScala.foreach { case (_, disk) =>
-      disk.maxSlots_$eq(disk.actualUsableSpace / estimatedPartitionSize)
+      disk.maxSlots_$eq(disk.totalSpace / estimatedPartitionSize)
+      disk.availableSlots_$eq(disk.actualUsableSpace / estimatedPartitionSize)
     }
   }
 
   def totalAvailableSlots(): Long = this.synchronized {
-    diskInfos.asScala.map(_._2.availableSlots()).sum
+    diskInfos.asScala.map(_._2.getAvailableSlots()).sum
   }
 
   def totalSpace(): Long = this.synchronized {
@@ -212,12 +213,14 @@ class WorkerInfo(
           curDisk.avgFlushTime = newDisk.avgFlushTime
           curDisk.avgFetchTime = newDisk.avgFetchTime
           if (estimatedPartitionSize.nonEmpty && curDisk.storageType != 
StorageInfo.Type.HDFS && curDisk.storageType != StorageInfo.Type.S3) {
-            curDisk.maxSlots = curDisk.actualUsableSpace / 
estimatedPartitionSize.get
+            curDisk.maxSlots = curDisk.totalSpace / estimatedPartitionSize.get
+            curDisk.availableSlots = curDisk.actualUsableSpace / 
estimatedPartitionSize.get
           }
           curDisk.setStatus(newDisk.status)
         } else {
           if (estimatedPartitionSize.nonEmpty && newDisk.storageType != 
StorageInfo.Type.HDFS && newDisk.storageType != StorageInfo.Type.S3) {
-            newDisk.maxSlots = newDisk.actualUsableSpace / 
estimatedPartitionSize.get
+            newDisk.maxSlots = newDisk.totalSpace / estimatedPartitionSize.get
+            newDisk.availableSlots = newDisk.actualUsableSpace / 
estimatedPartitionSize.get
           }
           diskInfos.put(mountPoint, newDisk)
         }
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
index d28d5d676..abb8fb592 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
@@ -74,17 +74,17 @@ public class SlotsAllocator {
               && diskInfoEntry.getValue().storageType() != 
StorageInfo.Type.S3) {
             usableDisks.add(
                 new UsableDiskInfo(
-                    diskInfoEntry.getValue(), 
diskInfoEntry.getValue().availableSlots()));
+                    diskInfoEntry.getValue(), 
diskInfoEntry.getValue().getAvailableSlots()));
           } else if (StorageInfo.HDFSAvailable(availableStorageTypes)
               && diskInfoEntry.getValue().storageType() == 
StorageInfo.Type.HDFS) {
             usableDisks.add(
                 new UsableDiskInfo(
-                    diskInfoEntry.getValue(), 
diskInfoEntry.getValue().availableSlots()));
+                    diskInfoEntry.getValue(), 
diskInfoEntry.getValue().getAvailableSlots()));
           } else if (StorageInfo.S3Available(availableStorageTypes)
               && diskInfoEntry.getValue().storageType() == 
StorageInfo.Type.S3) {
             usableDisks.add(
                 new UsableDiskInfo(
-                    diskInfoEntry.getValue(), 
diskInfoEntry.getValue().availableSlots()));
+                    diskInfoEntry.getValue(), 
diskInfoEntry.getValue().getAvailableSlots()));
           }
         }
       }
@@ -151,7 +151,7 @@ public class SlotsAllocator {
             || (shouldReplicate
                 && (usableDisks.size() == 1
                     || 
usableDisks.stream().map(diskToWorkerMap::get).distinct().count() <= 1));
-    boolean noAvailableSlots = 
usableDisks.stream().mapToLong(DiskInfo::availableSlots).sum() <= 0;
+    boolean noAvailableSlots = 
usableDisks.stream().mapToLong(DiskInfo::getAvailableSlots).sum() <= 0;
 
     if (noUsableDisks || noAvailableSlots) {
       logger.warn(
@@ -511,7 +511,7 @@ public class SlotsAllocator {
     long[] groupAvailableSlots = new long[groupSize];
     for (int i = 0; i < groupSize; i++) {
       for (DiskInfo disk : groups.get(i)) {
-        groupAvailableSlots[i] += disk.availableSlots();
+        groupAvailableSlots[i] += disk.getAvailableSlots();
       }
     }
     double[] currentAllocation = new double[groupSize];
@@ -557,8 +557,8 @@ public class SlotsAllocator {
             restrictions.computeIfAbsent(diskWorkerMap.get(disk), v -> new 
ArrayList<>());
         long allocated =
             (int) Math.ceil((groupAllocations[i] + groupLeft) / (double) 
disksInsideGroup);
-        if (allocated > disk.availableSlots()) {
-          allocated = disk.availableSlots();
+        if (allocated > disk.getAvailableSlots()) {
+          allocated = disk.getAvailableSlots();
         }
         if (allocated > groupRequired) {
           allocated = groupRequired;
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index cf37796bf..a794e059c 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -330,7 +330,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     } else {
       
workerInfo.networkLocation_$eq(rackResolver.resolve(host).getNetworkLocation());
     }
-    workerInfo.updateDiskMaxSlots(estimatedPartitionSize);
+    workerInfo.updateDiskSlots(estimatedPartitionSize);
     synchronized (workersMap) {
       workersMap.putIfAbsent(workerInfo.toUniqueId(), workerInfo);
       shutdownWorkers.remove(workerInfo);
@@ -573,7 +573,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     HashSet<WorkerInfo> workers = new HashSet(workersMap.values());
     excludedWorkers.forEach(workers::remove);
     manuallyExcludedWorkers.forEach(workers::remove);
-    workers.forEach(workerInfo -> 
workerInfo.updateDiskMaxSlots(estimatedPartitionSize));
+    workers.forEach(workerInfo -> 
workerInfo.updateDiskSlots(estimatedPartitionSize));
   }
 
   private boolean isWorkerAvailable(WorkerInfo workerInfo) {
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
index e5b7595a9..78f7727bf 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
@@ -465,7 +465,7 @@ public class SlotsAllocatorSuiteJ {
     final List<WorkerInfo> workers = prepareWorkers(true);
     // Simulates no available slots behavior with greatly changed 
estimatedPartitionSize for workers
     // with usable disks.
-    workers.forEach(workerInfo -> 
workerInfo.updateDiskMaxSlots(Long.MAX_VALUE));
+    workers.forEach(workerInfo -> workerInfo.updateDiskSlots(Long.MAX_VALUE));
     final List<Integer> partitionIds = Collections.singletonList(0);
     final boolean shouldReplicate = false;
 

Reply via email to