This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 6c4f6c7b6 [CELEBORN-1758] Remove the empty user resource consumption 
from worker heartbeat
6c4f6c7b6 is described below

commit 6c4f6c7b6c206c38c246539873b764db51a6ff03
Author: Wang, Fei <[email protected]>
AuthorDate: Fri Dec 6 08:20:54 2024 +0800

    [CELEBORN-1758] Remove the empty user resource consumption from worker 
heartbeat
    
    ### What changes were proposed in this pull request?
    
    1. report the  `resourceConsumptionSnapshot` from `storageManager` directly 
in the worker heartbeat, which does not contain the empty user resource 
consumption
    2. For RESTful API, do not return the empty user resource consumption as 
well.
    
    ### Why are the changes needed?
    
https://github.com/apache/celeborn/blob/878a83cfa7129ab9cb27b83889b1ff1e29b9c3ba/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala#L239-L248
    
    Currently, we never remove the user resource consumption even the sub 
resource consumptions is empty, and create a `ResourceConsumption(0, 0, 0, 0)` 
instead.
    
    I am afraid that, the worker will report more and more empty user resource 
consumption to master, once one of their slots assigned to this worker.
    
    Likes:
    <img width="813" alt="image" 
src="https://github.com/user-attachments/assets/64932552-dc29-4a43-aed4-557419628b23";>
    
    So, I think we just need to report the `resourceConsumptionSnapshot` from 
`storageManager` directly, which does not contain the empty user resource 
consumption.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    GA.
    
    Closes #2967 from turboFei/reduce_report.
    
    Authored-by: Wang, Fei <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 .../server/common/http/api/v1/ApiUtils.scala       | 32 +++++++++++-----------
 .../celeborn/service/deploy/worker/Worker.scala    | 11 ++++----
 2 files changed, 21 insertions(+), 22 deletions(-)

diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiUtils.scala
 
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiUtils.scala
index d3209027b..799fec86e 100644
--- 
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiUtils.scala
+++ 
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiUtils.scala
@@ -63,28 +63,28 @@ object ApiUtils {
       : JMap[String, WorkerResourceConsumption] = {
     val workerResourceConsumptions = new util.HashMap[String, 
WorkerResourceConsumption]()
     if (CollectionUtils.isNotEmpty(workerInfo.userResourceConsumption)) {
-      workerInfo.userResourceConsumption.asScala.foreach {
+      // filter out user resource consumption with empty sub resource 
consumptions
+      workerInfo.userResourceConsumption.asScala.filter(ur =>
+        CollectionUtils.isNotEmpty(ur._2.subResourceConsumptions)).foreach {
         case (userIdentifier, resourceConsumption) =>
+          val subConsumptions = new util.HashMap[String, 
WorkerResourceConsumption]()
+          resourceConsumption.subResourceConsumptions.asScala.foreach {
+            case (subIdentifier, subConsumption) =>
+              subConsumptions.put(
+                subIdentifier,
+                new WorkerResourceConsumption()
+                  .diskBytesWritten(subConsumption.diskBytesWritten)
+                  .diskFileCount(subConsumption.diskFileCount)
+                  .hdfsBytesWritten(subConsumption.hdfsBytesWritten)
+                  .hdfsFileCount(subConsumption.hdfsFileCount))
+          }
+
           val workerConsumption = new WorkerResourceConsumption()
             .diskBytesWritten(resourceConsumption.diskBytesWritten)
             .diskFileCount(resourceConsumption.diskFileCount)
             .hdfsBytesWritten(resourceConsumption.hdfsBytesWritten)
             .hdfsFileCount(resourceConsumption.hdfsFileCount)
-
-          if 
(CollectionUtils.isNotEmpty(resourceConsumption.subResourceConsumptions)) {
-            val subConsumptions = new util.HashMap[String, 
WorkerResourceConsumption]()
-            resourceConsumption.subResourceConsumptions.asScala.foreach {
-              case (subIdentifier, subConsumption) =>
-                subConsumptions.put(
-                  subIdentifier,
-                  new WorkerResourceConsumption()
-                    .diskBytesWritten(subConsumption.diskBytesWritten)
-                    .diskFileCount(subConsumption.diskFileCount)
-                    .hdfsBytesWritten(subConsumption.hdfsBytesWritten)
-                    .hdfsFileCount(subConsumption.hdfsFileCount))
-            }
-            workerConsumption.subResourceConsumption(subConsumptions)
-          }
+            .subResourceConsumption(subConsumptions)
 
           workerResourceConsumptions.put(userIdentifier.toString, 
workerConsumption)
       }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 3439a2e86..de7190404 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -660,14 +660,13 @@ private[celeborn] class Worker(
   }
 
   private def handleResourceConsumption(): util.Map[UserIdentifier, 
ResourceConsumption] = {
-    val resourceConsumptionSnapshot = 
storageManager.userResourceConsumptionSnapshot()
-    val userResourceConsumptions =
-      
workerInfo.updateThenGetUserResourceConsumption(resourceConsumptionSnapshot.asJava)
-    resourceConsumptionSnapshot.foreach { case (userIdentifier, _) =>
+    val resourceConsumptionSnapshot = 
storageManager.userResourceConsumptionSnapshot().asJava
+    
workerInfo.updateThenGetUserResourceConsumption(resourceConsumptionSnapshot)
+    resourceConsumptionSnapshot.asScala.foreach { case (userIdentifier, _) =>
       gaugeResourceConsumption(userIdentifier)
     }
-    handleTopResourceConsumption(userResourceConsumptions)
-    userResourceConsumptions
+    handleTopResourceConsumption(resourceConsumptionSnapshot)
+    resourceConsumptionSnapshot
   }
 
   def handleTopResourceConsumption(userResourceConsumptions: util.Map[

Reply via email to