This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 475293663 [CELEBORN-1577][FOLLOWUP] Fix master resource consumption
metrics
475293663 is described below
commit 475293663c7519c4f4e4fee9a37d4fd1900003a1
Author: Sanskar Modi <[email protected]>
AuthorDate: Wed Feb 4 19:34:26 2026 +0800
[CELEBORN-1577][FOLLOWUP] Fix master resource consumption metrics
### What changes were proposed in this pull request?
Fix master resource consumption metrics.
https://github.com/apache/celeborn/pull/2819/ introduced a bug in master
resource consumption metrics where we passed a local variable as GaugeSupplier
leading to static values for user resource consumption.
### Why are the changes needed?
Currently the code is buggy and gives a static value
### Does this PR resolve a correctness bug?
No
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
GA in our cluster.
Closes #3591 from s0nskar/CELEBORN-1577.
Authored-by: Sanskar Modi <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../service/deploy/master/quota/QuotaManager.scala | 30 ++++++++++++++++------
1 file changed, 22 insertions(+), 8 deletions(-)
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManager.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManager.scala
index c6cb70d11..9b2f4d881 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManager.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManager.scala
@@ -50,6 +50,8 @@ class QuotaManager(
@volatile
var clusterQuotaStatus: QuotaStatus = new QuotaStatus()
val appQuotaStatus: JMap[String, QuotaStatus] =
JavaUtils.newConcurrentHashMap()
+ val userResourceConsumptionCache: JMap[UserIdentifier, ResourceConsumption] =
+ JavaUtils.newConcurrentHashMap()
private val quotaChecker =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-quota-checker")
quotaChecker.scheduleWithFixedDelay(
@@ -219,7 +221,8 @@ class QuotaManager(
// Step 2: Update user resource consumption metrics.
// For extract metrics
- registerUserResourceConsumptionMetrics(userIdentifier,
userResourceConsumption)
+ userResourceConsumptionCache.put(userIdentifier,
userResourceConsumption)
+ registerUserResourceConsumptionMetrics(userIdentifier)
// Step 3: Expire user level exceeded app except already
expired app
clusterResourceConsumption =
clusterResourceConsumption.add(userResourceConsumption)
@@ -368,26 +371,37 @@ class QuotaManager(
}
}
- private def registerUserResourceConsumptionMetrics(
- userIdentifier: UserIdentifier,
- resourceConsumption: ResourceConsumption): Unit = {
+ private def registerUserResourceConsumptionMetrics(userIdentifier:
UserIdentifier): Unit = {
if (resourceConsumptionMetricsEnabled) {
resourceConsumptionSource.addGauge(DISK_FILE_COUNT,
userIdentifier.toMap) { () =>
- resourceConsumption.diskFileCount
+ Option(userResourceConsumptionCache.get(userIdentifier))
+ .map(_.diskFileCount)
+ .getOrElse(0L)
}
resourceConsumptionSource.addGauge(DISK_BYTES_WRITTEN,
userIdentifier.toMap) { () =>
- resourceConsumption.diskBytesWritten
+ Option(userResourceConsumptionCache.get(userIdentifier))
+ .map(_.diskBytesWritten)
+ .getOrElse(0L)
}
resourceConsumptionSource.addGauge(HDFS_FILE_COUNT,
userIdentifier.toMap) { () =>
- resourceConsumption.hdfsFileCount
+ Option(userResourceConsumptionCache.get(userIdentifier))
+ .map(_.hdfsFileCount)
+ .getOrElse(0L)
}
resourceConsumptionSource.addGauge(HDFS_BYTES_WRITTEN,
userIdentifier.toMap) { () =>
- resourceConsumption.hdfsBytesWritten
+ Option(userResourceConsumptionCache.get(userIdentifier))
+ .map(_.hdfsBytesWritten)
+ .getOrElse(0L)
}
}
}
private def clearQuotaStatus(activeUsers: mutable.Set[UserIdentifier]): Unit
= {
+ userResourceConsumptionCache.keySet().removeIf(new
Predicate[UserIdentifier] {
+ override def test(userIdentifier: UserIdentifier): Boolean =
+ !activeUsers.contains(userIdentifier)
+ })
+
userQuotaStatus.keySet().removeIf(new Predicate[UserIdentifier] {
override def test(userIdentifier: UserIdentifier): Boolean =
!activeUsers.contains(userIdentifier)