This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new deee41c99 [CELEBORN-1547] Worker#listTopDiskUseApps should return
celeborn.metrics.app.topDiskUsage.count applications
deee41c99 is described below
commit deee41c992edf1570b3f97e20d92013617e2e332
Author: SteNicholas <[email protected]>
AuthorDate: Wed Aug 7 19:40:33 2024 +0800
[CELEBORN-1547] Worker#listTopDiskUseApps should return
celeborn.metrics.app.topDiskUsage.count applications
### What changes were proposed in this pull request?
`Worker#listTopDiskUseApps` should return
`celeborn.metrics.app.topDiskUsage.count` applications.
### Why are the changes needed?
`Worker#listTopDiskUseApps` returns 2x
`celeborn.metrics.app.topDiskUsage.count` application, which is not same as
`celeborn.metrics.app.topDiskUsage.count` configuration at present. Meanwhile,
`Worker#listTopDiskUseApps` reuses `StorageManager#topAppDiskUsage` method to
get the top application list used for estimated application disk usage.
Therefore, `Worker#listTopDiskUseApps` should return
celeborn.metrics.app.topDiskUsage.count applications.
```
$ curl http://celeborn-worker:9096/listTopDiskUsedApps|grep used|wc -l
100
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
```
$ curl http://celeborn-worker:9096/listTopDiskUsedApps|grep used|wc -l
50
```
Closes #2668 from SteNicholas/CELEBORN-1547.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../scala/org/apache/celeborn/service/deploy/worker/Worker.scala | 4 ++--
.../service/deploy/worker/http/api/v1/ApplicationResource.scala | 2 +-
.../celeborn/service/deploy/worker/storage/StorageManager.scala | 7 +++++--
3 files changed, 8 insertions(+), 5 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index d8205aa6f..3bbcb6a62 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -453,7 +453,7 @@ private[celeborn] class Worker(
val estimatedAppDiskUsage = new JHashMap[String, JLong]()
activeShuffleKeys.addAll(partitionLocationInfo.shuffleKeySet)
activeShuffleKeys.addAll(storageManager.shuffleKeySet())
- storageManager.topAppDiskUsage.asScala.foreach { case (shuffleId, usage) =>
+ storageManager.topAppDiskUsage(true).asScala.foreach { case (shuffleId,
usage) =>
estimatedAppDiskUsage.put(shuffleId, usage)
}
storageManager.updateDiskInfos()
@@ -800,7 +800,7 @@ private[celeborn] class Worker(
override def listTopDiskUseApps: String = {
val sb = new StringBuilder
sb.append("================== Top Disk Usage Applications
=======================\n")
- storageManager.topAppDiskUsage.asScala.foreach { case (appId, usage) =>
+ storageManager.topAppDiskUsage().asScala.foreach { case (appId, usage) =>
sb.append(s"Application $appId used ${Utils.bytesToString(usage)}\n")
}
sb.toString()
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApplicationResource.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApplicationResource.scala
index 5bb15c6ed..2f28506ca 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApplicationResource.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApplicationResource.scala
@@ -63,7 +63,7 @@ class ApplicationResource extends ApiRequestContext {
def topDiskUsedApplications(): AppDiskUsagesResponse = {
new AppDiskUsagesResponse()
.appDiskUsages(
- storageManager.topAppDiskUsage.asScala.map { case (appId, diskUsage) =>
+ storageManager.topAppDiskUsage().asScala.map { case (appId, diskUsage)
=>
new AppDiskUsageData()
.appId(appId)
.estimatedUsage(diskUsage)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 8a56aa630..e278041f1 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -73,6 +73,8 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
val storageExpireDirTimeout = conf.workerStorageExpireDirTimeout
val storagePolicy = new StoragePolicy(conf, this, workerSource)
+ val topDiskUsageCount = conf.metricsAppTopDiskUsageCount
+
// (deviceName -> deviceInfo) and (mount point -> diskInfo)
val (deviceInfos, diskInfos) = {
val workingDirInfos =
@@ -525,7 +527,8 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
hashSet
}
- def topAppDiskUsage: util.Map[String, Long] = {
+ def topAppDiskUsage(reportToMaster: Boolean = false): util.Map[String, Long]
= {
+ val topCount = if (reportToMaster) topDiskUsageCount * 2 else
topDiskUsageCount
diskFileInfos.asScala.map { keyedWriters =>
{
keyedWriters._1 ->
keyedWriters._2.values().asScala.map(_.getFileLength).sum
@@ -534,7 +537,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
Utils.splitShuffleKey(shuffleKey)._1 -> usage
}.groupBy(_._1).map { case (key, values) =>
key -> values.map(_._2).sum
- }.toSeq.sortBy(_._2).reverse.take(conf.metricsAppTopDiskUsageCount *
2).toMap.asJava
+ }.toSeq.sortBy(_._2).reverse.take(topCount).toMap.asJava
}
def cleanFile(shuffleKey: String, fileName: String): Unit = {