This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new b7e4dc433 [CELEBORN-1114] Remove allocationBuckets from WorkerInfo and 
refactor SLOTS_ALLOCATED metrics
b7e4dc433 is described below

commit b7e4dc4339411a0f164e4bfbef51db53189167b3
Author: onebox-li <[email protected]>
AuthorDate: Wed Nov 8 19:45:47 2023 +0800

    [CELEBORN-1114] Remove allocationBuckets from WorkerInfo and refactor 
SLOTS_ALLOCATED metrics
    
    ### What changes were proposed in this pull request?
    Currently, `WorkerInfo` is used in many places, and allocationBuckets is 
only used when its own workers want to collect metrics `SLOTS_ALLOCATED`. If 
there are lots of workers in the RSS cluster, there may be a certain amount of 
memory waste, each `WorkerInfo` maintain a Array\[Int](61), so remove it from 
`WorkerInfo`.
    And refactor the metrics `SLOTS_ALLOCATED` from gauge to counter. 
Originally, this metrics is approximately one hour's total only if there are 
continuous tasks. Now refactoring it into a counter can reduce the cost of 
maintaining time windows, including storage and timely expiration data, etc. It 
can also be more flexibly transformed according to user needs on the prometheus 
side.
    
    ### Why are the changes needed?
    Ditto.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. metrics_SlotsAllocated_Count metrics change from gauge for 1 hour to a 
increasing counter.
    
    ### How was this patch tested?
    Cluster test.
    
    Closes #2078 from onebox-li/improve-SlotsAllocated.
    
    Authored-by: onebox-li <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 assets/grafana/celeborn-dashboard.json             |  4 +--
 .../apache/celeborn/common/meta/WorkerInfo.scala   | 38 ++++------------------
 .../org/apache/celeborn/common/util/Utils.scala    |  8 ++---
 .../service/deploy/worker/Controller.scala         |  1 +
 .../celeborn/service/deploy/worker/Worker.scala    |  3 --
 .../service/deploy/worker/WorkerSource.scala       |  2 ++
 6 files changed, 16 insertions(+), 40 deletions(-)

diff --git a/assets/grafana/celeborn-dashboard.json 
b/assets/grafana/celeborn-dashboard.json
index 0f70d8e1b..22fd1c19b 100644
--- a/assets/grafana/celeborn-dashboard.json
+++ b/assets/grafana/celeborn-dashboard.json
@@ -1184,12 +1184,12 @@
                 "type": "prometheus",
                 "uid": "${DS_PROMETHEUS}"
               },
-              "expr": "metrics_SlotsAllocated_Value",
+              "expr": "metrics_SlotsAllocated_Count",
               "legendFormat": "${baseLegend}",
               "refId": "A"
             }
           ],
-          "title": "metrics_SlotsAllocated_Value",
+          "title": "metrics_SlotsAllocated_Count",
           "type": "timeseries"
         },
         {
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala 
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
index 7ff3bfa20..597784816 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
@@ -59,14 +59,6 @@ class WorkerInfo(
       new util.HashMap[UserIdentifier, ResourceConsumption]())
   }
 
-  val allocationBuckets = new Array[Int](61)
-  var bucketIndex = 0
-  var bucketTime = System.currentTimeMillis()
-  var bucketAllocations = 0
-  0 until allocationBuckets.length foreach { case idx =>
-    allocationBuckets(idx) = 0
-  }
-
   def isActive: Boolean = {
     endpoint.asInstanceOf[NettyRpcEndpointRef].client.isActive
   }
@@ -76,27 +68,19 @@ class WorkerInfo(
     diskInfos.asScala.map(_._2.activeSlots).sum
   }
 
