This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 222ed267b [CELEBORN-692] WorkerStatusTracker should handle
WORKER_SHUTDOWN properly
222ed267b is described below
commit 222ed267b0db779af60e6925a0f117fab8d7ea61
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Mon Jun 19 15:54:45 2023 +0800
[CELEBORN-692] WorkerStatusTracker should handle WORKER_SHUTDOWN properly
### What changes were proposed in this pull request?
This PR put workers with WORKER_SHUTDOWN status into shuttingWorkers
instead of blacklist.
### Why are the changes needed?
If WORKER_SHUTDOWN workers are put into blacklist, it will not trigger
commit files, see ```CommitHandler::commitFiles```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test.
Closes #1603 from waitinfuture/692.
Authored-by: zky.zhoukeyong <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../celeborn/client/WorkerStatusTracker.scala | 22 ++++++++++------------
1 file changed, 10 insertions(+), 12 deletions(-)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
index 602a2b5f0..99c558371 100644
--- a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
@@ -122,19 +122,17 @@ class WorkerStatusTracker(
|Current blacklist:
|$blacklistMsg
""".stripMargin)
- failedWorker.asScala.foreach { case (worker, (statusCode, registerTime))
=>
- if (!blacklist.containsKey(worker)) {
+ failedWorker.asScala.foreach {
+ case (worker, (StatusCode.WORKER_SHUTDOWN, _)) =>
+ shuttingWorkers.add(worker)
+ case (worker, (statusCode, registerTime)) if
!blacklist.containsKey(worker) =>
blacklist.put(worker, (statusCode, registerTime))
- } else {
- statusCode match {
- case StatusCode.WORKER_SHUTDOWN |
- StatusCode.NO_AVAILABLE_WORKING_DIR |
- StatusCode.RESERVE_SLOTS_FAILED |
- StatusCode.UNKNOWN_WORKER =>
- blacklist.put(worker, (statusCode, blacklist.get(worker)._2))
- case _ => // Not cover
- }
- }
+ case (worker, (statusCode, _))
+ if statusCode == StatusCode.NO_AVAILABLE_WORKING_DIR ||
+ statusCode == StatusCode.RESERVE_SLOTS_FAILED ||
+ statusCode == StatusCode.UNKNOWN_WORKER =>
+ blacklist.put(worker, (statusCode, blacklist.get(worker)._2))
+ case _ => // Not cover
}
}
}