This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch b2045
in repository https://gitbox.apache.org/repos/asf/celeborn.git

commit bc5dd9b375b085de9405c877514268d562f70789
Author: mingji <[email protected]>
AuthorDate: Tue Jun 24 13:57:35 2025 +0800

    [CELEBORN-2045] Fix OOM caused by worker source if the worker's metrics is 
not captured for a long time
    
    Signed-off-by: mingji <[email protected]>
---
 .../celeborn/common/metrics/source/AbstractSource.scala       | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
index 705344381..5d85a841a 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
@@ -100,6 +100,9 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
 
   def addTimerMetrics(namedTimer: NamedTimer): Unit = {
     val timerMetricsString = getTimerMetrics(namedTimer)
+    if (timerMetrics.size() > metricsCapacity) {
+      timerMetrics.poll()
+    }
     timerMetrics.add(timerMetricsString)
   }
 
@@ -224,11 +227,11 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
 
   def getAndClearTimerMetrics(): List[String] = {
     timerMetrics.synchronized {
-      var timerMetricsSize = timerMetrics.size()
       val timerMetricsList = ArrayBuffer[String]()
-      while (timerMetricsSize > 0) {
-        timerMetricsList.append(timerMetrics.poll())
-        timerMetricsSize = timerMetricsSize - 1
+      var elem = timerMetrics.poll()
+      while (elem != null) {
+        timerMetricsList.append(elem)
+        elem = timerMetrics.poll()
       }
       timerMetricsList.toList
     }

Reply via email to