This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 475293663 [CELEBORN-1577][FOLLOWUP] Fix master resource consumption 
metrics
475293663 is described below

commit 475293663c7519c4f4e4fee9a37d4fd1900003a1
Author: Sanskar Modi <[email protected]>
AuthorDate: Wed Feb 4 19:34:26 2026 +0800

    [CELEBORN-1577][FOLLOWUP] Fix master resource consumption metrics
    
    ### What changes were proposed in this pull request?
    
    Fix master resource consumption metrics. 
https://github.com/apache/celeborn/pull/2819/ introduced a bug in master 
resource consumption metrics where we passed a local variable as GaugeSupplier 
leading to static values for user resource consumption.
    
    ### Why are the changes needed?
    
    Currently the code is buggy and gives a static value
    
    ### Does this PR resolve a correctness bug?
    
    No
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    GA in our cluster.
    
    Closes #3591 from s0nskar/CELEBORN-1577.
    
    Authored-by: Sanskar Modi <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 .../service/deploy/master/quota/QuotaManager.scala | 30 ++++++++++++++++------
 1 file changed, 22 insertions(+), 8 deletions(-)

diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManager.scala
 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManager.scala
index c6cb70d11..9b2f4d881 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManager.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/quota/QuotaManager.scala
@@ -50,6 +50,8 @@ class QuotaManager(
   @volatile
   var clusterQuotaStatus: QuotaStatus = new QuotaStatus()
   val appQuotaStatus: JMap[String, QuotaStatus] = 
JavaUtils.newConcurrentHashMap()
+  val userResourceConsumptionCache: JMap[UserIdentifier, ResourceConsumption] =
+    JavaUtils.newConcurrentHashMap()
   private val quotaChecker =
     ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-quota-checker")
   quotaChecker.scheduleWithFixedDelay(
@@ -219,7 +221,8 @@ class QuotaManager(
 
                 // Step 2: Update user resource consumption metrics.
                 // For extract metrics
-                registerUserResourceConsumptionMetrics(userIdentifier, 
userResourceConsumption)
+                userResourceConsumptionCache.put(userIdentifier, 
userResourceConsumption)
+                registerUserResourceConsumptionMetrics(userIdentifier)
 
                 // Step 3: Expire user level exceeded app except already 
expired app
                 clusterResourceConsumption = 
clusterResourceConsumption.add(userResourceConsumption)
@@ -368,26 +371,37 @@ class QuotaManager(
     }
   }
 
-  private def registerUserResourceConsumptionMetrics(
-      userIdentifier: UserIdentifier,
-      resourceConsumption: ResourceConsumption): Unit = {
+  private def registerUserResourceConsumptionMetrics(userIdentifier: 
UserIdentifier): Unit = {
     if (resourceConsumptionMetricsEnabled) {
       resourceConsumptionSource.addGauge(DISK_FILE_COUNT, 
userIdentifier.toMap) { () =>
-        resourceConsumption.diskFileCount
+        Option(userResourceConsumptionCache.get(userIdentifier))
+          .map(_.diskFileCount)
+          .getOrElse(0L)
       }
       resourceConsumptionSource.addGauge(DISK_BYTES_WRITTEN, 
userIdentifier.toMap) { () =>
-        resourceConsumption.diskBytesWritten
+        Option(userResourceConsumptionCache.get(userIdentifier))
+          .map(_.diskBytesWritten)
+          .getOrElse(0L)
       }
       resourceConsumptionSource.addGauge(HDFS_FILE_COUNT, 
userIdentifier.toMap) { () =>
-        resourceConsumption.hdfsFileCount
+        Option(userResourceConsumptionCache.get(userIdentifier))
+          .map(_.hdfsFileCount)
+          .getOrElse(0L)
       }
       resourceConsumptionSource.addGauge(HDFS_BYTES_WRITTEN, 
userIdentifier.toMap) { () =>
-        resourceConsumption.hdfsBytesWritten
+        Option(userResourceConsumptionCache.get(userIdentifier))
+          .map(_.hdfsBytesWritten)
+          .getOrElse(0L)
       }
     }
   }
 
   private def clearQuotaStatus(activeUsers: mutable.Set[UserIdentifier]): Unit 
= {
+    userResourceConsumptionCache.keySet().removeIf(new 
Predicate[UserIdentifier] {
+      override def test(userIdentifier: UserIdentifier): Boolean =
+        !activeUsers.contains(userIdentifier)
+    })
+
     userQuotaStatus.keySet().removeIf(new Predicate[UserIdentifier] {
       override def test(userIdentifier: UserIdentifier): Boolean =
         !activeUsers.contains(userIdentifier)

Reply via email to