This is an automated email from the ASF dual-hosted git repository.
fchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new be4f1ac30 [CELEBORN-1634][FOLLOWUP] Simplify the logic of the
`RpcSource.addTimer` and `RpcSource.updateTimer`
be4f1ac30 is described below
commit be4f1ac309f11f257e4219d9406c2d56909059b0
Author: Fu Chen <[email protected]>
AuthorDate: Sat Nov 23 12:01:36 2024 +0800
[CELEBORN-1634][FOLLOWUP] Simplify the logic of the `RpcSource.addTimer`
and `RpcSource.updateTimer`
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
As title
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
GA
Closes #2935 from cfmcgrady/CELEBORN-1634-followup.
Authored-by: Fu Chen <[email protected]>
Signed-off-by: Fu Chen <[email protected]>
---
.../common/metrics/source/AbstractSource.scala | 3 +++
.../org/apache/celeborn/common/rpc/RpcSource.scala | 19 -------------------
2 files changed, 3 insertions(+), 19 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
index dc01c65be..56b7dd6d3 100644
---
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
@@ -298,6 +298,9 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
def updateTimer(metricsName: String, value: Long, labels: Map[String,
String]): Unit = {
val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName,
labels)
+ if (!namedTimers.containsKey(metricNameWithLabel)) {
+ addTimer(metricsName, labels)
+ }
val (namedTimer, _) = namedTimers.get(metricNameWithLabel)
namedTimer.timer.update(value, TimeUnit.NANOSECONDS)
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcSource.scala
b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcSource.scala
index 1acccf062..3b1da4385 100644
--- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcSource.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcSource.scala
@@ -17,31 +17,12 @@
package org.apache.celeborn.common.rpc
-import java.util.concurrent.ConcurrentHashMap
-
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.metrics.source.AbstractSource
class RpcSource(conf: CelebornConf) extends AbstractSource(conf,
RpcSource.ROLE_RPC) {
override def sourceName: String = RpcSource.ROLE_RPC
- private val msgNameSet = ConcurrentHashMap.newKeySet[String]()
-
- override def updateTimer(name: String, value: Long): Unit = {
- if (!msgNameSet.contains(name)) {
- super.addTimer(name)
- msgNameSet.add(name)
- }
- super.updateTimer(name, value)
- }
-
- override def addTimer(name: String): Unit = {
- if (!msgNameSet.contains(name)) {
- super.addTimer(name)
- msgNameSet.add(name)
- }
- }
-
startCleaner()
}