This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 961144fdb [CELEBORN-1582] Publish metric for unreleased shuffle count 
when worker was decommissioned
961144fdb is described below

commit 961144fdbd3231e173d8ef3ec2e78fa6db1eda78
Author: Sanskar Modi <[email protected]>
AuthorDate: Tue Oct 8 17:02:25 2024 +0800

    [CELEBORN-1582] Publish metric for unreleased shuffle count when worker was 
decommissioned
    
    ### What changes were proposed in this pull request?
    
    Adding a worker metrics for publish unreleased shuffle count when worker 
was decommissioned.
    
    <img width="885" alt="Screenshot 2024-09-16 at 11 12 33 AM" 
src="https://github.com/user-attachments/assets/c81f36c1-cbed-44fe-814b-88f3ff29875d";>
    
    ### Why are the changes needed?
    
    Currently celeborn don't publish the count of unreleased shuffle key which 
gets lost when a worker is decommissioned. This can be useful for monitoring 
and configuring the `forceExitTimeout`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    NO
    
    ### How was this patch tested?
    NA
    
    Closes #2711 from s0nskar/unrelease_shuffle_metric.
    
    Authored-by: Sanskar Modi <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 assets/grafana/celeborn-dashboard.json             | 91 ++++++++++++++++++++++
 docs/monitoring.md                                 |  1 +
 .../celeborn/service/deploy/worker/Worker.scala    | 15 +++-
 .../service/deploy/worker/WorkerSource.scala       |  1 +
 4 files changed, 106 insertions(+), 2 deletions(-)

diff --git a/assets/grafana/celeborn-dashboard.json 
b/assets/grafana/celeborn-dashboard.json
index 9b07db6c6..553bd44b0 100644
--- a/assets/grafana/celeborn-dashboard.json
+++ b/assets/grafana/celeborn-dashboard.json
@@ -2653,6 +2653,97 @@
           ],
           "title": "metrics_FlushWorkingQueueSize_Value",
           "type": "timeseries"
+        },
+        {
+          "datasource": {
+            "type": "prometheus",
+            "uid": "${DS_PROMETHEUS}"
+          },
+          "fieldConfig": {
+            "defaults": {
+              "color": {
+                "mode": "palette-classic"
+              },
+              "custom": {
+                "axisCenteredZero": false,
+                "axisColorMode": "text",
+                "axisLabel": "",
+                "axisPlacement": "auto",
+                "barAlignment": 0,
+                "drawStyle": "line",
+                "fillOpacity": 0,
+                "gradientMode": "none",
+                "hideFrom": {
+                  "legend": false,
+                  "tooltip": false,
+                  "viz": false
+                },
+                "lineInterpolation": "linear",
+                "lineWidth": 1,
+                "pointSize": 5,
+                "scaleDistribution": {
+                  "type": "linear"
+                },
+                "showPoints": "auto",
+                "spanNulls": false,
+                "stacking": {
+                  "group": "A",
+                  "mode": "none"
+                },
+                "thresholdsStyle": {
+                  "mode": "off"
+                }
+              },
+              "mappings": [],
+              "thresholds": {
+                "mode": "absolute",
+                "steps": [
+                  {
+                    "color": "green"
+                  },
+                  {
+                    "color": "red",
+                    "value": 80
+                  }
+                ]
+              }
+            },
+            "overrides": []
+          },
+          "gridPos": {
+            "h": 8,
+            "w": 12,
+            "x": 12,
+            "y": 54
+          },
+          "id": 195,
+          "options": {
+            "legend": {
+              "calcs": [],
+              "displayMode": "list",
+              "placement": "bottom",
+              "showLegend": true
+            },
+            "tooltip": {
+              "mode": "single",
+              "sort": "none"
+            }
+          },
+          "targets": [
+            {
+              "datasource": {
+                "type": "prometheus",
+                "uid": "${DS_PROMETHEUS}"
+              },
+              "editorMode": "builder",
+              "expr": "metrics_UnreleasedShuffleCount_Value",
+              "instant": false,
+              "range": true,
+              "refId": "A"
+            }
+          ],
+          "title": "metrics_UnreleasedShuffleCount_Value",
+          "type": "timeseries"
         }
       ],
       "title": "Worker",
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 5568b8f76..e097abdab 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -233,6 +233,7 @@ These metrics are exposed by Celeborn worker.
     | UserProduceSpeed                            | The speed of user 
production for congestion control.                                              
              |
     | WorkerConsumeSpeed                          | The speed of worker 
consumption for congestion control.                                             
            |
     | IsDecommissioningWorker                     | 1 means worker 
decommissioning, 0 means not decommissioning.                                   
                 |
+    | UnreleasedShuffleCount                      | Unreleased shuffle count 
when worker is decommissioning.                                                 
       |
     | MemoryStorageFileCount                      | The count of files in 
Memory Storage of a worker.                                                     
          |
     | MemoryFileStorageSize                       | The total amount of memory 
used by Memory Storage.                                                         
     |
     | EvictedFileCount                            | The count of files evicted 
from Memory Storage to Disk                                                     
     |
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 52c1f4b9a..e3cbd8642 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -435,6 +435,15 @@ private[celeborn] class Worker(
       0
     }
   }
+  // Unreleased shuffle count when worker is decommissioning
+  workerSource.addGauge(WorkerSource.UNRELEASED_SHUFFLE_COUNT) { () =>
+    if (shutdown.get() && (workerStatusManager.currentWorkerStatus.getState == 
State.InDecommission ||
+        workerStatusManager.currentWorkerStatus.getState == 
State.InDecommissionThenIdle)) {
+      storageManager.shuffleKeySet().size
+    } else {
+      0
+    }
+  }
   workerSource.addGauge(WorkerSource.CLEAN_TASK_QUEUE_SIZE) { () =>
     cleanTaskQueue.size()
   }
@@ -952,11 +961,13 @@ private[celeborn] class Worker(
       Thread.sleep(interval)
       waitTimes += 1
     }
-    if (storageManager.shuffleKeySet().isEmpty) {
+
+    val unreleasedShuffleKeys = storageManager.shuffleKeySet()
+    if (unreleasedShuffleKeys.isEmpty) {
       logInfo(s"Waiting for all shuffle expired cost ${waitTime}ms.")
     } else {
       logWarning(s"Waiting for all shuffle expired cost ${waitTime}ms, " +
-        s"unreleased shuffle: 
\n${storageManager.shuffleKeySet().asScala.mkString("[", ", ", "]")}")
+        s"unreleased shuffle: \n${unreleasedShuffleKeys.asScala.mkString("[", 
", ", "]")}")
     }
     workerStatusManager.transitionState(State.Exit)
   }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index b2777c564..15096fadc 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -222,6 +222,7 @@ object WorkerSource {
 
   // decommission
   val IS_DECOMMISSIONING_WORKER = "IsDecommissioningWorker"
+  val UNRELEASED_SHUFFLE_COUNT = "UnreleasedShuffleCount"
 
   // clean
   val CLEAN_TASK_QUEUE_SIZE = "CleanTaskQueueSize"

Reply via email to