This is an automated email from the ASF dual-hosted git repository.

angerszhuuuu pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new 6907c5382 [CELEBORN-832] Support use RESTful API to trigger worker 
decommission
6907c5382 is described below

commit 6907c538242edebb08c8b60b16b3b97c81a80e70
Author: Angerszhuuuu <[email protected]>
AuthorDate: Thu Jul 27 15:40:14 2023 +0800

    [CELEBORN-832] Support use RESTful API to trigger worker decommission
    
    ### What changes were proposed in this pull request?
    As title
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #1759 from AngersZhuuuu/CELEBORN-832.
    
    Authored-by: Angerszhuuuu <[email protected]>
    Signed-off-by: Angerszhuuuu <[email protected]>
    (cherry picked from commit bacfb54447f9d5b7cb4b371d7a7aed1895181471)
    Signed-off-by: Angerszhuuuu <[email protected]>
---
 .../org/apache/celeborn/common/CelebornConf.scala  | 23 ++++++++
 docs/configuration/worker.md                       |  2 +
 docs/monitoring.md                                 |  1 +
 .../celeborn/server/common/HttpService.scala       |  2 +
 .../server/common/http/HttpRequestHandler.scala    |  2 +
 .../celeborn/service/deploy/worker/Worker.scala    | 64 ++++++++++++++++++----
 6 files changed, 84 insertions(+), 10 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 7702bad83..c906a518b 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -901,6 +901,12 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   def readBufferTargetNotifyThreshold: Long = 
get(WORKER_READBUFFER_TARGET_NOTIFY_THRESHOLD)
   def readBuffersToTriggerReadMin: Int = 
get(WORKER_READBUFFERS_TOTRIGGERREAD_MIN)
 
+  // //////////////////////////////////////////////////////
+  //                   Decommission                      //
+  // //////////////////////////////////////////////////////
+  def workerDecommissionCheckInterval: Long = 
get(WORKER_DECOMMISSION_CHECK_INTERVAL)
+  def workerDecommissionForceExitTimeout: Long = 
get(WORKER_DECOMMISSION_FORCE_EXIT_TIMEOUT)
+
   // //////////////////////////////////////////////////////
   //            Graceful Shutdown & Recover              //
   // //////////////////////////////////////////////////////
@@ -2470,6 +2476,23 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("10min")
 
