This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 3e266c0cf [CELEBORN-852] Adding new metrics to record the number of 
registered …
3e266c0cf is described below

commit 3e266c0cf6247245ab7bcd3e1abb49cf4ece0bca
Author: caojiaqing <[email protected]>
AuthorDate: Tue Aug 1 21:21:13 2023 +0800

    [CELEBORN-852] Adding new metrics to record the number of registered …
    
    ### What changes were proposed in this pull request?
    Adding new metrics to record the number of registered connections
    
    ### Why are the changes needed?
    Monitor the number of active connections on worker nodes
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    no
    
    Closes #1773 from JQ-Cao/852.
    
    Authored-by: caojiaqing <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../celeborn/service/deploy/worker/FetchHandler.scala   |  7 +++++++
 .../service/deploy/worker/PushDataHandler.scala         | 17 +++++++++++++++++
 .../celeborn/service/deploy/worker/WorkerSource.scala   |  3 +++
 3 files changed, 27 insertions(+)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index 9b4d08f8c..66e0ecf11 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -265,7 +265,14 @@ class FetchHandler(val conf: CelebornConf, val 
transportConf: TransportConf)
 
   override def checkRegistered: Boolean = registered.get
 
+  /** Invoked when the channel associated with the given client is active. */
+  override def channelActive(client: TransportClient): Unit = {
+    workerSource.incCounter(WorkerSource.ACTIVE_CONNECTION_COUNT)
+    super.channelActive(client)
+  }
+
   override def channelInactive(client: TransportClient): Unit = {
+    workerSource.incCounter(WorkerSource.ACTIVE_CONNECTION_COUNT, -1)
     creditStreamManager.connectionTerminated(client.getChannel)
     logDebug(s"channel inactive ${client.getSocketAddress}")
   }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index f93d46d60..97deb7f6b 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -1100,6 +1100,23 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
       pushClientFactory.createClient(host, port, partitionId)
     }
   }
+
+  /**
+   * Invoked when the channel associated with the given client is active.
+   */
+  override def channelActive(client: TransportClient): Unit = {
+    workerSource.incCounter(WorkerSource.ACTIVE_CONNECTION_COUNT)
+    super.channelActive(client)
+  }
+
+  /**
+   * Invoked when the channel associated with the given client is inactive.
+   * No further requests will come from this client.
+   */
+  override def channelInactive(client: TransportClient): Unit = {
+    workerSource.incCounter(WorkerSource.ACTIVE_CONNECTION_COUNT, -1)
+    super.channelInactive(client)
+  }
 }
 
 object PushDataHandler {
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index 87c755517..edcebbb2e 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -36,6 +36,7 @@ class WorkerSource(conf: CelebornConf) extends 
AbstractSource(conf, MetricsSyste
   addCounter(PUSH_DATA_HANDSHAKE_FAIL_COUNT)
   addCounter(REGION_START_FAIL_COUNT)
   addCounter(REGION_FINISH_FAIL_COUNT)
+  addCounter(ACTIVE_CONNECTION_COUNT)
 
   // add Timers
   addTimer(COMMIT_FILES_TIME)
@@ -94,6 +95,8 @@ object WorkerSource {
   // slots
   val SLOTS_ALLOCATED = "SlotsAllocated"
 
+  val ACTIVE_CONNECTION_COUNT = "ActiveConnectionCount"
+
   // memory
   val NETTY_MEMORY = "NettyMemory"
   val SORT_TIME = "SortTime"

Reply via email to