This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new 630975bf0 [CELEBORN-1110] Support 
celeborn.worker.storage.disk.reserve.ratio to configure worker reserved ratio 
for each disk
630975bf0 is described below

commit 630975bf043a0216c632739c9bbbcf3421c292f4
Author: SteNicholas <[email protected]>
AuthorDate: Wed Nov 8 12:39:25 2023 +0800

    [CELEBORN-1110] Support celeborn.worker.storage.disk.reserve.ratio to 
configure worker reserved ratio for each disk
    
    ### What changes were proposed in this pull request?
    
    Support `celeborn.worker.storage.disk.reserve.ratio` to configure worker 
reserved ratio for each disk.
    
    ### Why are the changes needed?
    
    `CelebornConf` supports to configure celeborn worker reserved space for 
each disk, which space is absolute. `CelebornConf` could support 
`celeborn.worker.storage.disk.reserve.ratio` to configure worker reserved ratio 
for each disk. The minimum usable size for each disk should be the max space 
between the reserved space and the space calculate via reserved ratio.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    `SlotsAllocatorSuiteJ`
    
    Closes #2071 from SteNicholas/CELEBORN-1110.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: mingji <[email protected]>
    (cherry picked from commit d2582919ad9ba6c0d4f2b9ba8fe44b40dfff9287)
    Signed-off-by: mingji <[email protected]>
---
 .../celeborn/common/util/CollectionUtils.java      |  1 -
 .../org/apache/celeborn/common/CelebornConf.scala  | 11 +++++
 .../apache/celeborn/common/util/DiskUtils.scala    | 54 ++++++++++++++++++++++
 docs/configuration/worker.md                       |  1 +
 .../service/deploy/master/SlotsAllocator.java      | 14 +++++-
 .../celeborn/service/deploy/master/Master.scala    |  2 +
 .../deploy/master/SlotsAllocatorSuiteJ.java        |  2 +
 .../service/deploy/worker/PushDataHandler.scala    | 11 +++--
 .../deploy/worker/storage/DeviceMonitor.scala      |  8 +++-
 9 files changed, 96 insertions(+), 8 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/CollectionUtils.java 
b/common/src/main/java/org/apache/celeborn/common/util/CollectionUtils.java
similarity index 99%
rename from 
common/src/main/scala/org/apache/celeborn/common/util/CollectionUtils.java
rename to 
common/src/main/java/org/apache/celeborn/common/util/CollectionUtils.java
index 53316d56b..399d565d6 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/CollectionUtils.java
+++ b/common/src/main/java/org/apache/celeborn/common/util/CollectionUtils.java
@@ -37,5 +37,4 @@ public class CollectionUtils {
   public static boolean isNotEmpty(Map map) {
     return !isEmpty(map);
   }
-
 }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 54058bb9e..08b8ed242 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -982,6 +982,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def workerDiskTimeSlidingWindowMinFetchCount: Int =
     get(WORKER_DISKTIME_SLIDINGWINDOW_MINFETCHCOUNT)
   def workerDiskReserveSize: Long = get(WORKER_DISK_RESERVE_SIZE)
+  def workerDiskReserveRatio: Option[Double] = get(WORKER_DISK_RESERVE_RATIO)
   def workerDiskCleanThreads: Int = get(WORKER_DISK_CLEAN_THREADS)
   def workerDiskMonitorEnabled: Boolean = get(WORKER_DISK_MONITOR_ENABLED)
   def workerDiskMonitorCheckList: Seq[String] = 
