This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new e642197b5 [CELEBORN-1749] Fix incorrect application diskBytesWritten 
metrics
e642197b5 is described below

commit e642197b5653757b5f7703e286d9623b07c71e22
Author: Wang, Fei <[email protected]>
AuthorDate: Wed Nov 27 07:24:47 2024 +0800

    [CELEBORN-1749] Fix incorrect application diskBytesWritten metrics
    
    ### What changes were proposed in this pull request?
    Fix the incorrect application diskBytesWritten metrics, also make some code 
refactor to easy understand the logic.
    
    ### Why are the changes needed?
    
    There is typo, before when updating the top app usage metrics, it removes 
the gauge from `workerSrouce`, then the gauge in the 
`resourceConsumptionSource` will not be refreshed, and the old metrics would be 
never cleaned.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    UT
    
    Before this PR:
    The application metrics is incorrect. The application resource consumption 
is 0 and will not update.
    <img width="1917" alt="image" 
src="https://github.com/user-attachments/assets/035b9590-1b31-4d58-af0f-ce2fd3e71d0e";>
    
    After this PR:
    The application `diskBytesWritten` equals the user `diskBytesWritten` and 
update with the app consumption change, it is expected.
    
    <img width="1918" alt="image" 
src="https://github.com/user-attachments/assets/11bf6037-f696-4ab1-9685-f89f4dd1c692";>
    
    Closes #2954 from turboFei/fix_metrics_wrong.
    
    Authored-by: Wang, Fei <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 .../celeborn/service/deploy/worker/Worker.scala    | 47 +++++++++++++---------
 .../deploy/worker/storage/WorkerSuite.scala        | 23 +++++++++++
 2 files changed, 51 insertions(+), 19 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 15f89555a..7b8d5731a 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -72,7 +72,7 @@ private[celeborn] class Worker(
   override val metricsSystem: MetricsSystem =
     MetricsSystem.createMetricsSystem(serviceName, conf)
   val workerSource = new WorkerSource(conf)
-  private val resourceConsumptionSource =
+  val resourceConsumptionSource =
     new ResourceConsumptionSource(conf, Role.WORKER)
   private val threadPoolSource = ThreadPoolSource(conf, Role.WORKER)
   metricsSystem.registerSource(workerSource)
@@ -674,26 +674,27 @@ private[celeborn] class Worker(
     userResourceConsumptions
   }
 
-  private def handleTopResourceConsumption(userResourceConsumptions: util.Map[
+  def handleTopResourceConsumption(userResourceConsumptions: util.Map[
     UserIdentifier,
     ResourceConsumption]): Unit = {
     // Remove application top resource consumption gauges to refresh top 
resource consumption metrics.
     
removeAppResourceConsumption(topApplicationUserIdentifiers.keySet().asScala)
     // Top resource consumption is determined by 
diskBytesWritten+hdfsBytesWritten.
-    userResourceConsumptions.asScala.filter(userResourceConsumption =>
-      
CollectionUtils.isNotEmpty(userResourceConsumption._2.subResourceConsumptions))
-      .flatMap(userResourceConsumption =>
-        
userResourceConsumption._2.subResourceConsumptions.asScala.map(subResourceConsumption
 =>
-          (subResourceConsumption._1, (userResourceConsumption._1, 
subResourceConsumption._2))))
-      .toSeq
-      .sortBy(resourceConsumption =>
-        resourceConsumption._2._2.diskBytesWritten + 
resourceConsumption._2._2.hdfsBytesWritten)
+    userResourceConsumptions.asScala.filter { case (_, resourceConsumption) =>
+      CollectionUtils.isNotEmpty(resourceConsumption.subResourceConsumptions)
+    }.flatMap { case (userIdentifier, resourceConsumption) =>
+      resourceConsumption.subResourceConsumptions.asScala.map { case (appId, 
appConsumption) =>
+        (appId, userIdentifier, appConsumption)
+      }
+    }.toSeq
+      .sortBy { case (_, _, appConsumption) =>
+        appConsumption.diskBytesWritten + appConsumption.hdfsBytesWritten
+      }
       .reverse
-      .take(topResourceConsumptionCount).foreach { topResourceConsumption =>
-        val applicationId = topResourceConsumption._1
-        val userIdentifier = topResourceConsumption._2._1
-        topApplicationUserIdentifiers.put(applicationId, userIdentifier)
-        gaugeResourceConsumption(userIdentifier, applicationId, 
topResourceConsumption._2._2)
+      .take(topResourceConsumptionCount).foreach {
+        case (appId, userIdentifier, appConsumption) =>
+          topApplicationUserIdentifiers.put(appId, userIdentifier)
+          gaugeResourceConsumption(userIdentifier, appId, appConsumption)
       }
   }
 
@@ -784,10 +785,18 @@ private[celeborn] class Worker(
   }
 
   private def removeAppResourceConsumption(resourceConsumptionLabel: 
Map[String, String]): Unit = {
-    workerSource.removeGauge(ResourceConsumptionSource.DISK_FILE_COUNT, 
resourceConsumptionLabel)
-    workerSource.removeGauge(ResourceConsumptionSource.DISK_BYTES_WRITTEN, 
resourceConsumptionLabel)
-    workerSource.removeGauge(ResourceConsumptionSource.HDFS_FILE_COUNT, 
resourceConsumptionLabel)
-    workerSource.removeGauge(ResourceConsumptionSource.HDFS_BYTES_WRITTEN, 
resourceConsumptionLabel)
+    resourceConsumptionSource.removeGauge(
+      ResourceConsumptionSource.DISK_FILE_COUNT,
+      resourceConsumptionLabel)
+    resourceConsumptionSource.removeGauge(
+      ResourceConsumptionSource.DISK_BYTES_WRITTEN,
+      resourceConsumptionLabel)
+    resourceConsumptionSource.removeGauge(
+      ResourceConsumptionSource.HDFS_FILE_COUNT,
+      resourceConsumptionLabel)
+    resourceConsumptionSource.removeGauge(
+      ResourceConsumptionSource.HDFS_BYTES_WRITTEN,
+      resourceConsumptionLabel)
   }
 
   private def removeAppActiveConnection(applicationIds: JHashSet[String]): 
Unit = {
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
index a8572c371..43d413b14 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
@@ -32,6 +32,7 @@ import org.scalatest.funsuite.AnyFunSuite
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.identity.UserIdentifier
 import org.apache.celeborn.common.protocol.{PartitionLocation, 
PartitionSplitMode, PartitionType}
+import org.apache.celeborn.common.quota.ResourceConsumption
 import org.apache.celeborn.common.util.{CelebornExitKind, JavaUtils, 
ThreadUtils}
 import org.apache.celeborn.service.deploy.worker.{Worker, WorkerArguments}
 
@@ -122,4 +123,26 @@ class WorkerSuite extends AnyFunSuite with 
BeforeAndAfterEach {
     }
     Assert.assertEquals(1, allWriters.size())
   }
+
+  test("handle top resource consumption") {
+    conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, "/tmp")
+    worker = new Worker(conf, workerArgs)
+    val userIdentifier = new UserIdentifier("default", "celeborn")
+    worker.handleTopResourceConsumption(Map(userIdentifier -> 
ResourceConsumption(
+      1024,
+      1,
+      0,
+      0,
+      Map("app1" -> ResourceConsumption(1024, 1, 0, 0)).asJava)).asJava)
+    assert(worker.resourceConsumptionSource.gauges().size == 4)
+    worker.handleTopResourceConsumption(Map(userIdentifier -> 
ResourceConsumption(
+      1024,
+      1,
+      0,
+      0,
+      Map("app2" -> ResourceConsumption(1024, 1, 0, 0)).asJava)).asJava)
+    assert(worker.resourceConsumptionSource.gauges().size == 4)
+    worker.handleTopResourceConsumption(Map.empty[UserIdentifier, 
ResourceConsumption].asJava)
+    assert(worker.resourceConsumptionSource.gauges().size == 0)
+  }
 }

Reply via email to