This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new c893287be [CELEBORN-1756] Only gauge hdfs metrics if HDFS storage 
enabled to reduce metrics
c893287be is described below

commit c893287bea938bebb0fd4d9288562a1da4253865
Author: Wang, Fei <[email protected]>
AuthorDate: Mon Dec 2 14:11:56 2024 +0800

    [CELEBORN-1756] Only gauge hdfs metrics if HDFS storage enabled to reduce 
metrics
    
    ### What changes were proposed in this pull request?
    
    If `HDFS` is not defined in the `celeborn.storage.availableTypes`, do not 
gauge the HDFS metrics.
    
    ### Why are the changes needed?
    
    To reduce the metrics number, due there is metrics capacity limitation.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    GA.
    
    Closes #2965 from turboFei/user_metrics.
    
    Authored-by: Wang, Fei <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../celeborn/service/deploy/master/Master.scala    | 20 ++++++------
 .../celeborn/service/deploy/worker/Worker.scala    | 36 +++++++++++++---------
 .../deploy/worker/storage/WorkerSuite.scala        | 20 ++++++++++--
 3 files changed, 50 insertions(+), 26 deletions(-)

diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index e0328ac6d..d37b35407 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -1173,15 +1173,17 @@ private[celeborn] class Master(
       resourceConsumptionLabel) { () =>
       computeResourceConsumption(userIdentifier, 
applicationId).diskBytesWritten
     }
-    resourceConsumptionSource.addGauge(
-      ResourceConsumptionSource.HDFS_FILE_COUNT,
-      resourceConsumptionLabel) { () =>
-      computeResourceConsumption(userIdentifier, applicationId).hdfsFileCount
-    }
-    resourceConsumptionSource.addGauge(
-      ResourceConsumptionSource.HDFS_BYTES_WRITTEN,
-      resourceConsumptionLabel) { () =>
-      computeResourceConsumption(userIdentifier, 
applicationId).hdfsBytesWritten
+    if (hasHDFSStorage) {
+      resourceConsumptionSource.addGauge(
+        ResourceConsumptionSource.HDFS_FILE_COUNT,
+        resourceConsumptionLabel) { () =>
+        computeResourceConsumption(userIdentifier, applicationId).hdfsFileCount
+      }
+      resourceConsumptionSource.addGauge(
+        ResourceConsumptionSource.HDFS_BYTES_WRITTEN,
+        resourceConsumptionLabel) { () =>
+        computeResourceConsumption(userIdentifier, 
applicationId).hdfsBytesWritten
+      }
     }
   }
 
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 4edf64ba0..3439a2e86 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -89,6 +89,8 @@ private[celeborn] class Worker(
   private val authEnabled = conf.authEnabled
   private val secretRegistry = new 
WorkerSecretRegistryImpl(conf.workerApplicationRegistryCacheSize)
 
+  private val hasHDFSStorage = conf.hasHDFSStorage
+
   if (conf.logCelebornConfEnabled) {
     logInfo(getConf)
   }
@@ -709,15 +711,17 @@ private[celeborn] class Worker(
       resourceConsumptionLabel) { () =>
       computeResourceConsumption(userIdentifier, 
resourceConsumption).diskBytesWritten
     }
-    resourceConsumptionSource.addGauge(
-      ResourceConsumptionSource.HDFS_FILE_COUNT,
-      resourceConsumptionLabel) { () =>
-      computeResourceConsumption(userIdentifier, 
resourceConsumption).hdfsFileCount
-    }
-    resourceConsumptionSource.addGauge(
-      ResourceConsumptionSource.HDFS_BYTES_WRITTEN,
-      resourceConsumptionLabel) { () =>
-      computeResourceConsumption(userIdentifier, 
resourceConsumption).hdfsBytesWritten
+    if (hasHDFSStorage) {
+      resourceConsumptionSource.addGauge(
+        ResourceConsumptionSource.HDFS_FILE_COUNT,
+        resourceConsumptionLabel) { () =>
+        computeResourceConsumption(userIdentifier, 
resourceConsumption).hdfsFileCount
+      }
+      resourceConsumptionSource.addGauge(
+        ResourceConsumptionSource.HDFS_BYTES_WRITTEN,
+        resourceConsumptionLabel) { () =>
+        computeResourceConsumption(userIdentifier, 
resourceConsumption).hdfsBytesWritten
+      }
     }
   }
 
@@ -785,12 +789,14 @@ private[celeborn] class Worker(
     resourceConsumptionSource.removeGauge(
       ResourceConsumptionSource.DISK_BYTES_WRITTEN,
       resourceConsumptionLabel)
-    resourceConsumptionSource.removeGauge(
-      ResourceConsumptionSource.HDFS_FILE_COUNT,
-      resourceConsumptionLabel)
-    resourceConsumptionSource.removeGauge(
-      ResourceConsumptionSource.HDFS_BYTES_WRITTEN,
-      resourceConsumptionLabel)
+    if (hasHDFSStorage) {
+      resourceConsumptionSource.removeGauge(
+        ResourceConsumptionSource.HDFS_FILE_COUNT,
+        resourceConsumptionLabel)
+      resourceConsumptionSource.removeGauge(
+        ResourceConsumptionSource.HDFS_BYTES_WRITTEN,
+        resourceConsumptionLabel)
+    }
   }
 
   private def removeAppActiveConnection(applicationIds: JHashSet[String]): 
Unit = {
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
index 43d413b14..6f0521bf5 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/WorkerSuite.scala
@@ -134,15 +134,31 @@ class WorkerSuite extends AnyFunSuite with 
BeforeAndAfterEach {
       0,
       0,
       Map("app1" -> ResourceConsumption(1024, 1, 0, 0)).asJava)).asJava)
-    assert(worker.resourceConsumptionSource.gauges().size == 4)
+    assert(worker.resourceConsumptionSource.gauges().size == 2)
     worker.handleTopResourceConsumption(Map(userIdentifier -> 
ResourceConsumption(
       1024,
       1,
       0,
       0,
       Map("app2" -> ResourceConsumption(1024, 1, 0, 0)).asJava)).asJava)
-    assert(worker.resourceConsumptionSource.gauges().size == 4)
+    assert(worker.resourceConsumptionSource.gauges().size == 2)
     worker.handleTopResourceConsumption(Map.empty[UserIdentifier, 
ResourceConsumption].asJava)
     assert(worker.resourceConsumptionSource.gauges().size == 0)
   }
+
+  test("only gauge hdfs metrics if HDFS storage enabled") {
+    conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, "/tmp")
+    conf.set(CelebornConf.ACTIVE_STORAGE_TYPES.key, "HDD,HDFS")
+    conf.set(CelebornConf.HDFS_DIR.key, "hdfs://localhost:9000/test")
+
+    worker = new Worker(conf, workerArgs)
+    val userIdentifier = new UserIdentifier("default", "celeborn")
+    worker.handleTopResourceConsumption(Map(userIdentifier -> 
ResourceConsumption(
+      1024,
+      1,
+      0,
+      0,
+      Map("app1" -> ResourceConsumption(1024, 1, 0, 0)).asJava)).asJava)
+    assert(worker.resourceConsumptionSource.gauges().size == 4)
+  }
 }

Reply via email to