This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 8fd44b42b [CELEBORN-1634] Support queue time/processing time metrics
for rpc framework
8fd44b42b is described below
commit 8fd44b42ba1b3c02238874826e172fd8df70538a
Author: Erik.fang <[email protected]>
AuthorDate: Mon Nov 18 09:39:56 2024 +0800
[CELEBORN-1634] Support queue time/processing time metrics for rpc framework
### What changes were proposed in this pull request?
implement queue time/processing time metrics for rpc framework
### Why are the changes needed?
to identify rpc processing bottelneck
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
local
Closes #2784 from ErikFang/main-rpc-metrics.
Lead-authored-by: Erik.fang <[email protected]>
Co-authored-by: 仲甫 <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 28 ++++
.../common/metrics/source/AbstractSource.scala | 10 ++
.../org/apache/celeborn/common/rpc/RpcEnv.scala | 2 +
.../celeborn/common/rpc/RpcMetricsTracker.scala | 167 +++++++++++++++++++++
.../org/apache/celeborn/common/rpc/RpcSource.scala | 54 +++++++
.../celeborn/common/rpc/netty/Dispatcher.scala | 5 +-
.../apache/celeborn/common/rpc/netty/Inbox.scala | 45 +++++-
.../celeborn/common/rpc/netty/NettyRpcEnv.scala | 6 +-
.../celeborn/common/rpc/netty/InboxSuite.scala | 16 +-
docs/configuration/network.md | 3 +
.../celeborn/service/deploy/master/Master.scala | 1 +
.../celeborn/service/deploy/worker/Worker.scala | 1 +
12 files changed, 327 insertions(+), 11 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 5790b90ce..32b718dc5 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -532,6 +532,10 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
if (num != 0) num else availableCores
}
+ def rpcSlowThresholdNs(): Long = get(RPC_SLOW_THRESHOLD)
+ def rpcSlowIntervalMs(): Long = get(RPC_SLOW_INTERVAL).getOrElse(-1)
+ def rpcDumpIntervalMs(): Long = get(RPC_SUMMARY_DUMP_INTERVAL)
+
def networkIoMode(module: String): String = {
getTransportConf(module, NETWORK_IO_MODE)
}
@@ -1868,6 +1872,30 @@ object CelebornConf extends Logging {
.doc("Threads number of message dispatcher event loop for roles")
.fallbackConf(RPC_DISPATCHER_THREADS)
+ val RPC_SLOW_THRESHOLD: ConfigEntry[Long] =
+ buildConf("celeborn.rpc.slow.threshold")
+ .categories("network")
+ .doc("threshold for RPC framework to log slow RPC")
+ .version("0.6.0")
+ .timeConf(TimeUnit.NANOSECONDS)
+ .createWithDefaultString("1s")
+
+ val RPC_SLOW_INTERVAL: OptionalConfigEntry[Long] =
+ buildConf("celeborn.rpc.slow.interval")
+ .categories("network")
+ .doc("min interval (ms) for RPC framework to log slow RPC")
+ .version("0.6.0")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createOptional
+
+ val RPC_SUMMARY_DUMP_INTERVAL: ConfigEntry[Long] =
+ buildConf("celeborn.rpc.dump.interval")
+ .categories("network")
+ .doc("min interval (ms) for RPC framework to dump performance summary")
+ .version("0.6.0")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("60s")
+
val NETWORK_IO_MODE: ConfigEntry[String] =
buildConf("celeborn.<module>.io.mode")
.categories("network")
diff --git
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
index f000c9052..dc01c65be 100644
---
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
@@ -292,6 +292,16 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
}
}
+ def updateTimer(name: String, value: Long): Unit = {
+ updateTimer(name, value, Map.empty[String, String])
+ }
+
+ def updateTimer(metricsName: String, value: Long, labels: Map[String,
String]): Unit = {
+ val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName,
labels)
+ val (namedTimer, _) = namedTimers.get(metricNameWithLabel)
+ namedTimer.timer.update(value, TimeUnit.NANOSECONDS)
+ }
+
def incCounter(metricsName: String): Unit = {
incCounter(metricsName, 1)
}
diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
index 4e2bc35c9..31f69a377 100644
--- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
@@ -163,6 +163,8 @@ abstract class RpcEnv(config: RpcEnvConfig) {
* that contains [[RpcEndpointRef]]s, the deserialization codes should be
wrapped by this method.
*/
def deserialize[T](deserializationAction: () => T): T
+
+ def rpcSource(): RpcSource
}
/**
diff --git
a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcMetricsTracker.scala
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcMetricsTracker.scala
new file mode 100644
index 000000000..22a715c34
--- /dev/null
+++
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcMetricsTracker.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.rpc
+
+import java.util.concurrent.ConcurrentMap
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.JavaConverters._
+
+import com.codahale.metrics.{Histogram, UniformReservoir}
+import com.google.protobuf.GeneratedMessageV3
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.protocol.RpcNameConstants
+import org.apache.celeborn.common.protocol.message.Message
+import org.apache.celeborn.common.rpc.netty.{InboxMessage, OneWayMessage,
RpcEndpointVerifier, RpcMessage}
+import org.apache.celeborn.common.util.JavaUtils
+
+private[celeborn] class RpcMetricsTracker(
+ val name: String,
+ rpcSource: RpcSource,
+ conf: CelebornConf) extends Logging {
+
+ // Histogram is used for Client, eg LifecycleManager
+ val histogramMap: ConcurrentMap[String, Histogram] =
+ JavaUtils.newConcurrentHashMap[String, Histogram]
+
+ private val maxQueueLength: AtomicLong = new AtomicLong(0)
+ private val slowRpcThreshold: Long = conf.rpcSlowThresholdNs()
+ private val slowRpcInterval: Long = conf.rpcSlowIntervalMs()
+ private val rpcDumpInterval: Long = conf.rpcDumpIntervalMs()
+ private val lastDumpTime: AtomicLong = new AtomicLong(0)
+ private val lastSlowLogTime: AtomicLong = new AtomicLong(0)
+ final private val useHistogram =
+ if (name == RpcNameConstants.LIFECYCLE_MANAGER_EP || name ==
RpcEndpointVerifier.NAME) {
+ true
+ } else {
+ false
+ }
+ final private val QUEUE_LENGTH_METRIC = s"${name}_${RpcSource.QUEUE_LENGTH}"
+ final private val QUEUE_TIME_METRIC = s"${name}_${RpcSource.QUEUE_TIME}"
+ final private val PROCESS_TIME_METRIC = s"${name}_${RpcSource.PROCESS_TIME}"
+
+ private var queueLengthFunc: () => Long = _
+
+ def init(lengthFunc: () => Long): Unit = {
+ queueLengthFunc = lengthFunc
+ if (name != null) {
+ rpcSource.addGauge(QUEUE_LENGTH_METRIC)(queueLengthFunc)
+
+ rpcSource.addTimer(QUEUE_TIME_METRIC)
+ rpcSource.addTimer(PROCESS_TIME_METRIC)
+ }
+ }
+
+ def updateHistogram(name: String, value: Long): Unit = {
+ histogramMap.putIfAbsent(name, new Histogram(new UniformReservoir()))
+ val histogram = histogramMap.get(name)
+ histogram.update(value)
+ }
+
+ def updateMaxLength(): Unit = {
+ val len = queueLengthFunc()
+ if (len > maxQueueLength.get()) {
+ maxQueueLength.set(len)
+ }
+ }
+
+ private def logSlowRpc(message: InboxMessage, queueTime: Long, processTime:
Long): Unit = {
+ if (queueTime + processTime > slowRpcThreshold) {
+ val lastLogTime = lastSlowLogTime.get()
+ if (slowRpcInterval < 0 || System.currentTimeMillis() - lastLogTime >
slowRpcInterval &&
+ lastSlowLogTime.compareAndSet(lastLogTime,
System.currentTimeMillis())) {
+ logWarning(
+ s"slow rpc detected: currentQueueSize = ${queueLengthFunc()},
queueTime=$queueTime processTime=$processTime message=$message")
+ }
+
+ val lastTime = lastDumpTime.get
+ if (useHistogram && System.currentTimeMillis() - lastTime >
rpcDumpInterval &&
+ lastDumpTime.compareAndSet(lastTime, System.currentTimeMillis())) {
+ dump()
+ }
+ }
+ }
+
+ def record(message: Any, queueTime: Long, processTime: Long): Unit = {
+ def messageName(message: Any): String = {
+ message match {
+ case legacy: Message =>
+ legacy.getClass.toString
+ case pb: GeneratedMessageV3 =>
+ pb.getDescriptorForType.getFullName
+ case _: RpcEndpointVerifier.CheckExistence =>
+ "CheckExistence"
+ case _ =>
+ "unknown"
+ }
+ }
+ val msgName = messageName(message)
+
+ if (useHistogram) {
+ updateHistogram(QUEUE_TIME_METRIC, queueTime)
+ updateHistogram(PROCESS_TIME_METRIC, processTime)
+ updateHistogram(msgName, processTime)
+ } else {
+ rpcSource.updateTimer(QUEUE_TIME_METRIC, queueTime)
+ rpcSource.updateTimer(PROCESS_TIME_METRIC, processTime)
+ rpcSource.updateTimer(msgName, processTime)
+ }
+ }
+
+ def update(message: InboxMessage): Unit = {
+ message match {
+ case rpc @ RpcMessage(_, content, _) =>
+ val queueTime = rpc.dequeueTime - rpc.enqueueTime
+ val processTime = rpc.endProcessTime - rpc.dequeueTime
+ record(content, queueTime, processTime)
+ logSlowRpc(message, queueTime, processTime)
+ case one @ OneWayMessage(_, content) =>
+ val queueTime = one.dequeueTime - one.enqueueTime
+ val processTime = one.endProcessTime - one.dequeueTime
+ record(content, queueTime, processTime)
+ logSlowRpc(message, queueTime, processTime)
+ case _ =>
+ }
+ }
+
+ def dump(): Unit = {
+ if (!useHistogram)
+ return
+
+ val builder = new StringBuilder();
+ builder.append(s"RPC statistics for $name").append("\n")
+ builder.append(s"current queue size = ${queueLengthFunc()}").append("\n")
+ builder.append(s"max queue length = ${maxQueueLength.get()}").append("\n")
+ histogramMap.entrySet.asScala.foreach(entry => {
+ val histogram = entry.getValue
+ val snapshot = histogram.getSnapshot;
+ builder.append(s"histogram for $name RPC metrics:
").append(entry.getKey).append("\n")
+ builder.append("count: ").append(histogram.getCount).append("\n")
+ .append("min: ").append(snapshot.getMin).append("\n")
+ .append("mean: ").append(snapshot.getMean).append("\n")
+ .append("p50: ").append(snapshot.getMedian).append("\n")
+ .append("p75: ").append(snapshot.get75thPercentile).append("\n")
+ .append("p95: ").append(snapshot.get95thPercentile).append("\n")
+ .append("p99: ").append(snapshot.get99thPercentile()).append("\n")
+ .append("max: ").append(snapshot.getMax).append("\n")
+ })
+ logInfo(builder.toString())
+ }
+}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcSource.scala
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcSource.scala
new file mode 100644
index 000000000..1acccf062
--- /dev/null
+++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcSource.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.rpc
+
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.metrics.source.AbstractSource
+
+class RpcSource(conf: CelebornConf) extends AbstractSource(conf,
RpcSource.ROLE_RPC) {
+ override def sourceName: String = RpcSource.ROLE_RPC
+
+ private val msgNameSet = ConcurrentHashMap.newKeySet[String]()
+
+ override def updateTimer(name: String, value: Long): Unit = {
+ if (!msgNameSet.contains(name)) {
+ super.addTimer(name)
+ msgNameSet.add(name)
+ }
+ super.updateTimer(name, value)
+ }
+
+ override def addTimer(name: String): Unit = {
+ if (!msgNameSet.contains(name)) {
+ super.addTimer(name)
+ msgNameSet.add(name)
+ }
+ }
+
+ startCleaner()
+}
+
+object RpcSource {
+ val ROLE_RPC = "RPC"
+
+ val QUEUE_LENGTH = "QueueLength"
+ val QUEUE_TIME = "QueueTime"
+ val PROCESS_TIME = "ProcessTime"
+}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala
index b8a93bda0..5b7320726 100644
---
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala
@@ -33,14 +33,15 @@ import org.apache.celeborn.common.util.{JavaUtils,
ThreadUtils}
/**
* A message dispatcher, responsible for routing RPC messages to the
appropriate endpoint(s).
*/
-private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
+private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv, rpcSource:
RpcSource) extends Logging {
private class EndpointData(
val name: String,
val endpoint: RpcEndpoint,
val ref: NettyRpcEndpointRef) {
val celebornConf = nettyEnv.celebornConf
- val inbox = new Inbox(ref, endpoint, celebornConf)
+ val inbox =
+ new Inbox(ref, endpoint, celebornConf, new RpcMetricsTracker(name,
rpcSource, celebornConf))
}
private val endpoints: ConcurrentMap[String, EndpointData] =
diff --git
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Inbox.scala
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Inbox.scala
index 750252456..b179128fb 100644
--- a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Inbox.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Inbox.scala
@@ -26,9 +26,15 @@ import scala.util.control.NonFatal
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.exception.CelebornException
import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.common.rpc.{RpcAddress, RpcEndpoint,
ThreadSafeRpcEndpoint}
+import org.apache.celeborn.common.rpc.{RpcAddress, RpcEndpoint,
RpcMetricsTracker, ThreadSafeRpcEndpoint}
-sealed private[celeborn] trait InboxMessage
+sealed private[celeborn] trait InboxMessage extends RpcTimeMetrics
+
+private[celeborn] trait RpcTimeMetrics {
+ var enqueueTime: Long = 0
+ var dequeueTime: Long = 0
+ var endProcessTime: Long = 0
+}
private[celeborn] case class OneWayMessage(
senderAddress: RpcAddress,
@@ -68,7 +74,8 @@ private[celeborn] case class RemoteProcessConnectionError(
private[celeborn] class Inbox(
val endpointRef: NettyRpcEndpointRef,
val endpoint: RpcEndpoint,
- val conf: CelebornConf) extends Logging {
+ val conf: CelebornConf,
+ val metrics: RpcMetricsTracker) extends Logging {
inbox => // Give this an alias so we can use it more clearly in closures.
@@ -94,10 +101,14 @@ private[celeborn] class Inbox(
@GuardedBy("this")
private var numActiveThreads = 0
+ metrics.init(() => messageCount.get())
+
// OnStart should be the first message to process
try {
inboxLock.lockInterruptibly()
messages.add(OnStart)
+ RpcTimeMetrics.updateTime(OnStart, RpcTimeMetrics.Enqueue)
+ metrics.updateMaxLength()
messageCount.incrementAndGet()
} finally {
inboxLock.unlock()
@@ -204,6 +215,7 @@ private[celeborn] class Inbox(
message = messages.poll()
if (message != null) {
numActiveThreads += 1
+ RpcTimeMetrics.updateTime(message, RpcTimeMetrics.Dequeue)
messageCount.decrementAndGet()
signalNotFull()
} else {
@@ -214,7 +226,7 @@ private[celeborn] class Inbox(
}
while (true) {
- safelyCall(endpoint, endpointRef.name) {
+ safelyCall(endpoint, endpointRef.name, message) {
processInternal(dispatcher, message)
}
try {
@@ -231,6 +243,7 @@ private[celeborn] class Inbox(
numActiveThreads -= 1
return
} else {
+ RpcTimeMetrics.updateTime(message, RpcTimeMetrics.Dequeue)
messageCount.decrementAndGet()
signalNotFull()
}
@@ -248,6 +261,8 @@ private[celeborn] class Inbox(
onDrop(message)
} else {
addMessage(message)
+ RpcTimeMetrics.updateTime(message, RpcTimeMetrics.Enqueue)
+ metrics.updateMaxLength()
}
} finally {
inboxLock.unlock()
@@ -266,6 +281,7 @@ private[celeborn] class Inbox(
enableConcurrent = false
stopped = true
addMessage(OnStop)
+ metrics.dump()
// Note: The concurrent events in messages will be processed one by
one.
}
} finally {
@@ -295,7 +311,8 @@ private[celeborn] class Inbox(
*/
private def safelyCall(
endpoint: RpcEndpoint,
- endpointRefName: String)(action: => Unit): Unit = {
+ endpointRefName: String,
+ message: InboxMessage)(action: => Unit): Unit = {
def dealWithFatalError(fatal: Throwable): Unit = {
try {
inboxLock.lockInterruptibly()
@@ -327,6 +344,9 @@ private[celeborn] class Inbox(
}
case fatal: Throwable =>
dealWithFatalError(fatal)
+ } finally {
+ RpcTimeMetrics.updateTime(message, RpcTimeMetrics.Process)
+ metrics.update(message)
}
}
@@ -340,3 +360,18 @@ private[celeborn] class Inbox(
}
}
}
+
+private[celeborn] object RpcTimeMetrics {
+ trait MetricsType
+ case object Enqueue extends MetricsType
+ case object Dequeue extends MetricsType
+ case object Process extends MetricsType
+ def updateTime(message: InboxMessage, op: MetricsType): Unit = {
+ (message, op) match {
+ case (msg: RpcTimeMetrics, Enqueue) => msg.enqueueTime = System.nanoTime
+ case (msg: RpcTimeMetrics, Dequeue) => msg.dequeueTime = System.nanoTime
+ case (msg: RpcTimeMetrics, Process) => msg.endProcessTime =
System.nanoTime
+ case _ =>
+ }
+ }
+}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
index 598a589fe..5d2d31f66 100644
---
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
@@ -55,7 +55,9 @@ class NettyRpcEnv(
config.transportModule,
celebornConf.rpcIoThreads.getOrElse(config.numUsableCores))
- private val dispatcher: Dispatcher = new Dispatcher(this)
+ private val source: RpcSource = new RpcSource(celebornConf)
+
+ private val dispatcher: Dispatcher = new Dispatcher(this, source)
private var worker: RpcEndpoint = null
@@ -361,6 +363,8 @@ class NettyRpcEnv(
deserializationAction()
}
}
+
+ override def rpcSource(): RpcSource = source
}
private[celeborn] object NettyRpcEnv extends Logging {
diff --git
a/common/src/test/scala/org/apache/celeborn/common/rpc/netty/InboxSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/rpc/netty/InboxSuite.scala
index 553b791d8..83b5b0a1b 100644
---
a/common/src/test/scala/org/apache/celeborn/common/rpc/netty/InboxSuite.scala
+++
b/common/src/test/scala/org/apache/celeborn/common/rpc/netty/InboxSuite.scala
@@ -25,7 +25,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.rpc.{RpcAddress, TestRpcEndpoint}
+import org.apache.celeborn.common.rpc.{RpcAddress, RpcMetricsTracker,
RpcSource, TestRpcEndpoint}
import org.apache.celeborn.common.util.ThreadUtils
class InboxSuite extends CelebornFunSuite with BeforeAndAfter {
@@ -37,10 +37,20 @@ class InboxSuite extends CelebornFunSuite with
BeforeAndAfter {
testRpcEndpoint: TestRpcEndpoint,
onDropOverride: Option[InboxMessage => T]): Inbox = {
val rpcEnvRef = mock(classOf[NettyRpcEndpointRef])
+ val conf = new CelebornConf()
+ val source: RpcSource = new RpcSource(conf)
if (onDropOverride.isEmpty) {
- new Inbox(rpcEnvRef, testRpcEndpoint, new CelebornConf())
+ new Inbox(
+ rpcEnvRef,
+ testRpcEndpoint,
+ new CelebornConf(),
+ new RpcMetricsTracker("testRpc", source, conf))
} else {
- new Inbox(rpcEnvRef, testRpcEndpoint, new CelebornConf()) {
+ new Inbox(
+ rpcEnvRef,
+ testRpcEndpoint,
+ new CelebornConf(),
+ new RpcMetricsTracker("testRpc", source, conf)) {
override protected def onDrop(message: InboxMessage): Unit = {
onDropOverride.get(message)
}
diff --git a/docs/configuration/network.md b/docs/configuration/network.md
index a9e0deaf4..e4cd396bc 100644
--- a/docs/configuration/network.md
+++ b/docs/configuration/network.md
@@ -52,9 +52,12 @@ license: |
| celeborn.rpc.askTimeout | 60s | false | Timeout for RPC ask operations. It's
recommended to set at least `240s` when `HDFS` is enabled in
`celeborn.storage.availableTypes` | 0.2.0 | |
| celeborn.rpc.connect.threads | 64 | false | | 0.2.0 | |
| celeborn.rpc.dispatcher.threads | 0 | false | Threads number of message
dispatcher event loop. Default to 0, which is availableCore. | 0.3.0 |
celeborn.rpc.dispatcher.numThreads |
+| celeborn.rpc.dump.interval | 60s | false | min interval (ms) for RPC
framework to dump performance summary | 0.6.0 | |
| celeborn.rpc.inbox.capacity | 0 | false | Specifies size of the in memory
bounded capacity. | 0.5.0 | |
| celeborn.rpc.io.threads | <undefined> | false | Netty IO thread number
of NettyRpcEnv to handle RPC request. The default threads number is the number
of runtime available processors. | 0.2.0 | |
| celeborn.rpc.lookupTimeout | 30s | false | Timeout for RPC lookup
operations. | 0.2.0 | |
+| celeborn.rpc.slow.interval | <undefined> | false | min interval (ms)
for RPC framework to log slow RPC | 0.6.0 | |
+| celeborn.rpc.slow.threshold | 1s | false | threshold for RPC framework to
log slow RPC | 0.6.0 | |
| celeborn.shuffle.io.maxChunksBeingTransferred | <undefined> | false |
The max number of chunks allowed to be transferred at the same time on shuffle
service. Note that new incoming connections will be closed when the max number
is hit. The client will retry according to the shuffle retry configs (see
`celeborn.<module>.io.maxRetries` and `celeborn.<module>.io.retryWait`), if
those limits are reached the task will fail with fetch failure. | 0.2.0 | |
| celeborn.ssl.<module>.enabled | false | false | Enables SSL for
securing wire traffic. | 0.5.0 | |
| celeborn.ssl.<module>.enabledAlgorithms | <undefined> | false |
A comma-separated list of ciphers. The specified ciphers must be supported by
JVM.<br/>The reference list of protocols can be found in the "JSSE Cipher Suite
Names" section of the Java security guide. The list for Java 11, for example,
can be found at [this
page](https://docs.oracle.com/en/java/javase/11/docs/specs/security/standard-names.html#jsse-cipher-suite-names)<br/>Note:
If not set, the default cipher su [...]
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 310eaf6e1..de749a7b5 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -121,6 +121,7 @@ private[celeborn] class Master(
Some(externalSecurityContext),
None)
}
+ metricsSystem.registerSource(rpcEnv.rpcSource())
// Visible for testing
private[master] var internalRpcEnvInUse: RpcEnv =
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index b9bfff05b..0be341912 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -123,6 +123,7 @@ private[celeborn] class Worker(
Some(externalSecurityContext),
Some(workerSource))
}
+ metricsSystem.registerSource(rpcEnv.rpcSource())
private[worker] var internalRpcEnvInUse =
if (!conf.internalPortEnabled) {