This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.4 by this push:
new e5afe91dc [CELEBORN-1252] Fix resource consumption of worker does not
update when update interval is greater than heartbeat interval
e5afe91dc is described below
commit e5afe91dc85b17e4ca93134ac3d03c2ecb91855d
Author: Angerszhuuuu <[email protected]>
AuthorDate: Thu Jan 25 20:28:19 2024 +0800
[CELEBORN-1252] Fix resource consumption of worker does not update when
update interval is greater than heartbeat interval
### What changes were proposed in this pull request?
Resource consumption of worker does not update when update interval of
resource consumpution is greater than heartbeat interval.
<img width="1741" alt="截屏2024-01-24 14 49 50"
src="https://github.com/apache/incubator-celeborn/assets/46485123/21cfd412-c69e-4955-8bc8-155ee470697d">
This pull request introduces below changes:
1. Avoid master repeat add gauge for same user
2. For worker, user resource consumption can directly get from worker's
snapshot, didn't need update interval
### Why are the changes needed?
No.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No.
Closes #2260 from AngersZhuuuu/CELEBORN-1252.
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 9 ---
.../apache/celeborn/common/meta/WorkerInfo.scala | 9 ++-
docs/configuration/worker.md | 1 -
docs/migration.md | 2 +
.../celeborn/service/deploy/worker/Worker.scala | 69 +++++++---------------
5 files changed, 29 insertions(+), 61 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 51266165d..711355bf4 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -543,7 +543,6 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def estimatedPartitionSizeForEstimationUpdateInterval: Long =
get(ESTIMATED_PARTITION_SIZE_UPDATE_INTERVAL)
def masterResourceConsumptionInterval: Long =
get(MASTER_RESOURCE_CONSUMPTION_INTERVAL)
- def workerResourceConsumptionInterval: Long =
get(WORKER_RESOURCE_CONSUMPTION_INTERVAL)
// //////////////////////////////////////////////////////
// Address && HA && RATIS //
@@ -2177,14 +2176,6 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30s")
- val WORKER_RESOURCE_CONSUMPTION_INTERVAL: ConfigEntry[Long] =
- buildConf("celeborn.worker.userResourceConsumption.update.interval")
- .categories("worker")
- .doc("Time length for a window about compute user resource consumption.")
- .version("0.3.2")
- .timeConf(TimeUnit.MILLISECONDS)
- .createWithDefaultString("30s")
-
val SHUFFLE_CHUNK_SIZE: ConfigEntry[Long] =
buildConf("celeborn.shuffle.chunk.size")
.categories("worker")
diff --git
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
index 20561eebf..683a19557 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
@@ -188,11 +188,14 @@ class WorkerInfo(
JavaUtils.newConcurrentHashMap[String, DiskInfo](diskInfos)
}
- def updateThenGetUserResourceConsumption(consumption: util.Map[
+ def updateThenGetUserResourceConsumption(resourceConsumptions: util.Map[
UserIdentifier,
ResourceConsumption]): util.Map[UserIdentifier, ResourceConsumption] = {
- userResourceConsumption.clear()
- userResourceConsumption.putAll(consumption)
+
userResourceConsumption.keys().asScala.filterNot(resourceConsumptions.containsKey).foreach
{
+ identifier =>
+ userResourceConsumption.put(identifier, ResourceConsumption(0, 0, 0,
0))
+ }
+ userResourceConsumption.putAll(resourceConsumptions)
userResourceConsumption
}
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 24340be0b..1f7f48a24 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -122,7 +122,6 @@ license: |
| celeborn.worker.storage.disk.reserve.size | 5G | Celeborn worker reserved
space for each disk. | 0.3.0 | celeborn.worker.disk.reserve.size |
| celeborn.worker.storage.expireDirs.timeout | 1h | The timeout for a expire
dirs to be deleted on disk. | 0.3.2 | |
| celeborn.worker.storage.workingDir | celeborn-worker/shuffle_data | Worker's
working dir path name. | 0.3.0 | celeborn.worker.workingDir |
-| celeborn.worker.userResourceConsumption.update.interval | 30s | Time length
for a window about compute user resource consumption. | 0.3.2 | |
| celeborn.worker.writer.close.timeout | 120s | Timeout for a file writer to
close | 0.2.0 | |
| celeborn.worker.writer.create.maxAttempts | 3 | Retry count for a file
writer to create if its creation was failed. | 0.2.0 | |
<!--end-include-->
diff --git a/docs/migration.md b/docs/migration.md
index 538b7ada0..ae8715a76 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -44,6 +44,8 @@ license: |
- Since 0.4.0, Celeborn deprecate `celeborn.storage.activeTypes`. Please use
`celeborn.storage.availableTypes` instead.
+- Since 0.4.0, Celeborn worker removes configuration
`celeborn.worker.userResourceConsumption.update.interval`.
+
- Since 0.4.0, Celeborn master metrics `PartitionWritten` is renamed as
`ActiveShuffleSize`.
- Since 0.4.0, Celeborn master metrics `PartitionFileCount` is renamed as
`ActiveShuffleFileCount`.
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index c438ff8fe..b237f21ee 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -213,11 +213,6 @@ private[celeborn] class Worker(
diskInfos.put(diskInfo.mountPoint, diskInfo)
}
- // need to ensure storageManager has recovered fileinfos data if enable
graceful shutdown before retrieve consumption
- val userResourceConsumption: ConcurrentHashMap[UserIdentifier,
ResourceConsumption] =
- JavaUtils.newConcurrentHashMap[UserIdentifier, ResourceConsumption](
- storageManager.userResourceConsumptionSnapshot().asJava)
-
val workerInfo =
new WorkerInfo(
host,
@@ -226,7 +221,7 @@ private[celeborn] class Worker(
fetchPort,
replicatePort,
diskInfos,
- userResourceConsumption)
+ JavaUtils.newConcurrentHashMap[UserIdentifier, ResourceConsumption])
// whether this Worker registered to Master successfully
val registered = new AtomicBoolean(false)
@@ -273,10 +268,6 @@ private[celeborn] class Worker(
var cleaner: ExecutorService =
ThreadUtils.newDaemonSingleThreadExecutor("worker-expired-shuffle-cleaner")
- private val workerResourceConsumptionInterval =
conf.workerResourceConsumptionInterval
- private val userResourceConsumptions =
- JavaUtils.newConcurrentHashMap[UserIdentifier, (ResourceConsumption,
Long)]()
-
private var jvmQuake: JVMQuake = _
if (conf.workerJvmQuakeEnabled) {
jvmQuake = JVMQuake.create(conf, workerInfo.toUniqueId().replace(":", "-"))
@@ -534,47 +525,29 @@ private[celeborn] class Worker(
private def handleResourceConsumption(): util.Map[UserIdentifier,
ResourceConsumption] = {
val resourceConsumptionSnapshot =
storageManager.userResourceConsumptionSnapshot()
- resourceConsumptionSnapshot.foreach { resourceConsumption =>
- {
- resourceConsumptionSource.addGauge(
- ResourceConsumptionSource.DISK_FILE_COUNT,
- resourceConsumption._1.toMap) { () =>
- computeUserResourceConsumption(resourceConsumption).diskFileCount
- }
- resourceConsumptionSource.addGauge(
- ResourceConsumptionSource.DISK_BYTES_WRITTEN,
- resourceConsumption._1.toMap) { () =>
- computeUserResourceConsumption(resourceConsumption).diskBytesWritten
- }
- resourceConsumptionSource.addGauge(
- ResourceConsumptionSource.HDFS_FILE_COUNT,
- resourceConsumption._1.toMap) { () =>
- computeUserResourceConsumption(resourceConsumption).hdfsFileCount
- }
- resourceConsumptionSource.addGauge(
- ResourceConsumptionSource.HDFS_BYTES_WRITTEN,
- resourceConsumption._1.toMap) { () =>
- computeUserResourceConsumption(resourceConsumption).hdfsBytesWritten
- }
+ resourceConsumptionSnapshot.foreach { case (userIdentifier, _) =>
+ resourceConsumptionSource.addGauge(
+ ResourceConsumptionSource.DISK_FILE_COUNT,
+ userIdentifier.toMap) { () =>
+ workerInfo.userResourceConsumption.get(userIdentifier).diskFileCount
}
- }
-
workerInfo.updateThenGetUserResourceConsumption(resourceConsumptionSnapshot.asJava)
- }
-
- private def computeUserResourceConsumption(userResourceConsumption: (
- UserIdentifier,
- ResourceConsumption)): ResourceConsumption = {
- val userIdentifier = userResourceConsumption._1
- val resourceConsumption = userResourceConsumption._2
- val current = System.currentTimeMillis()
- if (userResourceConsumptions.containsKey(userIdentifier)) {
- val resourceConsumptionAndUpdateTime =
userResourceConsumptions.get(userIdentifier)
- if (current - resourceConsumptionAndUpdateTime._2 <=
workerResourceConsumptionInterval) {
- return resourceConsumptionAndUpdateTime._1
+ resourceConsumptionSource.addGauge(
+ ResourceConsumptionSource.DISK_BYTES_WRITTEN,
+ userIdentifier.toMap) { () =>
+ workerInfo.userResourceConsumption.get(userIdentifier).diskBytesWritten
+ }
+ resourceConsumptionSource.addGauge(
+ ResourceConsumptionSource.HDFS_FILE_COUNT,
+ userIdentifier.toMap) { () =>
+ workerInfo.userResourceConsumption.get(userIdentifier).hdfsFileCount
+ }
+ resourceConsumptionSource.addGauge(
+ ResourceConsumptionSource.HDFS_BYTES_WRITTEN,
+ userIdentifier.toMap) { () =>
+ workerInfo.userResourceConsumption.get(userIdentifier).hdfsBytesWritten
}
}
- userResourceConsumptions.put(userIdentifier, (resourceConsumption,
current))
- resourceConsumption
+
workerInfo.updateThenGetUserResourceConsumption(resourceConsumptionSnapshot.asJava)
}
@VisibleForTesting