This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 351173bac [CELEBORN-1727] Correct the calculation of worker diskInfo 
actualUsableSpace
351173bac is described below

commit 351173bacde5086e9c3f88d30f992ef53c2ecaa5
Author: onebox-li <[email protected]>
AuthorDate: Thu Nov 21 16:17:46 2024 +0800

    [CELEBORN-1727] Correct the calculation of worker diskInfo actualUsableSpace
    
    ### What changes were proposed in this pull request?
    Correct the calculation of worker diskInfo actualUsableSpace.
    Make the expression of the function to get the reserve size clearer. 
(`getMinimumUsableSize` -> `getActualReserveSize`).
    Let deviceMonitor startCheck after the first 
`storageManager.updateDiskInfos()` to avoid disks from being misidentified as 
HIGH_DISK_USAGE.
    Fix PushDataHandler#checkDiskFull judge.
    
    ### Why are the changes needed?
    Make sure worker disk reserve work correctly.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Cluster test and UT.
    
    Closes #2931 from onebox-li/fix-disk-usablespace.
    
    Authored-by: onebox-li <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../apache/celeborn/common/util/DiskUtils.scala    | 22 +++---
 .../tests/spark/CelebornHashCheckDiskSuite.scala   |  5 +-
 .../service/deploy/worker/PushDataHandler.scala    | 17 +----
 .../celeborn/service/deploy/worker/Worker.scala    |  1 +
 .../deploy/worker/storage/DeviceMonitor.scala      | 12 +--
 .../deploy/worker/storage/StorageManager.scala     | 23 +++---
 .../worker/storage/StorageManagerSuite.scala       | 87 ++++++++++++++--------
 7 files changed, 91 insertions(+), 76 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/DiskUtils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/DiskUtils.scala
