This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new a2110568f [CELEBORN-1501][FOLLOWUP] Add bytes written threshold for 
top app consumption metrics
a2110568f is described below

commit a2110568f279c2714600b31e05708b7cb76e9936
Author: Wang, Fei <[email protected]>
AuthorDate: Mon Apr 28 09:59:17 2025 -0700

    [CELEBORN-1501][FOLLOWUP] Add bytes written threshold for top app 
consumption metrics
    
    ### What changes were proposed in this pull request?
    Add bytes written threshold for top app consumption metrics.
    
    ### Why are the changes needed?
    
    Used to limit and reduce the top app consumption metrics.
    
    ### Does this PR introduce _any_ user-facing change?
    
    New config.
    
    ### How was this patch tested?
    
    Existing GA.
    
    Closes #3232 from turboFei/top_resource_consump_threashold.
    
    Authored-by: Wang, Fei <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../org/apache/celeborn/common/CelebornConf.scala   | 12 ++++++++++++
 docs/configuration/metrics.md                       |  1 +
 .../celeborn/service/deploy/worker/Worker.scala     | 21 +++++++++++++++------
 .../service/deploy/worker/WorkerSuite.scala         |  8 ++++----
 4 files changed, 32 insertions(+), 10 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 1be7a15e6..59f380fc9 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -889,6 +889,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
     get(METRICS_EXTRA_LABELS).map(Utils.parseMetricLabels).toMap
   def metricsWorkerAppTopResourceConsumptionCount: Int =
     get(METRICS_WORKER_APP_TOP_RESOURCE_CONSUMPTION_COUNT)
+  def metricsWorkerAppTopResourceConsumptionBytesWrittenThreshold: Long =
+    get(METRICS_WORKER_APP_TOP_RESOURCE_CONSUMPTION_BYTES_WRITTEN_THRESHOLD)
   def metricsWorkerForceAppendPauseSpentTimeThreshold: Int =
     get(METRICS_WORKER_PAUSE_SPENT_TIME_FORCE_APPEND_THRESHOLD)
   def metricsJsonPrettyEnabled: Boolean = get(METRICS_JSON_PRETTY_ENABLED)
