This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 03656b5b1 [CELEBORN-1634][FOLLOWUP] Add rpc metrics into grafana
dashboard
03656b5b1 is described below
commit 03656b5b1cc84a3119d85ce57b7cb905b6c291d6
Author: Wang, Fei <[email protected]>
AuthorDate: Tue Dec 24 11:13:49 2024 +0800
[CELEBORN-1634][FOLLOWUP] Add rpc metrics into grafana dashboard
### What changes were proposed in this pull request?
1. rename the RPC metrics name from `${name}_${metric}` to
`Rpc${metric}{name=$name}` so that it is easy to add into grafana dashboard
2. Use MASTER/WORKER/CLIENT Role for rpc env.
3. add the rpc metrics into grafana dashboard.
### Why are the changes needed?
For monitoring
### Does this PR introduce _any_ user-facing change?
No, it has not been released
### How was this patch tested?
UT for metrics source `instance`.
<img width="1456" alt="image"
src="https://github.com/user-attachments/assets/90284390-54ad-49ef-a868-fa537d2301b8">
<img width="1880" alt="image"
src="https://github.com/user-attachments/assets/e8101e47-d649-4c66-9978-1efb4faa047f">
Closes #2990 from turboFei/rpc_metrics.
Lead-authored-by: Wang, Fei <[email protected]>
Co-authored-by: Fei Wang <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
assets/grafana/celeborn-dashboard.json | 654 +++++++++++++++++++++
.../apache/celeborn/client/ShuffleClientImpl.java | 2 +
.../apache/celeborn/client/LifecycleManager.scala | 4 +
.../celeborn/common/metrics/source/Role.scala | 1 +
.../org/apache/celeborn/common/rpc/RpcEnv.scala | 8 +-
.../celeborn/common/rpc/RpcMetricsTracker.scala | 18 +-
.../org/apache/celeborn/common/rpc/RpcSource.scala | 12 +-
.../celeborn/common/rpc/netty/NettyRpcEnv.scala | 6 +-
.../celeborn/common/meta/WorkerInfoSuite.scala | 2 +
.../celeborn/common/rpc/netty/InboxSuite.scala | 3 +-
.../common/rpc/netty/NettyRpcEnvSuite.scala | 4 +
.../celeborn/service/deploy/master/Master.scala | 3 +
.../server/common/http/ApiBaseResourceSuite.scala | 10 +-
.../celeborn/service/deploy/worker/Worker.scala | 3 +
14 files changed, 707 insertions(+), 23 deletions(-)
diff --git a/assets/grafana/celeborn-dashboard.json
b/assets/grafana/celeborn-dashboard.json
index 2fa98c1ec..fb1f00577 100644
--- a/assets/grafana/celeborn-dashboard.json
+++ b/assets/grafana/celeborn-dashboard.json
@@ -13137,6 +13137,660 @@
],
"title": "UserConsumption",
"type": "row"
+ },
+ {
+ "collapsed": true,
+ "gridPos": {
+ "h": 1,
+ "w": 24,
+ "x": 0,
+ "y": 13
+ },
+ "id": 226,
+ "panels": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "description": "",
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green"
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ }
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 0,
+ "y": 1
+ },
+ "id": 227,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "editorMode": "code",
+ "expr":
"metrics_RpcQueueLength_Value{instance=~\"${instance}\"}",
+ "legendFormat": "${baseLegend}",
+ "range": true,
+ "refId": "A"
+ }
+ ],
+ "title": "metrics_RpcQueueLength_Value",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "description": "",
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green"
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ }
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 12,
+ "y": 1
+ },
+ "id": 228,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "editorMode": "code",
+ "expr": "metrics_RpcQueueTime_Count{instance=~\"${instance}\"}",
+ "legendFormat": "${baseLegend}",
+ "range": true,
+ "refId": "A"
+ }
+ ],
+ "title": "metrics_RpcQueueTime_Count",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green"
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ },
+ "unit": "ms"
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 0,
+ "y": 9
+ },
+ "id": 229,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "maxHeight": 600,
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "expr": "metrics_RpcQueueTime_Max{instance=~\"${instance}\"}",
+ "legendFormat": "${baseLegend}",
+ "refId": "A"
+ }
+ ],
+ "title": "metrics_RpcQueueTime_Max",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green"
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ },
+ "unit": "ms"
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 12,
+ "y": 9
+ },
+ "id": 230,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "maxHeight": 600,
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "expr": "metrics_RpcQueueTime_Mean{instance=~\"${instance}\"}",
+ "legendFormat": "${baseLegend}",
+ "refId": "A"
+ }
+ ],
+ "title": "metrics_RpcQueueTime_Mean",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "description": "",
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green"
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ }
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 0,
+ "y": 17
+ },
+ "id": 231,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "editorMode": "code",
+ "expr":
"metrics_RpcProcessTime_Count{instance=~\"${instance}\"}",
+ "legendFormat": "${baseLegend}",
+ "range": true,
+ "refId": "A"
+ }
+ ],
+ "title": "metrics_RpcProcessTime_Count",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green"
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ },
+ "unit": "ms"
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 12,
+ "y": 17
+ },
+ "id": 232,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "maxHeight": 600,
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "expr": "metrics_RpcProcessTime_Max{instance=~\"${instance}\"}",
+ "legendFormat": "${baseLegend}",
+ "refId": "A"
+ }
+ ],
+ "title": "metrics_RpcProcessTime_Max",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green"
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ },
+ "unit": "ms"
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 8,
+ "w": 12,
+ "x": 0,
+ "y": 25
+ },
+ "id": 233,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "maxHeight": 600,
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "${DS_PROMETHEUS}"
+ },
+ "expr": "metrics_RpcProcessTime_Mean{instance=~\"${instance}\"}",
+ "legendFormat": "${baseLegend}",
+ "refId": "A"
+ }
+ ],
+ "title": "metrics_RpcProcessTime_Mean",
+ "type": "timeseries"
+ }
+ ],
+ "title": "Rpc",
+ "type": "row"
}
],
"refresh": "5s",
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index e50e6093c..cde9fc043 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -44,6 +44,7 @@ import org.apache.celeborn.client.read.MetricsCallback;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.exception.CelebornIOException;
import org.apache.celeborn.common.identity.UserIdentifier;
+import org.apache.celeborn.common.metrics.source.Role;
import org.apache.celeborn.common.network.TransportContext;
import org.apache.celeborn.common.network.buffer.NettyManagedBuffer;
import org.apache.celeborn.common.network.client.RpcResponseCallback;
@@ -201,6 +202,7 @@ public class ShuffleClientImpl extends ShuffleClient {
Utils.localHostName(conf),
0,
conf,
+ Role.CLIENT(),
scala.None$.empty());
String module = TransportModuleConstants.DATA_MODULE;
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index ed803071e..4f69a05bc 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -44,6 +44,7 @@ import org.apache.celeborn.common.client.MasterClient
import org.apache.celeborn.common.identity.{IdentityProvider, UserIdentifier}
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{ApplicationMeta,
ShufflePartitionLocationInfo, WorkerInfo}
+import org.apache.celeborn.common.metrics.source.Role
import org.apache.celeborn.common.network.sasl.registration.RegistrationInfo
import org.apache.celeborn.common.protocol._
import org.apache.celeborn.common.protocol.RpcNameConstants.WORKER_EP
@@ -170,6 +171,7 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
lifecycleHost,
conf.shuffleManagerPort,
conf,
+ Role.CLIENT,
None)
rpcEnv.setupEndpoint(RpcNameConstants.LIFECYCLE_MANAGER_EP, this)
@@ -189,6 +191,7 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
lifecycleHost,
0,
conf,
+ Role.CLIENT,
createRpcSecurityContext(
appSecret,
addClientRegistrationBootstrap = true,
@@ -200,6 +203,7 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
lifecycleHost,
0,
conf,
+ Role.CLIENT,
createRpcSecurityContext(appSecret))
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/Role.scala
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/Role.scala
index 50b509643..95b03f256 100644
--- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/Role.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/Role.scala
@@ -20,4 +20,5 @@ package org.apache.celeborn.common.metrics.source
object Role {
val MASTER = "master"
val WORKER = "worker"
+ val CLIENT = "client"
}
diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
index 7a44d8b63..89973a936 100644
--- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
@@ -39,8 +39,9 @@ object RpcEnv {
host: String,
port: Int,
conf: CelebornConf,
+ role: String,
securityContext: Option[RpcSecurityContext]): RpcEnv = {
- create(name, transportModule, host, host, port, conf, 0, securityContext)
+ create(name, transportModule, host, host, port, conf, 0, role,
securityContext)
}
def create(
@@ -50,6 +51,7 @@ object RpcEnv {
port: Int,
conf: CelebornConf,
numUsableCores: Int,
+ role: String,
securityContext: Option[RpcSecurityContext],
source: Option[AbstractSource]): RpcEnv = {
val bindAddress =
@@ -63,6 +65,7 @@ object RpcEnv {
port,
conf,
numUsableCores,
+ role,
securityContext,
source)
}
@@ -75,6 +78,7 @@ object RpcEnv {
port: Int,
conf: CelebornConf,
numUsableCores: Int,
+ role: String,
securityContext: Option[RpcSecurityContext] = None,
source: Option[AbstractSource] = None): RpcEnv = {
val config =
@@ -86,6 +90,7 @@ object RpcEnv {
advertiseAddress,
port,
numUsableCores,
+ role,
securityContext,
source)
new NettyRpcEnvFactory().create(config)
@@ -222,6 +227,7 @@ private[celeborn] case class RpcEnvConfig(
advertiseAddress: String,
port: Int,
numUsableCores: Int,
+ role: String,
securityContext: Option[RpcSecurityContext],
source: Option[AbstractSource]) {
assert(RpcEnvConfig.VALID_TRANSPORT_MODULES.contains(transportModule))
diff --git
a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcMetricsTracker.scala
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcMetricsTracker.scala
index 22a715c34..5774b7a99 100644
---
a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcMetricsTracker.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcMetricsTracker.scala
@@ -53,19 +53,17 @@ private[celeborn] class RpcMetricsTracker(
} else {
false
}
- final private val QUEUE_LENGTH_METRIC = s"${name}_${RpcSource.QUEUE_LENGTH}"
- final private val QUEUE_TIME_METRIC = s"${name}_${RpcSource.QUEUE_TIME}"
- final private val PROCESS_TIME_METRIC = s"${name}_${RpcSource.PROCESS_TIME}"
+ final private val NAME_LABEL = Map("name" -> name)
private var queueLengthFunc: () => Long = _
def init(lengthFunc: () => Long): Unit = {
queueLengthFunc = lengthFunc
if (name != null) {
- rpcSource.addGauge(QUEUE_LENGTH_METRIC)(queueLengthFunc)
+ rpcSource.addGauge(RpcSource.QUEUE_LENGTH, NAME_LABEL)(queueLengthFunc)
- rpcSource.addTimer(QUEUE_TIME_METRIC)
- rpcSource.addTimer(PROCESS_TIME_METRIC)
+ rpcSource.addTimer(RpcSource.QUEUE_TIME, NAME_LABEL)
+ rpcSource.addTimer(RpcSource.PROCESS_TIME, NAME_LABEL)
}
}
@@ -115,12 +113,12 @@ private[celeborn] class RpcMetricsTracker(
val msgName = messageName(message)
if (useHistogram) {
- updateHistogram(QUEUE_TIME_METRIC, queueTime)
- updateHistogram(PROCESS_TIME_METRIC, processTime)
+ updateHistogram(RpcSource.QUEUE_TIME, queueTime)
+ updateHistogram(RpcSource.PROCESS_TIME, processTime)
updateHistogram(msgName, processTime)
} else {
- rpcSource.updateTimer(QUEUE_TIME_METRIC, queueTime)
- rpcSource.updateTimer(PROCESS_TIME_METRIC, processTime)
+ rpcSource.updateTimer(RpcSource.QUEUE_TIME, queueTime, NAME_LABEL)
+ rpcSource.updateTimer(RpcSource.PROCESS_TIME, processTime, NAME_LABEL)
rpcSource.updateTimer(msgName, processTime)
}
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcSource.scala
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcSource.scala
index 3b1da4385..67ddd7eaa 100644
--- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcSource.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcSource.scala
@@ -20,16 +20,14 @@ package org.apache.celeborn.common.rpc
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.metrics.source.AbstractSource
-class RpcSource(conf: CelebornConf) extends AbstractSource(conf,
RpcSource.ROLE_RPC) {
- override def sourceName: String = RpcSource.ROLE_RPC
+class RpcSource(conf: CelebornConf, role: String) extends AbstractSource(conf,
role) {
+ override def sourceName: String = "RPC"
startCleaner()
}
object RpcSource {
- val ROLE_RPC = "RPC"
-
- val QUEUE_LENGTH = "QueueLength"
- val QUEUE_TIME = "QueueTime"
- val PROCESS_TIME = "ProcessTime"
+ val QUEUE_LENGTH = "RpcQueueLength"
+ val QUEUE_TIME = "RpcQueueTime"
+ val PROCESS_TIME = "RpcProcessTime"
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
index 648393786..517b62fe3 100644
---
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
@@ -55,9 +55,9 @@ class NettyRpcEnv(
config.transportModule,
celebornConf.rpcIoThreads.getOrElse(config.numUsableCores))
- private val source: RpcSource = new RpcSource(celebornConf)
+ private val _rpcSource: RpcSource = new RpcSource(celebornConf, config.role)
- private val dispatcher: Dispatcher = new Dispatcher(this, source)
+ private val dispatcher: Dispatcher = new Dispatcher(this, _rpcSource)
private var worker: RpcEndpoint = null
@@ -364,7 +364,7 @@ class NettyRpcEnv(
}
}
- override def rpcSource(): RpcSource = source
+ override def rpcSource(): RpcSource = _rpcSource
}
private[celeborn] object NettyRpcEnv extends Logging {
diff --git
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
index aa72dce04..e7d211205 100644
---
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
+++
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
@@ -31,6 +31,7 @@ import org.junit.Assert.{assertEquals, assertNotEquals,
assertNotNull}
import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.identity.UserIdentifier
+import org.apache.celeborn.common.metrics.source.Role
import org.apache.celeborn.common.protocol.TransportModuleConstants
import org.apache.celeborn.common.quota.ResourceConsumption
import org.apache.celeborn.common.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
@@ -283,6 +284,7 @@ class WorkerInfoSuite extends CelebornFunSuite {
12345,
conf,
64,
+ Role.WORKER,
None,
None)
val worker4 = new WorkerInfo(
diff --git
a/common/src/test/scala/org/apache/celeborn/common/rpc/netty/InboxSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/rpc/netty/InboxSuite.scala
index 83b5b0a1b..a0c23368b 100644
---
a/common/src/test/scala/org/apache/celeborn/common/rpc/netty/InboxSuite.scala
+++
b/common/src/test/scala/org/apache/celeborn/common/rpc/netty/InboxSuite.scala
@@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.metrics.source.Role
import org.apache.celeborn.common.rpc.{RpcAddress, RpcMetricsTracker,
RpcSource, TestRpcEndpoint}
import org.apache.celeborn.common.util.ThreadUtils
@@ -38,7 +39,7 @@ class InboxSuite extends CelebornFunSuite with BeforeAndAfter
{
onDropOverride: Option[InboxMessage => T]): Inbox = {
val rpcEnvRef = mock(classOf[NettyRpcEndpointRef])
val conf = new CelebornConf()
- val source: RpcSource = new RpcSource(conf)
+ val source: RpcSource = new RpcSource(conf, Role.CLIENT)
if (onDropOverride.isEmpty) {
new Inbox(
rpcEnvRef,
diff --git
a/common/src/test/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnvSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnvSuite.scala
index 8b0ba7bcd..bd469591c 100644
---
a/common/src/test/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnvSuite.scala
+++
b/common/src/test/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnvSuite.scala
@@ -26,6 +26,7 @@ import org.scalatest.concurrent.{Signaler, ThreadSignaler,
TimeLimits}
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.exception.CelebornException
+import org.apache.celeborn.common.metrics.source.Role
import org.apache.celeborn.common.network.client.TransportClient
import org.apache.celeborn.common.protocol.TransportModuleConstants
import org.apache.celeborn.common.rpc._
@@ -49,6 +50,7 @@ class NettyRpcEnvSuite extends RpcEnvSuite with TimeLimits {
"localhost",
port,
0,
+ Role.CLIENT,
None,
None)
new NettyRpcEnvFactory().create(config)
@@ -73,6 +75,7 @@ class NettyRpcEnvSuite extends RpcEnvSuite with TimeLimits {
"example.com",
0,
0,
+ Role.CLIENT,
None,
None)
val env = new NettyRpcEnvFactory().create(config)
@@ -123,6 +126,7 @@ class NettyRpcEnvSuite extends RpcEnvSuite with TimeLimits {
"localhost",
0,
numUsableCores,
+ Role.CLIENT,
None,
None)
val anotherEnv = new NettyRpcEnvFactory().create(config)
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 5eb674428..31f10f9d4 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -101,6 +101,7 @@ private[celeborn] class Master(
masterArgs.port,
conf,
Math.max(64, Runtime.getRuntime.availableProcessors()),
+ Role.MASTER,
None,
None)
} else {
@@ -118,6 +119,7 @@ private[celeborn] class Master(
masterArgs.port,
conf,
Math.max(64, Runtime.getRuntime.availableProcessors()),
+ Role.MASTER,
Some(externalSecurityContext),
None)
}
@@ -137,6 +139,7 @@ private[celeborn] class Master(
masterArgs.internalPort,
conf,
Math.max(64, Runtime.getRuntime.availableProcessors()),
+ Role.MASTER,
None,
None)
}
diff --git
a/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceSuite.scala
b/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceSuite.scala
index 09945217b..49d513851 100644
---
a/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceSuite.scala
+++
b/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceSuite.scala
@@ -86,7 +86,15 @@ abstract class ApiBaseResourceSuite extends HttpTestHelper {
test("metrics") {
var response =
webTarget.path("metrics/prometheus").request(MediaType.APPLICATION_JSON).get()
assert(HttpServletResponse.SC_OK == response.getStatus)
-
assert(response.readEntity(classOf[String]).contains("metrics_jvm_memory_heap_max_Value"))
+ val metricLines = response.readEntity(classOf[String]).split("\n")
+ Seq(
+ "metrics_jvm_memory_heap_max_Value",
+ "metrics_RpcQueueLength_Value",
+ "metrics_RpcQueueTime_Max",
+ "metrics_RpcProcessTime_Max").foreach { metric =>
+ assert(metricLines.exists(l =>
+ l.contains(metric) &&
l.contains(s"""instance="${httpService.connectionUrl}"""")))
+ }
response =
webTarget.path("metrics/json").request(MediaType.APPLICATION_JSON).get()
assert(HttpServletResponse.SC_OK == response.getStatus)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 274ad0b82..7c85712ec 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -104,6 +104,7 @@ private[celeborn] class Worker(
workerArgs.port,
conf,
Math.min(64, Math.max(4, Runtime.getRuntime.availableProcessors())),
+ Role.WORKER,
None,
Some(workerSource))
} else {
@@ -121,6 +122,7 @@ private[celeborn] class Worker(
workerArgs.port,
conf,
Math.max(64, Runtime.getRuntime.availableProcessors()),
+ Role.WORKER,
Some(externalSecurityContext),
Some(workerSource))
}
@@ -137,6 +139,7 @@ private[celeborn] class Worker(
workerArgs.internalPort,
conf,
Math.min(64, Math.max(4, Runtime.getRuntime.availableProcessors())),
+ Role.WORKER,
None,
Some(workerSource))
}