This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 2097ad0a3 [CELEBORN-2205] Introduce metrics to fetch chunk for memory
and local disk
2097ad0a3 is described below
commit 2097ad0a347b982e68ac07779ee1ab11815b524b
Author: xxx <[email protected]>
AuthorDate: Wed Feb 4 19:38:06 2026 +0800
[CELEBORN-2205] Introduce metrics to fetch chunk for memory and local disk
### What changes were proposed in this pull request?
Introduce metrics to fetch chunk time for memory and local disk.
### Why are the changes needed?
Introduce metrics to fetch chunk time for memory and local disk.
### Does this PR resolve a correctness bug?
No.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
[Grafana](https://xy2953396112.grafana.net/public-dashboards/979279524ef74b6b92d0c08c39aa7c9e)
Closes #3546 from xy2953396112/CELEBORN-2205.
Authored-by: xxx <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
assets/grafana/celeborn-dashboard.json | 380 +++++++++++++++++++--
docs/monitoring.md | 9 +-
.../service/deploy/worker/FetchHandler.scala | 26 +-
.../service/deploy/worker/WorkerSource.scala | 18 +-
4 files changed, 397 insertions(+), 36 deletions(-)
diff --git a/assets/grafana/celeborn-dashboard.json
b/assets/grafana/celeborn-dashboard.json
index 2e62facaa..cff188291 100644
--- a/assets/grafana/celeborn-dashboard.json
+++ b/assets/grafana/celeborn-dashboard.json
@@ -5503,10 +5503,6 @@
"steps": [
{
"color": "green"
- },
- {
- "color": "red",
- "value": 80
}
]
},
@@ -5539,12 +5535,12 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
- "expr": "metrics_FetchChunkTime_Mean{instance=~\"${instance}\"}",
+ "expr":
"metrics_FetchMemoryChunkTime_Mean{instance=~\"${instance}\"}",
"legendFormat": "${baseLegend}",
"refId": "A"
}
],
- "title": "metrics_FetchChunkTime_Mean",
+ "title": "metrics_FetchMemoryChunkTime_Mean",
"type": "timeseries"
},
{
@@ -5595,10 +5591,6 @@
"steps": [
{
"color": "green"
- },
- {
- "color": "red",
- "value": 80
}
]
},
@@ -5631,12 +5623,188 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
- "expr": "metrics_FetchChunkTime_Max{instance=~\"${instance}\"}",
+ "expr":
"metrics_FetchMemoryChunkTime_Max{instance=~\"${instance}\"}",
+ "legendFormat": "${baseLegend}",
+ "refId": "A"
+ }
+ ],
+ "title": "metrics_FetchMemoryChunkTime_Max",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisBorderShow": false,
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "insertNulls": false,
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green"
+ }
+ ]
+ },
+ "unit": "ms"
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 0,
+ "y": 103
+ },
+ "id": 21,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "expr":
"metrics_FetchLocalChunkTime_Mean{instance=~\"${instance}\"}",
+ "legendFormat": "${baseLegend}",
+ "refId": "A"
+ }
+ ],
+ "title": "metrics_FetchLocalChunkTime_Mean",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisBorderShow": false,
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "insertNulls": false,
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green"
+ }
+ ]
+ },
+ "unit": "ms"
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 12,
+ "y": 103
+ },
+ "id": 22,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "expr":
"metrics_LocalFetchChunkTime_Max{instance=~\"${instance}\"}",
"legendFormat": "${baseLegend}",
"refId": "A"
}
],
- "title": "metrics_FetchChunkTime_Max",
+ "title": "metrics_LocalFetchChunkTime_Max",
"type": "timeseries"
},
{
@@ -5909,13 +6077,13 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
- "expr":
"metrics_FetchChunkSuccessCount_Count{instance=~\"${instance}\"}",
+ "expr":
"metrics_FetchMemoryChunkSuccessCount_Count{instance=~\"${instance}\"}",
"legendFormat": "${baseLegend}",
"range": true,
"refId": "A"
}
],
- "title": "metrics_FetchChunkSuccessCount_Count",
+ "title": "metrics_FetchMemoryChunkSuccessCount_Count",
"type": "timeseries"
},
{
@@ -5966,10 +6134,95 @@
"steps": [
{
"color": "green"
- },
+ }
+ ]
+ }
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 0,
+ "y": 119
+ },
+ "id": 86,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "editorMode": "code",
+ "expr":
"metrics_FetchLocalChunkSuccessCount_Count{instance=~\"${instance}\"}",
+ "legendFormat": "${baseLegend}",
+ "range": true,
+ "refId": "A"
+ }
+ ],
+ "title": "metrics_FetchLocalChunkSuccessCount_Count",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisBorderShow": false,
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "insertNulls": false,
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
{
- "color": "red",
- "value": 80
+ "color": "green"
}
]
}
@@ -6002,13 +6255,102 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
- "expr":
"metrics_FetchChunkFailCount_Count{instance=~\"${instance}\"}",
+ "expr":
"metrics_FetchMemoryChunkFailCount_Count{instance=~\"${instance}\"}",
+ "legendFormat": "${baseLegend}",
+ "range": true,
+ "refId": "A"
+ }
+ ],
+ "title": "metrics_FetchMemoryChunkFailCount_Count",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisBorderShow": false,
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "insertNulls": false,
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green"
+ }
+ ]
+ }
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 12,
+ "y": 119
+ },
+ "id": 85,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "editorMode": "code",
+ "expr":
"metrics_FetchLocalChunkFailCount_Count{instance=~\"${instance}\"}",
"legendFormat": "${baseLegend}",
"range": true,
"refId": "A"
}
],
- "title": "metrics_FetchChunkFailCount_Count",
+ "title": "metrics_FetchLocalChunkFailCount_Count",
"type": "timeseries"
},
{
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 4c7d4adfa..cef191cd2 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -184,13 +184,16 @@ These metrics are exposed by Celeborn worker.
| ActiveShuffleSize | The active shuffle size of a
worker including master replica and slave replica.
|
| ActiveShuffleFileCount | The active shuffle file count
of a worker including master replica and slave replica.
|
| OpenStreamTime | The time for a worker to
process openStream RPC and return StreamHandle.
|
- | FetchChunkTime | The time for a worker to fetch
a chunk which is 8MB by default from a reduced partition.
|
+ | FetchMemoryChunkTime | The time for a worker to fetch
a memory chunk which is 8MB by default from a reduced partition.
|
+ | FetchLocalChunkTime | The time for a worker to fetch
a local disk chunk which is 8MB by default from a reduced partition.
|
| FetchChunkTransferTime | The time for a worker to
transfer for fetching a chunk from a reduced partition.
|
| ActiveChunkStreamCount | Active stream count for reduce
partition reading streams.
|
| OpenStreamSuccessCount | The count of opening stream
succeed in current worker.
|
| OpenStreamFailCount | The count of opening stream
failed in current worker.
|
- | FetchChunkSuccessCount | The count of fetching chunk
succeed in current worker.
|
- | FetchChunkFailCount | The count of fetching chunk
failed in current worker.
|
+ | FetchMemoryChunkSuccessCount | The count of fetching memory
chunk succeed in current worker.
|
+ | FetchLocalChunkSuccessCount | The count of fetching local
disk chunk succeed in current worker.
|
+ | FetchMemoryChunkFailCount | The count of fetching memory
chunk failed in current worker.
|
+ | FetchLocalChunkFailCount | The count of fetching local
disk chunk failed in current worker.
|
| FetchChunkTransferSize | The size of transfer for
fetching chunk in current worker.
|
| PrimaryPushDataTime | The time for a worker to handle
a pushData RPC sent from a celeborn client.
|
| ReplicaPushDataTime | The time for a worker to handle
a pushData RPC sent from a celeborn worker by replicating.
|
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index edf71039f..ca9138e8f 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -553,11 +553,21 @@ class FetchHandler(
s" to fetch block $streamChunkSlice")
val streamState =
chunkStreamManager.getStreamState(streamChunkSlice.streamId)
+ val storageMetrics = streamState.buffers match {
+ case _: FileChunkBuffers => (
+ WorkerSource.FETCH_LOCAL_CHUNK_TIME,
+ WorkerSource.FETCH_LOCAL_CHUNK_SUCCESS_COUNT,
+ WorkerSource.FETCH_LOCAL_CHUNK_FAIL_COUNT)
+ case _: MemoryChunkBuffers => (
+ WorkerSource.FETCH_MEMORY_CHUNK_TIME,
+ WorkerSource.FETCH_MEMORY_CHUNK_SUCCESS_COUNT,
+ WorkerSource.FETCH_MEMORY_CHUNK_FAIL_COUNT)
+ }
if (streamState == null) {
val message = s"Stream ${streamChunkSlice.streamId} is not registered
with worker. " +
"This can happen if the worker was restart recently."
logError(message)
- workerSource.incCounter(WorkerSource.FETCH_CHUNK_FAIL_COUNT)
+ workerSource.incCounter(storageMetrics._3)
client.getChannel.writeAndFlush(new ChunkFetchFailure(streamChunkSlice,
message))
return
}
@@ -569,7 +579,7 @@ class FetchHandler(
s"$chunksBeingTransferred exceeds
${MAX_CHUNKS_BEING_TRANSFERRED.key} " +
s"${Utils.bytesToString(threshold)}."
logError(message)
- workerSource.incCounter(WorkerSource.FETCH_CHUNK_FAIL_COUNT)
+ workerSource.incCounter(storageMetrics._3)
client.getChannel.writeAndFlush(new
ChunkFetchFailure(streamChunkSlice, message))
return
}
@@ -578,7 +588,7 @@ class FetchHandler(
workerSource.recordAppActiveConnection(client, streamState.shuffleKey)
val reqStr = req.toString
- workerSource.startTimer(WorkerSource.FETCH_CHUNK_TIME, reqStr)
+ workerSource.startTimer(storageMetrics._1, reqStr)
val fetchTimeMetric =
chunkStreamManager.getFetchTimeMetric(streamChunkSlice.streamId)
val fetchBeginTime = System.nanoTime()
try {
@@ -596,18 +606,18 @@ class FetchHandler(
logDebug(
s"Sending ChunkFetchSuccess to $remoteAddr succeeded, chunk
$streamChunkSlice")
}
- workerSource.incCounter(WorkerSource.FETCH_CHUNK_SUCCESS_COUNT)
+ workerSource.incCounter(storageMetrics._2)
} else {
logWarning(
s"Sending ChunkFetchSuccess to $remoteAddr failed, chunk
$streamChunkSlice",
future.cause())
- workerSource.incCounter(WorkerSource.FETCH_CHUNK_FAIL_COUNT)
+ workerSource.incCounter(storageMetrics._3)
}
chunkStreamManager.chunkSent(streamChunkSlice.streamId)
if (fetchTimeMetric != null) {
fetchTimeMetric.update(System.nanoTime() - fetchBeginTime)
}
- workerSource.stopTimer(WorkerSource.FETCH_CHUNK_TIME, reqStr)
+ workerSource.stopTimer(storageMetrics._1, reqStr)
}
})
} catch {
@@ -616,11 +626,11 @@ class FetchHandler(
s"Error opening block $streamChunkSlice for request from " +
NettyUtils.getRemoteAddress(client.getChannel),
e)
- workerSource.incCounter(WorkerSource.FETCH_CHUNK_FAIL_COUNT)
+ workerSource.incCounter(storageMetrics._3)
client.getChannel.writeAndFlush(new ChunkFetchFailure(
streamChunkSlice,
Throwables.getStackTraceAsString(e)))
- workerSource.stopTimer(WorkerSource.FETCH_CHUNK_TIME, reqStr)
+ workerSource.stopTimer(storageMetrics._1, reqStr)
}
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index 7d2467333..15eb6a24e 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -40,8 +40,10 @@ class WorkerSource(conf: CelebornConf) extends
AbstractSource(conf, Role.WORKER)
// add counters
addCounter(OPEN_STREAM_SUCCESS_COUNT)
addCounter(OPEN_STREAM_FAIL_COUNT)
- addCounter(FETCH_CHUNK_SUCCESS_COUNT)
- addCounter(FETCH_CHUNK_FAIL_COUNT)
+ addCounter(FETCH_MEMORY_CHUNK_SUCCESS_COUNT)
+ addCounter(FETCH_LOCAL_CHUNK_SUCCESS_COUNT)
+ addCounter(FETCH_MEMORY_CHUNK_FAIL_COUNT)
+ addCounter(FETCH_LOCAL_CHUNK_FAIL_COUNT)
addCounter(WRITE_DATA_HARD_SPLIT_COUNT)
addCounter(WRITE_DATA_SUCCESS_COUNT)
addCounter(WRITE_DATA_FAIL_COUNT)
@@ -91,7 +93,8 @@ class WorkerSource(conf: CelebornConf) extends
AbstractSource(conf, Role.WORKER)
addTimer(PRIMARY_SEGMENT_START_TIME)
addTimer(REPLICA_SEGMENT_START_TIME)
- addTimer(FETCH_CHUNK_TIME)
+ addTimer(FETCH_MEMORY_CHUNK_TIME)
+ addTimer(FETCH_LOCAL_CHUNK_TIME)
addTimer(OPEN_STREAM_TIME)
addTimer(TAKE_BUFFER_TIME)
addTimer(SORT_TIME)
@@ -161,12 +164,15 @@ object WorkerSource {
// fetch data
val OPEN_STREAM_TIME = "OpenStreamTime"
- val FETCH_CHUNK_TIME = "FetchChunkTime"
+ val FETCH_MEMORY_CHUNK_TIME = "FetchMemoryChunkTime"
+ val FETCH_LOCAL_CHUNK_TIME = "FetchLocalChunkTime"
val ACTIVE_CHUNK_STREAM_COUNT = "ActiveChunkStreamCount"
val OPEN_STREAM_SUCCESS_COUNT = "OpenStreamSuccessCount"
val OPEN_STREAM_FAIL_COUNT = "OpenStreamFailCount"
- val FETCH_CHUNK_SUCCESS_COUNT = "FetchChunkSuccessCount"
- val FETCH_CHUNK_FAIL_COUNT = "FetchChunkFailCount"
+ val FETCH_MEMORY_CHUNK_SUCCESS_COUNT = "FetchMemoryChunkSuccessCount"
+ val FETCH_LOCAL_CHUNK_SUCCESS_COUNT = "FetchLocalChunkSuccessCount"
+ val FETCH_MEMORY_CHUNK_FAIL_COUNT = "FetchMemoryChunkFailCount"
+ val FETCH_LOCAL_CHUNK_FAIL_COUNT = "FetchLocalChunkFailCount"
val FETCH_CHUNK_TRANSFER_SIZE = "FetchChunkTransferSize"
val FETCH_CHUNK_TRANSFER_TIME = "FetchChunkTransferTime"