This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 8fd44b42b [CELEBORN-1634] Support queue time/processing time metrics 
for rpc framework
8fd44b42b is described below

commit 8fd44b42ba1b3c02238874826e172fd8df70538a
Author: Erik.fang <[email protected]>
AuthorDate: Mon Nov 18 09:39:56 2024 +0800

    [CELEBORN-1634] Support queue time/processing time metrics for rpc framework
    
    ### What changes were proposed in this pull request?
    
    implement queue time/processing time metrics for rpc framework
    
    ### Why are the changes needed?
    
    to identify rpc processing bottelneck
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    local
    
    Closes #2784 from ErikFang/main-rpc-metrics.
    
    Lead-authored-by: Erik.fang <[email protected]>
    Co-authored-by: 仲甫 <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../org/apache/celeborn/common/CelebornConf.scala  |  28 ++++
 .../common/metrics/source/AbstractSource.scala     |  10 ++
 .../org/apache/celeborn/common/rpc/RpcEnv.scala    |   2 +
 .../celeborn/common/rpc/RpcMetricsTracker.scala    | 167 +++++++++++++++++++++
 .../org/apache/celeborn/common/rpc/RpcSource.scala |  54 +++++++
 .../celeborn/common/rpc/netty/Dispatcher.scala     |   5 +-
 .../apache/celeborn/common/rpc/netty/Inbox.scala   |  45 +++++-
 .../celeborn/common/rpc/netty/NettyRpcEnv.scala    |   6 +-
 .../celeborn/common/rpc/netty/InboxSuite.scala     |  16 +-
 docs/configuration/network.md                      |   3 +
 .../celeborn/service/deploy/master/Master.scala    |   1 +
 .../celeborn/service/deploy/worker/Worker.scala    |   1 +
 12 files changed, 327 insertions(+), 11 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 5790b90ce..32b718dc5 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -532,6 +532,10 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
     if (num != 0) num else availableCores
   }
 
+  def rpcSlowThresholdNs(): Long = get(RPC_SLOW_THRESHOLD)
+  def rpcSlowIntervalMs(): Long = get(RPC_SLOW_INTERVAL).getOrElse(-1)
+  def rpcDumpIntervalMs(): Long = get(RPC_SUMMARY_DUMP_INTERVAL)
+
   def networkIoMode(module: String): String = {
     getTransportConf(module, NETWORK_IO_MODE)
   }
@@ -1868,6 +1872,30 @@ object CelebornConf extends Logging {
       .doc("Threads number of message dispatcher event loop for roles")
       .fallbackConf(RPC_DISPATCHER_THREADS)
 
+  val RPC_SLOW_THRESHOLD: ConfigEntry[Long] =
+    buildConf("celeborn.rpc.slow.threshold")
+      .categories("network")
+      .doc("threshold for RPC framework to log slow RPC")
+      .version("0.6.0")
+      .timeConf(TimeUnit.NANOSECONDS)
+      .createWithDefaultString("1s")
+
+  val RPC_SLOW_INTERVAL: OptionalConfigEntry[Long] =
+    buildConf("celeborn.rpc.slow.interval")
+      .categories("network")
+      .doc("min interval (ms) for RPC framework to log slow RPC")
+      .version("0.6.0")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createOptional
+
+  val RPC_SUMMARY_DUMP_INTERVAL: ConfigEntry[Long] =
+    buildConf("celeborn.rpc.dump.interval")
+      .categories("network")
+      .doc("min interval (ms) for RPC framework to dump performance summary")
+      .version("0.6.0")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("60s")
+
   val NETWORK_IO_MODE: ConfigEntry[String] =
     buildConf("celeborn.<module>.io.mode")
       .categories("network")
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
index f000c9052..dc01c65be 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
@@ -292,6 +292,16 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
     }
   }
 
+  def updateTimer(name: String, value: Long): Unit = {
+    updateTimer(name, value, Map.empty[String, String])
+  }
+
+  def updateTimer(metricsName: String, value: Long, labels: Map[String, 
String]): Unit = {
+    val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName, 
labels)
+    val (namedTimer, _) = namedTimers.get(metricNameWithLabel)
+    namedTimer.timer.update(value, TimeUnit.NANOSECONDS)
+  }
+
   def incCounter(metricsName: String): Unit = {
     incCounter(metricsName, 1)
   }
diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala 
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
index 4e2bc35c9..31f69a377 100644
--- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
@@ -163,6 +163,8 @@ abstract class RpcEnv(config: RpcEnvConfig) {
    * that contains [[RpcEndpointRef]]s, the deserialization codes should be 
wrapped by this method.
    */
   def deserialize[T](deserializationAction: () => T): T
+
+  def rpcSource(): RpcSource
 }
 
 /**
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcMetricsTracker.scala 
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcMetricsTracker.scala
new file mode 100644
index 000000000..22a715c34
--- /dev/null
+++ 
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcMetricsTracker.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.rpc
+
+import java.util.concurrent.ConcurrentMap
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.JavaConverters._
+
+import com.codahale.metrics.{Histogram, UniformReservoir}
+import com.google.protobuf.GeneratedMessageV3
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.protocol.RpcNameConstants
+import org.apache.celeborn.common.protocol.message.Message
+import org.apache.celeborn.common.rpc.netty.{InboxMessage, OneWayMessage, 
RpcEndpointVerifier, RpcMessage}
+import org.apache.celeborn.common.util.JavaUtils
+
+private[celeborn] class RpcMetricsTracker(
+    val name: String,
+    rpcSource: RpcSource,
+    conf: CelebornConf) extends Logging {
+
+  // Histogram is used for Client, eg LifecycleManager
+  val histogramMap: ConcurrentMap[String, Histogram] =
+    JavaUtils.newConcurrentHashMap[String, Histogram]
+
+  private val maxQueueLength: AtomicLong = new AtomicLong(0)
+  private val slowRpcThreshold: Long = conf.rpcSlowThresholdNs()
+  private val slowRpcInterval: Long = conf.rpcSlowIntervalMs()
+  private val rpcDumpInterval: Long = conf.rpcDumpIntervalMs()
+  private val lastDumpTime: AtomicLong = new AtomicLong(0)
+  private val lastSlowLogTime: AtomicLong = new AtomicLong(0)
+  final private val useHistogram =
+    if (name == RpcNameConstants.LIFECYCLE_MANAGER_EP || name == 
RpcEndpointVerifier.NAME) {
+      true
+    } else {
+      false
+    }
+  final private val QUEUE_LENGTH_METRIC = s"${name}_${RpcSource.QUEUE_LENGTH}"
+  final private val QUEUE_TIME_METRIC = s"${name}_${RpcSource.QUEUE_TIME}"
+  final private val PROCESS_TIME_METRIC = s"${name}_${RpcSource.PROCESS_TIME}"
+
+  private var queueLengthFunc: () => Long = _
+
+  def init(lengthFunc: () => Long): Unit = {
+    queueLengthFunc = lengthFunc
+    if (name != null) {
+      rpcSource.addGauge(QUEUE_LENGTH_METRIC)(queueLengthFunc)
+
+      rpcSource.addTimer(QUEUE_TIME_METRIC)
+      rpcSource.addTimer(PROCESS_TIME_METRIC)
+    }
+  }
+
+  def updateHistogram(name: String, value: Long): Unit = {
+    histogramMap.putIfAbsent(name, new Histogram(new UniformReservoir()))
+    val histogram = histogramMap.get(name)
+    histogram.update(value)
+  }
+
+  def updateMaxLength(): Unit = {
+    val len = queueLengthFunc()
+    if (len > maxQueueLength.get()) {
+      maxQueueLength.set(len)
+    }
+  }
+
+  private def logSlowRpc(message: InboxMessage, queueTime: Long, processTime: 
Long): Unit = {
+    if (queueTime + processTime > slowRpcThreshold) {
+      val lastLogTime = lastSlowLogTime.get()
+      if (slowRpcInterval < 0 || System.currentTimeMillis() - lastLogTime > 
slowRpcInterval &&
+        lastSlowLogTime.compareAndSet(lastLogTime, 
System.currentTimeMillis())) {
+        logWarning(
+          s"slow rpc detected: currentQueueSize = ${queueLengthFunc()}, 
queueTime=$queueTime processTime=$processTime message=$message")
+      }
+
+      val lastTime = lastDumpTime.get
+      if (useHistogram && System.currentTimeMillis() - lastTime > 
rpcDumpInterval &&
+        lastDumpTime.compareAndSet(lastTime, System.currentTimeMillis())) {
+        dump()
+      }
+    }
+  }
+
+  def record(message: Any, queueTime: Long, processTime: Long): Unit = {
+    def messageName(message: Any): String = {
+      message match {
+        case legacy: Message =>
+          legacy.getClass.toString
+        case pb: GeneratedMessageV3 =>
+          pb.getDescriptorForType.getFullName
+        case _: RpcEndpointVerifier.CheckExistence =>
+          "CheckExistence"
+        case _ =>
+          "unknown"
+      }
+    }
+    val msgName = messageName(message)
+
+    if (useHistogram) {
+      updateHistogram(QUEUE_TIME_METRIC, queueTime)
+      updateHistogram(PROCESS_TIME_METRIC, processTime)
+      updateHistogram(msgName, processTime)
+    } else {
+      rpcSource.updateTimer(QUEUE_TIME_METRIC, queueTime)
+      rpcSource.updateTimer(PROCESS_TIME_METRIC, processTime)
+      rpcSource.updateTimer(msgName, processTime)
+    }
+  }
+
+  def update(message: InboxMessage): Unit = {
+    message match {
+      case rpc @ RpcMessage(_, content, _) =>
+        val queueTime = rpc.dequeueTime - rpc.enqueueTime
+        val processTime = rpc.endProcessTime - rpc.dequeueTime
+        record(content, queueTime, processTime)
+        logSlowRpc(message, queueTime, processTime)
+      case one @ OneWayMessage(_, content) =>
+        val queueTime = one.dequeueTime - one.enqueueTime
+        val processTime = one.endProcessTime - one.dequeueTime
+        record(content, queueTime, processTime)
+        logSlowRpc(message, queueTime, processTime)
+      case _ =>
+    }
+  }
+
+  def dump(): Unit = {
+    if (!useHistogram)
+      return
+
+    val builder = new StringBuilder();
+    builder.append(s"RPC statistics for $name").append("\n")
+    builder.append(s"current queue size = ${queueLengthFunc()}").append("\n")
+    builder.append(s"max queue length = ${maxQueueLength.get()}").append("\n")
+    histogramMap.entrySet.asScala.foreach(entry => {
+      val histogram = entry.getValue
+      val snapshot = histogram.getSnapshot;
+      builder.append(s"histogram for $name RPC metrics: 
").append(entry.getKey).append("\n")
+      builder.append("count: ").append(histogram.getCount).append("\n")
+        .append("min: ").append(snapshot.getMin).append("\n")
+        .append("mean: ").append(snapshot.getMean).append("\n")
+        .append("p50: ").append(snapshot.getMedian).append("\n")
+        .append("p75: ").append(snapshot.get75thPercentile).append("\n")
+        .append("p95: ").append(snapshot.get95thPercentile).append("\n")
+        .append("p99: ").append(snapshot.get99thPercentile()).append("\n")
+        .append("max: ").append(snapshot.getMax).append("\n")
+    })
+    logInfo(builder.toString())
+  }
+}
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcSource.scala 
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcSource.scala
new file mode 100644
index 000000000..1acccf062
--- /dev/null
+++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcSource.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.rpc
+
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.metrics.source.AbstractSource
+
+class RpcSource(conf: CelebornConf) extends AbstractSource(conf, 
RpcSource.ROLE_RPC) {
+  override def sourceName: String = RpcSource.ROLE_RPC
+
+  private val msgNameSet = ConcurrentHashMap.newKeySet[String]()
+
+  override def updateTimer(name: String, value: Long): Unit = {
+    if (!msgNameSet.contains(name)) {
+      super.addTimer(name)
+      msgNameSet.add(name)
+    }
+    super.updateTimer(name, value)
+  }
+
+  override def addTimer(name: String): Unit = {
+    if (!msgNameSet.contains(name)) {
+      super.addTimer(name)
+      msgNameSet.add(name)
+    }
+  }
+
+  startCleaner()
+}
+
+object RpcSource {
+  val ROLE_RPC = "RPC"
+
+  val QUEUE_LENGTH = "QueueLength"
+  val QUEUE_TIME = "QueueTime"
+  val PROCESS_TIME = "ProcessTime"
+}
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala 
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala
index b8a93bda0..5b7320726 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala
@@ -33,14 +33,15 @@ import org.apache.celeborn.common.util.{JavaUtils, 
ThreadUtils}
 /**
  * A message dispatcher, responsible for routing RPC messages to the 
appropriate endpoint(s).
  */
