This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.4 by this push:
     new e5afe91dc [CELEBORN-1252] Fix resource consumption of worker does not 
update when update interval is greater than heartbeat interval
e5afe91dc is described below

commit e5afe91dc85b17e4ca93134ac3d03c2ecb91855d
Author: Angerszhuuuu <[email protected]>
AuthorDate: Thu Jan 25 20:28:19 2024 +0800

    [CELEBORN-1252] Fix resource consumption of worker does not update when 
update interval is greater than heartbeat interval
    
    ### What changes were proposed in this pull request?
    
     Resource consumption of worker does not update when update interval of 
resource consumpution is greater than heartbeat interval.
    
    <img width="1741" alt="截屏2024-01-24 14 49 50" 
src="https://github.com/apache/incubator-celeborn/assets/46485123/21cfd412-c69e-4955-8bc8-155ee470697d";>
    
    This pull request introduces below changes:
    
    1. Avoid master repeat add gauge for same user
    2. For worker, user resource consumption can directly get from worker's 
snapshot, didn't need update interval
    
    ### Why are the changes needed?
    
    No.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    No.
    
    Closes #2260 from AngersZhuuuu/CELEBORN-1252.
    
    Authored-by: Angerszhuuuu <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../org/apache/celeborn/common/CelebornConf.scala  |  9 ---
 .../apache/celeborn/common/meta/WorkerInfo.scala   |  9 ++-
 docs/configuration/worker.md                       |  1 -
 docs/migration.md                                  |  2 +
 .../celeborn/service/deploy/worker/Worker.scala    | 69 +++++++---------------
 5 files changed, 29 insertions(+), 61 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 51266165d..711355bf4 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -543,7 +543,6 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def estimatedPartitionSizeForEstimationUpdateInterval: Long =
     get(ESTIMATED_PARTITION_SIZE_UPDATE_INTERVAL)
   def masterResourceConsumptionInterval: Long = 
get(MASTER_RESOURCE_CONSUMPTION_INTERVAL)
-  def workerResourceConsumptionInterval: Long = 
get(WORKER_RESOURCE_CONSUMPTION_INTERVAL)
 
   // //////////////////////////////////////////////////////
   //               Address && HA && RATIS                //
@@ -2177,14 +2176,6 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("30s")
 
-  val WORKER_RESOURCE_CONSUMPTION_INTERVAL: ConfigEntry[Long] =
-    buildConf("celeborn.worker.userResourceConsumption.update.interval")
-      .categories("worker")
-      .doc("Time length for a window about compute user resource consumption.")
-      .version("0.3.2")
-      .timeConf(TimeUnit.MILLISECONDS)
-      .createWithDefaultString("30s")
-
   val SHUFFLE_CHUNK_SIZE: ConfigEntry[Long] =
     buildConf("celeborn.shuffle.chunk.size")
       .categories("worker")
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala 
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
index 20561eebf..683a19557 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
@@ -188,11 +188,14 @@ class WorkerInfo(
     JavaUtils.newConcurrentHashMap[String, DiskInfo](diskInfos)
   }
 