-  def allocateSlots(shuffleKey: String, slotsPerDisk: util.Map[String, 
Integer]): Unit =
+  def allocateSlots(shuffleKey: String, slotsPerDisk: util.Map[String, 
Integer]): Unit = {
+    logDebug(s"Shuffle $shuffleKey, allocations $slotsPerDisk")
     this.synchronized {
-      logDebug(s"shuffle $shuffleKey allocations $slotsPerDisk")
-      var totalSlots = 0
       slotsPerDisk.asScala.foreach { case (disk, slots) =>
-        if (!diskInfos.containsKey(disk)) {
-          logDebug(s"Unknown disk $disk")
+        val diskInfo = diskInfos.get(disk)
+        if (diskInfo == null) {
+          logDebug(s"Unknown disk $disk when allocateSlots")
         } else {
-          diskInfos.get(disk).allocateSlots(shuffleKey, slots)
+          diskInfo.allocateSlots(shuffleKey, slots)
         }
-        totalSlots += slots
-      }
-
-      val current = System.currentTimeMillis()
-      if (current - bucketTime > 60 * 1000) {
-        bucketIndex = (bucketIndex + 1) % allocationBuckets.length
-        allocationBuckets(bucketIndex) = 0
-        bucketTime = current
       }
-      allocationBuckets(bucketIndex) = allocationBuckets(bucketIndex) + 
totalSlots
     }
+  }
 
   def releaseSlots(shuffleKey: String, slots: util.Map[String, Integer]): Unit 
= this.synchronized {
     slots.asScala.foreach { case (disk, slot) =>
@@ -118,14 +102,6 @@ class WorkerInfo(
     shuffleKeySet
   }
 
-  def allocationsInLastHour(): Int = this.synchronized {
-    var total = 0
-    1 to 60 foreach { case delta =>
-      total += allocationBuckets((bucketIndex + delta) % 
allocationBuckets.length)
-    }
-    total
-  }
-
   def hasSameInfoWith(other: WorkerInfo): Boolean = {
     rpcPort == other.rpcPort &&
     pushPort == other.pushPort &&
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 2e6c0af67..5704f3f88 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -815,7 +815,7 @@ object Utils extends Logging {
     val slotDistributions = new util.HashMap[String, Integer]()
     (masterLocations.asScala ++ workerLocations.asScala)
       .foreach {
-        case location =>
+        location =>
           val mountPoint = location.getStorageInfo.getMountPoint
           if (slotDistributions.containsKey(mountPoint)) {
             slotDistributions.put(mountPoint, 
slotDistributions.get(mountPoint) + 1)
@@ -823,10 +823,10 @@ object Utils extends Logging {
             slotDistributions.put(mountPoint, 1)
           }
       }
-    logDebug(s"locations to distribution ," +
-      s" ${masterLocations.asScala.map(_.toString).mkString(",")} " +
+    logDebug(s"locations to distribution, " +
+      s"${masterLocations.asScala.map(_.toString).mkString(",")} " +
       s"${workerLocations.asScala.map(_.toString).mkString(",")} " +
-      s"to ${slotDistributions} ")
+      s"to $slotDistributions ")
     slotDistributions
   }
 
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
index f65b7f36c..8418d69d6 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
@@ -247,6 +247,7 @@ private[deploy] class Controller(
     workerInfo.allocateSlots(
       shuffleKey,
       Utils.getSlotsPerDisk(requestPrimaryLocs, requestReplicaLocs))
+    workerSource.incCounter(WorkerSource.SLOTS_ALLOCATED, primaryLocs.size() + 
replicaLocs.size())
 
     logInfo(s"Reserved ${primaryLocs.size()} primary location" +
       s" and ${replicaLocs.size()} replica location for $shuffleKey ")
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index d3850f8f1..80e207309 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -278,9 +278,6 @@ private[celeborn] class Worker(
   workerSource.addGauge(WorkerSource.REGISTERED_SHUFFLE_COUNT) { () =>
     workerInfo.getShuffleKeySet.size
   }
-  workerSource.addGauge(WorkerSource.SLOTS_ALLOCATED) { () =>
-    workerInfo.allocationsInLastHour()
-  }
   workerSource.addGauge(WorkerSource.SORT_MEMORY) { () =>
     memoryManager.getSortMemoryCounter.get()
   }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index f46b0ff90..7b3d1e51e 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -38,6 +38,8 @@ class WorkerSource(conf: CelebornConf) extends 
AbstractSource(conf, MetricsSyste
   addCounter(REGION_FINISH_FAIL_COUNT)
   addCounter(ACTIVE_CONNECTION_COUNT)
 
+  addCounter(SLOTS_ALLOCATED)
+
   // add timers
   addTimer(COMMIT_FILES_TIME)
   addTimer(RESERVE_SLOTS_TIME)

Reply via email to