This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 3e266c0cf [CELEBORN-852] Adding new metrics to record the number of
registered …
3e266c0cf is described below
commit 3e266c0cf6247245ab7bcd3e1abb49cf4ece0bca
Author: caojiaqing <[email protected]>
AuthorDate: Tue Aug 1 21:21:13 2023 +0800
[CELEBORN-852] Adding new metrics to record the number of registered …
### What changes were proposed in this pull request?
Adding new metrics to record the number of registered connections
### Why are the changes needed?
Monitor the number of active connections on worker nodes
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
no
Closes #1773 from JQ-Cao/852.
Authored-by: caojiaqing <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../celeborn/service/deploy/worker/FetchHandler.scala | 7 +++++++
.../service/deploy/worker/PushDataHandler.scala | 17 +++++++++++++++++
.../celeborn/service/deploy/worker/WorkerSource.scala | 3 +++
3 files changed, 27 insertions(+)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index 9b4d08f8c..66e0ecf11 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -265,7 +265,14 @@ class FetchHandler(val conf: CelebornConf, val
transportConf: TransportConf)
override def checkRegistered: Boolean = registered.get
+ /** Invoked when the channel associated with the given client is active. */
+ override def channelActive(client: TransportClient): Unit = {
+ workerSource.incCounter(WorkerSource.ACTIVE_CONNECTION_COUNT)
+ super.channelActive(client)
+ }
+
override def channelInactive(client: TransportClient): Unit = {
+ workerSource.incCounter(WorkerSource.ACTIVE_CONNECTION_COUNT, -1)
creditStreamManager.connectionTerminated(client.getChannel)
logDebug(s"channel inactive ${client.getSocketAddress}")
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index f93d46d60..97deb7f6b 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -1100,6 +1100,23 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
pushClientFactory.createClient(host, port, partitionId)
}
}
+
+ /**
+ * Invoked when the channel associated with the given client is active.
+ */
+ override def channelActive(client: TransportClient): Unit = {
+ workerSource.incCounter(WorkerSource.ACTIVE_CONNECTION_COUNT)
+ super.channelActive(client)
+ }
+
+ /**
+ * Invoked when the channel associated with the given client is inactive.
+ * No further requests will come from this client.
+ */
+ override def channelInactive(client: TransportClient): Unit = {
+ workerSource.incCounter(WorkerSource.ACTIVE_CONNECTION_COUNT, -1)
+ super.channelInactive(client)
+ }
}
object PushDataHandler {
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index 87c755517..edcebbb2e 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -36,6 +36,7 @@ class WorkerSource(conf: CelebornConf) extends
AbstractSource(conf, MetricsSyste
addCounter(PUSH_DATA_HANDSHAKE_FAIL_COUNT)
addCounter(REGION_START_FAIL_COUNT)
addCounter(REGION_FINISH_FAIL_COUNT)
+ addCounter(ACTIVE_CONNECTION_COUNT)
// add Timers
addTimer(COMMIT_FILES_TIME)
@@ -94,6 +95,8 @@ object WorkerSource {
// slots
val SLOTS_ALLOCATED = "SlotsAllocated"
+ val ACTIVE_CONNECTION_COUNT = "ActiveConnectionCount"
+
// memory
val NETTY_MEMORY = "NettyMemory"
val SORT_TIME = "SortTime"