This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 2097ad0a3 [CELEBORN-2205] Introduce metrics to fetch chunk for memory 
and local disk
2097ad0a3 is described below

commit 2097ad0a347b982e68ac07779ee1ab11815b524b
Author: xxx <[email protected]>
AuthorDate: Wed Feb 4 19:38:06 2026 +0800

    [CELEBORN-2205] Introduce metrics to fetch chunk for memory and local disk
    
    ### What changes were proposed in this pull request?
    
    Introduce metrics to fetch chunk time for memory and local disk.
    
    ### Why are the changes needed?
    
    Introduce metrics to fetch chunk time for memory and local disk.
    
    ### Does this PR resolve a correctness bug?
    
    No.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    
[Grafana](https://xy2953396112.grafana.net/public-dashboards/979279524ef74b6b92d0c08c39aa7c9e)
    
    Closes #3546 from xy2953396112/CELEBORN-2205.
    
    Authored-by: xxx <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 assets/grafana/celeborn-dashboard.json             | 380 +++++++++++++++++++--
 docs/monitoring.md                                 |   9 +-
 .../service/deploy/worker/FetchHandler.scala       |  26 +-
 .../service/deploy/worker/WorkerSource.scala       |  18 +-
 4 files changed, 397 insertions(+), 36 deletions(-)

diff --git a/assets/grafana/celeborn-dashboard.json 
b/assets/grafana/celeborn-dashboard.json
index 2e62facaa..cff188291 100644
--- a/assets/grafana/celeborn-dashboard.json
+++ b/assets/grafana/celeborn-dashboard.json
@@ -5503,10 +5503,6 @@
                 "steps": [
                   {
                     "color": "green"
-                  },
-                  {
-                    "color": "red",
-                    "value": 80
                   }
                 ]
               },
@@ -5539,12 +5535,12 @@
                 "type": "prometheus",
                 "uid": "${DS_PROMETHEUS}"
               },
-              "expr": "metrics_FetchChunkTime_Mean{instance=~\"${instance}\"}",
+              "expr": 
"metrics_FetchMemoryChunkTime_Mean{instance=~\"${instance}\"}",
               "legendFormat": "${baseLegend}",
               "refId": "A"
             }
           ],
-          "title": "metrics_FetchChunkTime_Mean",
+          "title": "metrics_FetchMemoryChunkTime_Mean",
           "type": "timeseries"
         },
         {
@@ -5595,10 +5591,6 @@
                 "steps": [
                   {
                     "color": "green"
-                  },
-                  {
-                    "color": "red",
-                    "value": 80
                   }
                 ]
               },
@@ -5631,12 +5623,188 @@
                 "type": "prometheus",
                 "uid": "${DS_PROMETHEUS}"
               },
