This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 961144fdb [CELEBORN-1582] Publish metric for unreleased shuffle count
when worker was decommissioned
961144fdb is described below
commit 961144fdbd3231e173d8ef3ec2e78fa6db1eda78
Author: Sanskar Modi <[email protected]>
AuthorDate: Tue Oct 8 17:02:25 2024 +0800
[CELEBORN-1582] Publish metric for unreleased shuffle count when worker was
decommissioned
### What changes were proposed in this pull request?
Adding a worker metrics for publish unreleased shuffle count when worker
was decommissioned.
<img width="885" alt="Screenshot 2024-09-16 at 11 12 33 AM"
src="https://github.com/user-attachments/assets/c81f36c1-cbed-44fe-814b-88f3ff29875d">
### Why are the changes needed?
Currently celeborn don't publish the count of unreleased shuffle key which
gets lost when a worker is decommissioned. This can be useful for monitoring
and configuring the `forceExitTimeout`.
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
NA
Closes #2711 from s0nskar/unrelease_shuffle_metric.
Authored-by: Sanskar Modi <[email protected]>
Signed-off-by: mingji <[email protected]>
---
assets/grafana/celeborn-dashboard.json | 91 ++++++++++++++++++++++
docs/monitoring.md | 1 +
.../celeborn/service/deploy/worker/Worker.scala | 15 +++-
.../service/deploy/worker/WorkerSource.scala | 1 +
4 files changed, 106 insertions(+), 2 deletions(-)
diff --git a/assets/grafana/celeborn-dashboard.json
b/assets/grafana/celeborn-dashboard.json
index 9b07db6c6..553bd44b0 100644
--- a/assets/grafana/celeborn-dashboard.json
+++ b/assets/grafana/celeborn-dashboard.json
@@ -2653,6 +2653,97 @@
],
"title": "metrics_FlushWorkingQueueSize_Value",
"type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green"
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ }
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 12,
+ "y": 54
+ },
+ "id": 195,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "editorMode": "builder",
+ "expr": "metrics_UnreleasedShuffleCount_Value",
+ "instant": false,
+ "range": true,
+ "refId": "A"
+ }
+ ],
+ "title": "metrics_UnreleasedShuffleCount_Value",
+ "type": "timeseries"
}
],
"title": "Worker",
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 5568b8f76..e097abdab 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -233,6 +233,7 @@ These metrics are exposed by Celeborn worker.
| UserProduceSpeed | The speed of user
production for congestion control.
|
| WorkerConsumeSpeed | The speed of worker
consumption for congestion control.
|
| IsDecommissioningWorker | 1 means worker
decommissioning, 0 means not decommissioning.
|
+ | UnreleasedShuffleCount | Unreleased shuffle count
when worker is decommissioning.
|
| MemoryStorageFileCount | The count of files in
Memory Storage of a worker.
|
| MemoryFileStorageSize | The total amount of memory
used by Memory Storage.
|
| EvictedFileCount | The count of files evicted
from Memory Storage to Disk
|
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 52c1f4b9a..e3cbd8642 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -435,6 +435,15 @@ private[celeborn] class Worker(
0
}
}
+ // Unreleased shuffle count when worker is decommissioning
+ workerSource.addGauge(WorkerSource.UNRELEASED_SHUFFLE_COUNT) { () =>
+ if (shutdown.get() && (workerStatusManager.currentWorkerStatus.getState ==
State.InDecommission ||
+ workerStatusManager.currentWorkerStatus.getState ==
State.InDecommissionThenIdle)) {
+ storageManager.shuffleKeySet().size
+ } else {
+ 0
+ }
+ }
workerSource.addGauge(WorkerSource.CLEAN_TASK_QUEUE_SIZE) { () =>
cleanTaskQueue.size()
}
@@ -952,11 +961,13 @@ private[celeborn] class Worker(
Thread.sleep(interval)
waitTimes += 1
}
- if (storageManager.shuffleKeySet().isEmpty) {
+
+ val unreleasedShuffleKeys = storageManager.shuffleKeySet()
+ if (unreleasedShuffleKeys.isEmpty) {
logInfo(s"Waiting for all shuffle expired cost ${waitTime}ms.")
} else {
logWarning(s"Waiting for all shuffle expired cost ${waitTime}ms, " +
- s"unreleased shuffle:
\n${storageManager.shuffleKeySet().asScala.mkString("[", ", ", "]")}")
+ s"unreleased shuffle: \n${unreleasedShuffleKeys.asScala.mkString("[",
", ", "]")}")
}
workerStatusManager.transitionState(State.Exit)
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index b2777c564..15096fadc 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -222,6 +222,7 @@ object WorkerSource {
// decommission
val IS_DECOMMISSIONING_WORKER = "IsDecommissioningWorker"
+ val UNRELEASED_SHUFFLE_COUNT = "UnreleasedShuffleCount"
// clean
val CLEAN_TASK_QUEUE_SIZE = "CleanTaskQueueSize"