This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 1e30f159b [CELEBORN-1577][FOLLOWUP] Add UpdateResourceConsumptionTime
timer and prevent NPE if metrics not found
1e30f159b is described below
commit 1e30f159b9e97277d13f80c0a4edea377bc5c022
Author: Wang, Fei <[email protected]>
AuthorDate: Tue Apr 1 19:24:23 2025 +0800
[CELEBORN-1577][FOLLOWUP] Add UpdateResourceConsumptionTime timer and
prevent NPE if metrics not found
### What changes were proposed in this pull request?
Follow up for https://github.com/apache/celeborn/pull/2819
1. add timer for UpdateResourceConsumptionTime
2. prevent NPE if metrics not found
### Why are the changes needed?
The timer not added and cause NPE.
```
25/03/31 13:18:48,219 WARN [master-quota-checker] MasterSource: Metric
UpdateResourceConsumptionTime{instance="zeus-slc-cm-1.zeus-slc-cm-cm.hm-dev.svc.140.tess.io:8080",role="master"}
not found!
25/03/31 13:18:48,220 WARN [master-quota-checker] MasterSource: Exception
encountered during stop timer of metric
UpdateResourceConsumptionTime{instance="zeus-slc-cm-1.zeus-slc-cm-cm.hm-dev.svc.140.tess.io:8080",role="master"}
scala.MatchError: null
at
org.apache.celeborn.common.metrics.source.AbstractSource.doStopTimer(AbstractSource.scala:316)
at
org.apache.celeborn.common.metrics.source.AbstractSource.sample(AbstractSource.scala:279)
at
org.apache.celeborn.service.deploy.master.quota.QuotaManager.updateResourceConsumption(QuotaManager.scala:201)
at
org.apache.celeborn.service.deploy.master.quota.QuotaManager$$anon$1.run(QuotaManager.scala:59)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at
java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UT.
Closes #3190 from turboFei/fix_npe.
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../common/metrics/source/AbstractSource.scala | 23 +++++++++++++---------
.../service/deploy/master/MasterSource.scala | 1 +
2 files changed, 15 insertions(+), 9 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
index 684aba222..d9aa630d1 100644
---
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
@@ -313,15 +313,20 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
protected def doStopTimer(metricsName: String, key: String, labels:
Map[String, String]): Unit = {
val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName,
labels)
try {
- val (namedTimer, map) = namedTimers.get(metricNameWithLabel)
- val startTime = Option(map.remove(key))
- startTime match {
- case Some(t) =>
- namedTimer.timer.update(System.nanoTime() - t, TimeUnit.NANOSECONDS)
- if (namedTimer.timer.getCount % metricsSlidingWindowSize == 0) {
- addTimerMetrics(namedTimer)
- }
- case None =>
+ val pair = namedTimers.get(metricNameWithLabel)
+ if (pair != null) {
+ val (namedTimer, map) = pair
+ val startTime = Option(map.remove(key))
+ startTime match {
+ case Some(t) =>
+ namedTimer.timer.update(System.nanoTime() - t,
TimeUnit.NANOSECONDS)
+ if (namedTimer.timer.getCount % metricsSlidingWindowSize == 0) {
+ addTimerMetrics(namedTimer)
+ }
+ case None =>
+ }
+ } else {
+ logWarning(s"Metric $metricNameWithLabel not found!")
}
} catch {
case e: Exception =>
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
index c27ff0949..dcc764ebb 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
@@ -26,6 +26,7 @@ class MasterSource(conf: CelebornConf) extends
AbstractSource(conf, Role.MASTER)
import MasterSource._
// add timers
addTimer(OFFER_SLOTS_TIME)
+ addTimer(UPDATE_RESOURCE_CONSUMPTION_TIME)
// start cleaner
startCleaner()
}