This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.5 by this push:
new 31db6b0df [CELEBORN-1459][FOLLOWUP] Introduce CleanTaskQueueSize and
CleanExpiredShuffleKeysTime to record situation of cleaning up expired shuffle
keys
31db6b0df is described below
commit 31db6b0dfa6e92a6a51c6dd831efec4a68299ff4
Author: SteNicholas <[email protected]>
AuthorDate: Wed Jun 19 16:12:48 2024 +0800
[CELEBORN-1459][FOLLOWUP] Introduce CleanTaskQueueSize and
CleanExpiredShuffleKeysTime to record situation of cleaning up expired shuffle
keys
Introduce `CleanTaskQueueSize` and `CleanExpiredShuffleKeysTime` to record
situation of cleaning up expired shuffle keys.
There is a backlog of task queue for cleaning up shuffle data of expired
shuffle keys in the production environment. It's recommended to introduce
`CleanTaskQueueSize` and `CleanExpiredShuffleKeysTime` to record the progress
of cleaning up expired shuffle keys.
No.
[Celeborn Grafana
Dashboard](https://stenicholas.grafana.net/public-dashboards/4b5a0b79a35e4ddbb18ddccfe2ec06d7)
Closes #2578 from SteNicholas/CELEBORN-1459.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
(cherry picked from commit db0791b5d8bb7c3abbd3970317f9021d350234ec)
Signed-off-by: mingji <[email protected]>
---
.../org/apache/celeborn/service/deploy/worker/Worker.scala | 11 +++++++++++
1 file changed, 11 insertions(+)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index e3987a195..8dbae9314 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -420,6 +420,17 @@ private[celeborn] class Worker(
workerSource.addGauge(WorkerSource.ACTIVE_SLOTS_COUNT) { () =>
workerInfo.usedSlots()
}
+ workerSource.addGauge(WorkerSource.IS_DECOMMISSIONING_WORKER) { () =>
+ if (shutdown.get() && (workerStatusManager.currentWorkerStatus.getState ==
State.InDecommission ||
+ workerStatusManager.currentWorkerStatus.getState ==
State.InDecommissionThenIdle)) {
+ 1
+ } else {
+ 0
+ }
+ }
+ workerSource.addGauge(WorkerSource.CLEAN_TASK_QUEUE_SIZE) { () =>
+ cleanTaskQueue.size()
+ }
private def highWorkload: Boolean = {
(memoryManager.currentServingState, conf.workerActiveConnectionMax) match {