+  val WORKER_DECOMMISSION_CHECK_INTERVAL: ConfigEntry[Long] =
+    buildConf("celeborn.worker.decommission.checkInterval")
+      .categories("worker")
+      .doc(
+        "The wait interval of checking whether all the shuffle expired during 
worker decomission")
+      .version("0.4.0")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("30s")
+
+  val WORKER_DECOMMISSION_FORCE_EXIT_TIMEOUT: ConfigEntry[Long] =
+    buildConf("celeborn.worker.decommission.forceExitTimeout")
+      .categories("worker")
+      .doc("The wait time of waiting for all the shuffle expire during worker 
decommission.")
+      .version("0.4.0")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("6h")
+
   val WORKER_GRACEFUL_SHUTDOWN_ENABLED: ConfigEntry[Boolean] =
     buildConf("celeborn.worker.graceful.shutdown.enabled")
       .categories("worker")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 7d37707f2..a76835b28 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -33,6 +33,8 @@ license: |
 | celeborn.worker.congestionControl.low.watermark | &lt;undefined&gt; | Will 
stop congest users if the total pending bytes of disk buffer is lower than this 
configuration | 0.3.0 | 
 | celeborn.worker.congestionControl.sample.time.window | 10s | The worker 
holds a time sliding list to calculate users' produce/consume rate | 0.3.0 | 
 | celeborn.worker.congestionControl.user.inactive.interval | 10min | How long 
will consider this user is inactive if it doesn't send data | 0.3.0 | 
+| celeborn.worker.decommission.checkInterval | 30s | The wait interval of 
checking whether all the shuffle expired during worker decomission | 0.4.0 | 
+| celeborn.worker.decommission.forceExitTimeout | 6h | The wait time of 
waiting for all the shuffle expire during worker decommission. | 0.4.0 | 
 | celeborn.worker.directMemoryRatioForMemoryShuffleStorage | 0.0 | Max ratio 
of direct memory to store shuffle data | 0.2.0 | 
 | celeborn.worker.directMemoryRatioForReadBuffer | 0.1 | Max ratio of direct 
memory for read buffer | 0.2.0 | 
 | celeborn.worker.directMemoryRatioToPauseReceive | 0.85 | If direct memory 
usage reaches this limit, the worker will stop to receive data from Celeborn 
shuffle clients. | 0.2.0 | 
diff --git a/docs/monitoring.md b/docs/monitoring.md
index d38fb9945..41e6413b9 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -316,3 +316,4 @@ API path listed as below:
 | /unavailablePeers          | List the unavailable peers of the worker, this 
always means the worker connect to the peer failed.       |
 | /isShutdown                | Show if the worker is during the process of 
shutdown.                                                    |
 | /isRegistered              | Show if the worker is registered to the master 
success.                                                  |
+| /decommission              | Trigger this worker to decommission from the 
cluster                                                     |
\ No newline at end of file
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala 
b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
index 322469144..6de0f03d0 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
@@ -67,6 +67,8 @@ abstract class HttpService extends Service with Logging {
 
   def isRegistered: String
 
+  def decommission: String = throw new UnsupportedOperationException()
+
   def startHttpServer(): Unit = {
     val handlers =
       if (metricsSystem.running) {
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
 
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
index 2747704dd..cc727d22e 100644
--- 
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
+++ 
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
@@ -90,6 +90,8 @@ class HttpRequestHandler(
         service.isShutdown
       case "/isRegistered" if service.serviceName == Service.WORKER =>
         service.isRegistered
+      case "/decommission" if service.serviceName == Service.WORKER =>
+        service.decommission
       case _ => INVALID
     }
   }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 19706a397..32d9290b7 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -77,7 +77,7 @@ private[celeborn] class Worker(
   private val WORKER_SHUTDOWN_PRIORITY = 100
   val shutdown = new AtomicBoolean(false)
   private val gracefulShutdown = conf.workerGracefulShutdown
-  private val exitKind = CelebornExitKind.EXIT_IMMEDIATELY
+  private var exitKind = CelebornExitKind.EXIT_IMMEDIATELY
   assert(
     !gracefulShutdown || (gracefulShutdown &&
       conf.workerRpcPort > 0 && conf.workerFetchPort > 0 &&
@@ -561,6 +561,21 @@ private[celeborn] class Worker(
     sb.toString()
   }
 
+  override def decommission: String = {
+    exitKind = CelebornExitKind.WORKER_DECOMMISSION
+    new Thread() {
+      override def run(): Unit = {
+        Thread.sleep(10000)
+        System.exit(0)
+      }
+    }.start()
+    val sb = new StringBuilder
+    sb.append("======================== Decommission Worker 
=========================\n")
+    sb.append("Decommission worker triggered: \n")
+    sb.append(workerInfo.toString()).append("\n")
+    sb.toString()
+  }
+
   def shutdownGracefully(): Unit = {
     // During shutdown, to avoid allocate slots in this worker,
     // add this worker to master's excluded list. When restart, register 
worker will
@@ -592,7 +607,37 @@ private[celeborn] class Worker(
       logWarning(s"Waiting for all PartitionLocation release cost 
${waitTime}ms, " +
         s"unreleased PartitionLocation: \n$partitionLocationInfo")
     }
-    stop(CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN)
+  }
+
+  def decommissionWorker(): Unit = {
+    try {
+      masterClient.askSync(
+        ReportWorkerUnavailable(List(workerInfo).asJava),
+        OneWayMessageResponse.getClass)
+    } catch {
+      case e: Throwable =>
+        logError(
+          s"Fail report to master, need wait registered shuffle expired: " +
+            s"\n${storageManager.shuffleKeySet().asScala.mkString("[", ", ", 
"]")}",
+          e)
+    }
+    shutdown.set(true)
+    val interval = conf.workerDecommissionCheckInterval
+    val timeout = conf.workerDecommissionForceExitTimeout
+    var waitTimes = 0
+
+    def waitTime: Long = waitTimes * interval
+
+    while (!storageManager.shuffleKeySet().isEmpty && waitTime < timeout) {
+      Thread.sleep(interval)
+      waitTimes += 1
+    }
+    if (storageManager.shuffleKeySet().isEmpty) {
+      logInfo(s"Waiting for all shuffle expired cost ${waitTime}ms.")
+    } else {
+      logWarning(s"Waiting for all shuffle expired cost ${waitTime}ms, " +
+        s"unreleased shuffle: 
\n${storageManager.shuffleKeySet().asScala.mkString("[", ", ", "]")}")
+    }
   }
 
   def exitImmediately(): Unit = {
@@ -616,22 +661,21 @@ private[celeborn] class Worker(
           e)
     }
     shutdown.set(true)
-    stop(CelebornExitKind.EXIT_IMMEDIATELY)
   }
 
   ShutdownHookManager.get().addShutdownHook(
     new Thread(new Runnable {
       override def run(): Unit = {
-        if (stopped) {
-          logInfo("Worker already stopped before call ShutdownHook.")
-        } else {
-          logInfo("Shutdown hook called.")
-          if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
+        logInfo("Shutdown hook called.")
+        exitKind match {
+          case CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN =>
             shutdownGracefully()
-          } else {
+          case CelebornExitKind.WORKER_DECOMMISSION =>
+            decommissionWorker()
+          case _ =>
             exitImmediately()
-          }
         }
+        stop(exitKind)
       }
     }),
     WORKER_SHUTDOWN_PRIORITY)

Reply via email to