This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.5 by this push:
     new 44468ff32 [CELEBORN-1564] Fix actualUsableSpace of offerSlotsLoadAware 
condition on diskInfo
44468ff32 is described below

commit 44468ff32fa7d1e5e997f4bc346951c733701258
Author: szt <[email protected]>
AuthorDate: Mon Aug 26 14:17:55 2024 +0800

    [CELEBORN-1564] Fix actualUsableSpace of offerSlotsLoadAware condition on 
diskInfo
    
    fix offerSlotsLoadAware's actualUsableSpace condition on diskInfo,
    considering diskReserveSize when updateDiskInfos in StorageManager,
    so master don't need to calculate usableSpace when offerSlots.
    
    No
    
    UT
    
    Closes #2688 from zaynt4606/main.
    
    Authored-by: szt <[email protected]>
    Signed-off-by: Shuang <[email protected]>
    (cherry picked from commit 9f0af3456a321d2b49909232eef87ca776fab1f6)
    Signed-off-by: Shuang <[email protected]>
---
 .../service/deploy/master/SlotsAllocator.java      | 13 +----
 .../celeborn/service/deploy/master/Master.scala    |  2 -
 .../deploy/master/SlotsAllocatorSuiteJ.java        |  5 --
 .../tests/spark/CelebornHashCheckDiskSuite.scala   |  7 +--
 .../deploy/worker/storage/StorageManager.scala     | 29 ++++++++---
 .../worker/storage/StorageManagerSuite.scala       | 56 +++++++++++++++++++++-
 6 files changed, 82 insertions(+), 30 deletions(-)

diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
index ced728b2f..ef42c9992 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
@@ -21,8 +21,6 @@ import java.util.*;
 import java.util.function.IntUnaryOperator;
 import java.util.stream.Collectors;
 
-import scala.Double;
-import scala.Option;
 import scala.Tuple2;
 
 import org.apache.commons.lang3.StringUtils;
@@ -35,7 +33,6 @@ import org.apache.celeborn.common.meta.DiskStatus;
 import org.apache.celeborn.common.meta.WorkerInfo;
 import org.apache.celeborn.common.protocol.PartitionLocation;
 import org.apache.celeborn.common.protocol.StorageInfo;
