This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new a547cdaef [CELEBORN-1974] ApplicationId as metrics label should be 
behind a config flag
a547cdaef is described below

commit a547cdaeff852d1050b97e452eb2f0eec98fbd3a
Author: Sanskar Modi <[email protected]>
AuthorDate: Mon May 12 21:05:45 2025 -0700

    [CELEBORN-1974] ApplicationId as metrics label should be behind a config 
flag
    
    ### What changes were proposed in this pull request?
    
    Push applicationId as metrics label only if 
`celeborn.metrics.worker.appLevel.enabled` is true.
    
    ### Why are the changes needed?
    
    At Uber, We use m3 for monitoring, it tries to make a new series using all 
the present metrics label. Having applicationId as a metrics introduces too 
much cardinality in `activeconnectioncount` and we are unable to use it, while 
it is an useful metric with/without applicationId as label. Similarly for 
resourceConsumption, userIdentifier alone can be used.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, changed the default config value.
    
    ### How was this patch tested?
    NA
    
    Closes #3221 from s0nskar/application_tag.
    
    Lead-authored-by: Sanskar Modi <[email protected]>
    Co-authored-by: Wang, Fei <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../org/apache/celeborn/common/CelebornConf.scala  | 17 ++++++++-
 docs/configuration/metrics.md                      |  3 +-
 .../service/deploy/worker/WorkerSource.scala       | 43 +++++++++++++---------
 .../service/deploy/worker/WorkerSuite.scala        |  1 +
 4 files changed, 44 insertions(+), 20 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 6455e4051..d33028a9a 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -894,6 +894,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def metricsWorkerForceAppendPauseSpentTimeThreshold: Int =
     get(METRICS_WORKER_PAUSE_SPENT_TIME_FORCE_APPEND_THRESHOLD)
   def metricsJsonPrettyEnabled: Boolean = get(METRICS_JSON_PRETTY_ENABLED)
+  def metricsWorkerAppLevelEnabled: Boolean = 
get(METRICS_WORKER_APP_LEVEL_ENABLED)
 
   // //////////////////////////////////////////////////////
   //                      Quota                         //