-              "expr": "metrics_FetchChunkTime_Max{instance=~\"${instance}\"}",
+              "expr": 
"metrics_FetchMemoryChunkTime_Max{instance=~\"${instance}\"}",
+              "legendFormat": "${baseLegend}",
+              "refId": "A"
+            }
+          ],
+          "title": "metrics_FetchMemoryChunkTime_Max",
+          "type": "timeseries"
+        },
+        {
+          "datasource": {
+            "type": "prometheus",
+            "uid": "${DS_PROMETHEUS}"
+          },
+          "fieldConfig": {
+            "defaults": {
+              "color": {
+                "mode": "palette-classic"
+              },
+              "custom": {
+                "axisBorderShow": false,
+                "axisCenteredZero": false,
+                "axisColorMode": "text",
+                "axisLabel": "",
+                "axisPlacement": "auto",
+                "barAlignment": 0,
+                "drawStyle": "line",
+                "fillOpacity": 0,
+                "gradientMode": "none",
+                "hideFrom": {
+                  "legend": false,
+                  "tooltip": false,
+                  "viz": false
+                },
+                "insertNulls": false,
+                "lineInterpolation": "linear",
+                "lineWidth": 1,
+                "pointSize": 5,
+                "scaleDistribution": {
+                  "type": "linear"
+                },
+                "showPoints": "auto",
+                "spanNulls": false,
+                "stacking": {
+                  "group": "A",
+                  "mode": "none"
+                },
+                "thresholdsStyle": {
+                  "mode": "off"
+                }
+              },
+              "mappings": [],
+              "thresholds": {
+                "mode": "absolute",
+                "steps": [
+                  {
+                    "color": "green"
+                  }
+                ]
+              },
+              "unit": "ms"
+            },
+            "overrides": []
+          },
+          "gridPos": {
+            "h": 8,
+            "w": 12,
+            "x": 0,
+            "y": 103
+          },
+          "id": 21,
+          "options": {
+            "legend": {
+              "calcs": [],
+              "displayMode": "list",
+              "placement": "bottom",
+              "showLegend": true
+            },
+            "tooltip": {
+              "mode": "single",
+              "sort": "none"
+            }
+          },
+          "targets": [
+            {
+              "datasource": {
+                "type": "prometheus",
+                "uid": "${DS_PROMETHEUS}"
+              },
+              "expr": 
"metrics_FetchLocalChunkTime_Mean{instance=~\"${instance}\"}",
+              "legendFormat": "${baseLegend}",
+              "refId": "A"
+            }
+          ],
+          "title": "metrics_FetchLocalChunkTime_Mean",
+          "type": "timeseries"
+        },
+        {
+          "datasource": {
+            "type": "prometheus",
+            "uid": "${DS_PROMETHEUS}"
+          },
+          "fieldConfig": {
+            "defaults": {
+              "color": {
+                "mode": "palette-classic"
+              },
+              "custom": {
+                "axisBorderShow": false,
+                "axisCenteredZero": false,
+                "axisColorMode": "text",
+                "axisLabel": "",
+                "axisPlacement": "auto",
+                "barAlignment": 0,
+                "drawStyle": "line",
+                "fillOpacity": 0,
+                "gradientMode": "none",
+                "hideFrom": {
+                  "legend": false,
+                  "tooltip": false,
+                  "viz": false
+                },
+                "insertNulls": false,
+                "lineInterpolation": "linear",
+                "lineWidth": 1,
+                "pointSize": 5,
+                "scaleDistribution": {
+                  "type": "linear"
+                },
+                "showPoints": "auto",
+                "spanNulls": false,
+                "stacking": {
+                  "group": "A",
+                  "mode": "none"
+                },
+                "thresholdsStyle": {
+                  "mode": "off"
+                }
+              },
+              "mappings": [],
+              "thresholds": {
+                "mode": "absolute",
+                "steps": [
+                  {
+                    "color": "green"
+                  }
+                ]
+              },
+              "unit": "ms"
+            },
+            "overrides": []
+          },
+          "gridPos": {
+            "h": 8,
+            "w": 12,
+            "x": 12,
+            "y": 103
+          },
+          "id": 22,
+          "options": {
+            "legend": {
+              "calcs": [],
+              "displayMode": "list",
+              "placement": "bottom",
+              "showLegend": true
+            },
+            "tooltip": {
+              "mode": "single",
+              "sort": "none"
+            }
+          },
+          "targets": [
+            {
+              "datasource": {
+                "type": "prometheus",
+                "uid": "${DS_PROMETHEUS}"
+              },
+              "expr": 
"metrics_LocalFetchChunkTime_Max{instance=~\"${instance}\"}",
               "legendFormat": "${baseLegend}",
               "refId": "A"
             }
           ],
-          "title": "metrics_FetchChunkTime_Max",
+          "title": "metrics_LocalFetchChunkTime_Max",
           "type": "timeseries"
         },
         {
@@ -5909,13 +6077,13 @@
                 "uid": "${DS_PROMETHEUS}"
               },
               "editorMode": "code",
