This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 03656b5b1 [CELEBORN-1634][FOLLOWUP] Add rpc metrics into grafana 
dashboard
03656b5b1 is described below

commit 03656b5b1cc84a3119d85ce57b7cb905b6c291d6
Author: Wang, Fei <[email protected]>
AuthorDate: Tue Dec 24 11:13:49 2024 +0800

    [CELEBORN-1634][FOLLOWUP] Add rpc metrics into grafana dashboard
    
    ### What changes were proposed in this pull request?
    
    1. rename the RPC metrics name from `${name}_${metric}` to 
`Rpc${metric}{name=$name}` so that it is easy to add into grafana dashboard
    2. Use MASTER/WORKER/CLIENT Role for rpc env.
    3. add the rpc metrics into grafana dashboard.
    
    ### Why are the changes needed?
    
    For monitoring
    
    ### Does this PR introduce _any_ user-facing change?
    No, it has not been released
    
    ### How was this patch tested?
    UT for  metrics source `instance`.
    
    <img width="1456" alt="image" 
src="https://github.com/user-attachments/assets/90284390-54ad-49ef-a868-fa537d2301b8";>
    
    <img width="1880" alt="image" 
src="https://github.com/user-attachments/assets/e8101e47-d649-4c66-9978-1efb4faa047f";>
    
    Closes #2990 from turboFei/rpc_metrics.
    
    Lead-authored-by: Wang, Fei <[email protected]>
    Co-authored-by: Fei Wang <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 assets/grafana/celeborn-dashboard.json             | 654 +++++++++++++++++++++
 .../apache/celeborn/client/ShuffleClientImpl.java  |   2 +
 .../apache/celeborn/client/LifecycleManager.scala  |   4 +
 .../celeborn/common/metrics/source/Role.scala      |   1 +
 .../org/apache/celeborn/common/rpc/RpcEnv.scala    |   8 +-
 .../celeborn/common/rpc/RpcMetricsTracker.scala    |  18 +-
 .../org/apache/celeborn/common/rpc/RpcSource.scala |  12 +-
 .../celeborn/common/rpc/netty/NettyRpcEnv.scala    |   6 +-
 .../celeborn/common/meta/WorkerInfoSuite.scala     |   2 +
 .../celeborn/common/rpc/netty/InboxSuite.scala     |   3 +-
 .../common/rpc/netty/NettyRpcEnvSuite.scala        |   4 +
 .../celeborn/service/deploy/master/Master.scala    |   3 +
 .../server/common/http/ApiBaseResourceSuite.scala  |  10 +-
 .../celeborn/service/deploy/worker/Worker.scala    |   3 +
 14 files changed, 707 insertions(+), 23 deletions(-)

diff --git a/assets/grafana/celeborn-dashboard.json 
b/assets/grafana/celeborn-dashboard.json
index 2fa98c1ec..fb1f00577 100644
--- a/assets/grafana/celeborn-dashboard.json
+++ b/assets/grafana/celeborn-dashboard.json
@@ -13137,6 +13137,660 @@
       ],
       "title": "UserConsumption",
       "type": "row"
