This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new db0791b5d [CELEBORN-1459][FOLLOWUP] Introduce CleanTaskQueueSize and
CleanExpiredShuffleKeysTime to record situation of cleaning up expired shuffle
keys
db0791b5d is described below
commit db0791b5d8bb7c3abbd3970317f9021d350234ec
Author: SteNicholas <[email protected]>
AuthorDate: Wed Jun 19 16:12:48 2024 +0800
[CELEBORN-1459][FOLLOWUP] Introduce CleanTaskQueueSize and
CleanExpiredShuffleKeysTime to record situation of cleaning up expired shuffle
keys
### What changes were proposed in this pull request?
Introduce `CleanTaskQueueSize` and `CleanExpiredShuffleKeysTime` to record
situation of cleaning up expired shuffle keys.
### Why are the changes needed?
There is a backlog of task queue for cleaning up shuffle data of expired
shuffle keys in the production environment. It's recommended to introduce
`CleanTaskQueueSize` and `CleanExpiredShuffleKeysTime` to record the progress
of cleaning up expired shuffle keys.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
[Celeborn Grafana
Dashboard](https://stenicholas.grafana.net/public-dashboards/4b5a0b79a35e4ddbb18ddccfe2ec06d7)
Closes #2578 from SteNicholas/CELEBORN-1459.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala | 3 +++
1 file changed, 3 insertions(+)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 31464c6f5..8ee278b45 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -430,6 +430,9 @@ private[celeborn] class Worker(
0
}
}
+ workerSource.addGauge(WorkerSource.CLEAN_TASK_QUEUE_SIZE) { () =>
+ cleanTaskQueue.size()
+ }
private def highWorkload: Boolean = {
(memoryManager.currentServingState, conf.workerActiveConnectionMax) match {