This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 30fc9037d [CELEBORN-960] Exclude workers without healthy disks
30fc9037d is described below
commit 30fc9037d569a22ef6c26c1d4976397fc6290859
Author: sychen <[email protected]>
AuthorDate: Sat Sep 9 18:52:25 2023 +0800
[CELEBORN-960] Exclude workers without healthy disks
### What changes were proposed in this pull request?
The master checks the number of healthy disks in the woker and decides
whether to exclude it.
### Why are the changes needed?
When the disks of all the workers are unhealthy, HDFS is not enabled, and
the master does not exclude the workers, the spark client calls
checkWorkersAvailable and returns available, and the shuffle write ultimately
fails without fallback.
```java
23/09/08 23:20:44 ERROR LifecycleManager: Aggregated error of reserveSlots
for shuffleId 9 failure:
[reserveSlots] Failed to reserve buffers for shuffleId 9 from worker
Host:1.2.3.4:RpcPort:55803:PushPort:55805:FetchPort:55807:ReplicatePort:55806.
Reason: Local storage has no available dirs!
23/09/08 23:20:44 ERROR LifecycleManager: Retry reserve slots for 9 failed
caused by not enough slots.
23/09/08 23:20:44 WARN LifecycleManager: Reserve buffers for 9 still fail
after retrying, clear buffers.
23/09/08 23:20:44 ERROR LifecycleManager: reserve buffer for 9 failed,
reply to all.
23/09/08 23:20:44 ERROR ShuffleClientImpl: LifecycleManager request slots
return RESERVE_SLOTS_FAILED, retry again, remain retry times 0.
23/09/08 23:20:47 WARN TaskSetManager: Lost task 8.0 in stage 27.0 (TID 89)
(1.2.3.4 executor driver): TaskKilled (Stage cancelled)
23/09/08 23:20:59 ERROR MasterClient: Send rpc with failure, has tried 15,
max try 15!
org.apache.celeborn.common.exception.CelebornException: Exception thrown in
awaitResult:
at
org.apache.celeborn.common.util.ThreadUtils$.awaitResult(ThreadUtils.scala:229)
at
org.apache.celeborn.common.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:74)
at
org.apache.celeborn.common.client.MasterClient.sendMessageInner(MasterClient.java:150)
at
org.apache.celeborn.common.client.MasterClient.askSync(MasterClient.java:118)
at
org.apache.celeborn.client.LifecycleManager.requestMasterRequestSlots(LifecycleManager.scala:1033)
at
org.apache.celeborn.client.LifecycleManager.requestMasterRequestSlotsWithRetry(LifecycleManager.scala:1022)
at
org.apache.celeborn.client.LifecycleManager.org$apache$celeborn$client$LifecycleManager$$offerAndReserveSlots(LifecycleManager.scala:402)
at
org.apache.celeborn.client.LifecycleManager$$anonfun$receiveAndReply$1.applyOrElse(LifecycleManager.scala:210)
```
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
local test
```
23/09/08 23:23:27 WARN CelebornShuffleFallbackPolicyRunner: No workers
available for current user `default`.`default`.
23/09/08 23:23:27 WARN SparkShuffleManager: Fallback to vanilla Spark
SortShuffleManager for shuffle: 10
23/09/08 23:23:28 WARN CelebornShuffleFallbackPolicyRunner: No workers
available for current user `default`.`default`.
23/09/08 23:23:28 WARN SparkShuffleManager: Fallback to vanilla Spark
SortShuffleManager for shuffle: 11
100000
Time taken: 0.192 seconds, Fetched 1 row(s)
```
```
Closes #1893 from cxzl25/CELEBORN-960.
Authored-by: sychen <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
(cherry picked from commit cb8ace406bd6df21dd8c16a4afcf8e0377e456ea)
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../service/deploy/master/clustermeta/AbstractMetaManager.java | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 8d7b9376a..fbf10c2ab 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -43,6 +43,7 @@ import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.meta.AppDiskUsageMetric;
import org.apache.celeborn.common.meta.AppDiskUsageSnapShot;
import org.apache.celeborn.common.meta.DiskInfo;
+import org.apache.celeborn.common.meta.DiskStatus;
import org.apache.celeborn.common.meta.WorkerInfo;
import org.apache.celeborn.common.protocol.PbSnapshotMetaInfo;
import org.apache.celeborn.common.quota.ResourceConsumption;
@@ -162,8 +163,10 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
}
appDiskUsageMetric.update(estimatedAppDiskUsage);
// If using HDFSONLY mode, workers with empty disks should not be put into
excluded worker list.
+ long healthyDiskNum =
+ disks.values().stream().filter(s ->
s.status().equals(DiskStatus.HEALTHY)).count();
if (!excludedWorkers.contains(worker)
- && ((disks.isEmpty() && !conf.hasHDFSStorage()) || highWorkload)) {
+ && (((disks.isEmpty() || healthyDiskNum <= 0) &&
!conf.hasHDFSStorage()) || highWorkload)) {
LOG.debug("Worker: {} num total slots is 0, add to excluded list",
worker);
excludedWorkers.add(worker);
} else if ((availableSlots.get() > 0 || conf.hasHDFSStorage()) &&
!highWorkload) {