-import org.apache.celeborn.common.util.DiskUtils;
 
 public class SlotsAllocator {
   static class UsableDiskInfo {
@@ -106,8 +103,6 @@ public class SlotsAllocator {
           List<Integer> partitionIds,
           boolean shouldReplicate,
           boolean shouldRackAware,
-          long diskReserveSize,
-          Option<Double> diskReserveRatio,
           int diskGroupCount,
           double diskGroupGradient,
           double flushTimeWeight,
@@ -133,13 +128,7 @@ public class SlotsAllocator {
                 .forEach(
                     (key, diskInfo) -> {
                       diskToWorkerMap.put(diskInfo, i);
-                      if (diskInfo.actualUsableSpace()
-                              > DiskUtils.getMinimumUsableSize(
-                                  diskInfo,
-                                  diskReserveSize,
-                                  diskReserveRatio.isEmpty()
-                                      ? Option.empty()
-                                      : Option.apply(diskReserveRatio.get()))
+                      if (diskInfo.actualUsableSpace() > 0
                           && diskInfo.status().equals(DiskStatus.HEALTHY)
                           && diskInfo.storageType() != StorageInfo.Type.HDFS) {
                         usableDisks.add(diskInfo);
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 1623de5c3..0a4a5d5a6 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -838,8 +838,6 @@ private[celeborn] class Master(
               requestSlots.partitionIdList,
               requestSlots.shouldReplicate,
               requestSlots.shouldRackAware,
-              diskReserveSize,
-              diskReserveRatio,
               slotsAssignLoadAwareDiskGroupNum,
               slotsAssignLoadAwareDiskGroupGradient,
               loadAwareFlushTimeWeight,
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
index 4bd04eeb1..d06ec867b 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertTrue;
 
 import java.util.*;
 
-import scala.Option;
 import scala.Tuple2;
 
 import org.junit.Test;
@@ -232,8 +231,6 @@ public class SlotsAllocatorSuiteJ {
             partitionIds,
             shouldReplicate,
             false,
-            10 * 1024 * 1024 * 1024L,
-            Option.empty(),
             conf.masterSlotAssignLoadAwareDiskGroupNum(),
             conf.masterSlotAssignLoadAwareDiskGroupGradient(),
             conf.masterSlotAssignLoadAwareFlushTimeWeight(),
@@ -307,8 +304,6 @@ public class SlotsAllocatorSuiteJ {
               partitionIds,
               shouldReplicate,
               false,
-              1000_000_000,
-              Option.empty(),
               3,
               0.1,
               0,
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala
index 9fe49bfd4..5b461013d 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala
@@ -38,7 +38,8 @@ class CelebornHashCheckDiskSuite extends SparkTestBase {
       CelebornConf.APPLICATION_HEARTBEAT_TIMEOUT.key -> "10s")
     val workerConf = Map(
       CelebornConf.WORKER_STORAGE_DIRS.key -> "/tmp:capacity=1000",
-      CelebornConf.WORKER_HEARTBEAT_TIMEOUT.key -> "10s")
+      CelebornConf.WORKER_HEARTBEAT_TIMEOUT.key -> "10s",
+      CelebornConf.WORKER_DISK_RESERVE_SIZE.key -> "0G")
     workers = setupMiniClusterWithRandomPorts(masterConf, workerConf)._2
   }
 
@@ -76,10 +77,10 @@ class CelebornHashCheckDiskSuite extends SparkTestBase {
     assert(combineResult.equals(celebornCombineResult))
     assert(sqlResult.equals(celebornSqlResult))
 
-    // shuffle key not expired, diskInfo.actualUsableSpace < 0, no space
+    // shuffle key not expired, diskInfo.actualUsableSpace <= 0, no space
     workers.foreach { worker =>
       worker.storageManager.disksSnapshot().foreach { diskInfo =>
-        assert(diskInfo.actualUsableSpace < 0)
+        assert(diskInfo.actualUsableSpace <= 0)
       }
     }
     sparkSessionEnableCeleborn.stop()
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 27423d01a..13bda7b43 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -43,7 +43,7 @@ import 
org.apache.celeborn.common.metrics.source.{AbstractSource, ThreadPoolSour
 import org.apache.celeborn.common.network.util.{NettyUtils, TransportConf}
 import org.apache.celeborn.common.protocol.{PartitionLocation, 
PartitionSplitMode, PartitionType, StorageInfo}
 import org.apache.celeborn.common.quota.ResourceConsumption
-import org.apache.celeborn.common.util.{CelebornExitKind, CelebornHadoopUtils, 
JavaUtils, PbSerDeUtils, ThreadUtils, Utils}
+import org.apache.celeborn.common.util.{CelebornExitKind, CelebornHadoopUtils, 
DiskUtils, JavaUtils, PbSerDeUtils, ThreadUtils, Utils}
 import org.apache.celeborn.service.deploy.worker._
 import org.apache.celeborn.service.deploy.worker.memory.MemoryManager
 import 
org.apache.celeborn.service.deploy.worker.memory.MemoryManager.MemoryPressureListener
@@ -69,6 +69,9 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
 
   val storageExpireDirTimeout = conf.workerStorageExpireDirTimeout
 
+  val diskReserveSize = conf.workerDiskReserveSize
+  val diskReserveRatio = conf.workerDiskReserveRatio
+
   // (deviceName -> deviceInfo) and (mount point -> diskInfo)
   val (deviceInfos, diskInfos) = {
     val workingDirInfos =
@@ -791,20 +794,32 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
           0
         }
       }.sum
-      val fileStore = Files.getFileStore(Paths.get(diskInfo.mountPoint))
-      val fileSystemReportedUsableSpace = fileStore.getUsableSpace
+
+      val (fileSystemReportedUsableSpace, fileSystemReportedTotalSpace) =
+        getFileSystemReportedSpace(diskInfo.mountPoint)
       val workingDirUsableSpace =
         Math.min(diskInfo.configuredUsableSpace - totalUsage, 
fileSystemReportedUsableSpace)
-      val totalSpace = fileStore.getTotalSpace
-      logDebug(s"updateDiskInfos workingDirUsableSpace:$workingDirUsableSpace 
filemeta:$fileSystemReportedUsableSpace conf:${diskInfo.configuredUsableSpace} 
totalUsage:$totalUsage totalSpace: $totalSpace")
-      diskInfo.setUsableSpace(workingDirUsableSpace)
-      diskInfo.setTotalSpace(totalSpace)
+      val minimumReserveSize =
+        DiskUtils.getMinimumUsableSize(diskInfo, diskReserveSize, 
diskReserveRatio)
+      val usableSpace = Math.max(workingDirUsableSpace - minimumReserveSize, 0)
+      logDebug(s"updateDiskInfos workingDirUsableSpace:$workingDirUsableSpace 
filemeta:$fileSystemReportedUsableSpace" +
+        s"conf:${diskInfo.configuredUsableSpace} totalUsage:$totalUsage 
totalSpace:$fileSystemReportedTotalSpace" +
+        s"minimumReserveSize:$minimumReserveSize usableSpace:$usableSpace")
+      diskInfo.setUsableSpace(usableSpace)
+      diskInfo.setTotalSpace(fileSystemReportedTotalSpace)
       diskInfo.updateFlushTime()
       diskInfo.updateFetchTime()
     }
     logInfo(s"Updated diskInfos:\n${disksSnapshot().mkString("\n")}")
   }
 
+  def getFileSystemReportedSpace(mountPoint: String): (Long, Long) = {
+    val fileStore = Files.getFileStore(Paths.get(mountPoint))
+    val fileSystemReportedUsableSpace = fileStore.getUsableSpace
+    val fileSystemReportedTotalSpace = fileStore.getTotalSpace
+    (fileSystemReportedUsableSpace, fileSystemReportedTotalSpace)
+  }
+
   def userResourceConsumptionSnapshot(): Map[UserIdentifier, 
ResourceConsumption] = {
     diskFileInfos.synchronized {
       // shuffleId -> (fileName -> fileInfo)
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala
index 5b747e9b1..b85697552 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala
@@ -17,12 +17,26 @@
 
 package org.apache.celeborn.service.deploy.worker.storage
 
+import org.mockito.{Mockito, MockitoSugar}
+import org.mockito.ArgumentMatchersSugar.any
+import org.mockito.stubbing.Stubber
+
 import org.apache.celeborn.CelebornFunSuite
 import org.apache.celeborn.common.CelebornConf
 import 
org.apache.celeborn.common.CelebornConf.{WORKER_GRACEFUL_SHUTDOWN_ENABLED, 
WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH}
+import org.apache.celeborn.common.meta.DiskInfo
+import org.apache.celeborn.common.util.DiskUtils
 import org.apache.celeborn.service.deploy.worker.WorkerSource
 
-class StorageManagerSuite extends CelebornFunSuite {
+trait MockitoHelper extends MockitoSugar {
+  def doReturn(toBeReturned: Any): Stubber = {
+    Mockito.doReturn(toBeReturned, Nil: _*)
+  }
+}
+
+class StorageManagerSuite extends CelebornFunSuite with MockitoHelper {
+
+  val conf = new CelebornConf()
 
   test("[CELEBORN-926] saveAllCommittedFileInfosToDB cause 
IllegalMonitorStateException") {
     val conf = new CelebornConf().set(WORKER_GRACEFUL_SHUTDOWN_ENABLED, true)
@@ -31,4 +45,44 @@ class StorageManagerSuite extends CelebornFunSuite {
     // should not throw IllegalMonitorStateException exception
     storageManager.saveAllCommittedFileInfosToDB()
   }
+
+  test("updateDiskInfosWithDiskReserveSize") {
+    val storageManager = new StorageManager(conf, new WorkerSource(conf))
+    val spyStorageManager = spy(storageManager)
+
+    val disks = prepareDisks()
+    val diskSetSpace = (80 * 1024 * 1024 * 1024L, 80 * 1024 * 1024 * 1024L)
+    doReturn(disks).when(spyStorageManager).disksSnapshot()
+    
doReturn(diskSetSpace).when(spyStorageManager).getFileSystemReportedSpace(any)
+    spyStorageManager.updateDiskInfos()
+    for (disk <- disks) {
+      val minimumReserveSize =
+        DiskUtils.getMinimumUsableSize(
+          disk,
+          conf.workerDiskReserveSize,
+          conf.workerDiskReserveRatio)
+      assert(disk.actualUsableSpace == diskSetSpace._1 - minimumReserveSize)
+    }
+  }
+
+  def prepareDisks(): List[DiskInfo] = {
+    val diskSetSpaces = Array(
+      90L * 1024 * 1024 * 1024,
+      95L * 1024 * 1024 * 1024,
+      100L * 1024 * 1024 * 1024)
+
+    val diskInfo1 = new DiskInfo("/mnt/disk1", List.empty, null, conf)
+    diskInfo1.configuredUsableSpace = (Long.MaxValue)
+    diskInfo1.setUsableSpace(diskSetSpaces(0))
+
+    val diskInfo2 = new DiskInfo("/mnt/disk2", List.empty, null, conf)
+    diskInfo2.configuredUsableSpace = (Long.MaxValue)
+    diskInfo2.setUsableSpace(diskSetSpaces(1))
+
+    val diskInfo3 = new DiskInfo("/mnt/disk3", List.empty, null, conf)
+    diskInfo3.configuredUsableSpace = (Long.MaxValue)
+    diskInfo3.setUsableSpace(diskSetSpaces(2))
+
+    List(diskInfo1, diskInfo2, diskInfo3)
+  }
 }

Reply via email to