This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new 00f3229f3 [CELEBORN-662] Report worker unavailable regardless graceful 
shutdown
00f3229f3 is described below

commit 00f3229f33f64f7cd0fc9d0d62a94e62c4887cf1
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Sat Jun 10 18:36:25 2023 +0800

    [CELEBORN-662] Report worker unavailable regardless graceful shutdown
    
    ### What changes were proposed in this pull request?
    In this PR, worker always report node unavailable regardless graceful 
shutdown is turned on or off.
    
    ### Why are the changes needed?
    To inform master the shutting down worker as soon as possible.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Manual test.
    
    Closes #1575 from waitinfuture/662.
    
    Authored-by: zky.zhoukeyong <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
    (cherry picked from commit dab865b68ba105fde7c99d8b266960481dabc2a5)
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../celeborn/service/deploy/worker/Worker.scala    | 27 +++++++++++-----------
 1 file changed, 13 insertions(+), 14 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 9a3b050ef..55cbf737a 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -534,22 +534,21 @@ private[celeborn] class Worker(
     new Thread(new Runnable {
       override def run(): Unit = {
         logInfo("Shutdown hook called.")
+        // During shutdown, to avoid allocate slots in this worker,
+        // add this worker to master's blacklist. When restart, register 
worker will
+        // make master remove this worker from blacklist.
+        try {
+          rssHARetryClient.askSync(
+            ReportWorkerUnavailable(List(workerInfo).asJava),
+            OneWayMessageResponse.getClass)
+        } catch {
+          case e: Throwable =>
+            logError(
+              s"Fail report to master, need wait PartitionLocation auto 
release: \n$partitionLocationInfo",
+              e)
+        }
         shutdown.set(true)
         if (gracefulShutdown) {
-          // During graceful shutdown, to avoid allocate slots in this worker,
-          // add this worker to master's blacklist. When restart, register 
worker will
-          // make master remove this worker from blacklist.
-          try {
-            rssHARetryClient.askSync(
-              ReportWorkerUnavailable(List(workerInfo).asJava),
-              OneWayMessageResponse.getClass)
-          } catch {
-            case e: Throwable =>
-              logError(
-                s"Fail report to master, need wait PartitionLocation auto 
release: \n$partitionLocationInfo",
-                e)
-          }
-
           val interval = conf.workerGracefulShutdownCheckSlotsFinishedInterval
           val timeout = conf.workerGracefulShutdownCheckSlotsFinishedTimeoutMs
           var waitTimes = 0

Reply via email to