This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 151fd3567 [CELEBORN-1923] Correct Celeborn available slots calculation 
logic
151fd3567 is described below

commit 151fd35676e5e09cb975b4b8e084dc9201da01b3
Author: Angerszhuuuu <[email protected]>
AuthorDate: Sun Mar 23 15:27:06 2025 -0700

    [CELEBORN-1923] Correct Celeborn available slots calculation logic
    
    ### What changes were proposed in this pull request?
    Fix incorrect logic when calculate disk available slots
    
    ### Why are the changes needed?
    Now we use `usableSize / estimatedPartitionSize = maxSlots`
    Then `availableSlots = maxSlots - allocatedSlots`
    But `availableSlots` should be `usableSize / estimizatedPartitionSize`
    
    ### Does this PR introduce _any_ user-facing change?
    Yea
    
    ### How was this patch tested?
    MT
    
    Closes #3162 from AngersZhuuuu/CELEBORN-1923.
    
    Authored-by: Angerszhuuuu <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../org/apache/celeborn/common/meta/DeviceInfo.scala      |  6 ++++--
 .../org/apache/celeborn/common/meta/WorkerInfo.scala      | 15 +++++++++------
 .../org/apache/celeborn/common/meta/WorkerInfoSuite.scala |  6 +++---
 .../celeborn/service/deploy/master/SlotsAllocator.java    | 15 ++++++++-------
 .../deploy/master/clustermeta/AbstractMetaManager.java    |  4 ++--
 .../service/deploy/master/SlotsAllocatorSuiteJ.java       |  2 +-
 6 files changed, 27 insertions(+), 21 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala 