index 2e361a985..b4972b0f0 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/DiskUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/DiskUtils.scala
@@ -27,28 +27,28 @@ import org.apache.celeborn.common.meta.DiskInfo
 object DiskUtils {
 
   /**
-   * Gets the minimum usable size for each disk, which size is the max space 
between the reserved space
-   * and the space calculate via reserved ratio.
+   * Get the actual size that the disk needs to reserve. It will take the 
larger value between
+   * the configured fixed reserved size and the size calculated by the 
reserved ratio.
    *
-   * @param diskInfo The reserved disk info.
-   * @param diskReserveSize The reserved space for each disk.
-   * @param diskReserveRatio The reserved ratio for each disk.
-   * @return the minimum usable space.
+   * @param diskInfo The disk info being calculated.
+   * @param diskReserveSize The configured fixed reserved size space for the 
disk.
+   * @param diskReserveRatio The configured reserved ratio for the disk.
+   * @return the actual size (in bytes) that the disk needs to reserve.
    */
-  def getMinimumUsableSize(
+  def getActualReserveSize(
       diskInfo: DiskInfo,
       diskReserveSize: Long,
       diskReserveRatio: Option[Double]): Long = {
-    var minimumUsableSize = diskReserveSize
+    var actualReserveSize = diskReserveSize
     if (diskReserveRatio.isDefined) {
       try {
         val totalSpace = 
Files.getFileStore(Paths.get(diskInfo.mountPoint)).getTotalSpace
-        minimumUsableSize =
-          BigDecimal(totalSpace * 
diskReserveRatio.get).longValue.max(minimumUsableSize)
+        actualReserveSize =
+          BigDecimal(totalSpace * 
diskReserveRatio.get).longValue.max(actualReserveSize)
       } catch {
         case _: Exception => // Do nothing
       }
     }
-    minimumUsableSize
+    actualReserveSize
   }
 }
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala
index 5b461013d..eafabbbe6 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala
@@ -60,7 +60,6 @@ class CelebornHashCheckDiskSuite extends SparkTestBase {
     val combineResult = combine(sparkSession)
     val groupByResult = groupBy(sparkSession)
     val repartitionResult = repartition(sparkSession)
-    val sqlResult = runsql(sparkSession)
     sparkSession.stop()
 
     val sparkSessionEnableCeleborn = SparkSession.builder()
@@ -69,16 +68,14 @@ class CelebornHashCheckDiskSuite extends SparkTestBase {
     val celebornCombineResult = combine(sparkSessionEnableCeleborn)
     val celebornGroupByResult = groupBy(sparkSessionEnableCeleborn)
     val celebornRepartitionResult = repartition(sparkSessionEnableCeleborn)
-    val celebornSqlResult = runsql(sparkSessionEnableCeleborn)
 
     assert(combineResult.equals(celebornCombineResult))
     assert(groupByResult.equals(celebornGroupByResult))
     assert(repartitionResult.equals(celebornRepartitionResult))
-    assert(combineResult.equals(celebornCombineResult))
-    assert(sqlResult.equals(celebornSqlResult))
 
     // shuffle key not expired, diskInfo.actualUsableSpace <= 0, no space
     workers.foreach { worker =>
+      worker.storageManager.updateDiskInfos()
       worker.storageManager.disksSnapshot().foreach { diskInfo =>
         assert(diskInfo.actualUsableSpace <= 0)
       }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index b6ddff261..af6d4cc66 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -21,7 +21,6 @@ import java.nio.ByteBuffer
 import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor}
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicIntegerArray}
 
-import scala.collection.JavaConverters._
 import scala.concurrent.{Await, Promise}
 import scala.concurrent.duration.Duration
 import scala.util.{Failure, Success, Try}
@@ -42,7 +41,7 @@ import 
org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMod
 import org.apache.celeborn.common.protocol.PbPartitionLocation.Mode
 import org.apache.celeborn.common.protocol.message.StatusCode
 import org.apache.celeborn.common.unsafe.Platform
-import org.apache.celeborn.common.util.{DiskUtils, ExceptionUtils, Utils}
+import org.apache.celeborn.common.util.{ExceptionUtils, Utils}
 import 
org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
 import org.apache.celeborn.service.deploy.worker.storage.{HdfsFlusher, 
LocalFlusher, MapPartitionDataWriter, PartitionDataWriter, S3Flusher, 
StorageManager}
 import 
org.apache.celeborn.service.deploy.worker.storage.segment.SegmentMapPartitionFileWriter
@@ -58,9 +57,6 @@ class PushDataHandler(val workerSource: WorkerSource) extends 
BaseMessageHandler
   private var replicateClientFactory: TransportClientFactory = _
   private var registered: Option[AtomicBoolean] = None
   private var workerInfo: WorkerInfo = _
-  private var diskReserveSize: Long = _
-  private var diskReserveRatio: Option[Double] = _
-  private var diskUsableSizes: Map[String, Long] = _
   private var partitionSplitMinimumSize: Long = _
   private var partitionSplitMaximumSize: Long = _
   private var shutdown: AtomicBoolean = _
@@ -80,11 +76,6 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
     unavailablePeers = worker.unavailablePeers
     replicateClientFactory = worker.replicateClientFactory
     workerInfo = worker.workerInfo
-    diskReserveSize = worker.conf.workerDiskReserveSize
-    diskReserveRatio = worker.conf.workerDiskReserveRatio
-    diskUsableSizes = workerInfo.diskInfos.asScala.map { case (mountPoint, 
diskInfo) =>
-      (mountPoint, DiskUtils.getMinimumUsableSize(diskInfo, diskReserveSize, 
diskReserveRatio))
-    }.toMap
     partitionSplitMinimumSize = worker.conf.partitionSplitMinimumSize
     partitionSplitMaximumSize = worker.conf.partitionSplitMaximumSize
     storageManager = worker.storageManager
@@ -95,8 +86,6 @@ class PushDataHandler(val workerSource: WorkerSource) extends 
BaseMessageHandler
     testPushPrimaryDataTimeout = worker.conf.testPushPrimaryDataTimeout
     testPushReplicaDataTimeout = worker.conf.testPushReplicaDataTimeout
     registered = Some(worker.registered)
-    logInfo(
-      s"diskReserveSize ${Utils.bytesToString(diskReserveSize)}, 
diskReserveRatio ${diskReserveRatio.orNull}")
   }
 
   override def receive(
@@ -1230,8 +1219,8 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
     }
     val mountPoint = fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint
     val diskInfo = workerInfo.diskInfos.get(mountPoint)
-    val diskFull = diskInfo.status.equals(
-      DiskStatus.HIGH_DISK_USAGE) || diskInfo.actualUsableSpace < 
diskUsableSizes(mountPoint)
+    val diskFull =
+      diskInfo.status.equals(DiskStatus.HIGH_DISK_USAGE) || 
diskInfo.actualUsableSpace <= 0
     diskFull
   }
 
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 0be341912..15f89555a 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -274,6 +274,7 @@ private[celeborn] class Worker(
   assert(replicatePort > 0, "worker replica bind port should be positive")
 
   storageManager.updateDiskInfos()
+  storageManager.startDeviceMonitor()
 
   // WorkerInfo's diskInfos is a reference to storageManager.diskInfos
   val diskInfos = JavaUtils.newConcurrentHashMap[String, DiskInfo]()
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
index 8243f6522..77cd6d44f 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
@@ -254,18 +254,18 @@ object DeviceMonitor extends Logging {
     tryWithTimeoutAndCallback({
       val usage = getDiskUsageInfos(diskInfo)
       // assume no single device capacity exceeds 1EB in this era
-      val minimumUsableSize = DiskUtils.getMinimumUsableSize(
+      val actualReserveSize = DiskUtils.getActualReserveSize(
         diskInfo,
         conf.workerDiskReserveSize,
         conf.workerDiskReserveRatio)
       val highDiskUsage =
-        usage.freeSpace < minimumUsableSize || diskInfo.actualUsableSpace <= 0
+        usage.freeSpace < actualReserveSize || diskInfo.actualUsableSpace <= 0
       if (highDiskUsage) {
         logWarning(s"${diskInfo.mountPoint} usage is above threshold." +
-          s" Disk usage(Report by 
OS):{total:${Utils.bytesToString(usage.totalSpace)}," +
-          s" free:${Utils.bytesToString(usage.freeSpace)}, 
used_percent:${usage.usedPercent}} " +
-          s"usage(Report by Celeborn):{" +
-          s"total:${Utils.bytesToString(diskInfo.configuredUsableSpace)}" +
+          s" Disk usage(Report by OS): 
{total:${Utils.bytesToString(usage.totalSpace)}," +
+          s" free:${Utils.bytesToString(usage.freeSpace)}, 
used_percent:${usage.usedPercent}}," +
+          s" usage(Report by Celeborn): {" +
+          s" total:${Utils.bytesToString(diskInfo.configuredUsableSpace)}," +
           s" free:${Utils.bytesToString(diskInfo.actualUsableSpace)} }")
       }
       highDiskUsage
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 517bcb971..87d6de179 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -160,8 +160,6 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
     (flushers, totalThread)
   }
 
-  deviceMonitor.startCheck()
-
   val hdfsDir = conf.hdfsDir
   val s3Dir = conf.s3Dir
   val hdfsPermission = new FsPermission("755")
@@ -890,15 +888,17 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
           try {
             val (fileSystemReportedUsableSpace, fileSystemReportedTotalSpace) =
               getFileSystemReportedSpace(diskInfo.mountPoint)
+            val actualReserveSize =
+              DiskUtils.getActualReserveSize(diskInfo, diskReserveSize, 
diskReserveRatio)
             val workingDirUsableSpace =
-              Math.min(diskInfo.configuredUsableSpace - totalUsage, 
fileSystemReportedUsableSpace)
-            val minimumReserveSize =
-              DiskUtils.getMinimumUsableSize(diskInfo, diskReserveSize, 
diskReserveRatio)
-            val usableSpace = Math.max(workingDirUsableSpace - 
minimumReserveSize, 0)
+              Math.min(
+                diskInfo.configuredUsableSpace - totalUsage,
+                fileSystemReportedUsableSpace - actualReserveSize)
+            val usableSpace = Math.max(workingDirUsableSpace, 0)
             logDebug(
-              s"Update diskInfo:${diskInfo.mountPoint} 
workingDirUsableSpace:$workingDirUsableSpace 
fileMeta:$fileSystemReportedUsableSpace" +
-                s"conf:${diskInfo.configuredUsableSpace} 
totalUsage:$totalUsage totalSpace:$fileSystemReportedTotalSpace" +
-                s"minimumReserveSize:$minimumReserveSize 
usableSpace:$usableSpace")
+              s"Update diskInfo:${diskInfo.mountPoint} 
workingDirUsableSpace:$workingDirUsableSpace 
fileMeta:$fileSystemReportedUsableSpace " +
+                s"configuredUsableSpace:${diskInfo.configuredUsableSpace} 
totalUsage:$totalUsage totalSpace:$fileSystemReportedTotalSpace " +
+                s"actualReserveSize:$actualReserveSize 
usableSpace:$usableSpace")
             diskInfo.setUsableSpace(usableSpace)
             diskInfo.setTotalSpace(fileSystemReportedTotalSpace)
           } catch {
@@ -1170,6 +1170,11 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
     }
     throw exception
   }
+
+  def startDeviceMonitor(): Unit = {
+    deviceMonitor.startCheck()
+  }
+
 }
 
 object StorageManager {
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala
index b85697552..d393df083 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala
@@ -23,9 +23,9 @@ import org.mockito.stubbing.Stubber
 
 import org.apache.celeborn.CelebornFunSuite
 import org.apache.celeborn.common.CelebornConf
-import 
org.apache.celeborn.common.CelebornConf.{WORKER_GRACEFUL_SHUTDOWN_ENABLED, 
WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH}
+import org.apache.celeborn.common.CelebornConf.{WORKER_DISK_RESERVE_SIZE, 
WORKER_GRACEFUL_SHUTDOWN_ENABLED, WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH}
 import org.apache.celeborn.common.meta.DiskInfo
-import org.apache.celeborn.common.util.DiskUtils
+import org.apache.celeborn.common.util.Utils
 import org.apache.celeborn.service.deploy.worker.WorkerSource
 
 trait MockitoHelper extends MockitoSugar {
@@ -36,8 +36,6 @@ trait MockitoHelper extends MockitoSugar {
 
 class StorageManagerSuite extends CelebornFunSuite with MockitoHelper {
 
-  val conf = new CelebornConf()
-
   test("[CELEBORN-926] saveAllCommittedFileInfosToDB cause 
IllegalMonitorStateException") {
     val conf = new CelebornConf().set(WORKER_GRACEFUL_SHUTDOWN_ENABLED, true)
       .set(WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH, "/tmp/recover")
@@ -47,42 +45,67 @@ class StorageManagerSuite extends CelebornFunSuite with 
MockitoHelper {
   }
 
   test("updateDiskInfosWithDiskReserveSize") {
+    // reserve size set to 5g
+    val conf = new CelebornConf().set(WORKER_DISK_RESERVE_SIZE, 
Utils.byteStringAsBytes("5g"))
     val storageManager = new StorageManager(conf, new WorkerSource(conf))
     val spyStorageManager = spy(storageManager)
 
-    val disks = prepareDisks()
-    val diskSetSpace = (80 * 1024 * 1024 * 1024L, 80 * 1024 * 1024 * 1024L)
-    doReturn(disks).when(spyStorageManager).disksSnapshot()
-    
doReturn(diskSetSpace).when(spyStorageManager).getFileSystemReportedSpace(any)
+    val diskInfo = new DiskInfo("/mnt/disk1", List.empty, null, conf)
+    diskInfo.setUsableSpace(-1L)
+
+    var diskSetSpace = (0L, 0L)
+    doReturn(List(diskInfo)).when(spyStorageManager).disksSnapshot()
+    
doAnswer(diskSetSpace).when(spyStorageManager).getFileSystemReportedSpace(any)
+
+    // disk usable 80g, total 80g, worker config 8EB
+    diskSetSpace = (80 * 1024 * 1024 * 1024L, 80 * 1024 * 1024 * 1024L)
+    diskInfo.configuredUsableSpace = Long.MaxValue
     spyStorageManager.updateDiskInfos()
-    for (disk <- disks) {
-      val minimumReserveSize =
-        DiskUtils.getMinimumUsableSize(
-          disk,
-          conf.workerDiskReserveSize,
-          conf.workerDiskReserveRatio)
-      assert(disk.actualUsableSpace == diskSetSpace._1 - minimumReserveSize)
-    }
-  }
+    assert(diskInfo.actualUsableSpace == 75 * 1024 * 1024 * 1024L)
 
-  def prepareDisks(): List[DiskInfo] = {
-    val diskSetSpaces = Array(
-      90L * 1024 * 1024 * 1024,
-      95L * 1024 * 1024 * 1024,
-      100L * 1024 * 1024 * 1024)
+    // disk usable 80g, total 80g, worker config 50g
+    diskInfo.configuredUsableSpace = 50 * 1024 * 1024 * 1024L
+    diskInfo.setUsableSpace(-1L)
+    spyStorageManager.updateDiskInfos()
+    assert(diskInfo.actualUsableSpace == 50 * 1024 * 1024 * 1024L)
 
-    val diskInfo1 = new DiskInfo("/mnt/disk1", List.empty, null, conf)
-    diskInfo1.configuredUsableSpace = (Long.MaxValue)
-    diskInfo1.setUsableSpace(diskSetSpaces(0))
+    // disk usable 10g, total 80g, worker config 20g
+    diskSetSpace = (10 * 1024 * 1024 * 1024L, 80 * 1024 * 1024 * 1024L)
+    diskInfo.configuredUsableSpace = 20 * 1024 * 1024 * 1024L
+    diskInfo.setUsableSpace(-1L)
+    spyStorageManager.updateDiskInfos()
+    assert(diskInfo.actualUsableSpace == 5 * 1024 * 1024 * 1024L)
 
-    val diskInfo2 = new DiskInfo("/mnt/disk2", List.empty, null, conf)
-    diskInfo2.configuredUsableSpace = (Long.MaxValue)
-    diskInfo2.setUsableSpace(diskSetSpaces(1))
+    // disk usable 10g, total 80g, worker config 5g
+    diskInfo.configuredUsableSpace = 5 * 1024 * 1024 * 1024L
+    diskInfo.setUsableSpace(-1L)
+    spyStorageManager.updateDiskInfos()
+    assert(diskInfo.actualUsableSpace == 5 * 1024 * 1024 * 1024L)
 
-    val diskInfo3 = new DiskInfo("/mnt/disk3", List.empty, null, conf)
-    diskInfo3.configuredUsableSpace = (Long.MaxValue)
-    diskInfo3.setUsableSpace(diskSetSpaces(2))
+    // disk usable 5g, total 80g, worker config 20g
+    diskSetSpace = (5 * 1024 * 1024 * 1024L, 80 * 1024 * 1024 * 1024L)
+    diskInfo.configuredUsableSpace = 20 * 1024 * 1024 * 1024L
+    diskInfo.setUsableSpace(-1L)
+    spyStorageManager.updateDiskInfos()
+    assert(diskInfo.actualUsableSpace == 0L)
+
+    // disk usable 5g, total 80g, worker config 5g
+    diskInfo.configuredUsableSpace = 5 * 1024 * 1024 * 1024L
+    diskInfo.setUsableSpace(-1L)
+    spyStorageManager.updateDiskInfos()
+    assert(diskInfo.actualUsableSpace == 0L)
 
-    List(diskInfo1, diskInfo2, diskInfo3)
+    // disk usable 1g, total 80g, worker config 20g
+    diskSetSpace = (1 * 1024 * 1024 * 1024L, 80 * 1024 * 1024 * 1024L)
+    diskInfo.configuredUsableSpace = 20 * 1024 * 1024 * 1024L
+    diskInfo.setUsableSpace(-1L)
+    spyStorageManager.updateDiskInfos()
+    assert(diskInfo.actualUsableSpace == 0L)
+
+    // disk usable 1g, total 80g, worker config 5g
+    diskInfo.configuredUsableSpace = 5 * 1024 * 1024 * 1024L
+    diskInfo.setUsableSpace(-1L)
+    spyStorageManager.updateDiskInfos()
+    assert(diskInfo.actualUsableSpace == 0L)
   }
 }

Reply via email to