This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new e642197b5 [CELEBORN-1749] Fix incorrect application diskBytesWritten
metrics
e642197b5 is described below
commit e642197b5653757b5f7703e286d9623b07c71e22
Author: Wang, Fei <[email protected]>
AuthorDate: Wed Nov 27 07:24:47 2024 +0800
[CELEBORN-1749] Fix incorrect application diskBytesWritten metrics
### What changes were proposed in this pull request?
Fix the incorrect application diskBytesWritten metrics, also make some code
refactor to easy understand the logic.
### Why are the changes needed?
There is typo, before when updating the top app usage metrics, it removes
the gauge from `workerSrouce`, then the gauge in the
`resourceConsumptionSource` will not be refreshed, and the old metrics would be
never cleaned.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT
Before this PR:
The application metrics is incorrect. The application resource consumption
is 0 and will not update.
<img width="1917" alt="image"
src="https://github.com/user-attachments/assets/035b9590-1b31-4d58-af0f-ce2fd3e71d0e">
After this PR:
The application `diskBytesWritten` equals the user `diskBytesWritten` and
update with the app consumption change, it is expected.
<img width="1918" alt="image"
src="https://github.com/user-attachments/assets/11bf6037-f696-4ab1-9685-f89f4dd1c692">
Closes #2954 from turboFei/fix_metrics_wrong.
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../celeborn/service/deploy/worker/Worker.scala | 47 +++++++++++++---------
.../deploy/worker/storage/WorkerSuite.scala | 23 +++++++++++
2 files changed, 51 insertions(+), 19 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 15f89555a..7b8d5731a 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -72,7 +72,7 @@ private[celeborn] class Worker(
override val metricsSystem: MetricsSystem =
MetricsSystem.createMetricsSystem(serviceName, conf)
val workerSource = new WorkerSource(conf)
- private val resourceConsumptionSource =
+ val resourceConsumptionSource =
new ResourceConsumptionSource(conf, Role.WORKER)
private val threadPoolSource = ThreadPoolSource(conf, Role.WORKER)
metricsSystem.registerSource(workerSource)
@@ -674,26 +674,27 @@ private[celeborn] class Worker(
userResourceConsumptions
}
- private def handleTopResourceConsumption(userResourceConsumptions: util.Map[
+ def handleTopResourceConsumption(userResourceConsumptions: util.Map[
UserIdentifier,
ResourceConsumption]): Unit = {
// Remove application top resource consumption gauges to refresh top
resource consumption metrics.
removeAppResourceConsumption(topApplicationUserIdentifiers.keySet().asScala)
// Top resource consumption is determined by
diskBytesWritten+hdfsBytesWritten.
- userResourceConsumptions.asScala.filter(userResourceConsumption =>
-
CollectionUtils.isNotEmpty(userResourceConsumption._2.subResourceConsumptions))
- .flatMap(userResourceConsumption =>
-
userResourceConsumption._2.subResourceConsumptions.asScala.map(subResourceConsumption
=>
- (subResourceConsumption._1, (userResourceConsumption._1,
subResourceConsumption._2))))
- .toSeq
- .sortBy(resourceConsumption =>
- resourceConsumption._2._2.diskBytesWritten +
resourceConsumption._2._2.hdfsBytesWritten)
+ userResourceConsumptions.asScala.filter { case (_, resourceConsumption) =>
+ CollectionUtils.isNotEmpty(resourceConsumption.subResourceConsumptions)
+ }.flatMap { case (userIdentifier, resourceConsumption) =>
+ resourceConsumption.subResourceConsumptions.asScala.map { case (appId,
appConsumption) =>
+ (appId, userIdentifier, appConsumption)
+ }
+ }.toSeq
+ .sortBy { case (_, _, appConsumption) =>
+ appConsumption.diskBytesWritten + appConsumption.hdfsBytesWritten
+ }
.reverse
- .take(topResourceConsumptionCount).foreach { topResourceConsumption =>
- val applicationId = topResourceConsumption._1
- val userIdentifier = topResourceConsumption._2._1
- topApplicationUserIdentifiers.put(applicationId, userIdentifier)
- gaugeResourceConsumption(userIdentifier, applicationId,
topResourceConsumption._2._2)
+ .take(topResourceConsumptionCount).foreach {
+ case (appId, userIdentifier, appConsumption) =>
+ topApplicationUserIdentifiers.put(appId, userIdentifier)
+ gaugeResourceConsumption(userIdentifier, appId, appConsumption)
}
}
@@ -784,10 +785,18 @@ private[celeborn] class Worker(
}
private def removeAppResourceConsumption(resourceConsumptionLabel:
Map[String, String]): Unit = {
- workerSource.removeGauge(ResourceConsumptionSource.DISK_FILE_COUNT,
resourceConsumptionLabel)
- workerSource.removeGauge(ResourceConsumptionSource.DISK_BYTES_WRITTEN,
resourceConsumptionLabel)
- workerSource.removeGauge(ResourceConsumptionSource.HDFS_FILE_COUNT,
resourceConsumptionLabel)
- workerSource.removeGauge(ResourceConsumptionSource.HDFS_BYTES_WRITTEN,
resourceConsumptionLabel)
+ resourceConsumptionSource.removeGauge(
+ ResourceConsumptionSource.DISK_FILE_COUNT,
+ resourceConsumptionLabel)
+ resourceConsumptionSource.removeGauge(
+ ResourceConsumptionSource.DISK_BYTES_WRITTEN,
+ resourceConsumptionLabel)
+ resourceConsumptionSource.removeGauge(
+ ResourceConsumptionSource.HDFS_FILE_COUNT,
+ resourceConsumptionLabel)
+ resourceConsumptionSource.removeGauge(
+ ResourceConsumptionSource.HDFS_BYTES_WRITTEN,
+ resourceConsumptionLabel)
}
private def removeAppActiveConnection(applicationIds: JHashSet[String]):
Unit = {
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
index a8572c371..43d413b14 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
@@ -32,6 +32,7 @@ import org.scalatest.funsuite.AnyFunSuite
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.protocol.{PartitionLocation,
PartitionSplitMode, PartitionType}
+import org.apache.celeborn.common.quota.ResourceConsumption
import org.apache.celeborn.common.util.{CelebornExitKind, JavaUtils,
ThreadUtils}
import org.apache.celeborn.service.deploy.worker.{Worker, WorkerArguments}
@@ -122,4 +123,26 @@ class WorkerSuite extends AnyFunSuite with
BeforeAndAfterEach {
}
Assert.assertEquals(1, allWriters.size())
}
+
+ test("handle top resource consumption") {
+ conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, "/tmp")
+ worker = new Worker(conf, workerArgs)
+ val userIdentifier = new UserIdentifier("default", "celeborn")
+ worker.handleTopResourceConsumption(Map(userIdentifier ->
ResourceConsumption(
+ 1024,
+ 1,
+ 0,
+ 0,
+ Map("app1" -> ResourceConsumption(1024, 1, 0, 0)).asJava)).asJava)
+ assert(worker.resourceConsumptionSource.gauges().size == 4)
+ worker.handleTopResourceConsumption(Map(userIdentifier ->
ResourceConsumption(
+ 1024,
+ 1,
+ 0,
+ 0,
+ Map("app2" -> ResourceConsumption(1024, 1, 0, 0)).asJava)).asJava)
+ assert(worker.resourceConsumptionSource.gauges().size == 4)
+ worker.handleTopResourceConsumption(Map.empty[UserIdentifier,
ResourceConsumption].asJava)
+ assert(worker.resourceConsumptionSource.gauges().size == 0)
+ }
}