This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new deee41c99 [CELEBORN-1547] Worker#listTopDiskUseApps should return 
celeborn.metrics.app.topDiskUsage.count applications
deee41c99 is described below

commit deee41c992edf1570b3f97e20d92013617e2e332
Author: SteNicholas <[email protected]>
AuthorDate: Wed Aug 7 19:40:33 2024 +0800

    [CELEBORN-1547] Worker#listTopDiskUseApps should return 
celeborn.metrics.app.topDiskUsage.count applications
    
    ### What changes were proposed in this pull request?
    
    `Worker#listTopDiskUseApps` should return 
`celeborn.metrics.app.topDiskUsage.count` applications.
    
    ### Why are the changes needed?
    
    `Worker#listTopDiskUseApps` returns 2x 
`celeborn.metrics.app.topDiskUsage.count` application, which is not same as 
`celeborn.metrics.app.topDiskUsage.count` configuration at present. Meanwhile, 
`Worker#listTopDiskUseApps` reuses `StorageManager#topAppDiskUsage` method to 
get the top application list used for estimated application disk usage. 
Therefore, `Worker#listTopDiskUseApps` should return 
celeborn.metrics.app.topDiskUsage.count applications.
    
    ```
    $ curl http://celeborn-worker:9096/listTopDiskUsedApps|grep used|wc -l
    100
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    ```
    $ curl http://celeborn-worker:9096/listTopDiskUsedApps|grep used|wc -l
    50
    ```
    
    Closes #2668 from SteNicholas/CELEBORN-1547.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../scala/org/apache/celeborn/service/deploy/worker/Worker.scala   | 4 ++--
 .../service/deploy/worker/http/api/v1/ApplicationResource.scala    | 2 +-
 .../celeborn/service/deploy/worker/storage/StorageManager.scala    | 7 +++++--
 3 files changed, 8 insertions(+), 5 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index d8205aa6f..3bbcb6a62 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -453,7 +453,7 @@ private[celeborn] class Worker(
     val estimatedAppDiskUsage = new JHashMap[String, JLong]()
     activeShuffleKeys.addAll(partitionLocationInfo.shuffleKeySet)
     activeShuffleKeys.addAll(storageManager.shuffleKeySet())
-    storageManager.topAppDiskUsage.asScala.foreach { case (shuffleId, usage) =>
+    storageManager.topAppDiskUsage(true).asScala.foreach { case (shuffleId, 
usage) =>
       estimatedAppDiskUsage.put(shuffleId, usage)
     }
     storageManager.updateDiskInfos()
@@ -800,7 +800,7 @@ private[celeborn] class Worker(
   override def listTopDiskUseApps: String = {
     val sb = new StringBuilder
     sb.append("================== Top Disk Usage Applications 
=======================\n")
-    storageManager.topAppDiskUsage.asScala.foreach { case (appId, usage) =>
+    storageManager.topAppDiskUsage().asScala.foreach { case (appId, usage) =>
       sb.append(s"Application $appId used ${Utils.bytesToString(usage)}\n")
     }
     sb.toString()
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApplicationResource.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApplicationResource.scala
index 5bb15c6ed..2f28506ca 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApplicationResource.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApplicationResource.scala
@@ -63,7 +63,7 @@ class ApplicationResource extends ApiRequestContext {
   def topDiskUsedApplications(): AppDiskUsagesResponse = {
     new AppDiskUsagesResponse()
       .appDiskUsages(
-        storageManager.topAppDiskUsage.asScala.map { case (appId, diskUsage) =>
+        storageManager.topAppDiskUsage().asScala.map { case (appId, diskUsage) 
=>
           new AppDiskUsageData()
             .appId(appId)
             .estimatedUsage(diskUsage)
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 8a56aa630..e278041f1 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -73,6 +73,8 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
   val storageExpireDirTimeout = conf.workerStorageExpireDirTimeout
   val storagePolicy = new StoragePolicy(conf, this, workerSource)
 
+  val topDiskUsageCount = conf.metricsAppTopDiskUsageCount
+
   // (deviceName -> deviceInfo) and (mount point -> diskInfo)
   val (deviceInfos, diskInfos) = {
     val workingDirInfos =
@@ -525,7 +527,8 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
     hashSet
   }
 
-  def topAppDiskUsage: util.Map[String, Long] = {
+  def topAppDiskUsage(reportToMaster: Boolean = false): util.Map[String, Long] 
= {
+    val topCount = if (reportToMaster) topDiskUsageCount * 2 else 
topDiskUsageCount
     diskFileInfos.asScala.map { keyedWriters =>
       {
         keyedWriters._1 -> 
keyedWriters._2.values().asScala.map(_.getFileLength).sum
@@ -534,7 +537,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
       Utils.splitShuffleKey(shuffleKey)._1 -> usage
     }.groupBy(_._1).map { case (key, values) =>
       key -> values.map(_._2).sum
-    }.toSeq.sortBy(_._2).reverse.take(conf.metricsAppTopDiskUsageCount * 
2).toMap.asJava
+    }.toSeq.sortBy(_._2).reverse.take(topCount).toMap.asJava
   }
 
   def cleanFile(shuffleKey: String, fileName: String): Unit = {

Reply via email to