This is an automated email from the ASF dual-hosted git repository.

fchen pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.4 by this push:
     new 8976ba26b [CELEBORN-1485][0.4] Refactor addCounter, addGauge and 
addTimer of AbstractSource to reduce CPU utilization
8976ba26b is described below

commit 8976ba26b0d2bb05945711885864f3e822c1fd08
Author: SteNicholas <[email protected]>
AuthorDate: Wed Nov 20 11:50:44 2024 +0800

    [CELEBORN-1485][0.4] Refactor addCounter, addGauge and addTimer of 
AbstractSource to reduce CPU utilization
    
    backport https://github.com/apache/celeborn/pull/2593 to branch-0.4
    
    ### What changes were proposed in this pull request?
    
    Refactor `addCounter`, `addGauge` and `addTimer` of `AbstractSource` to 
reduce CPU utilization.
    
    ### Why are the changes needed?
    
    `addCounter`, `addGauge` and `addTimer` of `AbstractSource` checks whether 
the metric key exist via `MetricRegistry#getMetrics` which iterates all metrics 
and put into map at present. It causes that adding counter of active connection 
count metric for application dimension would increase high CPU utilization when 
there are many active connections:
    
    <img width="1350" alt="image" 
src="https://github.com/apache/celeborn/assets/10048174/cc882fac-eec1-417b-ba17-f3012053c6c7";>
    
    The implementation of `MetricRegistry#getMetrics` is as follows:
    
    ```
    private <T extends Metric> SortedMap<String, T> getMetrics(Class<T> klass, 
MetricFilter filter) {
        final TreeMap<String, T> timers = new TreeMap<>();
        for (Map.Entry<String, Metric> entry : metrics.entrySet()) {
            if (klass.isInstance(entry.getValue()) && 
filter.matches(entry.getKey(), entry.getValue())) {
                timers.put(entry.getKey(), (T) entry.getValue());
            }
        }
        return Collections.unmodifiableSortedMap(timers);
    }
    ```
    
    Refactor `addCounter`, `addGauge` and `addTimer` of `AbstractSource` to 
reduce CPU utilization.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Cluster test.
    
    <img width="1345" alt="image" 
src="https://github.com/apache/celeborn/assets/10048174/4c0a7f92-3cc5-45f8-941f-e1d0166043e1";>
    
    Closes #2593 from SteNicholas/CELEBORN-1485.
    
    Authored-by: SteNicholas <programgeek163.com>
    
    Closes #2928 from cfmcgrady/CELEBORN-1485-0.4.
    
    Lead-authored-by: SteNicholas <[email protected]>
    Co-authored-by: Fu Chen <[email protected]>
    Signed-off-by: Fu Chen <[email protected]>
---
 .../common/metrics/source/AbstractSource.scala     | 95 +++++++++-------------
 .../congestcontrol/TestCongestionController.java   | 40 +--------
 2 files changed, 40 insertions(+), 95 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
index e77dd0685..bba98777f 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
@@ -17,7 +17,7 @@
 
 package org.apache.celeborn.common.metrics.source
 
-import java.util.{Map => JMap, Queue => JQueue}
+import java.util.{Map => JMap}
 import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, 
ScheduledExecutorService, TimeUnit}
 
 import scala.collection.JavaConverters._
@@ -69,7 +69,8 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
 
   val applicationLabel = "applicationId"
 