+    },
+    {
+      "collapsed": true,
+      "gridPos": {
+        "h": 1,
+        "w": 24,
+        "x": 0,
+        "y": 13
+      },
+      "id": 226,
+      "panels": [
+        {
+          "datasource": {
+            "type": "prometheus",
+            "uid": "${DS_PROMETHEUS}"
+          },
+          "description": "",
+          "fieldConfig": {
+            "defaults": {
+              "color": {
+                "mode": "palette-classic"
+              },
+              "custom": {
+                "axisCenteredZero": false,
+                "axisColorMode": "text",
+                "axisLabel": "",
+                "axisPlacement": "auto",
+                "barAlignment": 0,
+                "drawStyle": "line",
+                "fillOpacity": 0,
+                "gradientMode": "none",
+                "hideFrom": {
+                  "legend": false,
+                  "tooltip": false,
+                  "viz": false
+                },
+                "lineInterpolation": "linear",
+                "lineWidth": 1,
+                "pointSize": 5,
+                "scaleDistribution": {
+                  "type": "linear"
+                },
+                "showPoints": "auto",
+                "spanNulls": false,
+                "stacking": {
+                  "group": "A",
+                  "mode": "none"
+                },
+                "thresholdsStyle": {
+                  "mode": "off"
+                }
+              },
+              "mappings": [],
+              "thresholds": {
+                "mode": "absolute",
+                "steps": [
+                  {
+                    "color": "green"
+                  },
+                  {
+                    "color": "red",
+                    "value": 80
+                  }
+                ]
+              }
+            },
+            "overrides": []
+          },
+          "gridPos": {
+            "h": 8,
+            "w": 12,
+            "x": 0,
+            "y": 1
+          },
+          "id": 227,
+          "options": {
+            "legend": {
+              "calcs": [],
+              "displayMode": "list",
+              "placement": "bottom",
+              "showLegend": true
+            },
+            "tooltip": {
+              "mode": "single",
+              "sort": "none"
+            }
+          },
+          "targets": [
+            {
+              "datasource": {
+                "type": "prometheus",
+                "uid": "${DS_PROMETHEUS}"
+              },
+              "editorMode": "code",
+              "expr": 
"metrics_RpcQueueLength_Value{instance=~\"${instance}\"}",
+              "legendFormat": "${baseLegend}",
+              "range": true,
+              "refId": "A"
+            }
+          ],
+          "title": "metrics_RpcQueueLength_Value",
+          "type": "timeseries"
+        },
+        {
+          "datasource": {
+            "type": "prometheus",
+            "uid": "${DS_PROMETHEUS}"
+          },
+          "description": "",
+          "fieldConfig": {
+            "defaults": {
+              "color": {
+                "mode": "palette-classic"
+              },
+              "custom": {
+                "axisCenteredZero": false,
+                "axisColorMode": "text",
+                "axisLabel": "",
+                "axisPlacement": "auto",
+                "barAlignment": 0,
+                "drawStyle": "line",
+                "fillOpacity": 0,
+                "gradientMode": "none",
+                "hideFrom": {
+                  "legend": false,
+                  "tooltip": false,
+                  "viz": false
+                },
+                "lineInterpolation": "linear",
+                "lineWidth": 1,
+                "pointSize": 5,
+                "scaleDistribution": {
+                  "type": "linear"
+                },
+                "showPoints": "auto",
+                "spanNulls": false,
+                "stacking": {
+                  "group": "A",
+                  "mode": "none"
+                },
+                "thresholdsStyle": {
+                  "mode": "off"
+                }
+              },
+              "mappings": [],
+              "thresholds": {
+                "mode": "absolute",
+                "steps": [
+                  {
+                    "color": "green"
+                  },
+                  {
+                    "color": "red",
+                    "value": 80
+                  }
+                ]
+              }
+            },
+            "overrides": []
+          },
+          "gridPos": {
+            "h": 8,
+            "w": 12,
+            "x": 12,
+            "y": 1
+          },
+          "id": 228,
+          "options": {
+            "legend": {
+              "calcs": [],
+              "displayMode": "list",
+              "placement": "bottom",
+              "showLegend": true
+            },
+            "tooltip": {
+              "mode": "single",
+              "sort": "none"
+            }
+          },
+          "targets": [
+            {
+              "datasource": {
+                "type": "prometheus",
+                "uid": "${DS_PROMETHEUS}"
+              },
+              "editorMode": "code",
+              "expr": "metrics_RpcQueueTime_Count{instance=~\"${instance}\"}",
+              "legendFormat": "${baseLegend}",
+              "range": true,
+              "refId": "A"
+            }
+          ],
+          "title": "metrics_RpcQueueTime_Count",
+          "type": "timeseries"
+        },
+        {
+          "datasource": {
+            "type": "prometheus",
+            "uid": "${DS_PROMETHEUS}"
+          },
+          "fieldConfig": {
+            "defaults": {
+              "color": {
+                "mode": "palette-classic"
+              },
+              "custom": {
+                "axisCenteredZero": false,
+                "axisColorMode": "text",
+                "axisLabel": "",
+                "axisPlacement": "auto",
+                "barAlignment": 0,
+                "drawStyle": "line",
+                "fillOpacity": 0,
+                "gradientMode": "none",
+                "hideFrom": {
+                  "legend": false,
+                  "tooltip": false,
+                  "viz": false
+                },
+                "lineInterpolation": "linear",
+                "lineWidth": 1,
+                "pointSize": 5,
+                "scaleDistribution": {
+                  "type": "linear"
+                },
+                "showPoints": "auto",
+                "spanNulls": false,
+                "stacking": {
+                  "group": "A",
+                  "mode": "none"
+                },
+                "thresholdsStyle": {
+                  "mode": "off"
+                }
+              },
+              "mappings": [],
+              "thresholds": {
+                "mode": "absolute",
+                "steps": [
+                  {
+                    "color": "green"
+                  },
+                  {
+                    "color": "red",
+                    "value": 80
+                  }
+                ]
+              },
+              "unit": "ms"
+            },
+            "overrides": []
+          },
+          "gridPos": {
+            "h": 8,
+            "w": 12,
+            "x": 0,
+            "y": 9
+          },
+          "id": 229,
+          "options": {
+            "legend": {
+              "calcs": [],
+              "displayMode": "list",
+              "placement": "bottom",
+              "showLegend": true
+            },
+            "tooltip": {
+              "maxHeight": 600,
+              "mode": "single",
+              "sort": "none"
+            }
+          },
+          "targets": [
+            {
+              "datasource": {
+                "type": "prometheus",
+                "uid": "${DS_PROMETHEUS}"
+              },
+              "expr": "metrics_RpcQueueTime_Max{instance=~\"${instance}\"}",
+              "legendFormat": "${baseLegend}",
+              "refId": "A"
+            }
+          ],
+          "title": "metrics_RpcQueueTime_Max",
+          "type": "timeseries"
+        },
+        {
+          "datasource": {
+            "type": "prometheus",
+            "uid": "${DS_PROMETHEUS}"
+          },
+          "fieldConfig": {
+            "defaults": {
+              "color": {
+                "mode": "palette-classic"
+              },
+              "custom": {
+                "axisCenteredZero": false,
+                "axisColorMode": "text",
+                "axisLabel": "",
+                "axisPlacement": "auto",
+                "barAlignment": 0,
+                "drawStyle": "line",
+                "fillOpacity": 0,
+                "gradientMode": "none",
+                "hideFrom": {
+                  "legend": false,
+                  "tooltip": false,
+                  "viz": false
+                },
+                "lineInterpolation": "linear",
+                "lineWidth": 1,
+                "pointSize": 5,
+                "scaleDistribution": {
+                  "type": "linear"
+                },
+                "showPoints": "auto",
+                "spanNulls": false,
+                "stacking": {
+                  "group": "A",
+                  "mode": "none"
+                },
+                "thresholdsStyle": {
+                  "mode": "off"
+                }
+              },
+              "mappings": [],
+              "thresholds": {
+                "mode": "absolute",
+                "steps": [
+                  {
+                    "color": "green"
+                  },
+                  {
+                    "color": "red",
+                    "value": 80
+                  }
+                ]
+              },
+              "unit": "ms"
+            },
+            "overrides": []
+          },
+          "gridPos": {
+            "h": 8,
+            "w": 12,
+            "x": 12,
+            "y": 9
+          },
+          "id": 230,
+          "options": {
+            "legend": {
+              "calcs": [],
+              "displayMode": "list",
+              "placement": "bottom",
+              "showLegend": true
+            },
+            "tooltip": {
+              "maxHeight": 600,
+              "mode": "single",
+              "sort": "none"
+            }
+          },
+          "targets": [
+            {
+              "datasource": {
+                "type": "prometheus",
+                "uid": "${DS_PROMETHEUS}"
+              },
+              "expr": "metrics_RpcQueueTime_Mean{instance=~\"${instance}\"}",
+              "legendFormat": "${baseLegend}",
+              "refId": "A"
+            }
+          ],
+          "title": "metrics_RpcQueueTime_Mean",
+          "type": "timeseries"
+        },
+        {
+          "datasource": {
+            "type": "prometheus",
+            "uid": "${DS_PROMETHEUS}"
+          },
+          "description": "",
+          "fieldConfig": {
+            "defaults": {
+              "color": {
+                "mode": "palette-classic"
+              },
+              "custom": {
+                "axisCenteredZero": false,
+                "axisColorMode": "text",
+                "axisLabel": "",
+                "axisPlacement": "auto",
+                "barAlignment": 0,
+                "drawStyle": "line",
+                "fillOpacity": 0,
+                "gradientMode": "none",
+                "hideFrom": {
+                  "legend": false,
+                  "tooltip": false,
+                  "viz": false
+                },
+                "lineInterpolation": "linear",
+                "lineWidth": 1,
+                "pointSize": 5,
+                "scaleDistribution": {
+                  "type": "linear"
+                },
+                "showPoints": "auto",
+                "spanNulls": false,
+                "stacking": {
+                  "group": "A",
+                  "mode": "none"
+                },
+                "thresholdsStyle": {
+                  "mode": "off"
+                }
+              },
+              "mappings": [],
+              "thresholds": {
+                "mode": "absolute",
+                "steps": [
+                  {
+                    "color": "green"
+                  },
+                  {
+                    "color": "red",
+                    "value": 80
+                  }
+                ]
+              }
+            },
+            "overrides": []
+          },
+          "gridPos": {
+            "h": 8,
+            "w": 12,
+            "x": 0,
+            "y": 17
+          },
+          "id": 231,
+          "options": {
+            "legend": {
+              "calcs": [],
+              "displayMode": "list",
+              "placement": "bottom",
+              "showLegend": true
+            },
+            "tooltip": {
+              "mode": "single",
+              "sort": "none"
+            }
+          },
+          "targets": [
+            {
+              "datasource": {
+                "type": "prometheus",
+                "uid": "${DS_PROMETHEUS}"
+              },
+              "editorMode": "code",
+              "expr": 
"metrics_RpcProcessTime_Count{instance=~\"${instance}\"}",
+              "legendFormat": "${baseLegend}",
+              "range": true,
+              "refId": "A"
+            }
+          ],
+          "title": "metrics_RpcProcessTime_Count",
+          "type": "timeseries"
+        },
+        {
+          "datasource": {
+            "type": "prometheus",
+            "uid": "${DS_PROMETHEUS}"
+          },
+          "fieldConfig": {
+            "defaults": {
+              "color": {
+                "mode": "palette-classic"
+              },
+              "custom": {
+                "axisCenteredZero": false,
+                "axisColorMode": "text",
+                "axisLabel": "",
+                "axisPlacement": "auto",
+                "barAlignment": 0,
+                "drawStyle": "line",
+                "fillOpacity": 0,
+                "gradientMode": "none",
+                "hideFrom": {
+                  "legend": false,
+                  "tooltip": false,
+                  "viz": false
+                },
+                "lineInterpolation": "linear",
+                "lineWidth": 1,
+                "pointSize": 5,
+                "scaleDistribution": {
+                  "type": "linear"
+                },
+                "showPoints": "auto",
+                "spanNulls": false,
+                "stacking": {
+                  "group": "A",
+                  "mode": "none"
+                },
+                "thresholdsStyle": {
+                  "mode": "off"
+                }
+              },
+              "mappings": [],
+              "thresholds": {
+                "mode": "absolute",
+                "steps": [
+                  {
+                    "color": "green"
+                  },
+                  {
+                    "color": "red",
+                    "value": 80
+                  }
+                ]
+              },
+              "unit": "ms"
+            },
+            "overrides": []
+          },
+          "gridPos": {
+            "h": 8,
+            "w": 12,
+            "x": 12,
+            "y": 17
+          },
+          "id": 232,
+          "options": {
+            "legend": {
+              "calcs": [],
+              "displayMode": "list",
+              "placement": "bottom",
+              "showLegend": true
+            },
+            "tooltip": {
+              "maxHeight": 600,
+              "mode": "single",
+              "sort": "none"
+            }
+          },
+          "targets": [
+            {
+              "datasource": {
+                "type": "prometheus",
+                "uid": "${DS_PROMETHEUS}"
+              },
+              "expr": "metrics_RpcProcessTime_Max{instance=~\"${instance}\"}",
+              "legendFormat": "${baseLegend}",
+              "refId": "A"
+            }
+          ],
+          "title": "metrics_RpcProcessTime_Max",
+          "type": "timeseries"
+        },
+        {
+          "datasource": {
+            "type": "prometheus",
+            "uid": "${DS_PROMETHEUS}"
+          },
+          "fieldConfig": {
+            "defaults": {
+              "color": {
+                "mode": "palette-classic"
+              },
+              "custom": {
+                "axisCenteredZero": false,
+                "axisColorMode": "text",
+                "axisLabel": "",
+                "axisPlacement": "auto",
+                "barAlignment": 0,
+                "drawStyle": "line",
+                "fillOpacity": 0,
+                "gradientMode": "none",
+                "hideFrom": {
+                  "legend": false,
+                  "tooltip": false,
+                  "viz": false
+                },
+                "lineInterpolation": "linear",
+                "lineWidth": 1,
+                "pointSize": 5,
+                "scaleDistribution": {
+                  "type": "linear"
+                },
+                "showPoints": "auto",
+                "spanNulls": false,
+                "stacking": {
+                  "group": "A",
+                  "mode": "none"
+                },
+                "thresholdsStyle": {
+                  "mode": "off"
+                }
+              },
+              "mappings": [],
+              "thresholds": {
+                "mode": "absolute",
+                "steps": [
+                  {
+                    "color": "green"
+                  },
+                  {
+                    "color": "red",
+                    "value": 80
+                  }
+                ]
+              },
+              "unit": "ms"
+            },
+            "overrides": []
+          },
+          "gridPos": {
+            "h": 8,
+            "w": 12,
+            "x": 0,
+            "y": 25
+          },
+          "id": 233,
+          "options": {
+            "legend": {
+              "calcs": [],
+              "displayMode": "list",
+              "placement": "bottom",
+              "showLegend": true
+            },
+            "tooltip": {
+              "maxHeight": 600,
+              "mode": "single",
+              "sort": "none"
+            }
+          },
+          "targets": [
+            {
+              "datasource": {
+                "type": "prometheus",
+                "uid": "${DS_PROMETHEUS}"
+              },
+              "expr": "metrics_RpcProcessTime_Mean{instance=~\"${instance}\"}",
+              "legendFormat": "${baseLegend}",
+              "refId": "A"
+            }
+          ],
+          "title": "metrics_RpcProcessTime_Mean",
+          "type": "timeseries"
+        }
+      ],
+      "title": "Rpc",
+      "type": "row"
     }
   ],
   "refresh": "5s",
diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index e50e6093c..cde9fc043 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -44,6 +44,7 @@ import org.apache.celeborn.client.read.MetricsCallback;
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.exception.CelebornIOException;
 import org.apache.celeborn.common.identity.UserIdentifier;
+import org.apache.celeborn.common.metrics.source.Role;
 import org.apache.celeborn.common.network.TransportContext;
 import org.apache.celeborn.common.network.buffer.NettyManagedBuffer;
 import org.apache.celeborn.common.network.client.RpcResponseCallback;
@@ -201,6 +202,7 @@ public class ShuffleClientImpl extends ShuffleClient {
             Utils.localHostName(conf),
             0,
             conf,
+            Role.CLIENT(),
             scala.None$.empty());
 
     String module = TransportModuleConstants.DATA_MODULE;
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index ed803071e..4f69a05bc 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -44,6 +44,7 @@ import org.apache.celeborn.common.client.MasterClient
 import org.apache.celeborn.common.identity.{IdentityProvider, UserIdentifier}
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.meta.{ApplicationMeta, 
ShufflePartitionLocationInfo, WorkerInfo}
+import org.apache.celeborn.common.metrics.source.Role
 import org.apache.celeborn.common.network.sasl.registration.RegistrationInfo
 import org.apache.celeborn.common.protocol._
 import org.apache.celeborn.common.protocol.RpcNameConstants.WORKER_EP
