This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 586556b34 [CELEBORN-1114] Remove allocationBuckets from WorkerInfo and
refactor SLOTS_ALLOCATED metrics
586556b34 is described below
commit 586556b340a9b905f4fa3ce9cceb286ef950e9ed
Author: onebox-li <[email protected]>
AuthorDate: Wed Nov 8 19:45:47 2023 +0800
[CELEBORN-1114] Remove allocationBuckets from WorkerInfo and refactor
SLOTS_ALLOCATED metrics
Currently, `WorkerInfo` is used in many places, and allocationBuckets is
only used when its own workers want to collect metrics `SLOTS_ALLOCATED`. If
there are lots of workers in the RSS cluster, there may be a certain amount of
memory waste, each `WorkerInfo` maintain a Array\[Int](61), so remove it from
`WorkerInfo`.
And refactor the metrics `SLOTS_ALLOCATED` from gauge to counter.
Originally, this metrics is approximately one hour's total only if there are
continuous tasks. Now refactoring it into a counter can reduce the cost of
maintaining time windows, including storage and timely expiration data, etc. It
can also be more flexibly transformed according to user needs on the prometheus
side.
Ditto.
Yes. metrics_SlotsAllocated_Count metrics change from gauge for 1 hour to a
increasing counter.
Cluster test.
Closes #2078 from onebox-li/improve-SlotsAllocated.
Authored-by: onebox-li <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
(cherry picked from commit b7e4dc4339411a0f164e4bfbef51db53189167b3)
Signed-off-by: zky.zhoukeyong <[email protected]>
---
assets/grafana/celeborn-dashboard.json | 4 +--
.../apache/celeborn/common/meta/WorkerInfo.scala | 38 ++++------------------
.../org/apache/celeborn/common/util/Utils.scala | 8 ++---
.../service/deploy/worker/Controller.scala | 1 +
.../celeborn/service/deploy/worker/Worker.scala | 3 --
.../service/deploy/worker/WorkerSource.scala | 4 ++-
6 files changed, 17 insertions(+), 41 deletions(-)
diff --git a/assets/grafana/celeborn-dashboard.json
b/assets/grafana/celeborn-dashboard.json
index 1f534335a..a8fc98571 100644
--- a/assets/grafana/celeborn-dashboard.json
+++ b/assets/grafana/celeborn-dashboard.json
@@ -1184,12 +1184,12 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
- "expr": "metrics_SlotsAllocated_Value",
+ "expr": "metrics_SlotsAllocated_Count",
"legendFormat": "$baseLegend",
"refId": "A"
}
],
- "title": "metrics_SlotsAllocated_Value",
+ "title": "metrics_SlotsAllocated_Count",
"type": "timeseries"
},
{
diff --git
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
index 7ff3bfa20..597784816 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
@@ -59,14 +59,6 @@ class WorkerInfo(
new util.HashMap[UserIdentifier, ResourceConsumption]())
}
- val allocationBuckets = new Array[Int](61)
- var bucketIndex = 0
- var bucketTime = System.currentTimeMillis()
- var bucketAllocations = 0
- 0 until allocationBuckets.length foreach { case idx =>
- allocationBuckets(idx) = 0
- }
-
def isActive: Boolean = {
endpoint.asInstanceOf[NettyRpcEndpointRef].client.isActive
}
@@ -76,27 +68,19 @@ class WorkerInfo(
diskInfos.asScala.map(_._2.activeSlots).sum
}
- def allocateSlots(shuffleKey: String, slotsPerDisk: util.Map[String,
Integer]): Unit =
+ def allocateSlots(shuffleKey: String, slotsPerDisk: util.Map[String,
Integer]): Unit = {
+ logDebug(s"Shuffle $shuffleKey, allocations $slotsPerDisk")
this.synchronized {
- logDebug(s"shuffle $shuffleKey allocations $slotsPerDisk")
- var totalSlots = 0
slotsPerDisk.asScala.foreach { case (disk, slots) =>
- if (!diskInfos.containsKey(disk)) {
- logDebug(s"Unknown disk $disk")
+ val diskInfo = diskInfos.get(disk)
+ if (diskInfo == null) {
+ logDebug(s"Unknown disk $disk when allocateSlots")
} else {
- diskInfos.get(disk).allocateSlots(shuffleKey, slots)
+ diskInfo.allocateSlots(shuffleKey, slots)
}
- totalSlots += slots
- }
-
- val current = System.currentTimeMillis()
- if (current - bucketTime > 60 * 1000) {
- bucketIndex = (bucketIndex + 1) % allocationBuckets.length
- allocationBuckets(bucketIndex) = 0
- bucketTime = current
}
- allocationBuckets(bucketIndex) = allocationBuckets(bucketIndex) +
totalSlots
}
+ }
def releaseSlots(shuffleKey: String, slots: util.Map[String, Integer]): Unit
= this.synchronized {
slots.asScala.foreach { case (disk, slot) =>
@@ -118,14 +102,6 @@ class WorkerInfo(
shuffleKeySet
}
- def allocationsInLastHour(): Int = this.synchronized {
- var total = 0
- 1 to 60 foreach { case delta =>
- total += allocationBuckets((bucketIndex + delta) %
allocationBuckets.length)
- }
- total
- }
-
def hasSameInfoWith(other: WorkerInfo): Boolean = {
rpcPort == other.rpcPort &&
pushPort == other.pushPort &&
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 3c39de638..0f72bd66f 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -816,7 +816,7 @@ object Utils extends Logging {
val slotDistributions = new util.HashMap[String, Integer]()
(masterLocations.asScala ++ workerLocations.asScala)
.foreach {
- case location =>
+ location =>
val mountPoint = location.getStorageInfo.getMountPoint
if (slotDistributions.containsKey(mountPoint)) {
slotDistributions.put(mountPoint,
slotDistributions.get(mountPoint) + 1)
@@ -824,10 +824,10 @@ object Utils extends Logging {
slotDistributions.put(mountPoint, 1)
}
}
- logDebug(s"locations to distribution ," +
- s" ${masterLocations.asScala.map(_.toString).mkString(",")} " +
+ logDebug(s"locations to distribution, " +
+ s"${masterLocations.asScala.map(_.toString).mkString(",")} " +
s"${workerLocations.asScala.map(_.toString).mkString(",")} " +
- s"to ${slotDistributions} ")
+ s"to $slotDistributions ")
slotDistributions
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
index f65b7f36c..8418d69d6 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
@@ -247,6 +247,7 @@ private[deploy] class Controller(
workerInfo.allocateSlots(
shuffleKey,
Utils.getSlotsPerDisk(requestPrimaryLocs, requestReplicaLocs))
+ workerSource.incCounter(WorkerSource.SLOTS_ALLOCATED, primaryLocs.size() +
replicaLocs.size())
logInfo(s"Reserved ${primaryLocs.size()} primary location" +
s" and ${replicaLocs.size()} replica location for $shuffleKey ")
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index c8f0f2b98..955378c61 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -270,9 +270,6 @@ private[celeborn] class Worker(
workerSource.addGauge(WorkerSource.REGISTERED_SHUFFLE_COUNT) { () =>
workerInfo.getShuffleKeySet.size
}
- workerSource.addGauge(WorkerSource.SLOTS_ALLOCATED) { () =>
- workerInfo.allocationsInLastHour()
- }
workerSource.addGauge(WorkerSource.SORT_MEMORY) { () =>
memoryManager.getSortMemoryCounter.get()
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index df07970ab..91df21a1b 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -38,7 +38,9 @@ class WorkerSource(conf: CelebornConf) extends
AbstractSource(conf, MetricsSyste
addCounter(REGION_FINISH_FAIL_COUNT)
addCounter(ACTIVE_CONNECTION_COUNT)
- // add Timers
+ addCounter(SLOTS_ALLOCATED)
+
+ // add timers
addTimer(COMMIT_FILES_TIME)
addTimer(RESERVE_SLOTS_TIME)
addTimer(FLUSH_DATA_TIME)