This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new cc04d1315 [CELEBORN-1743] Resolve the metrics data interruption and 
the job failure caused by locked resources
cc04d1315 is described below

commit cc04d1315e123f42c2e5b50179bd9f8f21742a57
Author: zhengtao <[email protected]>
AuthorDate: Wed Dec 4 10:11:09 2024 +0800

    [CELEBORN-1743] Resolve the metrics data interruption and the job failure 
caused by locked resources
    
    ### What changes were proposed in this pull request?
    Remove the  ConcurrentLinkedQueue and lock in AbstractSource which might 
cause the metrics data interruption and job fail.
    
    ### Why are the changes needed?
    Current problems:[jira 
CELEBORN-1743](https://issues.apache.org/jira/browse/CELEBORN-1743)
    the lock in [[CELEBORN-1453]](https://github.com/apache/celeborn/pull/2548) 
might block the thread.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Manual test
    same result with CELEBORN-1453
    
![image](https://github.com/user-attachments/assets/3e3a4c53-1cf6-48f6-8c37-67d875d675af)
    
    Closes #2956 from zaynt4606/clb1743.
    
    Authored-by: zhengtao <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../common/metrics/source/AbstractSource.scala     | 134 +++++++++++++--------
 1 file changed, 85 insertions(+), 49 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
index 56b7dd6d3..e9c4cfa3b 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.{ConcurrentHashMap, 
ConcurrentLinkedQueue, Scheduled
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
 import scala.util.Random
 
 import com.codahale.metrics._
@@ -59,8 +60,6 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
 
   val metricsCapacity: Int = conf.metricsCapacity
 
-  val innerMetrics: ConcurrentLinkedQueue[String] = new 
ConcurrentLinkedQueue[String]()
-
   val timerSupplier = new TimerSupplier(metricsSlidingWindowSize)
 
   val metricsCleaner: ScheduledExecutorService =
@@ -79,12 +78,26 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
 
   val applicationLabel = "applicationId"
 
+  val timerMetrics: ConcurrentLinkedQueue[String] = new 
ConcurrentLinkedQueue[String]()
+
   protected val namedGauges: ConcurrentHashMap[String, NamedGauge[_]] =
     JavaUtils.newConcurrentHashMap[String, NamedGauge[_]]()
 
+  protected val namedTimers
+      : ConcurrentHashMap[String, (NamedTimer, ConcurrentHashMap[String, 
Long])] =
+    JavaUtils.newConcurrentHashMap[String, (NamedTimer, 
ConcurrentHashMap[String, Long])]()
+
+  protected val namedCounters: ConcurrentHashMap[String, NamedCounter] =
+    JavaUtils.newConcurrentHashMap[String, NamedCounter]()
+
   protected val namedMeters: ConcurrentHashMap[String, NamedMeter] =
     JavaUtils.newConcurrentHashMap[String, NamedMeter]()
 
+  def addTimerMetrics(namedTimer: NamedTimer): Unit = {
+    val timerMetricsString = getTimerMetrics(namedTimer)
+    timerMetrics.add(timerMetricsString)
+  }
+
   def addGauge[T](
       name: String,
       labels: Map[String, String],
@@ -145,10 +158,6 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
     addMeter(name, Map.empty[String, String], meter)
   }
 
-  protected val namedTimers
-      : ConcurrentHashMap[String, (NamedTimer, ConcurrentHashMap[String, 
Long])] =
-    JavaUtils.newConcurrentHashMap[String, (NamedTimer, 
ConcurrentHashMap[String, Long])]()
-
   def addTimer(name: String): Unit = addTimer(name, Map.empty[String, String])
 
   def addTimer(name: String, labels: Map[String, String]): Unit = {
@@ -165,9 +174,6 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
       })
   }
 
-  protected val namedCounters: ConcurrentHashMap[String, NamedCounter] =
-    JavaUtils.newConcurrentHashMap[String, NamedCounter]()
-
   def addCounter(name: String): Unit = addCounter(name, Map.empty[String, 
String])
 
   def addCounter(name: String, labels: Map[String, String]): Unit = {
@@ -197,6 +203,18 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
     namedTimers.values().asScala.toList.map(_._1)
   }
 
+  def getAndClearTimerMetrics(): List[String] = {
+    timerMetrics.synchronized {
+      var timerMetricsSize = timerMetrics.size()
+      val timerMetricsList = ArrayBuffer[String]()
+      while (timerMetricsSize > 0) {
+        timerMetricsList.append(timerMetrics.poll())
+        timerMetricsSize = timerMetricsSize - 1
+      }
+      timerMetricsList.toList
+    }
+  }
+
   def gaugeExists(name: String, labels: Map[String, String]): Boolean = {
     namedGauges.containsKey(metricNameWithCustomizedLabels(name, labels))
   }
@@ -282,7 +300,7 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
         case Some(t) =>
           namedTimer.timer.update(System.nanoTime() - t, TimeUnit.NANOSECONDS)
           if (namedTimer.timer.getCount % metricsSlidingWindowSize == 0) {
-            recordTimer(namedTimer)
+            addTimerMetrics(namedTimer)
           }
         case None =>
       }
@@ -347,31 +365,22 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
     metricsCleaner.scheduleWithFixedDelay(cleanTask, 10, 10, TimeUnit.MINUTES)
   }
 
