This is an automated email from the ASF dual-hosted git repository.

fchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new be4f1ac30 [CELEBORN-1634][FOLLOWUP] Simplify the logic of the 
`RpcSource.addTimer` and `RpcSource.updateTimer`
be4f1ac30 is described below

commit be4f1ac309f11f257e4219d9406c2d56909059b0
Author: Fu Chen <[email protected]>
AuthorDate: Sat Nov 23 12:01:36 2024 +0800

    [CELEBORN-1634][FOLLOWUP] Simplify the logic of the `RpcSource.addTimer` 
and `RpcSource.updateTimer`
    
    ### What changes were proposed in this pull request?
    
    As title
    
    ### Why are the changes needed?
    
    As title
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    GA
    
    Closes #2935 from cfmcgrady/CELEBORN-1634-followup.
    
    Authored-by: Fu Chen <[email protected]>
    Signed-off-by: Fu Chen <[email protected]>
---
 .../common/metrics/source/AbstractSource.scala        |  3 +++
 .../org/apache/celeborn/common/rpc/RpcSource.scala    | 19 -------------------
 2 files changed, 3 insertions(+), 19 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
index dc01c65be..56b7dd6d3 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
@@ -298,6 +298,9 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
 
   def updateTimer(metricsName: String, value: Long, labels: Map[String, 
String]): Unit = {
     val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName, 
labels)
+    if (!namedTimers.containsKey(metricNameWithLabel)) {
+      addTimer(metricsName, labels)
+    }
     val (namedTimer, _) = namedTimers.get(metricNameWithLabel)
     namedTimer.timer.update(value, TimeUnit.NANOSECONDS)
   }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcSource.scala 
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcSource.scala
index 1acccf062..3b1da4385 100644
--- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcSource.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcSource.scala
@@ -17,31 +17,12 @@
 
 package org.apache.celeborn.common.rpc
 
-import java.util.concurrent.ConcurrentHashMap
-
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.metrics.source.AbstractSource
 
 class RpcSource(conf: CelebornConf) extends AbstractSource(conf, 
RpcSource.ROLE_RPC) {
   override def sourceName: String = RpcSource.ROLE_RPC
 
-  private val msgNameSet = ConcurrentHashMap.newKeySet[String]()
-
-  override def updateTimer(name: String, value: Long): Unit = {
-    if (!msgNameSet.contains(name)) {
-      super.addTimer(name)
-      msgNameSet.add(name)
-    }
-    super.updateTimer(name, value)
-  }
-
-  override def addTimer(name: String): Unit = {
-    if (!msgNameSet.contains(name)) {
-      super.addTimer(name)
-      msgNameSet.add(name)
-    }
-  }
-
   startCleaner()
 }
 

Reply via email to