This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 6c4f6c7b6 [CELEBORN-1758] Remove the empty user resource consumption
from worker heartbeat
6c4f6c7b6 is described below
commit 6c4f6c7b6c206c38c246539873b764db51a6ff03
Author: Wang, Fei <[email protected]>
AuthorDate: Fri Dec 6 08:20:54 2024 +0800
[CELEBORN-1758] Remove the empty user resource consumption from worker
heartbeat
### What changes were proposed in this pull request?
1. report the `resourceConsumptionSnapshot` from `storageManager` directly
in the worker heartbeat, which does not contain the empty user resource
consumption
2. For RESTful API, do not return the empty user resource consumption as
well.
### Why are the changes needed?
https://github.com/apache/celeborn/blob/878a83cfa7129ab9cb27b83889b1ff1e29b9c3ba/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala#L239-L248
Currently, we never remove the user resource consumption even the sub
resource consumptions is empty, and create a `ResourceConsumption(0, 0, 0, 0)`
instead.
I am afraid that, the worker will report more and more empty user resource
consumption to master, once one of their slots assigned to this worker.
Likes:
<img width="813" alt="image"
src="https://github.com/user-attachments/assets/64932552-dc29-4a43-aed4-557419628b23">
So, I think we just need to report the `resourceConsumptionSnapshot` from
`storageManager` directly, which does not contain the empty user resource
consumption.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GA.
Closes #2967 from turboFei/reduce_report.
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../server/common/http/api/v1/ApiUtils.scala | 32 +++++++++++-----------
.../celeborn/service/deploy/worker/Worker.scala | 11 ++++----
2 files changed, 21 insertions(+), 22 deletions(-)
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiUtils.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiUtils.scala
index d3209027b..799fec86e 100644
---
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiUtils.scala
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiUtils.scala
@@ -63,28 +63,28 @@ object ApiUtils {
: JMap[String, WorkerResourceConsumption] = {
val workerResourceConsumptions = new util.HashMap[String,
WorkerResourceConsumption]()
if (CollectionUtils.isNotEmpty(workerInfo.userResourceConsumption)) {
- workerInfo.userResourceConsumption.asScala.foreach {
+ // filter out user resource consumption with empty sub resource
consumptions
+ workerInfo.userResourceConsumption.asScala.filter(ur =>
+ CollectionUtils.isNotEmpty(ur._2.subResourceConsumptions)).foreach {
case (userIdentifier, resourceConsumption) =>
+ val subConsumptions = new util.HashMap[String,
WorkerResourceConsumption]()
+ resourceConsumption.subResourceConsumptions.asScala.foreach {
+ case (subIdentifier, subConsumption) =>
+ subConsumptions.put(
+ subIdentifier,
+ new WorkerResourceConsumption()
+ .diskBytesWritten(subConsumption.diskBytesWritten)
+ .diskFileCount(subConsumption.diskFileCount)
+ .hdfsBytesWritten(subConsumption.hdfsBytesWritten)
+ .hdfsFileCount(subConsumption.hdfsFileCount))
+ }
+
val workerConsumption = new WorkerResourceConsumption()
.diskBytesWritten(resourceConsumption.diskBytesWritten)
.diskFileCount(resourceConsumption.diskFileCount)
.hdfsBytesWritten(resourceConsumption.hdfsBytesWritten)
.hdfsFileCount(resourceConsumption.hdfsFileCount)
-
- if
(CollectionUtils.isNotEmpty(resourceConsumption.subResourceConsumptions)) {
- val subConsumptions = new util.HashMap[String,
WorkerResourceConsumption]()
- resourceConsumption.subResourceConsumptions.asScala.foreach {
- case (subIdentifier, subConsumption) =>
- subConsumptions.put(
- subIdentifier,
- new WorkerResourceConsumption()
- .diskBytesWritten(subConsumption.diskBytesWritten)
- .diskFileCount(subConsumption.diskFileCount)
- .hdfsBytesWritten(subConsumption.hdfsBytesWritten)
- .hdfsFileCount(subConsumption.hdfsFileCount))
- }
- workerConsumption.subResourceConsumption(subConsumptions)
- }
+ .subResourceConsumption(subConsumptions)
workerResourceConsumptions.put(userIdentifier.toString,
workerConsumption)
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 3439a2e86..de7190404 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -660,14 +660,13 @@ private[celeborn] class Worker(
}
private def handleResourceConsumption(): util.Map[UserIdentifier,
ResourceConsumption] = {
- val resourceConsumptionSnapshot =
storageManager.userResourceConsumptionSnapshot()
- val userResourceConsumptions =
-
workerInfo.updateThenGetUserResourceConsumption(resourceConsumptionSnapshot.asJava)
- resourceConsumptionSnapshot.foreach { case (userIdentifier, _) =>
+ val resourceConsumptionSnapshot =
storageManager.userResourceConsumptionSnapshot().asJava
+
workerInfo.updateThenGetUserResourceConsumption(resourceConsumptionSnapshot)
+ resourceConsumptionSnapshot.asScala.foreach { case (userIdentifier, _) =>
gaugeResourceConsumption(userIdentifier)
}
- handleTopResourceConsumption(userResourceConsumptions)
- userResourceConsumptions
+ handleTopResourceConsumption(resourceConsumptionSnapshot)
+ resourceConsumptionSnapshot
}
def handleTopResourceConsumption(userResourceConsumptions: util.Map[