@@ -170,6 +171,7 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
     lifecycleHost,
     conf.shuffleManagerPort,
     conf,
+    Role.CLIENT,
     None)
   rpcEnv.setupEndpoint(RpcNameConstants.LIFECYCLE_MANAGER_EP, this)
 
@@ -189,6 +191,7 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
         lifecycleHost,
         0,
         conf,
+        Role.CLIENT,
         createRpcSecurityContext(
           appSecret,
           addClientRegistrationBootstrap = true,
@@ -200,6 +203,7 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
         lifecycleHost,
         0,
         conf,
+        Role.CLIENT,
         createRpcSecurityContext(appSecret))
   }
 
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/Role.scala 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/Role.scala
index 50b509643..95b03f256 100644
--- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/Role.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/Role.scala
@@ -20,4 +20,5 @@ package org.apache.celeborn.common.metrics.source
 object Role {
   val MASTER = "master"
   val WORKER = "worker"
+  val CLIENT = "client"
 }
diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala 
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
index 7a44d8b63..89973a936 100644
--- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
@@ -39,8 +39,9 @@ object RpcEnv {
       host: String,
       port: Int,
       conf: CelebornConf,
+      role: String,
       securityContext: Option[RpcSecurityContext]): RpcEnv = {
-    create(name, transportModule, host, host, port, conf, 0, securityContext)
+    create(name, transportModule, host, host, port, conf, 0, role, 
securityContext)
   }
 
   def create(
@@ -50,6 +51,7 @@ object RpcEnv {
       port: Int,
       conf: CelebornConf,
       numUsableCores: Int,
+      role: String,
       securityContext: Option[RpcSecurityContext],
       source: Option[AbstractSource]): RpcEnv = {
     val bindAddress =
@@ -63,6 +65,7 @@ object RpcEnv {
       port,
       conf,
       numUsableCores,
+      role,
       securityContext,
       source)
   }
@@ -75,6 +78,7 @@ object RpcEnv {
       port: Int,
       conf: CelebornConf,
       numUsableCores: Int,
+      role: String,
       securityContext: Option[RpcSecurityContext] = None,
       source: Option[AbstractSource] = None): RpcEnv = {
     val config =
@@ -86,6 +90,7 @@ object RpcEnv {
         advertiseAddress,
         port,
         numUsableCores,
+        role,
         securityContext,
         source)
     new NettyRpcEnvFactory().create(config)
@@ -222,6 +227,7 @@ private[celeborn] case class RpcEnvConfig(
     advertiseAddress: String,
     port: Int,
     numUsableCores: Int,
+    role: String,
     securityContext: Option[RpcSecurityContext],
     source: Option[AbstractSource]) {
   assert(RpcEnvConfig.VALID_TRANSPORT_MODULES.contains(transportModule))
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcMetricsTracker.scala 
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcMetricsTracker.scala
index 22a715c34..5774b7a99 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcMetricsTracker.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcMetricsTracker.scala
@@ -53,19 +53,17 @@ private[celeborn] class RpcMetricsTracker(
     } else {
       false
     }
-  final private val QUEUE_LENGTH_METRIC = s"${name}_${RpcSource.QUEUE_LENGTH}"
-  final private val QUEUE_TIME_METRIC = s"${name}_${RpcSource.QUEUE_TIME}"
-  final private val PROCESS_TIME_METRIC = s"${name}_${RpcSource.PROCESS_TIME}"
+  final private val NAME_LABEL = Map("name" -> name)
 
   private var queueLengthFunc: () => Long = _
 
   def init(lengthFunc: () => Long): Unit = {
     queueLengthFunc = lengthFunc
     if (name != null) {
-      rpcSource.addGauge(QUEUE_LENGTH_METRIC)(queueLengthFunc)
+      rpcSource.addGauge(RpcSource.QUEUE_LENGTH, NAME_LABEL)(queueLengthFunc)
 
-      rpcSource.addTimer(QUEUE_TIME_METRIC)
-      rpcSource.addTimer(PROCESS_TIME_METRIC)
+      rpcSource.addTimer(RpcSource.QUEUE_TIME, NAME_LABEL)
+      rpcSource.addTimer(RpcSource.PROCESS_TIME, NAME_LABEL)
     }
   }
 
@@ -115,12 +113,12 @@ private[celeborn] class RpcMetricsTracker(
     val msgName = messageName(message)
 
     if (useHistogram) {
-      updateHistogram(QUEUE_TIME_METRIC, queueTime)
-      updateHistogram(PROCESS_TIME_METRIC, processTime)
+      updateHistogram(RpcSource.QUEUE_TIME, queueTime)
+      updateHistogram(RpcSource.PROCESS_TIME, processTime)
       updateHistogram(msgName, processTime)
     } else {
-      rpcSource.updateTimer(QUEUE_TIME_METRIC, queueTime)
-      rpcSource.updateTimer(PROCESS_TIME_METRIC, processTime)
+      rpcSource.updateTimer(RpcSource.QUEUE_TIME, queueTime, NAME_LABEL)
+      rpcSource.updateTimer(RpcSource.PROCESS_TIME, processTime, NAME_LABEL)
       rpcSource.updateTimer(msgName, processTime)
     }
   }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcSource.scala 
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcSource.scala
index 3b1da4385..67ddd7eaa 100644
--- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcSource.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcSource.scala
@@ -20,16 +20,14 @@ package org.apache.celeborn.common.rpc
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.metrics.source.AbstractSource
 
-class RpcSource(conf: CelebornConf) extends AbstractSource(conf, 
RpcSource.ROLE_RPC) {
-  override def sourceName: String = RpcSource.ROLE_RPC
+class RpcSource(conf: CelebornConf, role: String) extends AbstractSource(conf, 
role) {
+  override def sourceName: String = "RPC"
 
   startCleaner()
 }
 
 object RpcSource {
-  val ROLE_RPC = "RPC"
-
-  val QUEUE_LENGTH = "QueueLength"
-  val QUEUE_TIME = "QueueTime"
-  val PROCESS_TIME = "ProcessTime"
+  val QUEUE_LENGTH = "RpcQueueLength"
+  val QUEUE_TIME = "RpcQueueTime"
+  val PROCESS_TIME = "RpcProcessTime"
 }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala 
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
index 648393786..517b62fe3 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
@@ -55,9 +55,9 @@ class NettyRpcEnv(
     config.transportModule,
     celebornConf.rpcIoThreads.getOrElse(config.numUsableCores))
 
-  private val source: RpcSource = new RpcSource(celebornConf)
+  private val _rpcSource: RpcSource = new RpcSource(celebornConf, config.role)
 
-  private val dispatcher: Dispatcher = new Dispatcher(this, source)
+  private val dispatcher: Dispatcher = new Dispatcher(this, _rpcSource)
 
   private var worker: RpcEndpoint = null
 
@@ -364,7 +364,7 @@ class NettyRpcEnv(
     }
   }
 
-  override def rpcSource(): RpcSource = source
+  override def rpcSource(): RpcSource = _rpcSource
 }
 
 private[celeborn] object NettyRpcEnv extends Logging {
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala 
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
index aa72dce04..e7d211205 100644
--- 
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
+++ 
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
@@ -31,6 +31,7 @@ import org.junit.Assert.{assertEquals, assertNotEquals, 
assertNotNull}
 import org.apache.celeborn.CelebornFunSuite
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.identity.UserIdentifier
+import org.apache.celeborn.common.metrics.source.Role
 import org.apache.celeborn.common.protocol.TransportModuleConstants
 import org.apache.celeborn.common.quota.ResourceConsumption
 import org.apache.celeborn.common.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
@@ -283,6 +284,7 @@ class WorkerInfoSuite extends CelebornFunSuite {
         12345,
         conf,
         64,
+        Role.WORKER,
         None,
         None)
       val worker4 = new WorkerInfo(
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/rpc/netty/InboxSuite.scala 
b/common/src/test/scala/org/apache/celeborn/common/rpc/netty/InboxSuite.scala
index 83b5b0a1b..a0c23368b 100644
--- 
a/common/src/test/scala/org/apache/celeborn/common/rpc/netty/InboxSuite.scala
+++ 
b/common/src/test/scala/org/apache/celeborn/common/rpc/netty/InboxSuite.scala
@@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfter
 
 import org.apache.celeborn.CelebornFunSuite
 import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.metrics.source.Role
 import org.apache.celeborn.common.rpc.{RpcAddress, RpcMetricsTracker, 
RpcSource, TestRpcEndpoint}
 import org.apache.celeborn.common.util.ThreadUtils
 
@@ -38,7 +39,7 @@ class InboxSuite extends CelebornFunSuite with BeforeAndAfter 
{
       onDropOverride: Option[InboxMessage => T]): Inbox = {
     val rpcEnvRef = mock(classOf[NettyRpcEndpointRef])
     val conf = new CelebornConf()
-    val source: RpcSource = new RpcSource(conf)
+    val source: RpcSource = new RpcSource(conf, Role.CLIENT)
     if (onDropOverride.isEmpty) {
       new Inbox(
         rpcEnvRef,
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnvSuite.scala
 
b/common/src/test/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnvSuite.scala
index 8b0ba7bcd..bd469591c 100644
--- 
a/common/src/test/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnvSuite.scala
+++ 
b/common/src/test/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnvSuite.scala
@@ -26,6 +26,7 @@ import org.scalatest.concurrent.{Signaler, ThreadSignaler, 
TimeLimits}
 
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.exception.CelebornException
+import org.apache.celeborn.common.metrics.source.Role
 import org.apache.celeborn.common.network.client.TransportClient
 import org.apache.celeborn.common.protocol.TransportModuleConstants
 import org.apache.celeborn.common.rpc._
@@ -49,6 +50,7 @@ class NettyRpcEnvSuite extends RpcEnvSuite with TimeLimits {
       "localhost",
       port,
       0,
+      Role.CLIENT,
       None,
       None)
     new NettyRpcEnvFactory().create(config)
@@ -73,6 +75,7 @@ class NettyRpcEnvSuite extends RpcEnvSuite with TimeLimits {
       "example.com",
       0,
       0,
+      Role.CLIENT,
       None,
       None)
     val env = new NettyRpcEnvFactory().create(config)
@@ -123,6 +126,7 @@ class NettyRpcEnvSuite extends RpcEnvSuite with TimeLimits {
       "localhost",
       0,
       numUsableCores,
+      Role.CLIENT,
       None,
       None)
     val anotherEnv = new NettyRpcEnvFactory().create(config)
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 5eb674428..31f10f9d4 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -101,6 +101,7 @@ private[celeborn] class Master(
         masterArgs.port,
         conf,
         Math.max(64, Runtime.getRuntime.availableProcessors()),
+        Role.MASTER,
         None,
         None)
     } else {
@@ -118,6 +119,7 @@ private[celeborn] class Master(
         masterArgs.port,
         conf,
         Math.max(64, Runtime.getRuntime.availableProcessors()),
+        Role.MASTER,
         Some(externalSecurityContext),
         None)
     }
@@ -137,6 +139,7 @@ private[celeborn] class Master(
         masterArgs.internalPort,
         conf,
         Math.max(64, Runtime.getRuntime.availableProcessors()),
+        Role.MASTER,
         None,
         None)
     }
diff --git 
a/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceSuite.scala
 
b/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceSuite.scala
index 09945217b..49d513851 100644
--- 
a/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceSuite.scala
+++ 
b/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceSuite.scala
@@ -86,7 +86,15 @@ abstract class ApiBaseResourceSuite extends HttpTestHelper {
   test("metrics") {
     var response = 
webTarget.path("metrics/prometheus").request(MediaType.APPLICATION_JSON).get()
     assert(HttpServletResponse.SC_OK == response.getStatus)
-    
assert(response.readEntity(classOf[String]).contains("metrics_jvm_memory_heap_max_Value"))
+    val metricLines = response.readEntity(classOf[String]).split("\n")
+    Seq(
+      "metrics_jvm_memory_heap_max_Value",
+      "metrics_RpcQueueLength_Value",
+      "metrics_RpcQueueTime_Max",
+      "metrics_RpcProcessTime_Max").foreach { metric =>
+      assert(metricLines.exists(l =>
+        l.contains(metric) && 
l.contains(s"""instance="${httpService.connectionUrl}"""")))
+    }
 
     response = 
webTarget.path("metrics/json").request(MediaType.APPLICATION_JSON).get()
     assert(HttpServletResponse.SC_OK == response.getStatus)
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 274ad0b82..7c85712ec 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -104,6 +104,7 @@ private[celeborn] class Worker(
         workerArgs.port,
         conf,
         Math.min(64, Math.max(4, Runtime.getRuntime.availableProcessors())),
+        Role.WORKER,
         None,
         Some(workerSource))
     } else {
@@ -121,6 +122,7 @@ private[celeborn] class Worker(
         workerArgs.port,
         conf,
         Math.max(64, Runtime.getRuntime.availableProcessors()),
+        Role.WORKER,
         Some(externalSecurityContext),
         Some(workerSource))
     }
@@ -137,6 +139,7 @@ private[celeborn] class Worker(
         workerArgs.internalPort,
         conf,
         Math.min(64, Math.max(4, Runtime.getRuntime.availableProcessors())),
+        Role.WORKER,
         None,
         Some(workerSource))
     }


Reply via email to