This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new cb8ace406 [CELEBORN-960] Exclude workers without healthy disks
cb8ace406 is described below

commit cb8ace406bd6df21dd8c16a4afcf8e0377e456ea
Author: sychen <[email protected]>
AuthorDate: Sat Sep 9 18:52:25 2023 +0800

    [CELEBORN-960] Exclude workers without healthy disks
    
    ### What changes were proposed in this pull request?
    The master checks the number of healthy disks in the woker and decides 
whether to exclude it.
    
    ### Why are the changes needed?
    
    When the disks of all the workers are unhealthy, HDFS is not enabled, and 
the master does not exclude the workers, the spark client calls 
checkWorkersAvailable and returns available, and the shuffle write ultimately 
fails without fallback.
    
    ```java
    23/09/08 23:20:44 ERROR LifecycleManager: Aggregated error of reserveSlots 
for shuffleId 9 failure:
     [reserveSlots] Failed to reserve buffers for shuffleId 9 from worker 
Host:1.2.3.4:RpcPort:55803:PushPort:55805:FetchPort:55807:ReplicatePort:55806. 
Reason: Local storage has no available dirs!
    23/09/08 23:20:44 ERROR LifecycleManager: Retry reserve slots for 9 failed 
caused by not enough slots.
    23/09/08 23:20:44 WARN LifecycleManager: Reserve buffers for 9 still fail 
after retrying, clear buffers.
    23/09/08 23:20:44 ERROR LifecycleManager: reserve buffer for 9 failed, 
reply to all.
    23/09/08 23:20:44 ERROR ShuffleClientImpl: LifecycleManager request slots 
return RESERVE_SLOTS_FAILED, retry again, remain retry times 0.
    23/09/08 23:20:47 WARN TaskSetManager: Lost task 8.0 in stage 27.0 (TID 89) 
(1.2.3.4 executor driver): TaskKilled (Stage cancelled)
    23/09/08 23:20:59 ERROR MasterClient: Send rpc with failure, has tried 15, 
max try 15!
    org.apache.celeborn.common.exception.CelebornException: Exception thrown in 
awaitResult:
            at 
org.apache.celeborn.common.util.ThreadUtils$.awaitResult(ThreadUtils.scala:229)
            at 
org.apache.celeborn.common.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:74)
            at 
org.apache.celeborn.common.client.MasterClient.sendMessageInner(MasterClient.java:150)
            at 
org.apache.celeborn.common.client.MasterClient.askSync(MasterClient.java:118)
            at 
org.apache.celeborn.client.LifecycleManager.requestMasterRequestSlots(LifecycleManager.scala:1033)
            at 
org.apache.celeborn.client.LifecycleManager.requestMasterRequestSlotsWithRetry(LifecycleManager.scala:1022)
            at 
org.apache.celeborn.client.LifecycleManager.org$apache$celeborn$client$LifecycleManager$$offerAndReserveSlots(LifecycleManager.scala:402)
            at 
org.apache.celeborn.client.LifecycleManager$$anonfun$receiveAndReply$1.applyOrElse(LifecycleManager.scala:210)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    local test
    ```
    23/09/08 23:23:27 WARN CelebornShuffleFallbackPolicyRunner: No workers 
available for current user `default`.`default`.
    23/09/08 23:23:27 WARN SparkShuffleManager: Fallback to vanilla Spark 
SortShuffleManager for shuffle: 10
    23/09/08 23:23:28 WARN CelebornShuffleFallbackPolicyRunner: No workers 
available for current user `default`.`default`.
    23/09/08 23:23:28 WARN SparkShuffleManager: Fallback to vanilla Spark 
SortShuffleManager for shuffle: 11
    100000
    Time taken: 0.192 seconds, Fetched 1 row(s)
    ```
    ```
    
    Closes #1893 from cxzl25/CELEBORN-960.
    
    Authored-by: sychen <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../service/deploy/master/clustermeta/AbstractMetaManager.java       | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 8d7b9376a..fbf10c2ab 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -43,6 +43,7 @@ import org.apache.celeborn.common.identity.UserIdentifier;
 import org.apache.celeborn.common.meta.AppDiskUsageMetric;
 import org.apache.celeborn.common.meta.AppDiskUsageSnapShot;
 import org.apache.celeborn.common.meta.DiskInfo;
+import org.apache.celeborn.common.meta.DiskStatus;
 import org.apache.celeborn.common.meta.WorkerInfo;
 import org.apache.celeborn.common.protocol.PbSnapshotMetaInfo;
 import org.apache.celeborn.common.quota.ResourceConsumption;
@@ -162,8 +163,10 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     }
     appDiskUsageMetric.update(estimatedAppDiskUsage);
     // If using HDFSONLY mode, workers with empty disks should not be put into 
excluded worker list.
+    long healthyDiskNum =
+        disks.values().stream().filter(s -> 
s.status().equals(DiskStatus.HEALTHY)).count();
     if (!excludedWorkers.contains(worker)
-        && ((disks.isEmpty() && !conf.hasHDFSStorage()) || highWorkload)) {
+        && (((disks.isEmpty() || healthyDiskNum <= 0) && 
!conf.hasHDFSStorage()) || highWorkload)) {
       LOG.debug("Worker: {} num total slots is 0, add to excluded list", 
worker);
       excludedWorkers.add(worker);
     } else if ((availableSlots.get() > 0 || conf.hasHDFSStorage()) && 
!highWorkload) {

Reply via email to