This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new c893287be [CELEBORN-1756] Only gauge hdfs metrics if HDFS storage
enabled to reduce metrics
c893287be is described below
commit c893287bea938bebb0fd4d9288562a1da4253865
Author: Wang, Fei <[email protected]>
AuthorDate: Mon Dec 2 14:11:56 2024 +0800
[CELEBORN-1756] Only gauge hdfs metrics if HDFS storage enabled to reduce
metrics
### What changes were proposed in this pull request?
If `HDFS` is not defined in the `celeborn.storage.availableTypes`, do not
gauge the HDFS metrics.
### Why are the changes needed?
To reduce the metrics number, due there is metrics capacity limitation.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GA.
Closes #2965 from turboFei/user_metrics.
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../celeborn/service/deploy/master/Master.scala | 20 ++++++------
.../celeborn/service/deploy/worker/Worker.scala | 36 +++++++++++++---------
.../deploy/worker/storage/WorkerSuite.scala | 20 ++++++++++--
3 files changed, 50 insertions(+), 26 deletions(-)
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index e0328ac6d..d37b35407 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -1173,15 +1173,17 @@ private[celeborn] class Master(
resourceConsumptionLabel) { () =>
computeResourceConsumption(userIdentifier,
applicationId).diskBytesWritten
}
- resourceConsumptionSource.addGauge(
- ResourceConsumptionSource.HDFS_FILE_COUNT,
- resourceConsumptionLabel) { () =>
- computeResourceConsumption(userIdentifier, applicationId).hdfsFileCount
- }
- resourceConsumptionSource.addGauge(
- ResourceConsumptionSource.HDFS_BYTES_WRITTEN,
- resourceConsumptionLabel) { () =>
- computeResourceConsumption(userIdentifier,
applicationId).hdfsBytesWritten
+ if (hasHDFSStorage) {
+ resourceConsumptionSource.addGauge(
+ ResourceConsumptionSource.HDFS_FILE_COUNT,
+ resourceConsumptionLabel) { () =>
+ computeResourceConsumption(userIdentifier, applicationId).hdfsFileCount
+ }
+ resourceConsumptionSource.addGauge(
+ ResourceConsumptionSource.HDFS_BYTES_WRITTEN,
+ resourceConsumptionLabel) { () =>
+ computeResourceConsumption(userIdentifier,
applicationId).hdfsBytesWritten
+ }
}
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 4edf64ba0..3439a2e86 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -89,6 +89,8 @@ private[celeborn] class Worker(
private val authEnabled = conf.authEnabled
private val secretRegistry = new
WorkerSecretRegistryImpl(conf.workerApplicationRegistryCacheSize)
+ private val hasHDFSStorage = conf.hasHDFSStorage
+
if (conf.logCelebornConfEnabled) {
logInfo(getConf)
}
@@ -709,15 +711,17 @@ private[celeborn] class Worker(
resourceConsumptionLabel) { () =>
computeResourceConsumption(userIdentifier,
resourceConsumption).diskBytesWritten
}
- resourceConsumptionSource.addGauge(
- ResourceConsumptionSource.HDFS_FILE_COUNT,
- resourceConsumptionLabel) { () =>
- computeResourceConsumption(userIdentifier,
resourceConsumption).hdfsFileCount
- }
- resourceConsumptionSource.addGauge(
- ResourceConsumptionSource.HDFS_BYTES_WRITTEN,
- resourceConsumptionLabel) { () =>
- computeResourceConsumption(userIdentifier,
resourceConsumption).hdfsBytesWritten
+ if (hasHDFSStorage) {
+ resourceConsumptionSource.addGauge(
+ ResourceConsumptionSource.HDFS_FILE_COUNT,
+ resourceConsumptionLabel) { () =>
+ computeResourceConsumption(userIdentifier,
resourceConsumption).hdfsFileCount
+ }
+ resourceConsumptionSource.addGauge(
+ ResourceConsumptionSource.HDFS_BYTES_WRITTEN,
+ resourceConsumptionLabel) { () =>
+ computeResourceConsumption(userIdentifier,
resourceConsumption).hdfsBytesWritten
+ }
}
}
@@ -785,12 +789,14 @@ private[celeborn] class Worker(
resourceConsumptionSource.removeGauge(
ResourceConsumptionSource.DISK_BYTES_WRITTEN,
resourceConsumptionLabel)
- resourceConsumptionSource.removeGauge(
- ResourceConsumptionSource.HDFS_FILE_COUNT,
- resourceConsumptionLabel)
- resourceConsumptionSource.removeGauge(
- ResourceConsumptionSource.HDFS_BYTES_WRITTEN,
- resourceConsumptionLabel)
+ if (hasHDFSStorage) {
+ resourceConsumptionSource.removeGauge(
+ ResourceConsumptionSource.HDFS_FILE_COUNT,
+ resourceConsumptionLabel)
+ resourceConsumptionSource.removeGauge(
+ ResourceConsumptionSource.HDFS_BYTES_WRITTEN,
+ resourceConsumptionLabel)
+ }
}
private def removeAppActiveConnection(applicationIds: JHashSet[String]):
Unit = {
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
index 43d413b14..6f0521bf5 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
@@ -134,15 +134,31 @@ class WorkerSuite extends AnyFunSuite with
BeforeAndAfterEach {
0,
0,
Map("app1" -> ResourceConsumption(1024, 1, 0, 0)).asJava)).asJava)
- assert(worker.resourceConsumptionSource.gauges().size == 4)
+ assert(worker.resourceConsumptionSource.gauges().size == 2)
worker.handleTopResourceConsumption(Map(userIdentifier ->
ResourceConsumption(
1024,
1,
0,
0,
Map("app2" -> ResourceConsumption(1024, 1, 0, 0)).asJava)).asJava)
- assert(worker.resourceConsumptionSource.gauges().size == 4)
+ assert(worker.resourceConsumptionSource.gauges().size == 2)
worker.handleTopResourceConsumption(Map.empty[UserIdentifier,
ResourceConsumption].asJava)
assert(worker.resourceConsumptionSource.gauges().size == 0)
}
+
+ test("only gauge hdfs metrics if HDFS storage enabled") {
+ conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, "/tmp")
+ conf.set(CelebornConf.ACTIVE_STORAGE_TYPES.key, "HDD,HDFS")
+ conf.set(CelebornConf.HDFS_DIR.key, "hdfs://localhost:9000/test")
+
+ worker = new Worker(conf, workerArgs)
+ val userIdentifier = new UserIdentifier("default", "celeborn")
+ worker.handleTopResourceConsumption(Map(userIdentifier ->
ResourceConsumption(
+ 1024,
+ 1,
+ 0,
+ 0,
+ Map("app1" -> ResourceConsumption(1024, 1, 0, 0)).asJava)).asJava)
+ assert(worker.resourceConsumptionSource.gauges().size == 4)
+ }
}