This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new cc04d1315 [CELEBORN-1743] Resolve the metrics data interruption and
the job failure caused by locked resources
cc04d1315 is described below
commit cc04d1315e123f42c2e5b50179bd9f8f21742a57
Author: zhengtao <[email protected]>
AuthorDate: Wed Dec 4 10:11:09 2024 +0800
[CELEBORN-1743] Resolve the metrics data interruption and the job failure
caused by locked resources
### What changes were proposed in this pull request?
Remove the ConcurrentLinkedQueue and lock in AbstractSource which might
cause the metrics data interruption and job fail.
### Why are the changes needed?
Current problems:[jira
CELEBORN-1743](https://issues.apache.org/jira/browse/CELEBORN-1743)
the lock in [[CELEBORN-1453]](https://github.com/apache/celeborn/pull/2548)
might block the thread.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manual test
same result with CELEBORN-1453

Closes #2956 from zaynt4606/clb1743.
Authored-by: zhengtao <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../common/metrics/source/AbstractSource.scala | 134 +++++++++++++--------
1 file changed, 85 insertions(+), 49 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
index 56b7dd6d3..e9c4cfa3b 100644
---
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.{ConcurrentHashMap,
ConcurrentLinkedQueue, Scheduled
import scala.collection.JavaConverters._
import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import com.codahale.metrics._
@@ -59,8 +60,6 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
val metricsCapacity: Int = conf.metricsCapacity
- val innerMetrics: ConcurrentLinkedQueue[String] = new
ConcurrentLinkedQueue[String]()
-
val timerSupplier = new TimerSupplier(metricsSlidingWindowSize)
val metricsCleaner: ScheduledExecutorService =
@@ -79,12 +78,26 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
val applicationLabel = "applicationId"
+ val timerMetrics: ConcurrentLinkedQueue[String] = new
ConcurrentLinkedQueue[String]()
+
protected val namedGauges: ConcurrentHashMap[String, NamedGauge[_]] =
JavaUtils.newConcurrentHashMap[String, NamedGauge[_]]()
+ protected val namedTimers
+ : ConcurrentHashMap[String, (NamedTimer, ConcurrentHashMap[String,
Long])] =
+ JavaUtils.newConcurrentHashMap[String, (NamedTimer,
ConcurrentHashMap[String, Long])]()
+
+ protected val namedCounters: ConcurrentHashMap[String, NamedCounter] =
+ JavaUtils.newConcurrentHashMap[String, NamedCounter]()
+
protected val namedMeters: ConcurrentHashMap[String, NamedMeter] =
JavaUtils.newConcurrentHashMap[String, NamedMeter]()
+ def addTimerMetrics(namedTimer: NamedTimer): Unit = {
+ val timerMetricsString = getTimerMetrics(namedTimer)
+ timerMetrics.add(timerMetricsString)
+ }
+
def addGauge[T](
name: String,
labels: Map[String, String],
@@ -145,10 +158,6 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
addMeter(name, Map.empty[String, String], meter)
}
- protected val namedTimers
- : ConcurrentHashMap[String, (NamedTimer, ConcurrentHashMap[String,
Long])] =
- JavaUtils.newConcurrentHashMap[String, (NamedTimer,
ConcurrentHashMap[String, Long])]()
-
def addTimer(name: String): Unit = addTimer(name, Map.empty[String, String])
def addTimer(name: String, labels: Map[String, String]): Unit = {
@@ -165,9 +174,6 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
})
}
- protected val namedCounters: ConcurrentHashMap[String, NamedCounter] =
- JavaUtils.newConcurrentHashMap[String, NamedCounter]()
-
def addCounter(name: String): Unit = addCounter(name, Map.empty[String,
String])
def addCounter(name: String, labels: Map[String, String]): Unit = {
@@ -197,6 +203,18 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
namedTimers.values().asScala.toList.map(_._1)
}
+ def getAndClearTimerMetrics(): List[String] = {
+ timerMetrics.synchronized {
+ var timerMetricsSize = timerMetrics.size()
+ val timerMetricsList = ArrayBuffer[String]()
+ while (timerMetricsSize > 0) {
+ timerMetricsList.append(timerMetrics.poll())
+ timerMetricsSize = timerMetricsSize - 1
+ }
+ timerMetricsList.toList
+ }
+ }
+
def gaugeExists(name: String, labels: Map[String, String]): Boolean = {
namedGauges.containsKey(metricNameWithCustomizedLabels(name, labels))
}
@@ -282,7 +300,7 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
case Some(t) =>
namedTimer.timer.update(System.nanoTime() - t, TimeUnit.NANOSECONDS)
if (namedTimer.timer.getCount % metricsSlidingWindowSize == 0) {
- recordTimer(namedTimer)
+ addTimerMetrics(namedTimer)
}
case None =>
}
@@ -347,31 +365,22 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
metricsCleaner.scheduleWithFixedDelay(cleanTask, 10, 10, TimeUnit.MINUTES)
}
- private def updateInnerMetrics(str: String): Unit = {
- innerMetrics.synchronized {
- if (innerMetrics.size() >= metricsCapacity) {
- innerMetrics.remove()
- }
- innerMetrics.offer(str)
- }
- }
-
- def recordCounter(nc: NamedCounter): Unit = {
+ def getCounterMetrics(nc: NamedCounter): String = {
val timestamp = System.currentTimeMillis
val label = nc.labelString
- updateInnerMetrics(s"${normalizeKey(nc.name)}Count$label
${nc.counter.getCount} $timestamp\n")
+ val str = s"${normalizeKey(nc.name)}Count$label ${nc.counter.getCount}
$timestamp\n"
+ str
}
- def recordGauge(ng: NamedGauge[_]): Unit = {
+ def getGaugeMetrics(ng: NamedGauge[_]): String = {
val timestamp = System.currentTimeMillis
val sb = new StringBuilder
val label = ng.labelString
sb.append(s"${normalizeKey(ng.name)}Value$label ${ng.gauge.getValue}
$timestamp\n")
-
- updateInnerMetrics(sb.toString())
+ sb.toString()
}
- def recordMeter(nm: NamedMeter): Unit = {
+ def getMeterMetrics(nm: NamedMeter): String = {
val timestamp = System.currentTimeMillis
val sb = new StringBuilder
val label = nm.labelString
@@ -383,11 +392,10 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
s"${normalizeKey(nm.name)}FiveMinuteRate$label
${nm.meter.getFiveMinuteRate} $timestamp\n")
sb.append(
s"${normalizeKey(nm.name)}FifteenMinuteRate$label
${nm.meter.getFifteenMinuteRate} $timestamp\n")
-
- updateInnerMetrics(sb.toString())
+ sb.toString()
}
- def recordHistogram(nh: NamedHistogram): Unit = {
+ def getHistogramMetrics(nh: NamedHistogram): String = {
val timestamp = System.currentTimeMillis
val sb = new mutable.StringBuilder
val snapshot = nh.histogram.getSnapshot
@@ -409,11 +417,10 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
s" ${reportNanosAsMills(snapshot.get99thPercentile)} $timestamp\n")
sb.append(s"${prefix}999thPercentile$label" +
s" ${reportNanosAsMills(snapshot.get999thPercentile)} $timestamp\n")
-
- updateInnerMetrics(sb.toString())
+ sb.toString()
}
- def recordTimer(nt: NamedTimer): Unit = {
+ def getTimerMetrics(nt: NamedTimer): String = {
val timestamp = System.currentTimeMillis
val sb = new mutable.StringBuilder
val snapshot = nt.timer.getSnapshot
@@ -435,32 +442,61 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
s" ${reportNanosAsMills(snapshot.get99thPercentile)} $timestamp\n")
sb.append(s"${prefix}999thPercentile$label" +
s" ${reportNanosAsMills(snapshot.get999thPercentile)} $timestamp\n")
+ sb.toString()
+ }
- updateInnerMetrics(sb.toString())
+ def getAllMetricsNum: Int = {
+ val sum = timerMetrics.size() +
+ namedTimers.size() +
+ namedMeters.size() +
+ namedGauges.size() +
+ namedCounters.size()
+ sum
}
override def getMetrics(): String = {
- innerMetrics.synchronized {
- counters().foreach(c => recordCounter(c))
- gauges().foreach(g => recordGauge(g))
- meters().foreach(m => recordMeter(m))
- histograms().foreach(h => {
- recordHistogram(h)
+ var leftMetricsNum = metricsCapacity
+ val sb = new mutable.StringBuilder
+ leftMetricsNum = fillInnerMetricsSnapshot(getAndClearTimerMetrics(),
leftMetricsNum, sb)
+ leftMetricsNum = fillInnerMetricsSnapshot(timers(), leftMetricsNum, sb)
+ leftMetricsNum = fillInnerMetricsSnapshot(histograms(), leftMetricsNum, sb)
+ leftMetricsNum = fillInnerMetricsSnapshot(meters(), leftMetricsNum, sb)
+ leftMetricsNum = fillInnerMetricsSnapshot(gauges(), leftMetricsNum, sb)
+ leftMetricsNum = fillInnerMetricsSnapshot(counters(), leftMetricsNum, sb)
+ if (leftMetricsNum <= 0) {
+ logWarning(
+ s"The number of metrics exceed the output metrics strings capacity!
All metrics Num: $getAllMetricsNum")
+ }
+ sb.toString()
+ }
+
+ private def fillInnerMetricsSnapshot(
+ metricList: List[AnyRef],
+ leftNum: Int,
+ sb: mutable.StringBuilder): Int = {
+ if (leftNum <= 0) {
+ return 0
+ }
+ val addList = metricList.take(leftNum)
+ addList.foreach {
+ case c: NamedCounter =>
+ sb.append(getCounterMetrics(c))
+ case g: NamedGauge[_] =>
+ sb.append(getGaugeMetrics(g))
+ case m: NamedMeter =>
+ sb.append(getMeterMetrics(m))
+ case h: NamedHistogram =>
+ sb.append(getHistogramMetrics(h))
h.asInstanceOf[CelebornHistogram].reservoir
.asInstanceOf[ResettableSlidingWindowReservoir].reset()
- })
- timers().foreach(t => {
- recordTimer(t)
+ case t: NamedTimer =>
+ sb.append(getTimerMetrics(t))
t.timer.asInstanceOf[CelebornTimer].reservoir
.asInstanceOf[ResettableSlidingWindowReservoir].reset()
- })
- val sb = new mutable.StringBuilder
- while (!innerMetrics.isEmpty) {
- sb.append(innerMetrics.poll())
- }
- innerMetrics.clear()
- sb.toString()
+ case s =>
+ sb.append(s.toString)
}
+ leftNum - addList.size
}
override def destroy(): Unit = {
@@ -469,7 +505,7 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
namedGauges.clear()
namedMeters.clear()
namedTimers.clear()
- innerMetrics.clear()
+ timerMetrics.clear()
metricRegistry.removeMatching(new MetricFilter {
override def matches(s: String, metric: Metric): Boolean = true
})