-  private def updateInnerMetrics(str: String): Unit = {
-    innerMetrics.synchronized {
-      if (innerMetrics.size() >= metricsCapacity) {
-        innerMetrics.remove()
-      }
-      innerMetrics.offer(str)
-    }
-  }
-
-  def recordCounter(nc: NamedCounter): Unit = {
+  def getCounterMetrics(nc: NamedCounter): String = {
     val timestamp = System.currentTimeMillis
     val label = nc.labelString
-    updateInnerMetrics(s"${normalizeKey(nc.name)}Count$label 
${nc.counter.getCount} $timestamp\n")
+    val str = s"${normalizeKey(nc.name)}Count$label ${nc.counter.getCount} 
$timestamp\n"
+    str
   }
 
-  def recordGauge(ng: NamedGauge[_]): Unit = {
+  def getGaugeMetrics(ng: NamedGauge[_]): String = {
     val timestamp = System.currentTimeMillis
     val sb = new StringBuilder
     val label = ng.labelString
     sb.append(s"${normalizeKey(ng.name)}Value$label ${ng.gauge.getValue} 
$timestamp\n")
-
-    updateInnerMetrics(sb.toString())
+    sb.toString()
   }
 
-  def recordMeter(nm: NamedMeter): Unit = {
+  def getMeterMetrics(nm: NamedMeter): String = {
     val timestamp = System.currentTimeMillis
     val sb = new StringBuilder
     val label = nm.labelString
@@ -383,11 +392,10 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
       s"${normalizeKey(nm.name)}FiveMinuteRate$label 
${nm.meter.getFiveMinuteRate} $timestamp\n")
     sb.append(
       s"${normalizeKey(nm.name)}FifteenMinuteRate$label 
${nm.meter.getFifteenMinuteRate} $timestamp\n")
-
-    updateInnerMetrics(sb.toString())
+    sb.toString()
   }
 
-  def recordHistogram(nh: NamedHistogram): Unit = {
+  def getHistogramMetrics(nh: NamedHistogram): String = {
     val timestamp = System.currentTimeMillis
     val sb = new mutable.StringBuilder
     val snapshot = nh.histogram.getSnapshot
@@ -409,11 +417,10 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
       s" ${reportNanosAsMills(snapshot.get99thPercentile)} $timestamp\n")
     sb.append(s"${prefix}999thPercentile$label" +
       s" ${reportNanosAsMills(snapshot.get999thPercentile)} $timestamp\n")
-
-    updateInnerMetrics(sb.toString())
+    sb.toString()
   }
 
-  def recordTimer(nt: NamedTimer): Unit = {
+  def getTimerMetrics(nt: NamedTimer): String = {
     val timestamp = System.currentTimeMillis
     val sb = new mutable.StringBuilder
     val snapshot = nt.timer.getSnapshot
@@ -435,32 +442,61 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
       s" ${reportNanosAsMills(snapshot.get99thPercentile)} $timestamp\n")
     sb.append(s"${prefix}999thPercentile$label" +
       s" ${reportNanosAsMills(snapshot.get999thPercentile)} $timestamp\n")
+    sb.toString()
+  }
 
-    updateInnerMetrics(sb.toString())
+  def getAllMetricsNum: Int = {
+    val sum = timerMetrics.size() +
+      namedTimers.size() +
+      namedMeters.size() +
+      namedGauges.size() +
+      namedCounters.size()
+    sum
   }
 
   override def getMetrics(): String = {
-    innerMetrics.synchronized {
-      counters().foreach(c => recordCounter(c))
-      gauges().foreach(g => recordGauge(g))
-      meters().foreach(m => recordMeter(m))
-      histograms().foreach(h => {
-        recordHistogram(h)
+    var leftMetricsNum = metricsCapacity
+    val sb = new mutable.StringBuilder
+    leftMetricsNum = fillInnerMetricsSnapshot(getAndClearTimerMetrics(), 
leftMetricsNum, sb)
+    leftMetricsNum = fillInnerMetricsSnapshot(timers(), leftMetricsNum, sb)
+    leftMetricsNum = fillInnerMetricsSnapshot(histograms(), leftMetricsNum, sb)
+    leftMetricsNum = fillInnerMetricsSnapshot(meters(), leftMetricsNum, sb)
+    leftMetricsNum = fillInnerMetricsSnapshot(gauges(), leftMetricsNum, sb)
+    leftMetricsNum = fillInnerMetricsSnapshot(counters(), leftMetricsNum, sb)
+    if (leftMetricsNum <= 0) {
+      logWarning(
+        s"The number of metrics exceed the output metrics strings capacity! 
All metrics Num: $getAllMetricsNum")
+    }
+    sb.toString()
+  }
+
+  private def fillInnerMetricsSnapshot(
+      metricList: List[AnyRef],
+      leftNum: Int,
+      sb: mutable.StringBuilder): Int = {
+    if (leftNum <= 0) {
+      return 0
+    }
+    val addList = metricList.take(leftNum)
+    addList.foreach {
+      case c: NamedCounter =>
+        sb.append(getCounterMetrics(c))
+      case g: NamedGauge[_] =>
+        sb.append(getGaugeMetrics(g))
+      case m: NamedMeter =>
+        sb.append(getMeterMetrics(m))
+      case h: NamedHistogram =>
+        sb.append(getHistogramMetrics(h))
         h.asInstanceOf[CelebornHistogram].reservoir
           .asInstanceOf[ResettableSlidingWindowReservoir].reset()
-      })
-      timers().foreach(t => {
-        recordTimer(t)
+      case t: NamedTimer =>
+        sb.append(getTimerMetrics(t))
         t.timer.asInstanceOf[CelebornTimer].reservoir
           .asInstanceOf[ResettableSlidingWindowReservoir].reset()
-      })
-      val sb = new mutable.StringBuilder
-      while (!innerMetrics.isEmpty) {
-        sb.append(innerMetrics.poll())
-      }
-      innerMetrics.clear()
-      sb.toString()
+      case s =>
+        sb.append(s.toString)
     }
+    leftNum - addList.size
   }
 
   override def destroy(): Unit = {
@@ -469,7 +505,7 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
     namedGauges.clear()
     namedMeters.clear()
     namedTimers.clear()
-    innerMetrics.clear()
+    timerMetrics.clear()
     metricRegistry.removeMatching(new MetricFilter {
       override def matches(s: String, metric: Metric): Boolean = true
     })

Reply via email to