@@ -5548,6 +5550,16 @@ object CelebornConf extends Logging {
       .intConf
       .createWithDefault(50)
 
+  val METRICS_WORKER_APP_TOP_RESOURCE_CONSUMPTION_BYTES_WRITTEN_THRESHOLD: 
ConfigEntry[Long] =
+    
buildConf("celeborn.metrics.worker.app.topResourceConsumption.bytesWrittenThreshold")
+      .categories("metrics")
+      .doc("Threshold of bytes written for top resource consumption 
applications list of worker. " +
+        "The application which has bytes written less than this threshold will 
not be included " +
+        "in the top resource consumption list, including diskBytesWritten and 
hdfsBytesWritten.")
+      .version("0.6.0")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefault(0)
+
   val METRICS_WORKER_PAUSE_SPENT_TIME_FORCE_APPEND_THRESHOLD: ConfigEntry[Int] 
=
     buildConf("celeborn.metrics.worker.pauseSpentTime.forceAppend.threshold")
       .categories("metrics")
diff --git a/docs/configuration/metrics.md b/docs/configuration/metrics.md
index a5fe17318..b590f5628 100644
--- a/docs/configuration/metrics.md
+++ b/docs/configuration/metrics.md
@@ -29,6 +29,7 @@ license: |
 | celeborn.metrics.prometheus.path | /metrics/prometheus | false | URI context 
path of prometheus metrics HTTP server. | 0.4.0 |  | 
 | celeborn.metrics.sample.rate | 1.0 | false | It controls if Celeborn collect 
timer metrics for some operations. Its value should be in [0.0, 1.0]. | 0.2.0 | 
 | 
 | celeborn.metrics.timer.slidingWindow.size | 4096 | false | The sliding 
window size of timer metric. | 0.2.0 |  | 
+| celeborn.metrics.worker.app.topResourceConsumption.bytesWrittenThreshold | 
0b | false | Threshold of bytes written for top resource consumption 
applications list of worker. The application which has bytes written less than 
this threshold will not be included in the top resource consumption list, 
including diskBytesWritten and hdfsBytesWritten. | 0.6.0 |  | 
 | celeborn.metrics.worker.app.topResourceConsumption.count | 50 | false | Size 
for top items about top resource consumption applications list of worker. The 
top resource consumption is determined by sum of diskBytesWritten and 
hdfsBytesWritten. The top resource consumption count prevents the total number 
of metrics from exceeding the metrics capacity. | 0.6.0 |  | 
 | celeborn.metrics.worker.pauseSpentTime.forceAppend.threshold | 10 | false | 
Force append worker pause spent time even if worker still in pause serving 
state. Help user can find worker pause spent time increase, when worker always 
been pause state. |  |  | 
 <!--end-include-->
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 077adf727..edfb21d21 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -82,7 +82,9 @@ private[celeborn] class Worker(
   metricsSystem.registerSource(new JVMCPUSource(conf, Role.WORKER))
   metricsSystem.registerSource(new SystemMiscSource(conf, Role.WORKER))
 
-  private val topResourceConsumptionCount = 
conf.metricsWorkerAppTopResourceConsumptionCount
+  private val topAppResourceConsumptionCount = 
conf.metricsWorkerAppTopResourceConsumptionCount
+  private val topAppResourceConsumptionBytesWrittenThreshold =
+    conf.metricsWorkerAppTopResourceConsumptionBytesWrittenThreshold
   private val topApplicationUserIdentifiers =
     JavaUtils.newConcurrentHashMap[String, UserIdentifier]()
 
@@ -684,11 +686,13 @@ private[celeborn] class Worker(
     resourceConsumptionSnapshot.asScala.foreach { case (userIdentifier, _) =>
       gaugeResourceConsumption(userIdentifier)
     }
-    handleTopResourceConsumption(resourceConsumptionSnapshot)
+    if (topAppResourceConsumptionCount > 0) {
+      handleTopAppResourceConsumption(resourceConsumptionSnapshot)
+    }
     resourceConsumptionSnapshot
   }
 
-  def handleTopResourceConsumption(userResourceConsumptions: util.Map[
+  def handleTopAppResourceConsumption(userResourceConsumptions: util.Map[
     UserIdentifier,
     ResourceConsumption]): Unit = {
     // Remove application top resource consumption gauges to refresh top 
resource consumption metrics.
@@ -705,10 +709,15 @@ private[celeborn] class Worker(
         appConsumption.diskBytesWritten + appConsumption.hdfsBytesWritten
       }
       .reverse
-      .take(topResourceConsumptionCount).foreach {
+      .take(topAppResourceConsumptionCount).foreach {
         case (appId, userIdentifier, appConsumption) =>
-          topApplicationUserIdentifiers.put(appId, userIdentifier)
-          gaugeResourceConsumption(userIdentifier, appId, appConsumption)
+          if (appConsumption.diskBytesWritten + 
appConsumption.hdfsBytesWritten >=
+              topAppResourceConsumptionBytesWrittenThreshold) {
+            topApplicationUserIdentifiers.put(appId, userIdentifier)
+            gaugeResourceConsumption(userIdentifier, appId, appConsumption)
+          } else {
+            return
+          }
       }
   }
 
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerSuite.scala
index c28aa9f58..491f6f5b2 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerSuite.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerSuite.scala
@@ -133,21 +133,21 @@ class WorkerSuite extends AnyFunSuite with 
BeforeAndAfterEach {
     conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, "/tmp")
     worker = new Worker(conf, workerArgs)
     val userIdentifier = new UserIdentifier("default", "celeborn")
-    worker.handleTopResourceConsumption(Map(userIdentifier -> 
ResourceConsumption(
+    worker.handleTopAppResourceConsumption(Map(userIdentifier -> 
ResourceConsumption(
       1024,
       1,
       0,
       0,
       Map("app1" -> ResourceConsumption(1024, 1, 0, 0)).asJava)).asJava)
     assert(worker.resourceConsumptionSource.gauges().size == 2)
-    worker.handleTopResourceConsumption(Map(userIdentifier -> 
ResourceConsumption(
+    worker.handleTopAppResourceConsumption(Map(userIdentifier -> 
ResourceConsumption(
       1024,
       1,
       0,
       0,
       Map("app2" -> ResourceConsumption(1024, 1, 0, 0)).asJava)).asJava)
     assert(worker.resourceConsumptionSource.gauges().size == 2)
-    worker.handleTopResourceConsumption(Map.empty[UserIdentifier, 
ResourceConsumption].asJava)
+    worker.handleTopAppResourceConsumption(Map.empty[UserIdentifier, 
ResourceConsumption].asJava)
     assert(worker.resourceConsumptionSource.gauges().size == 0)
   }
 
@@ -158,7 +158,7 @@ class WorkerSuite extends AnyFunSuite with 
BeforeAndAfterEach {
 
     worker = new Worker(conf, workerArgs)
     val userIdentifier = new UserIdentifier("default", "celeborn")
-    worker.handleTopResourceConsumption(Map(userIdentifier -> 
ResourceConsumption(
+    worker.handleTopAppResourceConsumption(Map(userIdentifier -> 
ResourceConsumption(
       1024,
       1,
       0,

Reply via email to