This is an automated email from the ASF dual-hosted git repository.
angerszhuuuu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new bacfb5444 [CELEBORN-832] Support use RESTful API to trigger worker
decommission
bacfb5444 is described below
commit bacfb54447f9d5b7cb4b371d7a7aed1895181471
Author: Angerszhuuuu <[email protected]>
AuthorDate: Thu Jul 27 15:40:14 2023 +0800
[CELEBORN-832] Support use RESTful API to trigger worker decommission
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #1759 from AngersZhuuuu/CELEBORN-832.
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Angerszhuuuu <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 23 ++++++++
docs/configuration/worker.md | 2 +
docs/monitoring.md | 1 +
.../celeborn/server/common/HttpService.scala | 2 +
.../server/common/http/HttpRequestHandler.scala | 2 +
.../celeborn/service/deploy/worker/Worker.scala | 64 ++++++++++++++++++----
6 files changed, 84 insertions(+), 10 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index cc71954f6..d7bddc40a 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -901,6 +901,12 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
def readBufferTargetNotifyThreshold: Long =
get(WORKER_READBUFFER_TARGET_NOTIFY_THRESHOLD)
def readBuffersToTriggerReadMin: Int =
get(WORKER_READBUFFERS_TOTRIGGERREAD_MIN)
+ // //////////////////////////////////////////////////////
+ // Decommission //
+ // //////////////////////////////////////////////////////
+ def workerDecommissionCheckInterval: Long =
get(WORKER_DECOMMISSION_CHECK_INTERVAL)
+ def workerDecommissionForceExitTimeout: Long =
get(WORKER_DECOMMISSION_FORCE_EXIT_TIMEOUT)
+
// //////////////////////////////////////////////////////
// Graceful Shutdown & Recover //
// //////////////////////////////////////////////////////
@@ -2470,6 +2476,23 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("10min")
+ val WORKER_DECOMMISSION_CHECK_INTERVAL: ConfigEntry[Long] =
+ buildConf("celeborn.worker.decommission.checkInterval")
+ .categories("worker")
+ .doc(
+ "The wait interval of checking whether all the shuffle expired during
worker decomission")
+ .version("0.4.0")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("30s")
+
+ val WORKER_DECOMMISSION_FORCE_EXIT_TIMEOUT: ConfigEntry[Long] =
+ buildConf("celeborn.worker.decommission.forceExitTimeout")
+ .categories("worker")
+ .doc("The wait time of waiting for all the shuffle expire during worker
decommission.")
+ .version("0.4.0")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("6h")
+
val WORKER_GRACEFUL_SHUTDOWN_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.graceful.shutdown.enabled")
.categories("worker")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 7d37707f2..a76835b28 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -33,6 +33,8 @@ license: |
| celeborn.worker.congestionControl.low.watermark | <undefined> | Will
stop congest users if the total pending bytes of disk buffer is lower than this
configuration | 0.3.0 |
| celeborn.worker.congestionControl.sample.time.window | 10s | The worker
holds a time sliding list to calculate users' produce/consume rate | 0.3.0 |
| celeborn.worker.congestionControl.user.inactive.interval | 10min | How long
will consider this user is inactive if it doesn't send data | 0.3.0 |
+| celeborn.worker.decommission.checkInterval | 30s | The wait interval of
checking whether all the shuffle expired during worker decomission | 0.4.0 |
+| celeborn.worker.decommission.forceExitTimeout | 6h | The wait time of
waiting for all the shuffle expire during worker decommission. | 0.4.0 |
| celeborn.worker.directMemoryRatioForMemoryShuffleStorage | 0.0 | Max ratio
of direct memory to store shuffle data | 0.2.0 |
| celeborn.worker.directMemoryRatioForReadBuffer | 0.1 | Max ratio of direct
memory for read buffer | 0.2.0 |
| celeborn.worker.directMemoryRatioToPauseReceive | 0.85 | If direct memory
usage reaches this limit, the worker will stop to receive data from Celeborn
shuffle clients. | 0.2.0 |
diff --git a/docs/monitoring.md b/docs/monitoring.md
index b84d2ec8b..c0797a70b 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -313,3 +313,4 @@ API path listed as below:
| /unavailablePeers | List the unavailable peers of the worker, this
always means the worker connect to the peer failed. |
| /isShutdown | Show if the worker is during the process of
shutdown. |
| /isRegistered | Show if the worker is registered to the master
success. |
+| /decommission | Trigger this worker to decommission from the
cluster |
\ No newline at end of file
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
index 322469144..6de0f03d0 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
@@ -67,6 +67,8 @@ abstract class HttpService extends Service with Logging {
def isRegistered: String
+ def decommission: String = throw new UnsupportedOperationException()
+
def startHttpServer(): Unit = {
val handlers =
if (metricsSystem.running) {
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
index 2747704dd..cc727d22e 100644
---
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala
@@ -90,6 +90,8 @@ class HttpRequestHandler(
service.isShutdown
case "/isRegistered" if service.serviceName == Service.WORKER =>
service.isRegistered
+ case "/decommission" if service.serviceName == Service.WORKER =>
+ service.decommission
case _ => INVALID
}
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 19706a397..32d9290b7 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -77,7 +77,7 @@ private[celeborn] class Worker(
private val WORKER_SHUTDOWN_PRIORITY = 100
val shutdown = new AtomicBoolean(false)
private val gracefulShutdown = conf.workerGracefulShutdown
- private val exitKind = CelebornExitKind.EXIT_IMMEDIATELY
+ private var exitKind = CelebornExitKind.EXIT_IMMEDIATELY
assert(
!gracefulShutdown || (gracefulShutdown &&
conf.workerRpcPort > 0 && conf.workerFetchPort > 0 &&
@@ -561,6 +561,21 @@ private[celeborn] class Worker(
sb.toString()
}
+ override def decommission: String = {
+ exitKind = CelebornExitKind.WORKER_DECOMMISSION
+ new Thread() {
+ override def run(): Unit = {
+ Thread.sleep(10000)
+ System.exit(0)
+ }
+ }.start()
+ val sb = new StringBuilder
+ sb.append("======================== Decommission Worker
=========================\n")
+ sb.append("Decommission worker triggered: \n")
+ sb.append(workerInfo.toString()).append("\n")
+ sb.toString()
+ }
+
def shutdownGracefully(): Unit = {
// During shutdown, to avoid allocate slots in this worker,
// add this worker to master's excluded list. When restart, register
worker will
@@ -592,7 +607,37 @@ private[celeborn] class Worker(
logWarning(s"Waiting for all PartitionLocation release cost
${waitTime}ms, " +
s"unreleased PartitionLocation: \n$partitionLocationInfo")
}
- stop(CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN)
+ }
+
+ def decommissionWorker(): Unit = {
+ try {
+ masterClient.askSync(
+ ReportWorkerUnavailable(List(workerInfo).asJava),
+ OneWayMessageResponse.getClass)
+ } catch {
+ case e: Throwable =>
+ logError(
+ s"Fail report to master, need wait registered shuffle expired: " +
+ s"\n${storageManager.shuffleKeySet().asScala.mkString("[", ", ",
"]")}",
+ e)
+ }
+ shutdown.set(true)
+ val interval = conf.workerDecommissionCheckInterval
+ val timeout = conf.workerDecommissionForceExitTimeout
+ var waitTimes = 0
+
+ def waitTime: Long = waitTimes * interval
+
+ while (!storageManager.shuffleKeySet().isEmpty && waitTime < timeout) {
+ Thread.sleep(interval)
+ waitTimes += 1
+ }
+ if (storageManager.shuffleKeySet().isEmpty) {
+ logInfo(s"Waiting for all shuffle expired cost ${waitTime}ms.")
+ } else {
+ logWarning(s"Waiting for all shuffle expired cost ${waitTime}ms, " +
+ s"unreleased shuffle:
\n${storageManager.shuffleKeySet().asScala.mkString("[", ", ", "]")}")
+ }
}
def exitImmediately(): Unit = {
@@ -616,22 +661,21 @@ private[celeborn] class Worker(
e)
}
shutdown.set(true)
- stop(CelebornExitKind.EXIT_IMMEDIATELY)
}
ShutdownHookManager.get().addShutdownHook(
new Thread(new Runnable {
override def run(): Unit = {
- if (stopped) {
- logInfo("Worker already stopped before call ShutdownHook.")
- } else {
- logInfo("Shutdown hook called.")
- if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
+ logInfo("Shutdown hook called.")
+ exitKind match {
+ case CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN =>
shutdownGracefully()
- } else {
+ case CelebornExitKind.WORKER_DECOMMISSION =>
+ decommissionWorker()
+ case _ =>
exitImmediately()
- }
}
+ stop(exitKind)
}
}),
WORKER_SHUTDOWN_PRIORITY)