This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 3c6e4697f [CELEBORN-1110][FOLLOWUP] Support
celeborn.worker.storage.disk.reserve.ratio to configure worker reserved ratio
for each disk
3c6e4697f is described below
commit 3c6e4697f3270298ea3dc2549f2979be0854fbc4
Author: SteNicholas <[email protected]>
AuthorDate: Fri Nov 10 18:11:39 2023 +0800
[CELEBORN-1110][FOLLOWUP] Support
celeborn.worker.storage.disk.reserve.ratio to configure worker reserved ratio
for each disk
### What changes were proposed in this pull request?
Followup support `celeborn.worker.storage.disk.reserve.ratio` with
`minimumUsableSize` cache in a variable instead of calculate for every pushdata.
### Why are the changes needed?
Cache `minimumUsableSize` in a variable instead of calculate for every
pushdata because `DiskUtils.getMinimumUsableSize` is costly.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
`SlotsAllocatorSuiteJ`
Closes #2083 from SteNicholas/CELEBORN-1110.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../celeborn/service/deploy/worker/PushDataHandler.scala | 16 +++++++++-------
1 file changed, 9 insertions(+), 7 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index d8f039ac1..d7b111eb3 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -21,6 +21,8 @@ import java.nio.ByteBuffer
import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicIntegerArray}
+import scala.collection.JavaConverters._
+
import com.google.common.base.Throwables
import com.google.protobuf.GeneratedMessageV3
import io.netty.buffer.ByteBuf
@@ -55,6 +57,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends
BaseMessageHandler
private var workerInfo: WorkerInfo = _
private var diskReserveSize: Long = _
private var diskReserveRatio: Option[Double] = _
+ private var diskUsableSizes: Map[String, Long] = _
private var partitionSplitMinimumSize: Long = _
private var partitionSplitMaximumSize: Long = _
private var shutdown: AtomicBoolean = _
@@ -77,6 +80,9 @@ class PushDataHandler(val workerSource: WorkerSource) extends
BaseMessageHandler
workerInfo = worker.workerInfo
diskReserveSize = worker.conf.workerDiskReserveSize
diskReserveRatio = worker.conf.workerDiskReserveRatio
+ diskUsableSizes = workerInfo.diskInfos.asScala.map { case (mountPoint,
diskInfo) =>
+ (mountPoint, DiskUtils.getMinimumUsableSize(diskInfo, diskReserveSize,
diskReserveRatio))
+ }.toMap
partitionSplitMinimumSize = worker.conf.partitionSplitMinimumSize
partitionSplitMaximumSize = worker.conf.partitionSplitMaximumSize
storageManager = worker.storageManager
@@ -1195,14 +1201,10 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
if (fileWriter.flusher.isInstanceOf[HdfsFlusher]) {
return false
}
- val diskInfo = workerInfo.diskInfos
- .get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint)
-
- val minimumUsableSize =
- DiskUtils.getMinimumUsableSize(diskInfo, diskReserveSize,
diskReserveRatio)
+ val mountPoint = fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint
+ val diskInfo = workerInfo.diskInfos.get(mountPoint)
val diskFull = diskInfo.status.equals(
- DiskStatus.HIGH_DISK_USAGE) || diskInfo.actualUsableSpace <
minimumUsableSize
-
+ DiskStatus.HIGH_DISK_USAGE) || diskInfo.actualUsableSpace <
diskUsableSizes(mountPoint)
diskFull
}