This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new 02dad4389 [CELEBORN-945] Change ShutdownHook's timeout for decommission
02dad4389 is described below

commit 02dad438938bcc76c95a31f052b9255ef6f1ce89
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Tue Sep 5 10:24:08 2023 +0800

    [CELEBORN-945] Change ShutdownHook's timeout for decommission
    
    ### What changes were proposed in this pull request?
    When shutdown type is decommission, we should change the 
`ShutdownHookManager#HookEntry`'s
    timeout to `celeborn.worker.decommission.forceExitTimeout`.
    
    ### Why are the changes needed?
    ditto
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Manual test
    
    Closes #1877 from waitinfuture/945.
    
    Lead-authored-by: zky.zhoukeyong <[email protected]>
    Co-authored-by: Keyong Zhou <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
    (cherry picked from commit 8d005b8d39ce37a7cd1b4dcc5fd181f78179cd8f)
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../celeborn/common/util/ShutdownHookManager.java     | 19 +++++++++++++++++--
 .../celeborn/service/deploy/worker/Worker.scala       |  3 +++
 2 files changed, 20 insertions(+), 2 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/util/ShutdownHookManager.java 
b/common/src/main/java/org/apache/celeborn/common/util/ShutdownHookManager.java
index be535cafe..d7b794f29 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/util/ShutdownHookManager.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/util/ShutdownHookManager.java
@@ -52,6 +52,8 @@ import org.apache.celeborn.common.CelebornConf;
  */
 public final class ShutdownHookManager {
 
+  private static final Logger logger = 
LoggerFactory.getLogger(ShutdownHookManager.class);
+
   private static final ShutdownHookManager MGR = new ShutdownHookManager();
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ShutdownHookManager.class);
@@ -105,6 +107,10 @@ public final class ShutdownHookManager {
     for (HookEntry entry : MGR.getShutdownHooksInOrder()) {
       Future<?> future = EXECUTOR.submit(entry.getHook());
       try {
+        logger.info(
+            "timeout {}",
+            Utils.msDurationToString(
+                entry.getTimeUnit().convert(entry.getTimeout(), 
TimeUnit.MILLISECONDS)));
         future.get(entry.getTimeout(), entry.getTimeUnit());
       } catch (TimeoutException ex) {
         timeouts++;
@@ -165,8 +171,8 @@ public final class ShutdownHookManager {
   static class HookEntry {
     private final Runnable hook;
     private final int priority;
-    private final long timeout;
-    private final TimeUnit unit;
+    private long timeout;
+    private TimeUnit unit;
 
     HookEntry(Runnable hook, int priority) {
       this(hook, priority, getShutdownTimeout(new CelebornConf()), 
TIME_UNIT_DEFAULT);
@@ -207,6 +213,11 @@ public final class ShutdownHookManager {
       return timeout;
     }
 
+    public void setTimeout(long timeout, TimeUnit unit) {
+      this.timeout = timeout;
+      this.unit = unit;
+    }
+
     TimeUnit getTimeUnit() {
       return unit;
     }
@@ -279,6 +290,10 @@ public final class ShutdownHookManager {
     hooks.add(new HookEntry(shutdownHook, priority, timeout, unit));
   }
 
+  public void updateTimeout(long timeout, TimeUnit unit) {
+    hooks.forEach(hook -> hook.setTimeout(timeout, unit));
+  }
+
   /**
    * Removes a shutdownHook.
    *
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 5ae372c2f..143b54850 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -581,6 +581,9 @@ private[celeborn] class Worker(
     exitType match {
       case "DECOMMISSION" =>
         exitKind = CelebornExitKind.WORKER_DECOMMISSION
+        ShutdownHookManager.get().updateTimeout(
+          conf.workerDecommissionForceExitTimeout,
+          TimeUnit.MILLISECONDS)
       case "GRACEFUL" =>
         exitKind = CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN
       case "IMMEDIATELY" =>

Reply via email to