-              "expr": 
"metrics_FetchChunkSuccessCount_Count{instance=~\"${instance}\"}",
+              "expr": 
"metrics_FetchMemoryChunkSuccessCount_Count{instance=~\"${instance}\"}",
               "legendFormat": "${baseLegend}",
               "range": true,
               "refId": "A"
             }
           ],
-          "title": "metrics_FetchChunkSuccessCount_Count",
+          "title": "metrics_FetchMemoryChunkSuccessCount_Count",
           "type": "timeseries"
         },
         {
@@ -5966,10 +6134,95 @@
                 "steps": [
                   {
                     "color": "green"
-                  },
+                  }
+                ]
+              }
+            },
+            "overrides": []
+          },
+          "gridPos": {
+            "h": 8,
+            "w": 12,
+            "x": 0,
+            "y": 119
+          },
+          "id": 86,
+          "options": {
+            "legend": {
+              "calcs": [],
+              "displayMode": "list",
+              "placement": "bottom",
+              "showLegend": true
+            },
+            "tooltip": {
+              "mode": "single",
+              "sort": "none"
+            }
+          },
+          "targets": [
+            {
+              "datasource": {
+                "type": "prometheus",
+                "uid": "${DS_PROMETHEUS}"
+              },
+              "editorMode": "code",
+              "expr": 
"metrics_FetchLocalChunkSuccessCount_Count{instance=~\"${instance}\"}",
+              "legendFormat": "${baseLegend}",
+              "range": true,
+              "refId": "A"
+            }
+          ],
+          "title": "metrics_FetchLocalChunkSuccessCount_Count",
+          "type": "timeseries"
+        },
+        {
+          "datasource": {
+            "type": "prometheus",
+            "uid": "${DS_PROMETHEUS}"
+          },
+          "fieldConfig": {
+            "defaults": {
+              "color": {
+                "mode": "palette-classic"
+              },
+              "custom": {
+                "axisBorderShow": false,
+                "axisCenteredZero": false,
+                "axisColorMode": "text",
+                "axisLabel": "",
+                "axisPlacement": "auto",
+                "barAlignment": 0,
+                "drawStyle": "line",
+                "fillOpacity": 0,
+                "gradientMode": "none",
+                "hideFrom": {
+                  "legend": false,
+                  "tooltip": false,
+                  "viz": false
+                },
+                "insertNulls": false,
+                "lineInterpolation": "linear",
+                "lineWidth": 1,
+                "pointSize": 5,
+                "scaleDistribution": {
+                  "type": "linear"
+                },
+                "showPoints": "auto",
+                "spanNulls": false,
+                "stacking": {
+                  "group": "A",
+                  "mode": "none"
+                },
+                "thresholdsStyle": {
+                  "mode": "off"
+                }
+              },
+              "mappings": [],
+              "thresholds": {
+                "mode": "absolute",
+                "steps": [
                   {
-                    "color": "red",
-                    "value": 80
+                    "color": "green"
                   }
                 ]
               }
@@ -6002,13 +6255,102 @@
                 "uid": "${DS_PROMETHEUS}"
               },
               "editorMode": "code",
-              "expr": 
"metrics_FetchChunkFailCount_Count{instance=~\"${instance}\"}",
+              "expr": 
"metrics_FetchMemoryChunkFailCount_Count{instance=~\"${instance}\"}",
+              "legendFormat": "${baseLegend}",
+              "range": true,
+              "refId": "A"
+            }
+          ],
+          "title": "metrics_FetchMemoryChunkFailCount_Count",
+          "type": "timeseries"
+        },
+        {
+          "datasource": {
+            "type": "prometheus",
+            "uid": "${DS_PROMETHEUS}"
+          },
+          "fieldConfig": {
+            "defaults": {
+              "color": {
+                "mode": "palette-classic"
+              },
+              "custom": {
+                "axisBorderShow": false,
+                "axisCenteredZero": false,
+                "axisColorMode": "text",
+                "axisLabel": "",
+                "axisPlacement": "auto",
+                "barAlignment": 0,
+                "drawStyle": "line",
+                "fillOpacity": 0,
+                "gradientMode": "none",
+                "hideFrom": {
+                  "legend": false,
+                  "tooltip": false,
+                  "viz": false
+                },
+                "insertNulls": false,
+                "lineInterpolation": "linear",
+                "lineWidth": 1,
+                "pointSize": 5,
+                "scaleDistribution": {
+                  "type": "linear"
+                },
+                "showPoints": "auto",
+                "spanNulls": false,
+                "stacking": {
+                  "group": "A",
+                  "mode": "none"
+                },
+                "thresholdsStyle": {
+                  "mode": "off"
+                }
+              },
+              "mappings": [],
+              "thresholds": {
+                "mode": "absolute",
+                "steps": [
+                  {
+                    "color": "green"
+                  }
+                ]
+              }
+            },
+            "overrides": []
+          },
+          "gridPos": {
+            "h": 8,
+            "w": 12,
+            "x": 12,
+            "y": 119
+          },
+          "id": 85,
+          "options": {
+            "legend": {
+              "calcs": [],
+              "displayMode": "list",
+              "placement": "bottom",
+              "showLegend": true
+            },
+            "tooltip": {
+              "mode": "single",
+              "sort": "none"
+            }
+          },
+          "targets": [
+            {
+              "datasource": {
+                "type": "prometheus",
+                "uid": "${DS_PROMETHEUS}"
+              },
+              "editorMode": "code",
+              "expr": 
"metrics_FetchLocalChunkFailCount_Count{instance=~\"${instance}\"}",
               "legendFormat": "${baseLegend}",
               "range": true,
               "refId": "A"
             }
           ],
-          "title": "metrics_FetchChunkFailCount_Count",
+          "title": "metrics_FetchLocalChunkFailCount_Count",
           "type": "timeseries"
         },
         {
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 4c7d4adfa..cef191cd2 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -184,13 +184,16 @@ These metrics are exposed by Celeborn worker.
     | ActiveShuffleSize                      | The active shuffle size of a 
worker including master replica and slave replica.                              
   |
     | ActiveShuffleFileCount                 | The active shuffle file count 
of a worker including master replica and slave replica.                         
  |
     | OpenStreamTime                         | The time for a worker to 
process openStream RPC and return StreamHandle.                                 
       |
-    | FetchChunkTime                         | The time for a worker to fetch 
a chunk which is 8MB by default from a reduced partition.                       
 |
+    | FetchMemoryChunkTime                   | The time for a worker to fetch 
a memory chunk which is 8MB by default from a reduced partition.                
 |
+    | FetchLocalChunkTime                    | The time for a worker to fetch 
a local disk chunk which is 8MB by default from a reduced partition.            
 |
     | FetchChunkTransferTime                 | The time for a worker to 
transfer for fetching a chunk from a reduced partition.                         
       |
     | ActiveChunkStreamCount                 | Active stream count for reduce 
partition reading streams.                                                      
 |
     | OpenStreamSuccessCount                 | The count of opening stream 
succeed in current worker.                                                      
    |
     | OpenStreamFailCount                    | The count of opening stream 
failed in current worker.                                                       
    |
-    | FetchChunkSuccessCount                 | The count of fetching chunk 
succeed in current worker.                                                      
    |
-    | FetchChunkFailCount                    | The count of fetching chunk 
failed in current worker.                                                       
    |
+    | FetchMemoryChunkSuccessCount           | The count of fetching memory 
chunk succeed in current worker.                                                
   |
+    | FetchLocalChunkSuccessCount            | The count of fetching local 
disk chunk succeed in current worker.                                           
    |
+    | FetchMemoryChunkFailCount              | The count of fetching memory 
chunk failed in current worker.                                                 
   |
+    | FetchLocalChunkFailCount               | The count of fetching local 
disk chunk failed in current worker.                                            
    |
     | FetchChunkTransferSize                 | The size of transfer for 
fetching chunk in current worker.                                               
       |
     | PrimaryPushDataTime                    | The time for a worker to handle 
a pushData RPC sent from a celeborn client.                                     
|
     | ReplicaPushDataTime                    | The time for a worker to handle 
a pushData RPC sent from a celeborn worker by replicating.                      
|
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index edf71039f..ca9138e8f 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -553,11 +553,21 @@ class FetchHandler(
       s" to fetch block $streamChunkSlice")
 
     val streamState = 
chunkStreamManager.getStreamState(streamChunkSlice.streamId)
+    val storageMetrics = streamState.buffers match {
+      case _: FileChunkBuffers => (
+          WorkerSource.FETCH_LOCAL_CHUNK_TIME,
+          WorkerSource.FETCH_LOCAL_CHUNK_SUCCESS_COUNT,
+          WorkerSource.FETCH_LOCAL_CHUNK_FAIL_COUNT)
+      case _: MemoryChunkBuffers => (
+          WorkerSource.FETCH_MEMORY_CHUNK_TIME,
+          WorkerSource.FETCH_MEMORY_CHUNK_SUCCESS_COUNT,
+          WorkerSource.FETCH_MEMORY_CHUNK_FAIL_COUNT)
+    }
     if (streamState == null) {
       val message = s"Stream ${streamChunkSlice.streamId} is not registered 
with worker. " +
         "This can happen if the worker was restart recently."
       logError(message)
-      workerSource.incCounter(WorkerSource.FETCH_CHUNK_FAIL_COUNT)
+      workerSource.incCounter(storageMetrics._3)
       client.getChannel.writeAndFlush(new ChunkFetchFailure(streamChunkSlice, 
message))
       return
     }
@@ -569,7 +579,7 @@ class FetchHandler(
           s"$chunksBeingTransferred exceeds 
${MAX_CHUNKS_BEING_TRANSFERRED.key} " +
           s"${Utils.bytesToString(threshold)}."
         logError(message)
-        workerSource.incCounter(WorkerSource.FETCH_CHUNK_FAIL_COUNT)
+        workerSource.incCounter(storageMetrics._3)
         client.getChannel.writeAndFlush(new 
ChunkFetchFailure(streamChunkSlice, message))
         return
       }
@@ -578,7 +588,7 @@ class FetchHandler(
     workerSource.recordAppActiveConnection(client, streamState.shuffleKey)
 
     val reqStr = req.toString
-    workerSource.startTimer(WorkerSource.FETCH_CHUNK_TIME, reqStr)
+    workerSource.startTimer(storageMetrics._1, reqStr)
     val fetchTimeMetric = 
chunkStreamManager.getFetchTimeMetric(streamChunkSlice.streamId)
     val fetchBeginTime = System.nanoTime()
     try {
@@ -596,18 +606,18 @@ class FetchHandler(
                 logDebug(
                   s"Sending ChunkFetchSuccess to $remoteAddr succeeded, chunk 
$streamChunkSlice")
               }
-              workerSource.incCounter(WorkerSource.FETCH_CHUNK_SUCCESS_COUNT)
+              workerSource.incCounter(storageMetrics._2)
             } else {
               logWarning(
                 s"Sending ChunkFetchSuccess to $remoteAddr failed, chunk 
$streamChunkSlice",
                 future.cause())
-              workerSource.incCounter(WorkerSource.FETCH_CHUNK_FAIL_COUNT)
+              workerSource.incCounter(storageMetrics._3)
             }
             chunkStreamManager.chunkSent(streamChunkSlice.streamId)
             if (fetchTimeMetric != null) {
               fetchTimeMetric.update(System.nanoTime() - fetchBeginTime)
             }
-            workerSource.stopTimer(WorkerSource.FETCH_CHUNK_TIME, reqStr)
+            workerSource.stopTimer(storageMetrics._1, reqStr)
           }
         })
     } catch {
@@ -616,11 +626,11 @@ class FetchHandler(
           s"Error opening block $streamChunkSlice for request from " +
             NettyUtils.getRemoteAddress(client.getChannel),
           e)
-        workerSource.incCounter(WorkerSource.FETCH_CHUNK_FAIL_COUNT)
+        workerSource.incCounter(storageMetrics._3)
         client.getChannel.writeAndFlush(new ChunkFetchFailure(
           streamChunkSlice,
           Throwables.getStackTraceAsString(e)))
-        workerSource.stopTimer(WorkerSource.FETCH_CHUNK_TIME, reqStr)
+        workerSource.stopTimer(storageMetrics._1, reqStr)
     }
   }
 
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index 7d2467333..15eb6a24e 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -40,8 +40,10 @@ class WorkerSource(conf: CelebornConf) extends 
AbstractSource(conf, Role.WORKER)
   // add counters
   addCounter(OPEN_STREAM_SUCCESS_COUNT)
   addCounter(OPEN_STREAM_FAIL_COUNT)