get(WORKER_DISK_MONITOR_CHECKLIST)
@@ -2127,6 +2128,16 @@ object CelebornConf extends Logging {
       .bytesConf(ByteUnit.BYTE)
       .createWithDefaultString("5G")
 
+  val WORKER_DISK_RESERVE_RATIO: OptionalConfigEntry[Double] =
+    buildConf("celeborn.worker.storage.disk.reserve.ratio")
+      .categories("worker")
+      .doc("Celeborn worker reserved ratio for each disk. The minimum usable 
size for each disk is the max space " +
+        "between the reserved space and the space calculate via reserved 
ratio.")
+      .version("0.3.2")
+      .doubleConf
+      .checkValue(v => v > 0.0 && v < 1.0, "Should be in (0.0, 1.0).")
+      .createOptional
+
   val WORKER_DISK_CLEAN_THREADS: ConfigEntry[Int] =
     buildConf("celeborn.worker.disk.clean.threads")
       .categories("worker")
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/DiskUtils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/DiskUtils.scala
new file mode 100644
index 000000000..2e361a985
--- /dev/null
+++ b/common/src/main/scala/org/apache/celeborn/common/util/DiskUtils.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.util
+
+import java.nio.file.{Files, Paths}
+
+import org.apache.celeborn.common.meta.DiskInfo
+
+/**
+ * Disk utilities provide detail of disk info including disk statistics etc.
+ */
+object DiskUtils {
+
+  /**
+   * Gets the minimum usable size for each disk, which size is the max space 
between the reserved space
+   * and the space calculate via reserved ratio.
+   *
+   * @param diskInfo The reserved disk info.
+   * @param diskReserveSize The reserved space for each disk.
+   * @param diskReserveRatio The reserved ratio for each disk.
+   * @return the minimum usable space.
+   */
+  def getMinimumUsableSize(
+      diskInfo: DiskInfo,
+      diskReserveSize: Long,
+      diskReserveRatio: Option[Double]): Long = {
+    var minimumUsableSize = diskReserveSize
+    if (diskReserveRatio.isDefined) {
+      try {
+        val totalSpace = 
Files.getFileStore(Paths.get(diskInfo.mountPoint)).getTotalSpace
+        minimumUsableSize =
+          BigDecimal(totalSpace * 
diskReserveRatio.get).longValue.max(minimumUsableSize)
+      } catch {
+        case _: Exception => // Do nothing
+      }
+    }
+    minimumUsableSize
+  }
+}
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index fd0b8bbbc..1de428529 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -104,6 +104,7 @@ license: |
 | celeborn.worker.storage.checkDirsEmpty.maxRetries | 3 | The number of 
retries for a worker to check if the working directory is cleaned up before 
registering with the master. | 0.3.0 | 
 | celeborn.worker.storage.checkDirsEmpty.timeout | 1000ms | The wait time per 
retry for a worker to check if the working directory is cleaned up before 
registering with the master. | 0.3.0 | 
 | celeborn.worker.storage.dirs | &lt;undefined&gt; | Directory list to store 
shuffle data. It's recommended to configure one directory on each disk. Storage 
size limit can be set for each directory. For the sake of performance, there 
should be no more than 2 flush threads on the same disk partition if you are 
using HDD, and should be 8 or more flush threads on the same disk partition if 
you are using SSD. For example: 
`dir1[:capacity=][:disktype=][:flushthread=],dir2[:capacity=][:disktyp [...]
+| celeborn.worker.storage.disk.reserve.ratio | &lt;undefined&gt; | Celeborn 
worker reserved ratio for each disk. The minimum usable size for each disk is 
the max space between the reserved space and the space calculate via reserved 
ratio. | 0.3.2 | 
 | celeborn.worker.storage.disk.reserve.size | 5G | Celeborn worker reserved 
space for each disk. | 0.3.0 | 
 | celeborn.worker.storage.expireDirs.timeout | 1h | The timeout for a expire 
dirs to be deleted on disk. | 0.3.2 | 
 | celeborn.worker.storage.workingDir | celeborn-worker/shuffle_data | Worker's 
working dir path name. | 0.3.0 | 
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
index 3c83abad6..62d4ea17b 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
@@ -19,6 +19,8 @@ package org.apache.celeborn.service.deploy.master;
 
 import java.util.*;
 
+import scala.Double;
+import scala.Option;
 import scala.Tuple2;
 
 import org.apache.commons.lang3.StringUtils;
@@ -31,6 +33,7 @@ import org.apache.celeborn.common.meta.DiskStatus;
 import org.apache.celeborn.common.meta.WorkerInfo;
 import org.apache.celeborn.common.protocol.PartitionLocation;
 import org.apache.celeborn.common.protocol.StorageInfo;
+import org.apache.celeborn.common.util.DiskUtils;
 
 public class SlotsAllocator {
   static class UsableDiskInfo {
@@ -93,7 +96,8 @@ public class SlotsAllocator {
           List<Integer> partitionIds,
           boolean shouldReplicate,
           boolean shouldRackAware,
-          long minimumUsableSize,
+          long diskReserveSize,
+          Option<Double> diskReserveRatio,
           int diskGroupCount,
           double diskGroupGradient,
           double flushTimeWeight,
@@ -115,7 +119,13 @@ public class SlotsAllocator {
                 .forEach(
                     (key, diskInfo) -> {
                       diskToWorkerMap.put(diskInfo, i);
-                      if (diskInfo.actualUsableSpace() > minimumUsableSize
+                      if (diskInfo.actualUsableSpace()
+                              > DiskUtils.getMinimumUsableSize(
+                                  diskInfo,
+                                  diskReserveSize,
+                                  diskReserveRatio.isEmpty()
+                                      ? Option.empty()
+                                      : Option.apply(diskReserveRatio.get()))
                           && diskInfo.status().equals(DiskStatus.HEALTHY)) {
                         usableDisks.add(diskInfo);
                       }
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 1285c8ff3..547dfa07c 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -119,6 +119,7 @@ private[celeborn] class Master(
     statusSystem.workers.synchronized(new 
util.ArrayList[WorkerInfo](statusSystem.shutdownWorkers))
 
   private def diskReserveSize = conf.workerDiskReserveSize
+  private def diskReserveRatio = conf.workerDiskReserveRatio
 
   private val slotsAssignMaxWorkers = conf.masterSlotAssignMaxWorkers
   private val slotsAssignLoadAwareDiskGroupNum = 
conf.masterSlotAssignLoadAwareDiskGroupNum
@@ -666,6 +667,7 @@ private[celeborn] class Master(
               requestSlots.shouldReplicate,
               requestSlots.shouldRackAware,
               diskReserveSize,
+              diskReserveRatio,
               slotsAssignLoadAwareDiskGroupNum,
               slotsAssignLoadAwareDiskGroupGradient,
               loadAwareFlushTimeWeight,
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
index e82742bab..3300f6de2 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 
+import scala.Option;
 import scala.Tuple2;
 
 import org.junit.Assert;
@@ -237,6 +238,7 @@ public class SlotsAllocatorSuiteJ {
             shouldReplicate,
             false,
             10 * 1024 * 1024 * 1024L,
+            Option.empty(),
             conf.masterSlotAssignLoadAwareDiskGroupNum(),
             conf.masterSlotAssignLoadAwareDiskGroupGradient(),
             conf.masterSlotAssignLoadAwareFlushTimeWeight(),
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index e965fe7f8..a3981b201 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -38,7 +38,7 @@ import 
org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMod
 import org.apache.celeborn.common.protocol.PbPartitionLocation.Mode
 import org.apache.celeborn.common.protocol.message.StatusCode
 import org.apache.celeborn.common.unsafe.Platform
-import org.apache.celeborn.common.util.Utils
+import org.apache.celeborn.common.util.{DiskUtils, Utils}
 import 
org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
 import org.apache.celeborn.service.deploy.worker.storage.{FileWriter, 
HdfsFlusher, LocalFlusher, MapPartitionFileWriter, StorageManager}
 
@@ -54,6 +54,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends 
BaseMessageHandler
   private var registered: AtomicBoolean = _
   private var workerInfo: WorkerInfo = _
   private var diskReserveSize: Long = _
+  private var diskReserveRatio: Option[Double] = _
   private var partitionSplitMinimumSize: Long = _
   private var partitionSplitMaximumSize: Long = _
   private var shutdown: AtomicBoolean = _
@@ -75,6 +76,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends 
BaseMessageHandler
     registered = worker.registered
     workerInfo = worker.workerInfo
     diskReserveSize = worker.conf.workerDiskReserveSize
+    diskReserveRatio = worker.conf.workerDiskReserveRatio
     partitionSplitMinimumSize = worker.conf.partitionSplitMinimumSize
     partitionSplitMaximumSize = worker.conf.partitionSplitMaximumSize
     storageManager = worker.storageManager
@@ -85,7 +87,8 @@ class PushDataHandler(val workerSource: WorkerSource) extends 
BaseMessageHandler
     testPushPrimaryDataTimeout = worker.conf.testPushPrimaryDataTimeout
     testPushReplicaDataTimeout = worker.conf.testPushReplicaDataTimeout
 
-    logInfo(s"diskReserveSize ${Utils.bytesToString(diskReserveSize)}")
+    logInfo(
+      s"diskReserveSize ${Utils.bytesToString(diskReserveSize)}, 
diskReserveRatio ${diskReserveRatio.orNull}")
   }
 
   override def receive(client: TransportClient, msg: RequestMessage): Unit =
@@ -1195,8 +1198,10 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
     val diskInfo = workerInfo.diskInfos
       .get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint)
 
+    val minimumUsableSize =
+      DiskUtils.getMinimumUsableSize(diskInfo, diskReserveSize, 
diskReserveRatio)
     val diskFull = diskInfo.status.equals(
-      DiskStatus.HIGH_DISK_USAGE) || diskInfo.actualUsableSpace < 
diskReserveSize
+      DiskStatus.HIGH_DISK_USAGE) || diskInfo.actualUsableSpace < 
minimumUsableSize
 
     diskFull
   }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
index 4681140b9..d7f5879f1 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
@@ -31,7 +31,7 @@ import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.meta.{DeviceInfo, DiskInfo, DiskStatus}
 import org.apache.celeborn.common.metrics.source.AbstractSource
-import org.apache.celeborn.common.util.{ThreadUtils, Utils}
+import org.apache.celeborn.common.util.{DiskUtils, ThreadUtils, Utils}
 import org.apache.celeborn.common.util.Utils._
 import org.apache.celeborn.service.deploy.worker.WorkerSource
 
@@ -246,8 +246,12 @@ object DeviceMonitor extends Logging {
     tryWithTimeoutAndCallback({
       val usage = getDiskUsageInfos(diskInfo)
       // assume no single device capacity exceeds 1EB in this era
+      val minimumUsableSize = DiskUtils.getMinimumUsableSize(
+        diskInfo,
+        conf.workerDiskReserveSize,
+        conf.workerDiskReserveRatio)
       val highDiskUsage =
-        usage.freeSpace < conf.workerDiskReserveSize || 
diskInfo.actualUsableSpace <= 0
+        usage.freeSpace < minimumUsableSize || diskInfo.actualUsableSpace <= 0
       if (highDiskUsage) {
         logWarning(s"${diskInfo.mountPoint} usage is above threshold." +
           s" Disk usage(Report by 
OS):{total:${Utils.bytesToString(usage.totalSpace)}," +

Reply via email to