This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 3c6e4697f [CELEBORN-1110][FOLLOWUP] Support 
celeborn.worker.storage.disk.reserve.ratio to configure worker reserved ratio 
for each disk
3c6e4697f is described below

commit 3c6e4697f3270298ea3dc2549f2979be0854fbc4
Author: SteNicholas <[email protected]>
AuthorDate: Fri Nov 10 18:11:39 2023 +0800

    [CELEBORN-1110][FOLLOWUP] Support 
celeborn.worker.storage.disk.reserve.ratio to configure worker reserved ratio 
for each disk
    
    ### What changes were proposed in this pull request?
    
    Followup support `celeborn.worker.storage.disk.reserve.ratio` with 
`minimumUsableSize` cache in a variable instead of calculate for every pushdata.
    
    ### Why are the changes needed?
    
    Cache `minimumUsableSize` in a variable instead of calculate for every 
pushdata because `DiskUtils.getMinimumUsableSize` is costly.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    `SlotsAllocatorSuiteJ`
    
    Closes #2083 from SteNicholas/CELEBORN-1110.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../celeborn/service/deploy/worker/PushDataHandler.scala | 16 +++++++++-------
 1 file changed, 9 insertions(+), 7 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index d8f039ac1..d7b111eb3 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -21,6 +21,8 @@ import java.nio.ByteBuffer
 import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor}
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicIntegerArray}
 
+import scala.collection.JavaConverters._
+
 import com.google.common.base.Throwables
 import com.google.protobuf.GeneratedMessageV3
 import io.netty.buffer.ByteBuf
@@ -55,6 +57,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends 
BaseMessageHandler
   private var workerInfo: WorkerInfo = _
   private var diskReserveSize: Long = _
   private var diskReserveRatio: Option[Double] = _
+  private var diskUsableSizes: Map[String, Long] = _
   private var partitionSplitMinimumSize: Long = _
   private var partitionSplitMaximumSize: Long = _
   private var shutdown: AtomicBoolean = _
@@ -77,6 +80,9 @@ class PushDataHandler(val workerSource: WorkerSource) extends 
BaseMessageHandler
     workerInfo = worker.workerInfo
     diskReserveSize = worker.conf.workerDiskReserveSize
     diskReserveRatio = worker.conf.workerDiskReserveRatio
+    diskUsableSizes = workerInfo.diskInfos.asScala.map { case (mountPoint, 
diskInfo) =>
+      (mountPoint, DiskUtils.getMinimumUsableSize(diskInfo, diskReserveSize, 
diskReserveRatio))
+    }.toMap
     partitionSplitMinimumSize = worker.conf.partitionSplitMinimumSize
     partitionSplitMaximumSize = worker.conf.partitionSplitMaximumSize
     storageManager = worker.storageManager
@@ -1195,14 +1201,10 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
     if (fileWriter.flusher.isInstanceOf[HdfsFlusher]) {
       return false
     }
-    val diskInfo = workerInfo.diskInfos
-      .get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint)
-
-    val minimumUsableSize =
-      DiskUtils.getMinimumUsableSize(diskInfo, diskReserveSize, 
diskReserveRatio)
+    val mountPoint = fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint
+    val diskInfo = workerInfo.diskInfos.get(mountPoint)
     val diskFull = diskInfo.status.equals(
-      DiskStatus.HIGH_DISK_USAGE) || diskInfo.actualUsableSpace < 
minimumUsableSize
-
+      DiskStatus.HIGH_DISK_USAGE) || diskInfo.actualUsableSpace < 
diskUsableSizes(mountPoint)
     diskFull
   }
 

Reply via email to