@@ -5520,10 +5521,12 @@ object CelebornConf extends Logging {
       .categories("metrics")
       .doc("Size for top items about top resource consumption applications 
list of worker. " +
         "The top resource consumption is determined by sum of diskBytesWritten 
and hdfsBytesWritten. " +
-        "The top resource consumption count prevents the total number of 
metrics from exceeding the metrics capacity.")
+        "The top resource consumption count prevents the total number of 
metrics from exceeding the metrics capacity. " +
+        "Note: This will add applicationId as label which is considered as a 
high cardinality label, " +
+        "be careful enabling it on metrics systems that are not optimized for 
high cardinality columns.")
       .version("0.6.0")
       .intConf
-      .createWithDefault(50)
+      .createWithDefault(0)
 
   val METRICS_WORKER_APP_TOP_RESOURCE_CONSUMPTION_BYTES_WRITTEN_THRESHOLD: 
ConfigEntry[Long] =
     
buildConf("celeborn.metrics.worker.app.topResourceConsumption.bytesWrittenThreshold")
@@ -5569,6 +5572,16 @@ object CelebornConf extends Logging {
       .booleanConf
       .createWithDefault(true)
 
+  val METRICS_WORKER_APP_LEVEL_ENABLED: ConfigEntry[Boolean] =
+    buildConf("celeborn.metrics.worker.appLevel.enabled")
+      .categories("metrics")
+      .doc("When true, enable worker application level metrics. Note: 
applicationId is " +
+        "considered as a high cardinality label, be careful enabling it on 
metrics systems " +
+        "that are not optimized for high cardinality columns.")
+      .version("0.6.0")
+      .booleanConf
+      .createWithDefault(true)
+
   val IDENTITY_PROVIDER: ConfigEntry[String] =
     buildConf("celeborn.identity.provider")
       .withAlternative("celeborn.quota.identity.provider")
diff --git a/docs/configuration/metrics.md b/docs/configuration/metrics.md
index b590f5628..77c7d6d59 100644
--- a/docs/configuration/metrics.md
+++ b/docs/configuration/metrics.md
@@ -30,6 +30,7 @@ license: |
 | celeborn.metrics.sample.rate | 1.0 | false | It controls if Celeborn collect 
timer metrics for some operations. Its value should be in [0.0, 1.0]. | 0.2.0 | 
 | 
 | celeborn.metrics.timer.slidingWindow.size | 4096 | false | The sliding 
window size of timer metric. | 0.2.0 |  | 
 | celeborn.metrics.worker.app.topResourceConsumption.bytesWrittenThreshold | 
0b | false | Threshold of bytes written for top resource consumption 
applications list of worker. The application which has bytes written less than 
this threshold will not be included in the top resource consumption list, 
including diskBytesWritten and hdfsBytesWritten. | 0.6.0 |  | 
-| celeborn.metrics.worker.app.topResourceConsumption.count | 50 | false | Size 
for top items about top resource consumption applications list of worker. The 
top resource consumption is determined by sum of diskBytesWritten and 
hdfsBytesWritten. The top resource consumption count prevents the total number 
of metrics from exceeding the metrics capacity. | 0.6.0 |  | 
+| celeborn.metrics.worker.app.topResourceConsumption.count | 0 | false | Size 
for top items about top resource consumption applications list of worker. The 
top resource consumption is determined by sum of diskBytesWritten and 
hdfsBytesWritten. The top resource consumption count prevents the total number 
of metrics from exceeding the metrics capacity. Note: This will add 
applicationId as label which is considered as a high cardinality label, be 
careful enabling it on metrics systems that  [...]
+| celeborn.metrics.worker.appLevel.enabled | true | false | When true, enable 
worker application level metrics. Note: applicationId is considered as a high 
cardinality label, be careful enabling it on metrics systems that are not 
optimized for high cardinality columns. | 0.6.0 |  | 
 | celeborn.metrics.worker.pauseSpentTime.forceAppend.threshold | 10 | false | 
Force append worker pause spent time even if worker still in pause serving 
state. Help user can find worker pause spent time increase, when worker always 
been pause state. |  |  | 
 <!--end-include-->
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index c4a82225c..01fd138b5 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -34,6 +34,7 @@ class WorkerSource(conf: CelebornConf) extends 
AbstractSource(conf, Role.WORKER)
 
   val appActiveConnections: ConcurrentHashMap[String, util.Set[String]] =
     JavaUtils.newConcurrentHashMap[String, util.Set[String]]
+  private val metricsAppLevelEnabled = conf.metricsWorkerAppLevelEnabled
 
   import WorkerSource._
   // add counters
@@ -92,36 +93,44 @@ class WorkerSource(conf: CelebornConf) extends 
AbstractSource(conf, Role.WORKER)
   }
 
   def connectionActive(client: TransportClient): Unit = {
-    appActiveConnections.putIfAbsent(
-      client.getChannel.id().asLongText(),
-      Sets.newConcurrentHashSet[String]())
+    if (metricsAppLevelEnabled) {
+      appActiveConnections.putIfAbsent(
+        client.getChannel.id().asLongText(),
+        Sets.newConcurrentHashSet[String]())
+    }
     incCounter(ACTIVE_CONNECTION_COUNT, 1)
   }
 
   def connectionInactive(client: TransportClient): Unit = {
-    val applicationIds = 
appActiveConnections.remove(client.getChannel.id().asLongText())
     incCounter(ACTIVE_CONNECTION_COUNT, -1)
-    if (null != applicationIds) {
-      applicationIds.asScala.foreach(applicationId =>
-        incCounter(ACTIVE_CONNECTION_COUNT, -1, Map(applicationLabel -> 
applicationId)))
+    if (metricsAppLevelEnabled) {
+      val applicationIds = 
appActiveConnections.remove(client.getChannel.id().asLongText())
+      if (null != applicationIds) {
+        applicationIds.asScala.foreach(applicationId =>
+          incCounter(ACTIVE_CONNECTION_COUNT, -1, Map(applicationLabel -> 
applicationId)))
+      }
     }
   }
 
   def recordAppActiveConnection(client: TransportClient, shuffleKey: String): 
Unit = {
-    val applicationIds = 
appActiveConnections.get(client.getChannel.id().asLongText())
-    val applicationId = Utils.splitShuffleKey(shuffleKey)._1
-    if (applicationIds != null && !applicationIds.contains(applicationId)) {
-      addCounter(ACTIVE_CONNECTION_COUNT, Map(applicationLabel -> 
applicationId))
-      incCounter(ACTIVE_CONNECTION_COUNT, 1, Map(applicationLabel -> 
applicationId))
-      applicationIds.add(applicationId)
+    if (metricsAppLevelEnabled) {
+      val applicationIds = 
appActiveConnections.get(client.getChannel.id().asLongText())
+      val applicationId = Utils.splitShuffleKey(shuffleKey)._1
+      if (applicationIds != null && !applicationIds.contains(applicationId)) {
+        addCounter(ACTIVE_CONNECTION_COUNT, Map(applicationLabel -> 
applicationId))
+        incCounter(ACTIVE_CONNECTION_COUNT, 1, Map(applicationLabel -> 
applicationId))
+        applicationIds.add(applicationId)
+      }
     }
   }
 
   def removeAppActiveConnection(applicationIds: util.Set[String]): Unit = {
-    applicationIds.asScala.foreach(applicationId =>
-      removeCounter(ACTIVE_CONNECTION_COUNT, Map(applicationLabel -> 
applicationId)))
-    appActiveConnections.values().asScala.foreach(connectionAppIds =>
-      applicationIds.asScala.foreach(applicationId => 
connectionAppIds.remove(applicationId)))
+    if (metricsAppLevelEnabled) {
+      applicationIds.asScala.foreach(applicationId =>
+        removeCounter(ACTIVE_CONNECTION_COUNT, Map(applicationLabel -> 
applicationId)))
+      appActiveConnections.values().asScala.foreach(connectionAppIds =>
+        applicationIds.asScala.foreach(applicationId => 
connectionAppIds.remove(applicationId)))
+    }
   }
 
   // start cleaner thread
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerSuite.scala
index 491f6f5b2..2e13ef1d6 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerSuite.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerSuite.scala
@@ -131,6 +131,7 @@ class WorkerSuite extends AnyFunSuite with 
BeforeAndAfterEach {
 
   test("handle top resource consumption") {
     conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, "/tmp")
+    conf.set(CelebornConf.METRICS_WORKER_APP_TOP_RESOURCE_CONSUMPTION_COUNT, 5)
     worker = new Worker(conf, workerArgs)
     val userIdentifier = new UserIdentifier("default", "celeborn")
     worker.handleTopAppResourceConsumption(Map(userIdentifier -> 
ResourceConsumption(

Reply via email to