This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new c311a3551 [CELEBORN-1485] Refactor addCounter, addGauge and addTimer
of AbstractSource to reduce CPU utilization
c311a3551 is described below
commit c311a35516eb624946748999a0f04a17cb76dcb9
Author: SteNicholas <[email protected]>
AuthorDate: Mon Jul 1 20:16:28 2024 +0800
[CELEBORN-1485] Refactor addCounter, addGauge and addTimer of
AbstractSource to reduce CPU utilization
### What changes were proposed in this pull request?
Refactor `addCounter`, `addGauge` and `addTimer` of `AbstractSource` to
reduce CPU utilization.
### Why are the changes needed?
`addCounter`, `addGauge` and `addTimer` of `AbstractSource` checks whether
the metric key exist via `MetricRegistry#getMetrics` which iterates all metrics
and put into map at present. It causes that adding counter of active connection
count metric for application dimension would increase high CPU utilization when
there are many active connections:
<img width="1350" alt="image"
src="https://github.com/apache/celeborn/assets/10048174/cc882fac-eec1-417b-ba17-f3012053c6c7">
The implementation of `MetricRegistry#getMetrics` is as follows:
```
private <T extends Metric> SortedMap<String, T> getMetrics(Class<T> klass,
MetricFilter filter) {
final TreeMap<String, T> timers = new TreeMap<>();
for (Map.Entry<String, Metric> entry : metrics.entrySet()) {
if (klass.isInstance(entry.getValue()) &&
filter.matches(entry.getKey(), entry.getValue())) {
timers.put(entry.getKey(), (T) entry.getValue());
}
}
return Collections.unmodifiableSortedMap(timers);
}
```
Refactor `addCounter`, `addGauge` and `addTimer` of `AbstractSource` to
reduce CPU utilization.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Cluster test.
<img width="1345" alt="image"
src="https://github.com/apache/celeborn/assets/10048174/4c0a7f92-3cc5-45f8-941f-e1d0166043e1">
Closes #2593 from SteNicholas/CELEBORN-1485.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../common/metrics/source/AbstractSource.scala | 99 ++++++++--------------
.../congestcontrol/TestCongestionController.java | 40 +--------
2 files changed, 36 insertions(+), 103 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
index 72611598f..58aa71bee 100644
---
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
@@ -17,7 +17,7 @@
package org.apache.celeborn.common.metrics.source
-import java.util.{Map => JMap, Queue => JQueue}
+import java.util.{Map => JMap}
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue,
ScheduledExecutorService, TimeUnit}
import scala.collection.JavaConverters._
@@ -69,7 +69,8 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
val applicationLabel = "applicationId"
- protected val namedGauges: JQueue[NamedGauge[_]] = new
ConcurrentLinkedQueue[NamedGauge[_]]()
+ protected val namedGauges: ConcurrentHashMap[String, NamedGauge[_]] =
+ JavaUtils.newConcurrentHashMap[String, NamedGauge[_]]()
def addGauge[T](
name: String,
@@ -77,7 +78,9 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
gauge: Gauge[T]): Unit = {
// filter out non-number type gauges
if (gauge.getValue.isInstanceOf[Number]) {
- namedGauges.add(NamedGauge(name, gauge, labels ++ staticLabels))
+ namedGauges.putIfAbsent(
+ metricNameWithCustomizedLabels(name, labels),
+ NamedGauge(name, gauge, labels ++ staticLabels))
} else {
logWarning(
s"Add gauge $name failed, the value type ${gauge.getValue.getClass} is
not a number")
@@ -92,12 +95,10 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
}
def addGauge[T](name: String, labels: Map[String, String] = Map.empty)(f: ()
=> T): Unit = {
- val metricNameWithLabel = metricNameWithCustomizedLabels(name, labels)
- if (!metricRegistry.getGauges.containsKey(metricNameWithLabel)) {
- val supplier: MetricRegistry.MetricSupplier[Gauge[_]] = new
GaugeSupplier[T](f)
- val gauge = metricRegistry.gauge(metricNameWithLabel, supplier)
- addGauge(name, labels, gauge)
- }
+ addGauge(
+ name,
+ labels,
+ metricRegistry.gauge(metricNameWithCustomizedLabels(name, labels), new
GaugeSupplier[T](f)))
}
def addGauge[T](name: String, gauge: Gauge[T]): Unit = {
@@ -112,16 +113,16 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
def addTimer(name: String, labels: Map[String, String]): Unit = {
val metricNameWithLabel = metricNameWithCustomizedLabels(name, labels)
- if (!metricRegistry.getTimers.containsKey(metricNameWithLabel)) {
- val timer = metricRegistry.timer(metricNameWithLabel, timerSupplier)
- namedTimers.computeIfAbsent(
- metricNameWithLabel,
- (_: String) => {
- val namedTimer = NamedTimer(name, timer, labels ++ staticLabels)
- val values = JavaUtils.newConcurrentHashMap[String, Long]()
- (namedTimer, values)
- })
- }
+ namedTimers.computeIfAbsent(
+ metricNameWithLabel,
+ (_: String) => {
+ val namedTimer = NamedTimer(
+ name,
+ metricRegistry.timer(metricNameWithLabel, timerSupplier),
+ labels ++ staticLabels)
+ val values = JavaUtils.newConcurrentHashMap[String, Long]()
+ (namedTimer, values)
+ })
}
protected val namedCounters: ConcurrentHashMap[String, NamedCounter] =
@@ -131,13 +132,9 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
def addCounter(name: String, labels: Map[String, String]): Unit = {
val metricNameWithLabel = metricNameWithCustomizedLabels(name, labels)
- if (!metricRegistry.getCounters.containsKey(
- metricNameWithLabel)) {
- val counter = metricRegistry.counter(metricNameWithLabel)
- namedCounters.put(
- metricNameWithLabel,
- NamedCounter(name, counter, labels ++ staticLabels))
- }
+ namedCounters.putIfAbsent(
+ metricNameWithLabel,
+ NamedCounter(name, metricRegistry.counter(metricNameWithLabel), labels
++ staticLabels))
}
def counters(): List[NamedCounter] = {
@@ -145,7 +142,7 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
}
def gauges(): List[NamedGauge[_]] = {
- namedGauges.asScala.toList
+ namedGauges.values().asScala.toList
}
def histograms(): List[NamedHistogram] = {
@@ -156,6 +153,10 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
namedTimers.values().asScala.toList.map(_._1)
}
+ def gaugeExists(name: String, labels: Map[String, String]): Boolean = {
+ namedGauges.containsKey(metricNameWithCustomizedLabels(name, labels))
+ }
+
def needSample(): Boolean = {
if (metricsSampleRate >= 1) {
true
@@ -167,43 +168,17 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
}
def removeCounter(name: String, labels: Map[String, String]): Unit = {
- val metricNameWithLabel = metricNameWithCustomizedLabels(name, labels)
- val namedCounter = namedCounters.get(metricNameWithLabel)
- if (namedCounter != null) {
- removeMetric(metricNameWithLabel, namedCounter)
- }
+ namedCounters.remove(removeMetric(name, labels))
}
def removeGauge(name: String, labels: Map[String, String]): Unit = {
- val labelString = MetricLabels.labelString(labels ++ staticLabels)
-
- val iter = namedGauges.iterator()
- while (iter.hasNext) {
- val namedGauge = iter.next()
- if (namedGauge.name.equals(name) &&
namedGauge.labelString.equals(labelString)) {
- iter.remove()
- removeMetric(name, namedGauge)
- return
- }
- }
- }
-
- def removeGauge(name: String, labelKey: String, labelVal: String): Unit = {
- val labels = Map(labelKey -> labelVal) ++ staticLabels
-
- val iter = namedGauges.iterator()
- while (iter.hasNext) {
- val namedGauge = iter.next()
- if (namedGauge.name.equals(name) &&
labels.toSet.subsetOf(namedGauge.labels.toSet)) {
- iter.remove()
- removeMetric(name, namedGauge)
- return
- }
- }
+ namedGauges.remove(removeMetric(name, labels))
}
- def removeMetric(name: String, metric: MetricLabels): Unit = {
- metricRegistry.remove(metricNameWithCustomizedLabelString(name,
metric.labelString))
+ def removeMetric(name: String, labels: Map[String, String]): String = {
+ val metricNameWithLabel = metricNameWithCustomizedLabels(name, labels)
+ metricRegistry.remove(metricNameWithLabel)
+ metricNameWithLabel
}
override def sample[T](metricsName: String, key: String)(f: => T): T = {
@@ -442,12 +417,6 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
metricsName + MetricLabels.labelString(labels ++ staticLabels)
}
}
-
- protected def metricNameWithCustomizedLabelString(
- metricsName: String,
- labelString: String): String = {
- metricsName + labelString
- }
}
class TimerSupplier(val slidingWindowSize: Int)
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
index deb533fa4..a48288325 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java
@@ -17,10 +17,6 @@
package org.apache.celeborn.service.deploy.worker.congestcontrol;
-import java.util.Map;
-
-import scala.collection.JavaConverters;
-
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -121,43 +117,11 @@ public class TestCongestionController {
Assert.assertFalse(controller.isUserCongested(user));
produceBytes(user, 800);
- Assert.assertTrue(
- isGaugeExist(
- WorkerSource.USER_PRODUCE_SPEED(),
- JavaConverters.mapAsJavaMapConverter(user.toMap()).asJava()));
+ Assert.assertTrue(source.gaugeExists(WorkerSource.USER_PRODUCE_SPEED(),
user.toMap()));
Thread.sleep(userInactiveTimeMills * 2);
- Assert.assertFalse(
- isGaugeExist(
- WorkerSource.USER_PRODUCE_SPEED(),
- JavaConverters.mapAsJavaMapConverter(user.toMap()).asJava()));
- }
-
- private boolean isGaugeExist(String name, Map<String, String> labels) {
- return source.namedGauges().stream()
- .filter(
- gauge -> {
- if (gauge.name().equals(name)) {
- return labels.entrySet().stream()
- .noneMatch(
- entry -> {
- // Filter entry not exist in the gauge's labels
- if
(gauge.labels().get(entry.getKey()).nonEmpty()) {
- return !gauge
- .labels()
- .get(entry.getKey())
- .get()
- .equals(entry.getValue());
- } else {
- return true;
- }
- });
- }
- return false;
- })
- .count()
- == 1;
+ Assert.assertFalse(source.gaugeExists(WorkerSource.USER_PRODUCE_SPEED(),
user.toMap()));
}
private void produceBytes(UserIdentifier userIdentifier, long numBytes) {