This is an automated email from the ASF dual-hosted git repository. ethanfeng pushed a commit to branch b2045 in repository https://gitbox.apache.org/repos/asf/celeborn.git
commit bc5dd9b375b085de9405c877514268d562f70789 Author: mingji <[email protected]> AuthorDate: Tue Jun 24 13:57:35 2025 +0800 [CELEBORN-2045] Fix OOM caused by worker source if the worker's metrics is not captured for a long time Signed-off-by: mingji <[email protected]> --- .../celeborn/common/metrics/source/AbstractSource.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala index 705344381..5d85a841a 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala @@ -100,6 +100,9 @@ abstract class AbstractSource(conf: CelebornConf, role: String) def addTimerMetrics(namedTimer: NamedTimer): Unit = { val timerMetricsString = getTimerMetrics(namedTimer) + if (timerMetrics.size() > metricsCapacity) { + timerMetrics.poll() + } timerMetrics.add(timerMetricsString) } @@ -224,11 +227,11 @@ abstract class AbstractSource(conf: CelebornConf, role: String) def getAndClearTimerMetrics(): List[String] = { timerMetrics.synchronized { - var timerMetricsSize = timerMetrics.size() val timerMetricsList = ArrayBuffer[String]() - while (timerMetricsSize > 0) { - timerMetricsList.append(timerMetrics.poll()) - timerMetricsSize = timerMetricsSize - 1 + var elem = timerMetrics.poll() + while (elem != null) { + timerMetricsList.append(elem) + elem = timerMetrics.poll() } timerMetricsList.toList }
