This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new e588c80e3 [CELEBORN-1136] Support policy for master to assign slots 
fallback to roundrobin with no available slots
e588c80e3 is described below

commit e588c80e331f4047f96d216441fe759374c6f5dd
Author: SteNicholas <[email protected]>
AuthorDate: Wed Nov 22 14:08:06 2023 +0800

    [CELEBORN-1136] Support policy for master to assign slots fallback to 
roundrobin with no available slots
    
    `SlotsAllocator` supports policy for master to assign slots fallback to 
roundrobin with no available slots.
    
    When the selected workers have no available slots, the loadaware policy 
could throw `MasterNotLeaderException`. It's recommended to support policy for 
master to assign slots fallback to roundrobin with no available slots. 
Meanwhile, the situation that there is no available slots would occur when the 
partition size has increased a lot in a short period of time.
    ```
    Caused by: org.apache.celeborn.common.haclient.MasterNotLeaderException: 
Master:xx.xx.xx.xx:9099 is not the leader. Suggested leader is 
Master:xx.xx.xx.xx:9099. Exception:bound must be positive.
        at 
org.apache.celeborn.service.deploy.master.clustermeta.ha.HAHelper.sendFailure(HAHelper.java:58)
        at 
org.apache.celeborn.service.deploy.master.Master.executeWithLeaderChecker(Master.scala:236)
        at 
org.apache.celeborn.service.deploy.master.Master$$anonfun$receiveAndReply$1.applyOrElse(Master.scala:314)
        ... 7 more
    Caused by: java.lang.IllegalArgumentException: bound must be positive
        at java.util.Random.nextInt(Random.java:388)
        at 
org.apache.celeborn.service.deploy.master.SlotsAllocator.roundRobin(SlotsAllocator.java:202)
        at 
org.apache.celeborn.service.deploy.master.SlotsAllocator.offerSlotsLoadAware(SlotsAllocator.java:151)
        at 
org.apache.celeborn.service.deploy.master.Master.$anonfun$handleRequestSlots$1(Master.scala:598)
        at 
org.apache.celeborn.common.metrics.source.AbstractSource.sample(AbstractSource.scala:199)
        at 
org.apache.celeborn.common.metrics.source.AbstractSource.sample(AbstractSource.scala:189)
        at 
org.apache.celeborn.service.deploy.master.Master.handleRequestSlots(Master.scala:587)
        at 
org.apache.celeborn.service.deploy.master.Master$$anonfun$receiveAndReply$1.$anonfun$applyOrElse$12(Master.scala:314)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at 
org.apache.celeborn.service.deploy.master.Master.executeWithLeaderChecker(Master.scala:233)
        ... 8 more
    ```
    
    No.
    
    `SlotsAllocatorSuiteJ#testAllocateSlotsWithNoAvailableSlots`
    
    Closes #2108 from SteNicholas/CELEBORN-1136.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: mingji <[email protected]>
    (cherry picked from commit 60871750e4c21360054584c4ba5f10809ee172f2)
    Signed-off-by: mingji <[email protected]>
---
 .../org/apache/celeborn/common/meta/DeviceInfo.scala   | 11 ++++++++++-
 .../celeborn/service/deploy/master/SlotsAllocator.java | 18 +++++++++++-------
 .../service/deploy/master/SlotsAllocatorSuiteJ.java    | 12 ++++++++++++
 3 files changed, 33 insertions(+), 8 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala 
b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
index 6bd49a1a0..56933a2cf 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
@@ -92,8 +92,17 @@ class DiskInfo(
     avgFetchTime = fetchTimeMetrics.getAverage()
   }
 
+  /**
+   * Returns the available slots of the disk calculated by maxSlots minus 
activeSlots.
+   * Returns zero for the negative slots calculated.
+   *
+   * <b>Note:</b>`maxSlots` is calculated by actualUsableSpace divided 
estimatedPartitionSize.
+   * Meanwhile, `activeSlots` include slots reserved.
+   *
+   * @return the available slots of the disk.
+   */
   def availableSlots(): Long = this.synchronized {
-    maxSlots - activeSlots
+    math.max(maxSlots - activeSlots, 0L)
   }
 
   def allocateSlots(shuffleKey: String, slots: Int): Unit = this.synchronized {
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
index 62d4ea17b..e964e57e5 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
@@ -131,14 +131,18 @@ public class SlotsAllocator {
                       }
                     }));
 
-    Set<WorkerInfo> usableWorkers = new HashSet<>();
-    for (DiskInfo disk : usableDisks) {
-      usableWorkers.add(diskToWorkerMap.get(disk));
-    }
-    if ((shouldReplicate && usableWorkers.size() <= 1) || 
usableDisks.isEmpty()) {
+    boolean noUsableDisks =
+        usableDisks.isEmpty()
+            || (shouldReplicate
+                && (usableDisks.size() == 1
+                    || 
usableDisks.stream().map(diskToWorkerMap::get).distinct().count() <= 1));
+    boolean noAvailableSlots = 
usableDisks.stream().mapToLong(DiskInfo::availableSlots).sum() <= 0;
+
+    if (noUsableDisks || noAvailableSlots) {
       logger.warn(
-          "offer slots for {} fallback to roundrobin because there is no 
usable disks",
-          StringUtils.join(partitionIds, ','));
+          "offer slots for {} fallback to roundrobin because there is no {}",
+          StringUtils.join(partitionIds, ','),
+          noUsableDisks ? "usable disks" : "available slots");
       return offerSlotsRoundRobin(
           workers, partitionIds, shouldReplicate, shouldRackAware, 
availableStorageTypes);
     }
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
index 3300f6de2..92f2ecfbd 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
@@ -334,4 +334,16 @@ public class SlotsAllocatorSuiteJ {
     final boolean shouldReplicate = true;
     checkSlotsOnHDFS(workers, partitionIds, shouldReplicate, true);
   }
+
+  @Test
+  public void testAllocateSlotsWithNoAvailableSlots() {
+    final List<WorkerInfo> workers = prepareWorkers(true);
+    // Simulates no available slots behavior with greatly changed 
estimatedPartitionSize for workers
+    // with usable disks.
+    workers.forEach(workerInfo -> 
workerInfo.updateDiskMaxSlots(Long.MAX_VALUE));
+    final List<Integer> partitionIds = Collections.singletonList(0);
+    final boolean shouldReplicate = false;
+
+    check(workers, partitionIds, shouldReplicate, true);
+  }
 }

Reply via email to