-private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
+private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv, rpcSource: 
RpcSource) extends Logging {
 
   private class EndpointData(
       val name: String,
       val endpoint: RpcEndpoint,
       val ref: NettyRpcEndpointRef) {
     val celebornConf = nettyEnv.celebornConf
-    val inbox = new Inbox(ref, endpoint, celebornConf)
+    val inbox =
+      new Inbox(ref, endpoint, celebornConf, new RpcMetricsTracker(name, 
rpcSource, celebornConf))
   }
 
   private val endpoints: ConcurrentMap[String, EndpointData] =
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Inbox.scala 
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Inbox.scala
index 750252456..b179128fb 100644
--- a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Inbox.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Inbox.scala
@@ -26,9 +26,15 @@ import scala.util.control.NonFatal
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.exception.CelebornException
 import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.common.rpc.{RpcAddress, RpcEndpoint, 
ThreadSafeRpcEndpoint}
+import org.apache.celeborn.common.rpc.{RpcAddress, RpcEndpoint, 
RpcMetricsTracker, ThreadSafeRpcEndpoint}
 
-sealed private[celeborn] trait InboxMessage
+sealed private[celeborn] trait InboxMessage extends RpcTimeMetrics
+
+private[celeborn] trait RpcTimeMetrics {
+  var enqueueTime: Long = 0
+  var dequeueTime: Long = 0
+  var endProcessTime: Long = 0
+}
 
 private[celeborn] case class OneWayMessage(
     senderAddress: RpcAddress,
@@ -68,7 +74,8 @@ private[celeborn] case class RemoteProcessConnectionError(
 private[celeborn] class Inbox(
     val endpointRef: NettyRpcEndpointRef,
     val endpoint: RpcEndpoint,
-    val conf: CelebornConf) extends Logging {
+    val conf: CelebornConf,
+    val metrics: RpcMetricsTracker) extends Logging {
 
   inbox => // Give this an alias so we can use it more clearly in closures.
 
@@ -94,10 +101,14 @@ private[celeborn] class Inbox(
   @GuardedBy("this")
   private var numActiveThreads = 0
 
+  metrics.init(() => messageCount.get())
+
   // OnStart should be the first message to process
   try {
     inboxLock.lockInterruptibly()
     messages.add(OnStart)
+    RpcTimeMetrics.updateTime(OnStart, RpcTimeMetrics.Enqueue)
+    metrics.updateMaxLength()
     messageCount.incrementAndGet()
   } finally {
     inboxLock.unlock()
@@ -204,6 +215,7 @@ private[celeborn] class Inbox(
       message = messages.poll()
       if (message != null) {
         numActiveThreads += 1
+        RpcTimeMetrics.updateTime(message, RpcTimeMetrics.Dequeue)
         messageCount.decrementAndGet()
         signalNotFull()
       } else {
@@ -214,7 +226,7 @@ private[celeborn] class Inbox(
     }
 
     while (true) {
-      safelyCall(endpoint, endpointRef.name) {
+      safelyCall(endpoint, endpointRef.name, message) {
         processInternal(dispatcher, message)
       }
       try {
@@ -231,6 +243,7 @@ private[celeborn] class Inbox(
           numActiveThreads -= 1
           return
         } else {
+          RpcTimeMetrics.updateTime(message, RpcTimeMetrics.Dequeue)
           messageCount.decrementAndGet()
           signalNotFull()
         }
@@ -248,6 +261,8 @@ private[celeborn] class Inbox(
         onDrop(message)
       } else {
         addMessage(message)
+        RpcTimeMetrics.updateTime(message, RpcTimeMetrics.Enqueue)
+        metrics.updateMaxLength()
       }
     } finally {
       inboxLock.unlock()
@@ -266,6 +281,7 @@ private[celeborn] class Inbox(
         enableConcurrent = false
         stopped = true
         addMessage(OnStop)
+        metrics.dump()
         // Note: The concurrent events in messages will be processed one by 
one.
       }
     } finally {
@@ -295,7 +311,8 @@ private[celeborn] class Inbox(
    */
   private def safelyCall(
       endpoint: RpcEndpoint,
-      endpointRefName: String)(action: => Unit): Unit = {
+      endpointRefName: String,
+      message: InboxMessage)(action: => Unit): Unit = {
     def dealWithFatalError(fatal: Throwable): Unit = {
       try {
         inboxLock.lockInterruptibly()
@@ -327,6 +344,9 @@ private[celeborn] class Inbox(
         }
       case fatal: Throwable =>
         dealWithFatalError(fatal)
+    } finally {
+      RpcTimeMetrics.updateTime(message, RpcTimeMetrics.Process)
+      metrics.update(message)
     }
   }
 
@@ -340,3 +360,18 @@ private[celeborn] class Inbox(
     }
   }
 }
+
+private[celeborn] object RpcTimeMetrics {
+  trait MetricsType
+  case object Enqueue extends MetricsType
+  case object Dequeue extends MetricsType
+  case object Process extends MetricsType
+  def updateTime(message: InboxMessage, op: MetricsType): Unit = {
+    (message, op) match {
+      case (msg: RpcTimeMetrics, Enqueue) => msg.enqueueTime = System.nanoTime
+      case (msg: RpcTimeMetrics, Dequeue) => msg.dequeueTime = System.nanoTime
+      case (msg: RpcTimeMetrics, Process) => msg.endProcessTime = 
System.nanoTime
+      case _ =>
+    }
+  }
+}
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala 
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
index 598a589fe..5d2d31f66 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
@@ -55,7 +55,9 @@ class NettyRpcEnv(
     config.transportModule,
     celebornConf.rpcIoThreads.getOrElse(config.numUsableCores))
 
-  private val dispatcher: Dispatcher = new Dispatcher(this)
+  private val source: RpcSource = new RpcSource(celebornConf)
+
+  private val dispatcher: Dispatcher = new Dispatcher(this, source)
 
   private var worker: RpcEndpoint = null
 
@@ -361,6 +363,8 @@ class NettyRpcEnv(
       deserializationAction()
     }
   }
+
+  override def rpcSource(): RpcSource = source
 }
 
 private[celeborn] object NettyRpcEnv extends Logging {
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/rpc/netty/InboxSuite.scala 
b/common/src/test/scala/org/apache/celeborn/common/rpc/netty/InboxSuite.scala
index 553b791d8..83b5b0a1b 100644
--- 
a/common/src/test/scala/org/apache/celeborn/common/rpc/netty/InboxSuite.scala
+++ 
b/common/src/test/scala/org/apache/celeborn/common/rpc/netty/InboxSuite.scala
@@ -25,7 +25,7 @@ import org.scalatest.BeforeAndAfter
 
 import org.apache.celeborn.CelebornFunSuite
 import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.rpc.{RpcAddress, TestRpcEndpoint}
+import org.apache.celeborn.common.rpc.{RpcAddress, RpcMetricsTracker, 
RpcSource, TestRpcEndpoint}
 import org.apache.celeborn.common.util.ThreadUtils
 
 class InboxSuite extends CelebornFunSuite with BeforeAndAfter {
@@ -37,10 +37,20 @@ class InboxSuite extends CelebornFunSuite with 
BeforeAndAfter {
       testRpcEndpoint: TestRpcEndpoint,
       onDropOverride: Option[InboxMessage => T]): Inbox = {
     val rpcEnvRef = mock(classOf[NettyRpcEndpointRef])
+    val conf = new CelebornConf()
+    val source: RpcSource = new RpcSource(conf)
     if (onDropOverride.isEmpty) {
-      new Inbox(rpcEnvRef, testRpcEndpoint, new CelebornConf())
+      new Inbox(
+        rpcEnvRef,
+        testRpcEndpoint,
+        new CelebornConf(),
+        new RpcMetricsTracker("testRpc", source, conf))
     } else {
-      new Inbox(rpcEnvRef, testRpcEndpoint, new CelebornConf()) {
+      new Inbox(
+        rpcEnvRef,
+        testRpcEndpoint,
+        new CelebornConf(),
+        new RpcMetricsTracker("testRpc", source, conf)) {
         override protected def onDrop(message: InboxMessage): Unit = {
           onDropOverride.get(message)
         }
diff --git a/docs/configuration/network.md b/docs/configuration/network.md
index a9e0deaf4..e4cd396bc 100644
--- a/docs/configuration/network.md
+++ b/docs/configuration/network.md
@@ -52,9 +52,12 @@ license: |
 | celeborn.rpc.askTimeout | 60s | false | Timeout for RPC ask operations. It's 
recommended to set at least `240s` when `HDFS` is enabled in 
`celeborn.storage.availableTypes` | 0.2.0 |  | 
 | celeborn.rpc.connect.threads | 64 | false |  | 0.2.0 |  | 
 | celeborn.rpc.dispatcher.threads | 0 | false | Threads number of message 
dispatcher event loop. Default to 0, which is availableCore. | 0.3.0 | 
celeborn.rpc.dispatcher.numThreads | 
+| celeborn.rpc.dump.interval | 60s | false | min interval (ms) for RPC 
framework to dump performance summary | 0.6.0 |  | 
 | celeborn.rpc.inbox.capacity | 0 | false | Specifies size of the in memory 
bounded capacity. | 0.5.0 |  | 
 | celeborn.rpc.io.threads | &lt;undefined&gt; | false | Netty IO thread number 
of NettyRpcEnv to handle RPC request. The default threads number is the number 
of runtime available processors. | 0.2.0 |  | 
 | celeborn.rpc.lookupTimeout | 30s | false | Timeout for RPC lookup 
operations. | 0.2.0 |  | 
+| celeborn.rpc.slow.interval | &lt;undefined&gt; | false | min interval (ms) 
for RPC framework to log slow RPC | 0.6.0 |  | 
+| celeborn.rpc.slow.threshold | 1s | false | threshold for RPC framework to 
log slow RPC | 0.6.0 |  | 
 | celeborn.shuffle.io.maxChunksBeingTransferred | &lt;undefined&gt; | false | 
The max number of chunks allowed to be transferred at the same time on shuffle 
service. Note that new incoming connections will be closed when the max number 
is hit. The client will retry according to the shuffle retry configs (see 
`celeborn.<module>.io.maxRetries` and `celeborn.<module>.io.retryWait`), if 
those limits are reached the task will fail with fetch failure. | 0.2.0 |  | 
 | celeborn.ssl.&lt;module&gt;.enabled | false | false | Enables SSL for 
securing wire traffic. | 0.5.0 |  | 
 | celeborn.ssl.&lt;module&gt;.enabledAlgorithms | &lt;undefined&gt; | false | 
A comma-separated list of ciphers. The specified ciphers must be supported by 
JVM.<br/>The reference list of protocols can be found in the "JSSE Cipher Suite 
Names" section of the Java security guide. The list for Java 11, for example, 
can be found at [this 
page](https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#jsse-cipher-suite-names)<br/>Note:
 If not set, the default cipher su [...]
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 310eaf6e1..de749a7b5 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -121,6 +121,7 @@ private[celeborn] class Master(
         Some(externalSecurityContext),
         None)
     }
+  metricsSystem.registerSource(rpcEnv.rpcSource())
 
   // Visible for testing
   private[master] var internalRpcEnvInUse: RpcEnv =
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index b9bfff05b..0be341912 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -123,6 +123,7 @@ private[celeborn] class Worker(
         Some(externalSecurityContext),
         Some(workerSource))
     }
+  metricsSystem.registerSource(rpcEnv.rpcSource())
 
   private[worker] var internalRpcEnvInUse =
     if (!conf.internalPortEnabled) {

Reply via email to