This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 630975bf0 [CELEBORN-1110] Support
celeborn.worker.storage.disk.reserve.ratio to configure worker reserved ratio
for each disk
630975bf0 is described below
commit 630975bf043a0216c632739c9bbbcf3421c292f4
Author: SteNicholas <[email protected]>
AuthorDate: Wed Nov 8 12:39:25 2023 +0800
[CELEBORN-1110] Support celeborn.worker.storage.disk.reserve.ratio to
configure worker reserved ratio for each disk
### What changes were proposed in this pull request?
Support `celeborn.worker.storage.disk.reserve.ratio` to configure worker
reserved ratio for each disk.
### Why are the changes needed?
`CelebornConf` supports to configure celeborn worker reserved space for
each disk, which space is absolute. `CelebornConf` could support
`celeborn.worker.storage.disk.reserve.ratio` to configure worker reserved ratio
for each disk. The minimum usable size for each disk should be the max space
between the reserved space and the space calculate via reserved ratio.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
`SlotsAllocatorSuiteJ`
Closes #2071 from SteNicholas/CELEBORN-1110.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
(cherry picked from commit d2582919ad9ba6c0d4f2b9ba8fe44b40dfff9287)
Signed-off-by: mingji <[email protected]>
---
.../celeborn/common/util/CollectionUtils.java | 1 -
.../org/apache/celeborn/common/CelebornConf.scala | 11 +++++
.../apache/celeborn/common/util/DiskUtils.scala | 54 ++++++++++++++++++++++
docs/configuration/worker.md | 1 +
.../service/deploy/master/SlotsAllocator.java | 14 +++++-
.../celeborn/service/deploy/master/Master.scala | 2 +
.../deploy/master/SlotsAllocatorSuiteJ.java | 2 +
.../service/deploy/worker/PushDataHandler.scala | 11 +++--
.../deploy/worker/storage/DeviceMonitor.scala | 8 +++-
9 files changed, 96 insertions(+), 8 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/CollectionUtils.java
b/common/src/main/java/org/apache/celeborn/common/util/CollectionUtils.java
similarity index 99%
rename from
common/src/main/scala/org/apache/celeborn/common/util/CollectionUtils.java
rename to
common/src/main/java/org/apache/celeborn/common/util/CollectionUtils.java
index 53316d56b..399d565d6 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/CollectionUtils.java
+++ b/common/src/main/java/org/apache/celeborn/common/util/CollectionUtils.java
@@ -37,5 +37,4 @@ public class CollectionUtils {
public static boolean isNotEmpty(Map map) {
return !isEmpty(map);
}
-
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 54058bb9e..08b8ed242 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -982,6 +982,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def workerDiskTimeSlidingWindowMinFetchCount: Int =
get(WORKER_DISKTIME_SLIDINGWINDOW_MINFETCHCOUNT)
def workerDiskReserveSize: Long = get(WORKER_DISK_RESERVE_SIZE)
+ def workerDiskReserveRatio: Option[Double] = get(WORKER_DISK_RESERVE_RATIO)
def workerDiskCleanThreads: Int = get(WORKER_DISK_CLEAN_THREADS)
def workerDiskMonitorEnabled: Boolean = get(WORKER_DISK_MONITOR_ENABLED)
def workerDiskMonitorCheckList: Seq[String] =
get(WORKER_DISK_MONITOR_CHECKLIST)
@@ -2127,6 +2128,16 @@ object CelebornConf extends Logging {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("5G")
+ val WORKER_DISK_RESERVE_RATIO: OptionalConfigEntry[Double] =
+ buildConf("celeborn.worker.storage.disk.reserve.ratio")
+ .categories("worker")
+ .doc("Celeborn worker reserved ratio for each disk. The minimum usable
size for each disk is the max space " +
+ "between the reserved space and the space calculate via reserved
ratio.")
+ .version("0.3.2")
+ .doubleConf
+ .checkValue(v => v > 0.0 && v < 1.0, "Should be in (0.0, 1.0).")
+ .createOptional
+
val WORKER_DISK_CLEAN_THREADS: ConfigEntry[Int] =
buildConf("celeborn.worker.disk.clean.threads")
.categories("worker")
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/DiskUtils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/DiskUtils.scala
new file mode 100644
index 000000000..2e361a985
--- /dev/null
+++ b/common/src/main/scala/org/apache/celeborn/common/util/DiskUtils.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.util
+
+import java.nio.file.{Files, Paths}
+
+import org.apache.celeborn.common.meta.DiskInfo
+
+/**
+ * Disk utilities provide detail of disk info including disk statistics etc.
+ */
+object DiskUtils {
+
+ /**
+ * Gets the minimum usable size for each disk, which size is the max space
between the reserved space
+ * and the space calculate via reserved ratio.
+ *
+ * @param diskInfo The reserved disk info.
+ * @param diskReserveSize The reserved space for each disk.
+ * @param diskReserveRatio The reserved ratio for each disk.
+ * @return the minimum usable space.
+ */
+ def getMinimumUsableSize(
+ diskInfo: DiskInfo,
+ diskReserveSize: Long,
+ diskReserveRatio: Option[Double]): Long = {
+ var minimumUsableSize = diskReserveSize
+ if (diskReserveRatio.isDefined) {
+ try {
+ val totalSpace =
Files.getFileStore(Paths.get(diskInfo.mountPoint)).getTotalSpace
+ minimumUsableSize =
+ BigDecimal(totalSpace *
diskReserveRatio.get).longValue.max(minimumUsableSize)
+ } catch {
+ case _: Exception => // Do nothing
+ }
+ }
+ minimumUsableSize
+ }
+}
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index fd0b8bbbc..1de428529 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -104,6 +104,7 @@ license: |
| celeborn.worker.storage.checkDirsEmpty.maxRetries | 3 | The number of
retries for a worker to check if the working directory is cleaned up before
registering with the master. | 0.3.0 |
| celeborn.worker.storage.checkDirsEmpty.timeout | 1000ms | The wait time per
retry for a worker to check if the working directory is cleaned up before
registering with the master. | 0.3.0 |
| celeborn.worker.storage.dirs | <undefined> | Directory list to store
shuffle data. It's recommended to configure one directory on each disk. Storage
size limit can be set for each directory. For the sake of performance, there
should be no more than 2 flush threads on the same disk partition if you are
using HDD, and should be 8 or more flush threads on the same disk partition if
you are using SSD. For example:
`dir1[:capacity=][:disktype=][:flushthread=],dir2[:capacity=][:disktyp [...]
+| celeborn.worker.storage.disk.reserve.ratio | <undefined> | Celeborn
worker reserved ratio for each disk. The minimum usable size for each disk is
the max space between the reserved space and the space calculate via reserved
ratio. | 0.3.2 |
| celeborn.worker.storage.disk.reserve.size | 5G | Celeborn worker reserved
space for each disk. | 0.3.0 |
| celeborn.worker.storage.expireDirs.timeout | 1h | The timeout for a expire
dirs to be deleted on disk. | 0.3.2 |
| celeborn.worker.storage.workingDir | celeborn-worker/shuffle_data | Worker's
working dir path name. | 0.3.0 |
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
index 3c83abad6..62d4ea17b 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
@@ -19,6 +19,8 @@ package org.apache.celeborn.service.deploy.master;
import java.util.*;
+import scala.Double;
+import scala.Option;
import scala.Tuple2;
import org.apache.commons.lang3.StringUtils;
@@ -31,6 +33,7 @@ import org.apache.celeborn.common.meta.DiskStatus;
import org.apache.celeborn.common.meta.WorkerInfo;
import org.apache.celeborn.common.protocol.PartitionLocation;
import org.apache.celeborn.common.protocol.StorageInfo;
+import org.apache.celeborn.common.util.DiskUtils;
public class SlotsAllocator {
static class UsableDiskInfo {
@@ -93,7 +96,8 @@ public class SlotsAllocator {
List<Integer> partitionIds,
boolean shouldReplicate,
boolean shouldRackAware,
- long minimumUsableSize,
+ long diskReserveSize,
+ Option<Double> diskReserveRatio,
int diskGroupCount,
double diskGroupGradient,
double flushTimeWeight,
@@ -115,7 +119,13 @@ public class SlotsAllocator {
.forEach(
(key, diskInfo) -> {
diskToWorkerMap.put(diskInfo, i);
- if (diskInfo.actualUsableSpace() > minimumUsableSize
+ if (diskInfo.actualUsableSpace()
+ > DiskUtils.getMinimumUsableSize(
+ diskInfo,
+ diskReserveSize,
+ diskReserveRatio.isEmpty()
+ ? Option.empty()
+ : Option.apply(diskReserveRatio.get()))
&& diskInfo.status().equals(DiskStatus.HEALTHY)) {
usableDisks.add(diskInfo);
}
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 1285c8ff3..547dfa07c 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -119,6 +119,7 @@ private[celeborn] class Master(
statusSystem.workers.synchronized(new
util.ArrayList[WorkerInfo](statusSystem.shutdownWorkers))
private def diskReserveSize = conf.workerDiskReserveSize
+ private def diskReserveRatio = conf.workerDiskReserveRatio
private val slotsAssignMaxWorkers = conf.masterSlotAssignMaxWorkers
private val slotsAssignLoadAwareDiskGroupNum =
conf.masterSlotAssignLoadAwareDiskGroupNum
@@ -666,6 +667,7 @@ private[celeborn] class Master(
requestSlots.shouldReplicate,
requestSlots.shouldRackAware,
diskReserveSize,
+ diskReserveRatio,
slotsAssignLoadAwareDiskGroupNum,
slotsAssignLoadAwareDiskGroupGradient,
loadAwareFlushTimeWeight,
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
index e82742bab..3300f6de2 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
+import scala.Option;
import scala.Tuple2;
import org.junit.Assert;
@@ -237,6 +238,7 @@ public class SlotsAllocatorSuiteJ {
shouldReplicate,
false,
10 * 1024 * 1024 * 1024L,
+ Option.empty(),
conf.masterSlotAssignLoadAwareDiskGroupNum(),
conf.masterSlotAssignLoadAwareDiskGroupGradient(),
conf.masterSlotAssignLoadAwareFlushTimeWeight(),
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index e965fe7f8..a3981b201 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -38,7 +38,7 @@ import
org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMod
import org.apache.celeborn.common.protocol.PbPartitionLocation.Mode
import org.apache.celeborn.common.protocol.message.StatusCode
import org.apache.celeborn.common.unsafe.Platform
-import org.apache.celeborn.common.util.Utils
+import org.apache.celeborn.common.util.{DiskUtils, Utils}
import
org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
import org.apache.celeborn.service.deploy.worker.storage.{FileWriter,
HdfsFlusher, LocalFlusher, MapPartitionFileWriter, StorageManager}
@@ -54,6 +54,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends
BaseMessageHandler
private var registered: AtomicBoolean = _
private var workerInfo: WorkerInfo = _
private var diskReserveSize: Long = _
+ private var diskReserveRatio: Option[Double] = _
private var partitionSplitMinimumSize: Long = _
private var partitionSplitMaximumSize: Long = _
private var shutdown: AtomicBoolean = _
@@ -75,6 +76,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends
BaseMessageHandler
registered = worker.registered
workerInfo = worker.workerInfo
diskReserveSize = worker.conf.workerDiskReserveSize
+ diskReserveRatio = worker.conf.workerDiskReserveRatio
partitionSplitMinimumSize = worker.conf.partitionSplitMinimumSize
partitionSplitMaximumSize = worker.conf.partitionSplitMaximumSize
storageManager = worker.storageManager
@@ -85,7 +87,8 @@ class PushDataHandler(val workerSource: WorkerSource) extends
BaseMessageHandler
testPushPrimaryDataTimeout = worker.conf.testPushPrimaryDataTimeout
testPushReplicaDataTimeout = worker.conf.testPushReplicaDataTimeout
- logInfo(s"diskReserveSize ${Utils.bytesToString(diskReserveSize)}")
+ logInfo(
+ s"diskReserveSize ${Utils.bytesToString(diskReserveSize)},
diskReserveRatio ${diskReserveRatio.orNull}")
}
override def receive(client: TransportClient, msg: RequestMessage): Unit =
@@ -1195,8 +1198,10 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
val diskInfo = workerInfo.diskInfos
.get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint)
+ val minimumUsableSize =
+ DiskUtils.getMinimumUsableSize(diskInfo, diskReserveSize,
diskReserveRatio)
val diskFull = diskInfo.status.equals(
- DiskStatus.HIGH_DISK_USAGE) || diskInfo.actualUsableSpace <
diskReserveSize
+ DiskStatus.HIGH_DISK_USAGE) || diskInfo.actualUsableSpace <
minimumUsableSize
diskFull
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
index 4681140b9..d7f5879f1 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
@@ -31,7 +31,7 @@ import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{DeviceInfo, DiskInfo, DiskStatus}
import org.apache.celeborn.common.metrics.source.AbstractSource
-import org.apache.celeborn.common.util.{ThreadUtils, Utils}
+import org.apache.celeborn.common.util.{DiskUtils, ThreadUtils, Utils}
import org.apache.celeborn.common.util.Utils._
import org.apache.celeborn.service.deploy.worker.WorkerSource
@@ -246,8 +246,12 @@ object DeviceMonitor extends Logging {
tryWithTimeoutAndCallback({
val usage = getDiskUsageInfos(diskInfo)
// assume no single device capacity exceeds 1EB in this era
+ val minimumUsableSize = DiskUtils.getMinimumUsableSize(
+ diskInfo,
+ conf.workerDiskReserveSize,
+ conf.workerDiskReserveRatio)
val highDiskUsage =
- usage.freeSpace < conf.workerDiskReserveSize ||
diskInfo.actualUsableSpace <= 0
+ usage.freeSpace < minimumUsableSize || diskInfo.actualUsableSpace <= 0
if (highDiskUsage) {
logWarning(s"${diskInfo.mountPoint} usage is above threshold." +
s" Disk usage(Report by
OS):{total:${Utils.bytesToString(usage.totalSpace)}," +