This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new ed94d8476 [CELEBORN-1075] Refactor `MetricsSystem` and
`AbstractSource` to use synchronized blocks
ed94d8476 is described below
commit ed94d8476645d837ebaae6cbf200d0f76bc0880a
Author: Fu Chen <[email protected]>
AuthorDate: Tue Oct 24 21:57:00 2023 +0800
[CELEBORN-1075] Refactor `MetricsSystem` and `AbstractSource` to use
synchronized blocks
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
Recently, during my testing on the main branch, we encountered
`java.util.ConcurrentModificationException` error. This PR addresses
synchronization issues in the MetricsSystem and AbstractSource classes by
introducing synchronized blocks to ensure thread safety.
1. the `MetricsSystem#sources` collection has been changed from the
`mutable.ArrayBuffer` to the `CopyOnWriteArrayList`, to prevent potential
thread safety issues
2. the `AbstractSource#namedGauges` collection has been changed from the
`ArrayList` to the `ConcurrentLinkedQueue` to enhance thread safety when adding
gauges. to fix:
```
java.util.ConcurrentModificationException
at
java.util.ArrayList$Itr.checkForComodification(ArrayList.java:911)
at java.util.ArrayList$Itr.next(ArrayList.java:861)
at
scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:46)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at
scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at
scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at
scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:184)
at
scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:47)
at scala.collection.TraversableLike.to(TraversableLike.scala:786)
at scala.collection.TraversableLike.to$(TraversableLike.scala:783)
at scala.collection.AbstractTraversable.to(Traversable.scala:108)
at
scala.collection.TraversableOnce.toList(TraversableOnce.scala:350)
at
scala.collection.TraversableOnce.toList$(TraversableOnce.scala:350)
at
scala.collection.AbstractTraversable.toList(Traversable.scala:108)
at
org.apache.celeborn.common.metrics.source.AbstractSource.gauges(AbstractSource.scala:146)
at
org.apache.celeborn.common.metrics.source.AbstractSource.getMetrics(AbstractSource.scala:401)
at
org.apache.celeborn.common.metrics.sink.PrometheusServlet.$anonfun$getMetricsSnapshot$1(PrometheusServlet.scala:42)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at
org.apache.celeborn.common.metrics.sink.PrometheusServlet.getMetricsSnapshot(PrometheusServlet.scala:42)
at
org.apache.celeborn.common.metrics.sink.PrometheusHttpRequestHandler.handleRequest(PrometheusServlet.scala:59)
at
org.apache.celeborn.server.common.http.HttpRequestHandler.channelRead0(HttpRequestHandler.scala:53)
at
org.apache.celeborn.server.common.http.HttpRequestHandler.channelRead0(HttpRequestHandler.scala:37)
```
### Does this PR introduce _any_ user-facing change?
No, only bug fix
### How was this patch tested?
Pass GA
Closes #2023 from cfmcgrady/synchronized-metrics.
Authored-by: Fu Chen <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
(cherry picked from commit 447c243601def2fec2578047e0efc43733495c8b)
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../apache/celeborn/common/metrics/MetricsSystem.scala | 18 +++++++++---------
.../common/metrics/sink/PrometheusServlet.scala | 4 +---
.../common/metrics/source/AbstractSource.scala | 4 ++--
3 files changed, 12 insertions(+), 14 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
index fdc1fef0b..fbc6f0fca 100644
---
a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala
@@ -18,9 +18,9 @@
package org.apache.celeborn.common.metrics
import java.util.Properties
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{CopyOnWriteArrayList, TimeUnit}
-import scala.collection.mutable
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.matching.Regex
@@ -38,8 +38,8 @@ class MetricsSystem(
val servletPath: String) extends Logging {
private[this] val metricsConfig = new MetricsConfig(conf)
- private val sinks = new mutable.ArrayBuffer[Sink]
- private val sources = new mutable.ArrayBuffer[Source]
+ private val sinks = new ArrayBuffer[Sink]
+ private val sources = new CopyOnWriteArrayList[Source]
private val registry = new MetricRegistry()
private var prometheusServlet: Option[PrometheusServlet] = None
@@ -81,10 +81,10 @@ class MetricsSystem(
}
def getSourcesByName(sourceName: String): Seq[Source] =
- sources.filter(_.sourceName == sourceName).toSeq
+ sources.asScala.filter(_.sourceName == sourceName).toSeq
def registerSource(source: Source) {
- sources += source
+ sources.add(source)
try {
val regName = buildRegistryName(source)
registry.register(regName, source.metricRegistry)
@@ -94,7 +94,7 @@ class MetricsSystem(
}
def removeSource(source: Source) {
- sources -= source
+ sources.remove(source)
val regName = buildRegistryName(source)
registry.removeMatching(new MetricFilter {
def matches(name: String, metric: Metric): Boolean =
name.startsWith(regName)
@@ -130,9 +130,9 @@ class MetricsSystem(
.getConstructor(
classOf[Properties],
classOf[MetricRegistry],
- classOf[ArrayBuffer[Source]],
+ classOf[Seq[Source]],
classOf[String])
- .newInstance(kv._2, registry, sources, servletPath)
+ .newInstance(kv._2, registry, sources.asScala, servletPath)
prometheusServlet = Some(servlet.asInstanceOf[PrometheusServlet])
} else {
val sink = Utils.classForName(classPath)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala
b/common/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala
index 6913457af..a77a8375e 100644
---
a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala
@@ -19,8 +19,6 @@ package org.apache.celeborn.common.metrics.sink
import java.util.Properties
-import scala.collection.mutable
-
import com.codahale.metrics.MetricRegistry
import io.netty.channel.ChannelHandler.Sharable
@@ -31,7 +29,7 @@ import org.apache.celeborn.common.metrics.source.Source
class PrometheusServlet(
val property: Properties,
val registry: MetricRegistry,
- val sources: mutable.ArrayBuffer[Source],
+ val sources: Seq[Source],
val servletPath: String) extends Sink with Logging {
def getHandler(conf: CelebornConf): PrometheusHttpRequestHandler = {
diff --git
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
index bfa01b428..66e6fd660 100644
---
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
@@ -17,7 +17,7 @@
package org.apache.celeborn.common.metrics.source
-import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
+import java.util.{Map => JMap, Queue => JQueue}
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue,
ScheduledExecutorService, TimeUnit}
import scala.collection.JavaConverters._
@@ -67,7 +67,7 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
val staticLabels: Map[String, String] = conf.metricsExtraLabels + roleLabel
val staticLabelsString: String = MetricLabels.labelString(staticLabels)
- protected val namedGauges: JList[NamedGauge[_]] = new
JArrayList[NamedGauge[_]]()
+ protected val namedGauges: JQueue[NamedGauge[_]] = new
ConcurrentLinkedQueue[NamedGauge[_]]()
def addGauge[T](
name: String,