-  addCounter(FETCH_CHUNK_SUCCESS_COUNT)
-  addCounter(FETCH_CHUNK_FAIL_COUNT)
+  addCounter(FETCH_MEMORY_CHUNK_SUCCESS_COUNT)
+  addCounter(FETCH_LOCAL_CHUNK_SUCCESS_COUNT)
+  addCounter(FETCH_MEMORY_CHUNK_FAIL_COUNT)
+  addCounter(FETCH_LOCAL_CHUNK_FAIL_COUNT)
   addCounter(WRITE_DATA_HARD_SPLIT_COUNT)
   addCounter(WRITE_DATA_SUCCESS_COUNT)
   addCounter(WRITE_DATA_FAIL_COUNT)
@@ -91,7 +93,8 @@ class WorkerSource(conf: CelebornConf) extends 
AbstractSource(conf, Role.WORKER)
   addTimer(PRIMARY_SEGMENT_START_TIME)
   addTimer(REPLICA_SEGMENT_START_TIME)
 
-  addTimer(FETCH_CHUNK_TIME)
+  addTimer(FETCH_MEMORY_CHUNK_TIME)
+  addTimer(FETCH_LOCAL_CHUNK_TIME)
   addTimer(OPEN_STREAM_TIME)
   addTimer(TAKE_BUFFER_TIME)
   addTimer(SORT_TIME)
