This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new ed94d8476 [CELEBORN-1075] Refactor `MetricsSystem` and 
`AbstractSource` to use synchronized blocks
ed94d8476 is described below

commit ed94d8476645d837ebaae6cbf200d0f76bc0880a
Author: Fu Chen <[email protected]>
AuthorDate: Tue Oct 24 21:57:00 2023 +0800

    [CELEBORN-1075] Refactor `MetricsSystem` and `AbstractSource` to use 
synchronized blocks
    
    ### What changes were proposed in this pull request?
    
    As title
    
    ### Why are the changes needed?
    
    Recently, during my testing on the main branch, we encountered 
`java.util.ConcurrentModificationException` error. This PR addresses 
synchronization issues in the MetricsSystem and AbstractSource classes by 
introducing synchronized blocks to ensure thread safety.
    
    1. the `MetricsSystem#sources` collection has been changed from the 
`mutable.ArrayBuffer` to the `CopyOnWriteArrayList`, to prevent potential 
thread safety issues
    2. the `AbstractSource#namedGauges` collection has been changed from the 
`ArrayList` to the `ConcurrentLinkedQueue` to enhance thread safety when adding 
gauges. to fix:
    
    ```
    java.util.ConcurrentModificationException
            at 
java.util.ArrayList$Itr.checkForComodification(ArrayList.java:911)
            at java.util.ArrayList$Itr.next(ArrayList.java:861)
            at 
scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:46)
            at scala.collection.Iterator.foreach(Iterator.scala:943)
            at scala.collection.Iterator.foreach$(Iterator.scala:943)
            at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
            at scala.collection.IterableLike.foreach(IterableLike.scala:74)
            at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
            at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
            at 
scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
            at 
scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
            at 
scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:184)
            at 
scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:47)
            at scala.collection.TraversableLike.to(TraversableLike.scala:786)
            at scala.collection.TraversableLike.to$(TraversableLike.scala:783)
            at scala.collection.AbstractTraversable.to(Traversable.scala:108)
            at 
scala.collection.TraversableOnce.toList(TraversableOnce.scala:350)
            at 
scala.collection.TraversableOnce.toList$(TraversableOnce.scala:350)
            at 
scala.collection.AbstractTraversable.toList(Traversable.scala:108)
            at 
org.apache.celeborn.common.metrics.source.AbstractSource.gauges(AbstractSource.scala:146)
            at 
org.apache.celeborn.common.metrics.source.AbstractSource.getMetrics(AbstractSource.scala:401)
            at 
org.apache.celeborn.common.metrics.sink.PrometheusServlet.$anonfun$getMetricsSnapshot$1(PrometheusServlet.scala:42)
            at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
            at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
            at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
            at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
            at scala.collection.TraversableLike.map(TraversableLike.scala:286)
            at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
            at scala.collection.AbstractTraversable.map(Traversable.scala:108)
            at 
org.apache.celeborn.common.metrics.sink.PrometheusServlet.getMetricsSnapshot(PrometheusServlet.scala:42)
            at 
org.apache.celeborn.common.metrics.sink.PrometheusHttpRequestHandler.handleRequest(PrometheusServlet.scala:59)
            at 
org.apache.celeborn.server.common.http.HttpRequestHandler.channelRead0(HttpRequestHandler.scala:53)
            at 
org.apache.celeborn.server.common.http.HttpRequestHandler.channelRead0(HttpRequestHandler.scala:37)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, only bug fix
    
    ### How was this patch tested?
    
    Pass GA
    
    Closes #2023 from cfmcgrady/synchronized-metrics.
    
    Authored-by: Fu Chen <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
    (cherry picked from commit 447c243601def2fec2578047e0efc43733495c8b)
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../apache/celeborn/common/metrics/MetricsSystem.scala | 18 +++++++++---------
 .../common/metrics/sink/PrometheusServlet.scala        |  4 +---
 .../common/metrics/source/AbstractSource.scala         |  4 ++--
 3 files changed, 12 insertions(+), 14 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala 
b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
index fdc1fef0b..fbc6f0fca 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
@@ -18,9 +18,9 @@
 package org.apache.celeborn.common.metrics
 
 import java.util.Properties
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{CopyOnWriteArrayList, TimeUnit}
 
-import scala.collection.mutable
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.util.matching.Regex
 
@@ -38,8 +38,8 @@ class MetricsSystem(
     val servletPath: String) extends Logging {
   private[this] val metricsConfig = new MetricsConfig(conf)
 
-  private val sinks = new mutable.ArrayBuffer[Sink]
-  private val sources = new mutable.ArrayBuffer[Source]
+  private val sinks = new ArrayBuffer[Sink]
+  private val sources = new CopyOnWriteArrayList[Source]
   private val registry = new MetricRegistry()
 
   private var prometheusServlet: Option[PrometheusServlet] = None
@@ -81,10 +81,10 @@ class MetricsSystem(
   }
 
   def getSourcesByName(sourceName: String): Seq[Source] =
-    sources.filter(_.sourceName == sourceName).toSeq
+    sources.asScala.filter(_.sourceName == sourceName).toSeq
 
   def registerSource(source: Source) {
-    sources += source
+    sources.add(source)
     try {
       val regName = buildRegistryName(source)
       registry.register(regName, source.metricRegistry)
@@ -94,7 +94,7 @@ class MetricsSystem(
   }
 
   def removeSource(source: Source) {
-    sources -= source
+    sources.remove(source)
     val regName = buildRegistryName(source)
     registry.removeMatching(new MetricFilter {
       def matches(name: String, metric: Metric): Boolean = 
name.startsWith(regName)
@@ -130,9 +130,9 @@ class MetricsSystem(
               .getConstructor(
                 classOf[Properties],
                 classOf[MetricRegistry],
-                classOf[ArrayBuffer[Source]],
+                classOf[Seq[Source]],
                 classOf[String])
-              .newInstance(kv._2, registry, sources, servletPath)
+              .newInstance(kv._2, registry, sources.asScala, servletPath)
             prometheusServlet = Some(servlet.asInstanceOf[PrometheusServlet])
           } else {
             val sink = Utils.classForName(classPath)
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala
 
b/common/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala
index 6913457af..a77a8375e 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala
@@ -19,8 +19,6 @@ package org.apache.celeborn.common.metrics.sink
 
 import java.util.Properties
 
-import scala.collection.mutable
-
 import com.codahale.metrics.MetricRegistry
 import io.netty.channel.ChannelHandler.Sharable
 
@@ -31,7 +29,7 @@ import org.apache.celeborn.common.metrics.source.Source
 class PrometheusServlet(
     val property: Properties,
     val registry: MetricRegistry,
-    val sources: mutable.ArrayBuffer[Source],
+    val sources: Seq[Source],
     val servletPath: String) extends Sink with Logging {
 
   def getHandler(conf: CelebornConf): PrometheusHttpRequestHandler = {
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
index bfa01b428..66e6fd660 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
@@ -17,7 +17,7 @@
 
 package org.apache.celeborn.common.metrics.source
 
-import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
+import java.util.{Map => JMap, Queue => JQueue}
 import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, 
ScheduledExecutorService, TimeUnit}
 
 import scala.collection.JavaConverters._
@@ -67,7 +67,7 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
   val staticLabels: Map[String, String] = conf.metricsExtraLabels + roleLabel
   val staticLabelsString: String = MetricLabels.labelString(staticLabels)
 
-  protected val namedGauges: JList[NamedGauge[_]] = new 
JArrayList[NamedGauge[_]]()
+  protected val namedGauges: JQueue[NamedGauge[_]] = new 
ConcurrentLinkedQueue[NamedGauge[_]]()
 
   def addGauge[T](
       name: String,

Reply via email to