-  protected val namedGauges: JQueue[NamedGauge[_]] = new 
ConcurrentLinkedQueue[NamedGauge[_]]()
+  protected val namedGauges: ConcurrentHashMap[String, NamedGauge[_]] =
+    JavaUtils.newConcurrentHashMap[String, NamedGauge[_]]()
 
   def addGauge[T](
       name: String,
@@ -77,7 +78,9 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
       gauge: Gauge[T]): Unit = {
     // filter out non-number type gauges
     if (gauge.getValue.isInstanceOf[Number]) {
-      namedGauges.add(NamedGauge(name, gauge, labels ++ staticLabels))
+      namedGauges.putIfAbsent(
+        metricNameWithCustomizedLabels(name, labels),
+        NamedGauge(name, gauge, labels ++ staticLabels))
     } else {
       logWarning(
         s"Add gauge $name failed, the value type ${gauge.getValue.getClass} is 
not a number")
@@ -92,12 +95,10 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
   }
 
   def addGauge[T](name: String, labels: Map[String, String] = Map.empty)(f: () 
=> T): Unit = {
-    val metricNameWithLabel = metricNameWithCustomizedLabels(name, labels)
-    if (!metricRegistry.getGauges.containsKey(metricNameWithLabel)) {
-      val supplier: MetricRegistry.MetricSupplier[Gauge[_]] = new 
GaugeSupplier[T](f)
-      val gauge = metricRegistry.gauge(metricNameWithLabel, supplier)
-      addGauge(name, labels, gauge)
-    }
+    addGauge(
+      name,
+      labels,
+      metricRegistry.gauge(metricNameWithCustomizedLabels(name, labels), new 
GaugeSupplier[T](f)))
   }
 
   def addGauge[T](name: String, gauge: Gauge[T]): Unit = {
@@ -112,16 +113,16 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
 
   def addTimer(name: String, labels: Map[String, String]): Unit = {
     val metricNameWithLabel = metricNameWithCustomizedLabels(name, labels)
-    if (!metricRegistry.getTimers.containsKey(metricNameWithLabel)) {
-      val timer = metricRegistry.timer(metricNameWithLabel, timerSupplier)
-      namedTimers.computeIfAbsent(
-        metricNameWithLabel,
-        (_: String) => {
-          val namedTimer = NamedTimer(name, timer, labels ++ staticLabels)
-          val values = JavaUtils.newConcurrentHashMap[String, Long]()
-          (namedTimer, values)
-        })
-    }
+    namedTimers.computeIfAbsent(
+      metricNameWithLabel,
+      (_: String) => {
+        val namedTimer = NamedTimer(
+          name,
+          metricRegistry.timer(metricNameWithLabel, timerSupplier),
+          labels ++ staticLabels)
+        val values = JavaUtils.newConcurrentHashMap[String, Long]()
+        (namedTimer, values)
+      })
   }
 
   protected val namedCounters: ConcurrentHashMap[String, NamedCounter] =
@@ -131,13 +132,9 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
 
   def addCounter(name: String, labels: Map[String, String]): Unit = {
     val metricNameWithLabel = metricNameWithCustomizedLabels(name, labels)
-    if (!metricRegistry.getCounters.containsKey(
-        metricNameWithLabel)) {
-      val counter = metricRegistry.counter(metricNameWithLabel)
-      namedCounters.put(
-        metricNameWithLabel,
-        NamedCounter(name, counter, labels ++ staticLabels))
-    }
+    namedCounters.putIfAbsent(
+      metricNameWithLabel,
+      NamedCounter(name, metricRegistry.counter(metricNameWithLabel), labels 
++ staticLabels))
   }
 
   def counters(): List[NamedCounter] = {
@@ -145,7 +142,7 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
   }
 
   def gauges(): List[NamedGauge[_]] = {
-    namedGauges.asScala.toList
+    namedGauges.values().asScala.toList
   }
 
   def histograms(): List[NamedHistogram] = {
@@ -156,6 +153,10 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
     namedTimers.values().asScala.toList.map(_._1)
   }
 
+  def gaugeExists(name: String, labels: Map[String, String]): Boolean = {
+    namedGauges.containsKey(metricNameWithCustomizedLabels(name, labels))
+  }
+
   def needSample(): Boolean = {
     if (metricsSampleRate >= 1) {
       true
@@ -166,36 +167,22 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
     }
   }
 
+  def removeCounter(name: String, labels: Map[String, String]): Unit = {
+    namedCounters.remove(removeMetric(name, labels))
+  }
+
   def removeGauge(name: String, labels: Map[String, String]): Unit = {
-    val labelString = MetricLabels.labelString(labels ++ staticLabels)
-
-    val iter = namedGauges.iterator()
-    while (iter.hasNext) {
-      val namedGauge = iter.next()
-      if (namedGauge.name.equals(name) && 
namedGauge.labelString.equals(labelString)) {
-        iter.remove()
-        removeGaugeMetric(name, namedGauge)
-        return
-      }
-    }
+    namedGauges.remove(removeMetric(name, labels))
   }
 
   def removeGauge(name: String, labelKey: String, labelVal: String): Unit = {
-    val labels = Map(labelKey -> labelVal) ++ staticLabels
-
-    val iter = namedGauges.iterator()
-    while (iter.hasNext) {
-      val namedGauge = iter.next()
-      if (namedGauge.name.equals(name) && 
labels.toSet.subsetOf(namedGauge.labels.toSet)) {
-        iter.remove()
-        removeGaugeMetric(name, namedGauge)
-        return
-      }
-    }
+    removeGauge(name, Map(labelKey -> labelVal) ++ staticLabels)
   }
 
-  def removeGaugeMetric(name: String, namedGauge: NamedGauge[_]): Unit = {
-    metricRegistry.remove(metricNameWithCustomizedLabelString(name, 
namedGauge.labelString))
+  def removeMetric(name: String, labels: Map[String, String]): String = {
+    val metricNameWithLabel = metricNameWithCustomizedLabels(name, labels)
+    metricRegistry.remove(metricNameWithLabel)
+    metricNameWithLabel
   }
 
   override def sample[T](metricsName: String, key: String)(f: => T): T = {
@@ -434,12 +421,6 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
       metricsName + MetricLabels.labelString(labels ++ staticLabels)
     }
   }
-
-  protected def metricNameWithCustomizedLabelString(
-      metricsName: String,
-      labelString: String): String = {
-    metricsName + labelString
-  }
 }
 
 class TimerSupplier(val slidingWindowSize: Int)
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
index b8fb5cadc..d7029cd93 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
@@ -17,10 +17,6 @@
 
 package org.apache.celeborn.service.deploy.worker.congestcontrol;
 
-import java.util.Map;
-
-import scala.collection.JavaConverters;
-
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -121,42 +117,10 @@ public class TestCongestionController {
     Assert.assertFalse(controller.isUserCongested(user));
     controller.produceBytes(user, 800);
 
-    Assert.assertTrue(
-        isGaugeExist(
-            WorkerSource.USER_PRODUCE_SPEED(),
-            JavaConverters.mapAsJavaMapConverter(user.toMap()).asJava()));
+    Assert.assertTrue(source.gaugeExists(WorkerSource.USER_PRODUCE_SPEED(), 
user.toMap()));
 
     Thread.sleep(userInactiveTimeMills * 2);
 
-    Assert.assertFalse(
-        isGaugeExist(
-            WorkerSource.USER_PRODUCE_SPEED(),
-            JavaConverters.mapAsJavaMapConverter(user.toMap()).asJava()));
-  }
-
-  private boolean isGaugeExist(String name, Map<String, String> labels) {
-    return source.namedGauges().stream()
-            .filter(
-                gauge -> {
-                  if (gauge.name().equals(name)) {
-                    return labels.entrySet().stream()
-                        .noneMatch(
-                            entry -> {
-                              // Filter entry not exist in the gauge's labels
-                              if 
(gauge.labels().get(entry.getKey()).nonEmpty()) {
-                                return !gauge
-                                    .labels()
-                                    .get(entry.getKey())
-                                    .get()
-                                    .equals(entry.getValue());
-                              } else {
-                                return true;
-                              }
-                            });
-                  }
-                  return false;
-                })
-            .count()
-        == 1;
+    Assert.assertFalse(source.gaugeExists(WorkerSource.USER_PRODUCE_SPEED(), 
user.toMap()));
   }
 }

Reply via email to