-  def updateThenGetUserResourceConsumption(consumption: util.Map[
+  def updateThenGetUserResourceConsumption(resourceConsumptions: util.Map[
     UserIdentifier,
     ResourceConsumption]): util.Map[UserIdentifier, ResourceConsumption] = {
-    userResourceConsumption.clear()
-    userResourceConsumption.putAll(consumption)
+    
userResourceConsumption.keys().asScala.filterNot(resourceConsumptions.containsKey).foreach
 {
+      identifier =>
+        userResourceConsumption.put(identifier, ResourceConsumption(0, 0, 0, 
0))
+    }
+    userResourceConsumption.putAll(resourceConsumptions)
     userResourceConsumption
   }
 
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 24340be0b..1f7f48a24 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -122,7 +122,6 @@ license: |
 | celeborn.worker.storage.disk.reserve.size | 5G | Celeborn worker reserved 
space for each disk. | 0.3.0 | celeborn.worker.disk.reserve.size | 
 | celeborn.worker.storage.expireDirs.timeout | 1h | The timeout for a expire 
dirs to be deleted on disk. | 0.3.2 |  | 
 | celeborn.worker.storage.workingDir | celeborn-worker/shuffle_data | Worker's 
working dir path name. | 0.3.0 | celeborn.worker.workingDir | 
-| celeborn.worker.userResourceConsumption.update.interval | 30s | Time length 
for a window about compute user resource consumption. | 0.3.2 |  | 
 | celeborn.worker.writer.close.timeout | 120s | Timeout for a file writer to 
close | 0.2.0 |  | 
 | celeborn.worker.writer.create.maxAttempts | 3 | Retry count for a file 
writer to create if its creation was failed. | 0.2.0 |  | 
 <!--end-include-->
diff --git a/docs/migration.md b/docs/migration.md
index 538b7ada0..ae8715a76 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -44,6 +44,8 @@ license: |
 
 - Since 0.4.0, Celeborn deprecate `celeborn.storage.activeTypes`. Please use 
`celeborn.storage.availableTypes` instead.
 
+- Since 0.4.0, Celeborn worker removes configuration 
`celeborn.worker.userResourceConsumption.update.interval`.
+
 - Since 0.4.0, Celeborn master metrics `PartitionWritten` is renamed as 
`ActiveShuffleSize`.
 
 - Since 0.4.0, Celeborn master metrics `PartitionFileCount` is renamed as 
`ActiveShuffleFileCount`.
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index c438ff8fe..b237f21ee 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -213,11 +213,6 @@ private[celeborn] class Worker(
     diskInfos.put(diskInfo.mountPoint, diskInfo)
   }
 
-  // need to ensure storageManager has recovered fileinfos data if enable 
graceful shutdown before retrieve consumption
-  val userResourceConsumption: ConcurrentHashMap[UserIdentifier, 
ResourceConsumption] =
-    JavaUtils.newConcurrentHashMap[UserIdentifier, ResourceConsumption](
-      storageManager.userResourceConsumptionSnapshot().asJava)
-
   val workerInfo =
     new WorkerInfo(
       host,
@@ -226,7 +221,7 @@ private[celeborn] class Worker(
       fetchPort,
       replicatePort,
       diskInfos,
-      userResourceConsumption)
+      JavaUtils.newConcurrentHashMap[UserIdentifier, ResourceConsumption])
 
   // whether this Worker registered to Master successfully
   val registered = new AtomicBoolean(false)
@@ -273,10 +268,6 @@ private[celeborn] class Worker(
   var cleaner: ExecutorService =
     ThreadUtils.newDaemonSingleThreadExecutor("worker-expired-shuffle-cleaner")
 
-  private val workerResourceConsumptionInterval = 
conf.workerResourceConsumptionInterval
-  private val userResourceConsumptions =
-    JavaUtils.newConcurrentHashMap[UserIdentifier, (ResourceConsumption, 
Long)]()
-
   private var jvmQuake: JVMQuake = _
   if (conf.workerJvmQuakeEnabled) {
     jvmQuake = JVMQuake.create(conf, workerInfo.toUniqueId().replace(":", "-"))
@@ -534,47 +525,29 @@ private[celeborn] class Worker(
 
   private def handleResourceConsumption(): util.Map[UserIdentifier, 
ResourceConsumption] = {
     val resourceConsumptionSnapshot = 
storageManager.userResourceConsumptionSnapshot()
-    resourceConsumptionSnapshot.foreach { resourceConsumption =>
-      {
-        resourceConsumptionSource.addGauge(
-          ResourceConsumptionSource.DISK_FILE_COUNT,
-          resourceConsumption._1.toMap) { () =>
-          computeUserResourceConsumption(resourceConsumption).diskFileCount
-        }
-        resourceConsumptionSource.addGauge(
-          ResourceConsumptionSource.DISK_BYTES_WRITTEN,
-          resourceConsumption._1.toMap) { () =>
-          computeUserResourceConsumption(resourceConsumption).diskBytesWritten
-        }
-        resourceConsumptionSource.addGauge(
-          ResourceConsumptionSource.HDFS_FILE_COUNT,
-          resourceConsumption._1.toMap) { () =>
-          computeUserResourceConsumption(resourceConsumption).hdfsFileCount
-        }
-        resourceConsumptionSource.addGauge(
-          ResourceConsumptionSource.HDFS_BYTES_WRITTEN,
-          resourceConsumption._1.toMap) { () =>
-          computeUserResourceConsumption(resourceConsumption).hdfsBytesWritten
-        }
+    resourceConsumptionSnapshot.foreach { case (userIdentifier, _) =>
+      resourceConsumptionSource.addGauge(
+        ResourceConsumptionSource.DISK_FILE_COUNT,
+        userIdentifier.toMap) { () =>
+        workerInfo.userResourceConsumption.get(userIdentifier).diskFileCount
       }
-    }
-    
workerInfo.updateThenGetUserResourceConsumption(resourceConsumptionSnapshot.asJava)
-  }
-
-  private def computeUserResourceConsumption(userResourceConsumption: (
-      UserIdentifier,
-      ResourceConsumption)): ResourceConsumption = {
-    val userIdentifier = userResourceConsumption._1
-    val resourceConsumption = userResourceConsumption._2
-    val current = System.currentTimeMillis()
-    if (userResourceConsumptions.containsKey(userIdentifier)) {
-      val resourceConsumptionAndUpdateTime = 
userResourceConsumptions.get(userIdentifier)
-      if (current - resourceConsumptionAndUpdateTime._2 <= 
workerResourceConsumptionInterval) {
-        return resourceConsumptionAndUpdateTime._1
+      resourceConsumptionSource.addGauge(
+        ResourceConsumptionSource.DISK_BYTES_WRITTEN,
+        userIdentifier.toMap) { () =>
+        workerInfo.userResourceConsumption.get(userIdentifier).diskBytesWritten
+      }
+      resourceConsumptionSource.addGauge(
+        ResourceConsumptionSource.HDFS_FILE_COUNT,
+        userIdentifier.toMap) { () =>
+        workerInfo.userResourceConsumption.get(userIdentifier).hdfsFileCount
+      }
+      resourceConsumptionSource.addGauge(
+        ResourceConsumptionSource.HDFS_BYTES_WRITTEN,
+        userIdentifier.toMap) { () =>
+        workerInfo.userResourceConsumption.get(userIdentifier).hdfsBytesWritten
       }
     }
-    userResourceConsumptions.put(userIdentifier, (resourceConsumption, 
current))
-    resourceConsumption
+    
workerInfo.updateThenGetUserResourceConsumption(resourceConsumptionSnapshot.asJava)
   }
 
   @VisibleForTesting

Reply via email to