b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
index de57b0630..928794fd9 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
@@ -84,6 +84,7 @@ class DiskInfo(
   var totalSpace = 0L
   var storageType: StorageInfo.Type = StorageInfo.Type.SSD
   var maxSlots: Long = 0
+  var availableSlots: Long = 0
   lazy val shuffleAllocations = new util.HashMap[String, Integer]()
   lazy val applicationAllocations = new util.HashMap[String, Integer]()
 
@@ -123,8 +124,8 @@ class DiskInfo(
    *
    * @return the available slots of the disk.
    */
-  def availableSlots(): Long = this.synchronized {
-    math.max(maxSlots - activeSlots, 0L)
+  def getAvailableSlots(): Long = {
+    math.max(availableSlots, 0L)
   }
 
   def allocateSlots(shuffleKey: String, slots: Int): Unit = this.synchronized {
@@ -175,6 +176,7 @@ class DiskInfo(
   override def toString: String = this.synchronized {
     val (emptyShuffles, nonEmptyShuffles) = 
shuffleAllocations.asScala.partition(_._2 == 0)
     s"DiskInfo(maxSlots: $maxSlots," +
+      s" availableSlots: $availableSlots," +
       s" committed shuffles ${emptyShuffles.size}," +
       s" running applications ${applicationAllocations.size}," +
       s" shuffleAllocations: ${nonEmptyShuffles.toMap}," +
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala 
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
index 020f93d7d..71401735f 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
@@ -163,7 +163,7 @@ class WorkerInfo(
   lazy val toUniqueId = s"$host:$rpcPort:$pushPort:$fetchPort:$replicatePort"
 
   def slotAvailable(): Boolean = this.synchronized {
-    diskInfos.asScala.exists { case (_, disk) => (disk.maxSlots - 
disk.activeSlots) > 0 }
+    diskInfos.asScala.exists { case (_, disk) => (disk.availableSlots) > 0 }
   }
 
   def getTotalSlots(): Long = this.synchronized {
@@ -178,14 +178,15 @@ class WorkerInfo(
     this.workerStatus = workerStatus;
   }
 
-  def updateDiskMaxSlots(estimatedPartitionSize: Long): Unit = 
this.synchronized {
+  def updateDiskSlots(estimatedPartitionSize: Long): Unit = this.synchronized {
     diskInfos.asScala.foreach { case (_, disk) =>
-      disk.maxSlots_$eq(disk.actualUsableSpace / estimatedPartitionSize)
+      disk.maxSlots_$eq(disk.totalSpace / estimatedPartitionSize)
+      disk.availableSlots_$eq(disk.actualUsableSpace / estimatedPartitionSize)
     }
   }
 
   def totalAvailableSlots(): Long = this.synchronized {
-    diskInfos.asScala.map(_._2.availableSlots()).sum
+    diskInfos.asScala.map(_._2.getAvailableSlots()).sum
   }
 
   def totalSpace(): Long = this.synchronized {
@@ -212,12 +213,14 @@ class WorkerInfo(
           curDisk.avgFlushTime = newDisk.avgFlushTime
           curDisk.avgFetchTime = newDisk.avgFetchTime
           if (estimatedPartitionSize.nonEmpty && curDisk.storageType != 
StorageInfo.Type.HDFS && curDisk.storageType != StorageInfo.Type.S3) {
-            curDisk.maxSlots = curDisk.actualUsableSpace / 
estimatedPartitionSize.get
+            curDisk.maxSlots = curDisk.totalSpace / estimatedPartitionSize.get
+            curDisk.availableSlots = curDisk.actualUsableSpace / 
estimatedPartitionSize.get
           }
           curDisk.setStatus(newDisk.status)
         } else {
           if (estimatedPartitionSize.nonEmpty && newDisk.storageType != 
StorageInfo.Type.HDFS && newDisk.storageType != StorageInfo.Type.S3) {
-            newDisk.maxSlots = newDisk.actualUsableSpace / 
estimatedPartitionSize.get
+            newDisk.maxSlots = newDisk.totalSpace / estimatedPartitionSize.get
+            newDisk.availableSlots = newDisk.actualUsableSpace / 
estimatedPartitionSize.get
           }
           diskInfos.put(mountPoint, newDisk)
         }
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala 
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
index e7d211205..2db11002b 100644
--- 
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
+++ 
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
@@ -355,9 +355,9 @@ class WorkerInfoSuite extends CelebornFunSuite {
            |SlotsUsed: 60
            |LastHeartbeat: 0
            |Disks: $placeholder
-           |  DiskInfo0: DiskInfo(maxSlots: 0, committed shuffles 0, running 
applications 0, shuffleAllocations: Map(), mountPoint: disk3, usableSpace: 
2048.0 MiB, totalSpace: 2048.0 MiB, avgFlushTime: 3 ns, avgFetchTime: 3 ns, 
activeSlots: 30, storageType: SSD) status: HEALTHY dirs $placeholder
-           |  DiskInfo1: DiskInfo(maxSlots: 0, committed shuffles 0, running 
applications 0, shuffleAllocations: Map(), mountPoint: disk1, usableSpace: 
2048.0 MiB, totalSpace: 2048.0 MiB, avgFlushTime: 1 ns, avgFetchTime: 1 ns, 
activeSlots: 10, storageType: SSD) status: HEALTHY dirs $placeholder
-           |  DiskInfo2: DiskInfo(maxSlots: 0, committed shuffles 0, running 
applications 0, shuffleAllocations: Map(), mountPoint: disk2, usableSpace: 
2048.0 MiB, totalSpace: 2048.0 MiB, avgFlushTime: 2 ns, avgFetchTime: 2 ns, 
activeSlots: 20, storageType: SSD) status: HEALTHY dirs $placeholder
+           |  DiskInfo0: DiskInfo(maxSlots: 0, availableSlots: 0, committed 
shuffles 0, running applications 0, shuffleAllocations: Map(), mountPoint: 
disk3, usableSpace: 2048.0 MiB, totalSpace: 2048.0 MiB, avgFlushTime: 3 ns, 
avgFetchTime: 3 ns, activeSlots: 30, storageType: SSD) status: HEALTHY dirs 
$placeholder
+           |  DiskInfo1: DiskInfo(maxSlots: 0, availableSlots: 0, committed 
shuffles 0, running applications 0, shuffleAllocations: Map(), mountPoint: 
disk1, usableSpace: 2048.0 MiB, totalSpace: 2048.0 MiB, avgFlushTime: 1 ns, 
avgFetchTime: 1 ns, activeSlots: 10, storageType: SSD) status: HEALTHY dirs 
$placeholder
+           |  DiskInfo2: DiskInfo(maxSlots: 0, availableSlots: 0, committed 
shuffles 0, running applications 0, shuffleAllocations: Map(), mountPoint: 
disk2, usableSpace: 2048.0 MiB, totalSpace: 2048.0 MiB, avgFlushTime: 2 ns, 
avgFetchTime: 2 ns, activeSlots: 20, storageType: SSD) status: HEALTHY dirs 
$placeholder
            |UserResourceConsumption: $placeholder
            |  UserIdentifier: `tenant1`.`name1`, ResourceConsumption: 
ResourceConsumption(diskBytesWritten: 20.0 MiB, diskFileCount: 1, 
hdfsBytesWritten: 50.0 MiB, hdfsFileCount: 1, subResourceConsumptions: 
(application_1697697127390_2171854 -> ResourceConsumption(diskBytesWritten: 
20.0 MiB, diskFileCount: 1, hdfsBytesWritten: 50.0 MiB, hdfsFileCount: 1, 
subResourceConsumptions: empty)))
            |WorkerRef: null
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
index d28d5d676..aab79cc2f 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
@@ -74,17 +74,17 @@ public class SlotsAllocator {
               && diskInfoEntry.getValue().storageType() != 
StorageInfo.Type.S3) {
             usableDisks.add(
                 new UsableDiskInfo(
-                    diskInfoEntry.getValue(), 
diskInfoEntry.getValue().availableSlots()));
+                    diskInfoEntry.getValue(), 
diskInfoEntry.getValue().getAvailableSlots()));
           } else if (StorageInfo.HDFSAvailable(availableStorageTypes)
               && diskInfoEntry.getValue().storageType() == 
StorageInfo.Type.HDFS) {
             usableDisks.add(
                 new UsableDiskInfo(
-                    diskInfoEntry.getValue(), 
diskInfoEntry.getValue().availableSlots()));
+                    diskInfoEntry.getValue(), 
diskInfoEntry.getValue().getAvailableSlots()));
           } else if (StorageInfo.S3Available(availableStorageTypes)
               && diskInfoEntry.getValue().storageType() == 
StorageInfo.Type.S3) {
             usableDisks.add(
                 new UsableDiskInfo(
-                    diskInfoEntry.getValue(), 
diskInfoEntry.getValue().availableSlots()));
+                    diskInfoEntry.getValue(), 
diskInfoEntry.getValue().getAvailableSlots()));
           }
         }
       }
@@ -151,7 +151,8 @@ public class SlotsAllocator {
             || (shouldReplicate
                 && (usableDisks.size() == 1
                     || 
usableDisks.stream().map(diskToWorkerMap::get).distinct().count() <= 1));
-    boolean noAvailableSlots = 
usableDisks.stream().mapToLong(DiskInfo::availableSlots).sum() <= 0;
+    boolean noAvailableSlots =
+        usableDisks.stream().mapToLong(DiskInfo::getAvailableSlots).sum() <= 0;
 
     if (noUsableDisks || noAvailableSlots) {
       logger.warn(
@@ -511,7 +512,7 @@ public class SlotsAllocator {
     long[] groupAvailableSlots = new long[groupSize];
     for (int i = 0; i < groupSize; i++) {
       for (DiskInfo disk : groups.get(i)) {
-        groupAvailableSlots[i] += disk.availableSlots();
+        groupAvailableSlots[i] += disk.getAvailableSlots();
       }
     }
     double[] currentAllocation = new double[groupSize];
@@ -557,8 +558,8 @@ public class SlotsAllocator {
             restrictions.computeIfAbsent(diskWorkerMap.get(disk), v -> new 
ArrayList<>());
         long allocated =
             (int) Math.ceil((groupAllocations[i] + groupLeft) / (double) 
disksInsideGroup);
-        if (allocated > disk.availableSlots()) {
-          allocated = disk.availableSlots();
+        if (allocated > disk.getAvailableSlots()) {
+          allocated = disk.getAvailableSlots();
         }
         if (allocated > groupRequired) {
           allocated = groupRequired;
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index cf37796bf..a794e059c 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -330,7 +330,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     } else {
       
workerInfo.networkLocation_$eq(rackResolver.resolve(host).getNetworkLocation());
     }
-    workerInfo.updateDiskMaxSlots(estimatedPartitionSize);
+    workerInfo.updateDiskSlots(estimatedPartitionSize);
     synchronized (workersMap) {
       workersMap.putIfAbsent(workerInfo.toUniqueId(), workerInfo);
       shutdownWorkers.remove(workerInfo);
@@ -573,7 +573,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     HashSet<WorkerInfo> workers = new HashSet(workersMap.values());
     excludedWorkers.forEach(workers::remove);
     manuallyExcludedWorkers.forEach(workers::remove);
-    workers.forEach(workerInfo -> 
workerInfo.updateDiskMaxSlots(estimatedPartitionSize));
+    workers.forEach(workerInfo -> 
workerInfo.updateDiskSlots(estimatedPartitionSize));
   }
 
   private boolean isWorkerAvailable(WorkerInfo workerInfo) {
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
index e5b7595a9..78f7727bf 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
@@ -465,7 +465,7 @@ public class SlotsAllocatorSuiteJ {
     final List<WorkerInfo> workers = prepareWorkers(true);
     // Simulates no available slots behavior with greatly changed 
estimatedPartitionSize for workers
     // with usable disks.
-    workers.forEach(workerInfo -> 
workerInfo.updateDiskMaxSlots(Long.MAX_VALUE));
+    workers.forEach(workerInfo -> workerInfo.updateDiskSlots(Long.MAX_VALUE));
     final List<Integer> partitionIds = Collections.singletonList(0);
     final boolean shouldReplicate = false;
 

Reply via email to