This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new b45b63f9a [CELEBORN-247][FOLLOWUP] Add metrics for each user's quota
usage of Celeborn Worker
b45b63f9a is described below
commit b45b63f9a56b20c4ca81d5c1cb8429e34a211ef4
Author: SteNicholas <[email protected]>
AuthorDate: Wed Nov 1 15:48:31 2023 +0800
[CELEBORN-247][FOLLOWUP] Add metrics for each user's quota usage of
Celeborn Worker
### What changes were proposed in this pull request?
Add the metric `ResourceConsumption` to monitor each user's quota usage of
Celeborn Worker.
### Why are the changes needed?
The metric `ResourceConsumption` supports to monitor each user's quota
usage of Celeborn Master at present. The usage of Celeborn Worker also needs to
monitor.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Internal tests.
Closes #2059 from SteNicholas/CELEBORN-247.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
---
METRICS.md | 8 +--
.../org/apache/celeborn/common/CelebornConf.scala | 9 +++
.../metrics/source/ResourceConsumptionSource.scala | 15 ++++-
docs/configuration/worker.md | 1 +
docs/monitoring.md | 8 +++
.../celeborn/service/deploy/master/Master.scala | 19 +++++--
.../service/deploy/master/MasterSource.scala | 9 ++-
.../celeborn/service/deploy/worker/Worker.scala | 64 +++++++++++++++++++---
.../service/deploy/worker/WorkerSource.scala | 4 +-
9 files changed, 111 insertions(+), 26 deletions(-)
diff --git a/METRICS.md b/METRICS.md
index e764371da..b4f10cdea 100644
--- a/METRICS.md
+++ b/METRICS.md
@@ -83,10 +83,10 @@ Here is an example of Grafana dashboard importing.
| PartitionSize | master | The
estimated partition size of last 20 flush window whose length is 15 seconds by
defaults. |
| PartitionWritten | master |
The active shuffle size.
|
| PartitionFileCount | master |
The active shuffle partition count.
|
-| diskFileCount | master |
The count of disk files consumption by each user.
|
-| diskBytesWritten | master |
The amount of disk files consumption by each user.
|
-| hdfsFileCount | master |
The count of hdfs files consumption by each user.
|
-| hdfsBytesWritten | master |
The amount of hdfs files consumption by each user.
|
+| diskFileCount | master and worker |
The count of disk files consumption by each user.
|
+| diskBytesWritten | master and worker |
The amount of disk files consumption by each user.
|
+| hdfsFileCount | master and worker |
The count of hdfs files consumption by each user.
|
+| hdfsBytesWritten | master and worker |
The amount of hdfs files consumption by each user.
|
| RegisteredShuffleCount | master and worker |
The value means count of registered shuffle.
|
| CommitFilesTime | worker |
CommitFiles means flush and close a shuffle partition file.
|
| ReserveSlotsTime | worker |
ReserveSlots means acquire a disk buffer and record partition location.
|
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 3c717fa5a..52e2a414b 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -521,6 +521,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def estimatedPartitionSizeForEstimationUpdateInterval: Long =
get(ESTIMATED_PARTITION_SIZE_UPDATE_INTERVAL)
def masterResourceConsumptionInterval: Long =
get(MASTER_RESOURCE_CONSUMPTION_INTERVAL)
+ def workerResourceConsumptionInterval: Long =
get(WORKER_RESOURCE_CONSUMPTION_INTERVAL)
// //////////////////////////////////////////////////////
// Address && HA && RATIS //
@@ -2018,6 +2019,14 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30s")
+ val WORKER_RESOURCE_CONSUMPTION_INTERVAL: ConfigEntry[Long] =
+ buildConf("celeborn.worker.userResourceConsumption.update.interval")
+ .categories("worker")
+ .doc("Time length for a window about compute user resource consumption.")
+ .version("0.3.2")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("30s")
+
val SHUFFLE_CHUNK_SIZE: ConfigEntry[Long] =
buildConf("celeborn.shuffle.chunk.size")
.categories("worker")
diff --git
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/ResourceConsumptionSource.scala
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/ResourceConsumptionSource.scala
index f705f121d..df33310bb 100644
---
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/ResourceConsumptionSource.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/ResourceConsumptionSource.scala
@@ -19,9 +19,18 @@ package org.apache.celeborn.common.metrics.source
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.common.metrics.MetricsSystem
-class ResourceConsumptionSource(conf: CelebornConf)
- extends AbstractSource(conf, MetricsSystem.ROLE_MASTER) with Logging {
+class ResourceConsumptionSource(conf: CelebornConf, role: String)
+ extends AbstractSource(conf, role) with Logging {
override val sourceName = "ResourceConsumption"
}
+
+object ResourceConsumptionSource {
+ val DISK_FILE_COUNT = "diskFileCount"
+
+ val DISK_BYTES_WRITTEN = "diskBytesWritten"
+
+ val HDFS_FILE_COUNT = "hdfsFileCount"
+
+ val HDFS_BYTES_WRITTEN = "hdfsBytesWritten"
+}
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 8154101b0..984be9f4f 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -105,6 +105,7 @@ license: |
| celeborn.worker.storage.disk.reserve.size | 5G | Celeborn worker reserved
space for each disk. | 0.3.0 |
| celeborn.worker.storage.expireDirs.timeout | 1h | The timeout for a expire
dirs to be deleted on disk. | 0.3.2 |
| celeborn.worker.storage.workingDir | celeborn-worker/shuffle_data | Worker's
working dir path name. | 0.3.0 |
+| celeborn.worker.userResourceConsumption.update.interval | 30s | Time length
for a window about compute user resource consumption. | 0.3.2 |
| celeborn.worker.writer.close.timeout | 120s | Timeout for a file writer to
close | 0.2.0 |
| celeborn.worker.writer.create.maxAttempts | 3 | Retry count for a file
writer to create if its creation was failed. | 0.2.0 |
<!--end-include-->
diff --git a/docs/monitoring.md b/docs/monitoring.md
index e5745ee56..1c82c1c1e 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -266,6 +266,14 @@ These metrics are exposed by Celeborn worker.
[Dropwizard/Codahale Metric Sets for JVM
instrumentation](https://metrics.dropwizard.io/4.2.0/manual/jvm.html)
and in particular the metric sets BufferPoolMetricSet,
GarbageCollectorMetricSet and MemoryUsageGaugeSet.
+ - namespace=ResourceConsumption
+ - **notes:**
+ - This metrics data is generated for each user and they are identified
using a metric tag.
+ - diskFileCount
+ - diskBytesWritten
+ - hdfsFileCount
+ - hdfsBytesWritten
+
**Note:**
The Netty DirectArenaMetrics named like `push/fetch/replicate_server_numXX`
are not exposed by default, nor in Grafana dashboard.
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 1e9c813e6..ead05b375 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -152,7 +152,8 @@ private[celeborn] class Master(
private val slotsAssignPolicy = conf.masterSlotAssignPolicy
// init and register master metrics
- val resourceConsumptionSource = new ResourceConsumptionSource(conf)
+ private val resourceConsumptionSource =
+ new ResourceConsumptionSource(conf, MetricsSystem.ROLE_MASTER)
private val masterSource = new MasterSource(conf)
private var hadoopFs: FileSystem = _
masterSource.addGauge(MasterSource.REGISTERED_SHUFFLE_COUNT) { () =>
@@ -840,16 +841,24 @@ private[celeborn] class Master(
userIdentifier: UserIdentifier,
context: RpcCallContext): Unit = {
- resourceConsumptionSource.addGauge("diskFileCount", userIdentifier.toMap)
{ () =>
+ resourceConsumptionSource.addGauge(
+ ResourceConsumptionSource.DISK_FILE_COUNT,
+ userIdentifier.toMap) { () =>
computeUserResourceConsumption(userIdentifier).diskFileCount
}
- resourceConsumptionSource.addGauge("diskBytesWritten",
userIdentifier.toMap) { () =>
+ resourceConsumptionSource.addGauge(
+ ResourceConsumptionSource.DISK_BYTES_WRITTEN,
+ userIdentifier.toMap) { () =>
computeUserResourceConsumption(userIdentifier).diskBytesWritten
}
- resourceConsumptionSource.addGauge("hdfsFileCount", userIdentifier.toMap)
{ () =>
+ resourceConsumptionSource.addGauge(
+ ResourceConsumptionSource.HDFS_FILE_COUNT,
+ userIdentifier.toMap) { () =>
computeUserResourceConsumption(userIdentifier).hdfsFileCount
}
- resourceConsumptionSource.addGauge("hdfsBytesWritten",
userIdentifier.toMap) { () =>
+ resourceConsumptionSource.addGauge(
+ ResourceConsumptionSource.HDFS_BYTES_WRITTEN,
+ userIdentifier.toMap) { () =>
computeUserResourceConsumption(userIdentifier).hdfsBytesWritten
}
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
index 05c14b7df..8010f0a74 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/MasterSource.scala
@@ -18,15 +18,14 @@
package org.apache.celeborn.service.deploy.master
import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.metrics.MetricsSystem
import org.apache.celeborn.common.metrics.source.AbstractSource
-import org.apache.celeborn.service.deploy.master.MasterSource.OFFER_SLOTS_TIME
-class MasterSource(conf: CelebornConf)
- extends AbstractSource(conf, MetricsSystem.ROLE_MASTER) with Logging {
- override val sourceName = s"master"
+class MasterSource(conf: CelebornConf) extends AbstractSource(conf,
MetricsSystem.ROLE_MASTER) {
+ override val sourceName = "master"
+ import MasterSource._
+ // add timers
addTimer(OFFER_SLOTS_TIME)
// start cleaner
startCleaner()
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 6c0f981ce..b3bef5688 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -19,6 +19,7 @@ package org.apache.celeborn.service.deploy.worker
import java.io.File
import java.lang.{Long => JLong}
+import java.util
import java.util.{HashMap => JHashMap, HashSet => JHashSet, Map => JMap}
import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicIntegerArray}
@@ -36,7 +37,7 @@ import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{DiskInfo, WorkerInfo,
WorkerPartitionLocationInfo}
import org.apache.celeborn.common.metrics.MetricsSystem
-import org.apache.celeborn.common.metrics.source.{JVMCPUSource, JVMSource,
SystemMiscSource}
+import org.apache.celeborn.common.metrics.source.{JVMCPUSource, JVMSource,
ResourceConsumptionSource, SystemMiscSource}
import org.apache.celeborn.common.network.TransportContext
import org.apache.celeborn.common.protocol.{PartitionType,
PbRegisterWorkerResponse, PbWorkerLostResponse, RpcNameConstants,
TransportModuleConstants}
import org.apache.celeborn.common.protocol.message.ControlMessages._
@@ -104,7 +105,10 @@ private[celeborn] class Worker(
}
}
+ private val resourceConsumptionSource =
+ new ResourceConsumptionSource(conf, MetricsSystem.ROLE_WORKER)
val workerSource = new WorkerSource(conf)
+ metricsSystem.registerSource(resourceConsumptionSource)
metricsSystem.registerSource(workerSource)
metricsSystem.registerSource(new JVMSource(conf, MetricsSystem.ROLE_WORKER))
metricsSystem.registerSource(new JVMCPUSource(conf,
MetricsSystem.ROLE_WORKER))
@@ -263,6 +267,10 @@ private[celeborn] class Worker(
private val cleanTaskQueue = new LinkedBlockingQueue[JHashSet[String]]
var cleaner: Thread = _
+ private val workerResourceConsumptionInterval =
conf.workerResourceConsumptionInterval
+ private val userResourceConsumptions =
+ JavaUtils.newConcurrentHashMap[UserIdentifier, (ResourceConsumption,
Long)]()
+
workerSource.addGauge(WorkerSource.REGISTERED_SHUFFLE_COUNT) { () =>
workerInfo.getShuffleKeySet.size
}
@@ -335,9 +343,6 @@ private[celeborn] class Worker(
workerInfo.updateThenGetDiskInfos(storageManager.disksSnapshot().map {
disk =>
disk.mountPoint -> disk
}.toMap.asJava).values().asScala.toSeq
- val resourceConsumption = workerInfo.updateThenGetUserResourceConsumption(
- storageManager.userResourceConsumptionSnapshot().asJava)
-
val response = masterClient.askSync[HeartbeatFromWorkerResponse](
HeartbeatFromWorker(
host,
@@ -346,7 +351,7 @@ private[celeborn] class Worker(
fetchPort,
replicatePort,
diskInfos,
- resourceConsumption,
+ handleResourceConsumption(),
activeShuffleKeys,
estimatedAppDiskUsage,
highWorkload),
@@ -486,8 +491,7 @@ private[celeborn] class Worker(
// Use WorkerInfo's diskInfo since re-register when heartbeat
return not-registered,
// StorageManager have update the disk info.
workerInfo.diskInfos.asScala.toMap,
- workerInfo.updateThenGetUserResourceConsumption(
-
storageManager.userResourceConsumptionSnapshot().asJava).asScala.toMap,
+ handleResourceConsumption().asScala.toMap,
MasterClient.genRequestId()),
classOf[PbRegisterWorkerResponse])
} catch {
@@ -511,6 +515,52 @@ private[celeborn] class Worker(
// If worker register still failed after retry, throw exception to stop
worker process
throw new CelebornException("Register worker failed.", exception)
}
+
+ private def handleResourceConsumption(): util.Map[UserIdentifier,
ResourceConsumption] = {
+ val resourceConsumptionSnapshot =
storageManager.userResourceConsumptionSnapshot()
+ resourceConsumptionSnapshot.foreach { resourceConsumption =>
+ {
+ resourceConsumptionSource.addGauge(
+ ResourceConsumptionSource.DISK_FILE_COUNT,
+ resourceConsumption._1.toMap) { () =>
+ computeUserResourceConsumption(resourceConsumption).diskFileCount
+ }
+ resourceConsumptionSource.addGauge(
+ ResourceConsumptionSource.DISK_BYTES_WRITTEN,
+ resourceConsumption._1.toMap) { () =>
+ computeUserResourceConsumption(resourceConsumption).diskBytesWritten
+ }
+ resourceConsumptionSource.addGauge(
+ ResourceConsumptionSource.HDFS_FILE_COUNT,
+ resourceConsumption._1.toMap) { () =>
+ computeUserResourceConsumption(resourceConsumption).hdfsFileCount
+ }
+ resourceConsumptionSource.addGauge(
+ ResourceConsumptionSource.HDFS_BYTES_WRITTEN,
+ resourceConsumption._1.toMap) { () =>
+ computeUserResourceConsumption(resourceConsumption).hdfsBytesWritten
+ }
+ }
+ }
+
workerInfo.updateThenGetUserResourceConsumption(resourceConsumptionSnapshot.asJava)
+ }
+
+ private def computeUserResourceConsumption(userResourceConsumption: (
+ UserIdentifier,
+ ResourceConsumption)): ResourceConsumption = {
+ val userIdentifier = userResourceConsumption._1
+ val resourceConsumption = userResourceConsumption._2
+ val current = System.currentTimeMillis()
+ if (userResourceConsumptions.containsKey(userIdentifier)) {
+ val resourceConsumptionAndUpdateTime =
userResourceConsumptions.get(userIdentifier)
+ if (current - resourceConsumptionAndUpdateTime._2 <=
workerResourceConsumptionInterval) {
+ return resourceConsumptionAndUpdateTime._1
+ }
+ }
+ userResourceConsumptions.put(userIdentifier, (resourceConsumption,
current))
+ resourceConsumption
+ }
+
@VisibleForTesting
def cleanup(expiredShuffleKeys: JHashSet[String]): Unit = synchronized {
expiredShuffleKeys.asScala.foreach { shuffleKey =>
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index df07970ab..f46b0ff90 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -38,7 +38,7 @@ class WorkerSource(conf: CelebornConf) extends
AbstractSource(conf, MetricsSyste
addCounter(REGION_FINISH_FAIL_COUNT)
addCounter(ACTIVE_CONNECTION_COUNT)
- // add Timers
+ // add timers
addTimer(COMMIT_FILES_TIME)
addTimer(RESERVE_SLOTS_TIME)
addTimer(FLUSH_DATA_TIME)
@@ -130,7 +130,7 @@ object WorkerSource {
val USER_PRODUCE_SPEED = "UserProduceSpeed"
val WORKER_CONSUME_SPEED = "WorkerConsumeSpeed"
- // active shuffle size
+ // active shuffle
val ACTIVE_SHUFFLE_SIZE = "ActiveShuffleSize"
val ACTIVE_SHUFFLE_FILE_COUNT = "ActiveShuffleFileCount"
}