This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 1e30f159b [CELEBORN-1577][FOLLOWUP] Add UpdateResourceConsumptionTime 
timer and prevent NPE if metrics not found
1e30f159b is described below

commit 1e30f159b9e97277d13f80c0a4edea377bc5c022
Author: Wang, Fei <[email protected]>
AuthorDate: Tue Apr 1 19:24:23 2025 +0800

    [CELEBORN-1577][FOLLOWUP] Add UpdateResourceConsumptionTime timer and 
prevent NPE if metrics not found
    
    ### What changes were proposed in this pull request?
    Follow up for https://github.com/apache/celeborn/pull/2819
    1. add timer for UpdateResourceConsumptionTime
    2. prevent NPE if metrics not found
    
    ### Why are the changes needed?
    
    The timer not added and cause NPE.
    ```
    25/03/31 13:18:48,219 WARN [master-quota-checker] MasterSource: Metric 
UpdateResourceConsumptionTime{instance="zeus-slc-cm-1.zeus-slc-cm-cm.hm-dev.svc.140.tess.io:8080",role="master"}
 not found!
    25/03/31 13:18:48,220 WARN [master-quota-checker] MasterSource: Exception 
encountered during stop timer of metric 
UpdateResourceConsumptionTime{instance="zeus-slc-cm-1.zeus-slc-cm-cm.hm-dev.svc.140.tess.io:8080",role="master"}
    scala.MatchError: null
            at 
org.apache.celeborn.common.metrics.source.AbstractSource.doStopTimer(AbstractSource.scala:316)
            at 
org.apache.celeborn.common.metrics.source.AbstractSource.sample(AbstractSource.scala:279)
            at 
org.apache.celeborn.service.deploy.master.quota.QuotaManager.updateResourceConsumption(QuotaManager.scala:201)
            at 
org.apache.celeborn.service.deploy.master.quota.QuotaManager$$anon$1.run(QuotaManager.scala:59)
            at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
            at 
java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
            at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
            at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
            at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
            at java.base/java.lang.Thread.run(Thread.java:833)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    
    Existing UT.
    
    Closes #3190 from turboFei/fix_npe.
    
    Authored-by: Wang, Fei <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 .../common/metrics/source/AbstractSource.scala     | 23 +++++++++++++---------
 .../service/deploy/master/MasterSource.scala       |  1 +
 2 files changed, 15 insertions(+), 9 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
index 684aba222..d9aa630d1 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
@@ -313,15 +313,20 @@ abstract class AbstractSource(conf: CelebornConf, role: 
String)
   protected def doStopTimer(metricsName: String, key: String, labels: 
Map[String, String]): Unit = {
     val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName, 
labels)
     try {
-      val (namedTimer, map) = namedTimers.get(metricNameWithLabel)
-      val startTime = Option(map.remove(key))
-      startTime match {
-        case Some(t) =>
-          namedTimer.timer.update(System.nanoTime() - t, TimeUnit.NANOSECONDS)
-          if (namedTimer.timer.getCount % metricsSlidingWindowSize == 0) {
-            addTimerMetrics(namedTimer)
-          }
-        case None =>
+      val pair = namedTimers.get(metricNameWithLabel)
+      if (pair != null) {
+        val (namedTimer, map) = pair
+        val startTime = Option(map.remove(key))
+        startTime match {
+          case Some(t) =>
+            namedTimer.timer.update(System.nanoTime() - t, 
TimeUnit.NANOSECONDS)
+            if (namedTimer.timer.getCount % metricsSlidingWindowSize == 0) {
+              addTimerMetrics(namedTimer)
+            }
+          case None =>
+        }
+      } else {
+        logWarning(s"Metric $metricNameWithLabel not found!")
       }
     } catch {
       case e: Exception =>
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
index c27ff0949..dcc764ebb 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
@@ -26,6 +26,7 @@ class MasterSource(conf: CelebornConf) extends 
AbstractSource(conf, Role.MASTER)
   import MasterSource._
   // add timers
   addTimer(OFFER_SLOTS_TIME)
+  addTimer(UPDATE_RESOURCE_CONSUMPTION_TIME)
   // start cleaner
   startCleaner()
 }

Reply via email to