@@ -161,12 +164,15 @@ object WorkerSource {
 
   // fetch data
   val OPEN_STREAM_TIME = "OpenStreamTime"
-  val FETCH_CHUNK_TIME = "FetchChunkTime"
+  val FETCH_MEMORY_CHUNK_TIME = "FetchMemoryChunkTime"
+  val FETCH_LOCAL_CHUNK_TIME = "FetchLocalChunkTime"
   val ACTIVE_CHUNK_STREAM_COUNT = "ActiveChunkStreamCount"
   val OPEN_STREAM_SUCCESS_COUNT = "OpenStreamSuccessCount"
   val OPEN_STREAM_FAIL_COUNT = "OpenStreamFailCount"
-  val FETCH_CHUNK_SUCCESS_COUNT = "FetchChunkSuccessCount"
-  val FETCH_CHUNK_FAIL_COUNT = "FetchChunkFailCount"
+  val FETCH_MEMORY_CHUNK_SUCCESS_COUNT = "FetchMemoryChunkSuccessCount"
+  val FETCH_LOCAL_CHUNK_SUCCESS_COUNT = "FetchLocalChunkSuccessCount"
+  val FETCH_MEMORY_CHUNK_FAIL_COUNT = "FetchMemoryChunkFailCount"
+  val FETCH_LOCAL_CHUNK_FAIL_COUNT = "FetchLocalChunkFailCount"
   val FETCH_CHUNK_TRANSFER_SIZE = "FetchChunkTransferSize"
   val FETCH_CHUNK_TRANSFER_TIME = "FetchChunkTransferTime"
 

Reply via email to