This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.5 by this push:
new 44468ff32 [CELEBORN-1564] Fix actualUsableSpace of offerSlotsLoadAware
condition on diskInfo
44468ff32 is described below
commit 44468ff32fa7d1e5e997f4bc346951c733701258
Author: szt <[email protected]>
AuthorDate: Mon Aug 26 14:17:55 2024 +0800
[CELEBORN-1564] Fix actualUsableSpace of offerSlotsLoadAware condition on
diskInfo
fix offerSlotsLoadAware's actualUsableSpace condition on diskInfo,
considering diskReserveSize when updateDiskInfos in StorageManager,
so master don't need to calculate usableSpace when offerSlots.
No
UT
Closes #2688 from zaynt4606/main.
Authored-by: szt <[email protected]>
Signed-off-by: Shuang <[email protected]>
(cherry picked from commit 9f0af3456a321d2b49909232eef87ca776fab1f6)
Signed-off-by: Shuang <[email protected]>
---
.../service/deploy/master/SlotsAllocator.java | 13 +----
.../celeborn/service/deploy/master/Master.scala | 2 -
.../deploy/master/SlotsAllocatorSuiteJ.java | 5 --
.../tests/spark/CelebornHashCheckDiskSuite.scala | 7 +--
.../deploy/worker/storage/StorageManager.scala | 29 ++++++++---
.../worker/storage/StorageManagerSuite.scala | 56 +++++++++++++++++++++-
6 files changed, 82 insertions(+), 30 deletions(-)
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
index ced728b2f..ef42c9992 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
@@ -21,8 +21,6 @@ import java.util.*;
import java.util.function.IntUnaryOperator;
import java.util.stream.Collectors;
-import scala.Double;
-import scala.Option;
import scala.Tuple2;
import org.apache.commons.lang3.StringUtils;
@@ -35,7 +33,6 @@ import org.apache.celeborn.common.meta.DiskStatus;
import org.apache.celeborn.common.meta.WorkerInfo;
import org.apache.celeborn.common.protocol.PartitionLocation;
import org.apache.celeborn.common.protocol.StorageInfo;
-import org.apache.celeborn.common.util.DiskUtils;
public class SlotsAllocator {
static class UsableDiskInfo {
@@ -106,8 +103,6 @@ public class SlotsAllocator {
List<Integer> partitionIds,
boolean shouldReplicate,
boolean shouldRackAware,
- long diskReserveSize,
- Option<Double> diskReserveRatio,
int diskGroupCount,
double diskGroupGradient,
double flushTimeWeight,
@@ -133,13 +128,7 @@ public class SlotsAllocator {
.forEach(
(key, diskInfo) -> {
diskToWorkerMap.put(diskInfo, i);
- if (diskInfo.actualUsableSpace()
- > DiskUtils.getMinimumUsableSize(
- diskInfo,
- diskReserveSize,
- diskReserveRatio.isEmpty()
- ? Option.empty()
- : Option.apply(diskReserveRatio.get()))
+ if (diskInfo.actualUsableSpace() > 0
&& diskInfo.status().equals(DiskStatus.HEALTHY)
&& diskInfo.storageType() != StorageInfo.Type.HDFS) {
usableDisks.add(diskInfo);
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 1623de5c3..0a4a5d5a6 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -838,8 +838,6 @@ private[celeborn] class Master(
requestSlots.partitionIdList,
requestSlots.shouldReplicate,
requestSlots.shouldRackAware,
- diskReserveSize,
- diskReserveRatio,
slotsAssignLoadAwareDiskGroupNum,
slotsAssignLoadAwareDiskGroupGradient,
loadAwareFlushTimeWeight,
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
index 4bd04eeb1..d06ec867b 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertTrue;
import java.util.*;
-import scala.Option;
import scala.Tuple2;
import org.junit.Test;
@@ -232,8 +231,6 @@ public class SlotsAllocatorSuiteJ {
partitionIds,
shouldReplicate,
false,
- 10 * 1024 * 1024 * 1024L,
- Option.empty(),
conf.masterSlotAssignLoadAwareDiskGroupNum(),
conf.masterSlotAssignLoadAwareDiskGroupGradient(),
conf.masterSlotAssignLoadAwareFlushTimeWeight(),
@@ -307,8 +304,6 @@ public class SlotsAllocatorSuiteJ {
partitionIds,
shouldReplicate,
false,
- 1000_000_000,
- Option.empty(),
3,
0.1,
0,
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala
index 9fe49bfd4..5b461013d 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala
@@ -38,7 +38,8 @@ class CelebornHashCheckDiskSuite extends SparkTestBase {
CelebornConf.APPLICATION_HEARTBEAT_TIMEOUT.key -> "10s")
val workerConf = Map(
CelebornConf.WORKER_STORAGE_DIRS.key -> "/tmp:capacity=1000",
- CelebornConf.WORKER_HEARTBEAT_TIMEOUT.key -> "10s")
+ CelebornConf.WORKER_HEARTBEAT_TIMEOUT.key -> "10s",
+ CelebornConf.WORKER_DISK_RESERVE_SIZE.key -> "0G")
workers = setupMiniClusterWithRandomPorts(masterConf, workerConf)._2
}
@@ -76,10 +77,10 @@ class CelebornHashCheckDiskSuite extends SparkTestBase {
assert(combineResult.equals(celebornCombineResult))
assert(sqlResult.equals(celebornSqlResult))
- // shuffle key not expired, diskInfo.actualUsableSpace < 0, no space
+ // shuffle key not expired, diskInfo.actualUsableSpace <= 0, no space
workers.foreach { worker =>
worker.storageManager.disksSnapshot().foreach { diskInfo =>
- assert(diskInfo.actualUsableSpace < 0)
+ assert(diskInfo.actualUsableSpace <= 0)
}
}
sparkSessionEnableCeleborn.stop()
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 27423d01a..13bda7b43 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -43,7 +43,7 @@ import
org.apache.celeborn.common.metrics.source.{AbstractSource, ThreadPoolSour
import org.apache.celeborn.common.network.util.{NettyUtils, TransportConf}
import org.apache.celeborn.common.protocol.{PartitionLocation,
PartitionSplitMode, PartitionType, StorageInfo}
import org.apache.celeborn.common.quota.ResourceConsumption
-import org.apache.celeborn.common.util.{CelebornExitKind, CelebornHadoopUtils,
JavaUtils, PbSerDeUtils, ThreadUtils, Utils}
+import org.apache.celeborn.common.util.{CelebornExitKind, CelebornHadoopUtils,
DiskUtils, JavaUtils, PbSerDeUtils, ThreadUtils, Utils}
import org.apache.celeborn.service.deploy.worker._
import org.apache.celeborn.service.deploy.worker.memory.MemoryManager
import
org.apache.celeborn.service.deploy.worker.memory.MemoryManager.MemoryPressureListener
@@ -69,6 +69,9 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
val storageExpireDirTimeout = conf.workerStorageExpireDirTimeout
+ val diskReserveSize = conf.workerDiskReserveSize
+ val diskReserveRatio = conf.workerDiskReserveRatio
+
// (deviceName -> deviceInfo) and (mount point -> diskInfo)
val (deviceInfos, diskInfos) = {
val workingDirInfos =
@@ -791,20 +794,32 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
0
}
}.sum
- val fileStore = Files.getFileStore(Paths.get(diskInfo.mountPoint))
- val fileSystemReportedUsableSpace = fileStore.getUsableSpace
+
+ val (fileSystemReportedUsableSpace, fileSystemReportedTotalSpace) =
+ getFileSystemReportedSpace(diskInfo.mountPoint)
val workingDirUsableSpace =
Math.min(diskInfo.configuredUsableSpace - totalUsage,
fileSystemReportedUsableSpace)
- val totalSpace = fileStore.getTotalSpace
- logDebug(s"updateDiskInfos workingDirUsableSpace:$workingDirUsableSpace
filemeta:$fileSystemReportedUsableSpace conf:${diskInfo.configuredUsableSpace}
totalUsage:$totalUsage totalSpace: $totalSpace")
- diskInfo.setUsableSpace(workingDirUsableSpace)
- diskInfo.setTotalSpace(totalSpace)
+ val minimumReserveSize =
+ DiskUtils.getMinimumUsableSize(diskInfo, diskReserveSize,
diskReserveRatio)
+ val usableSpace = Math.max(workingDirUsableSpace - minimumReserveSize, 0)
+ logDebug(s"updateDiskInfos workingDirUsableSpace:$workingDirUsableSpace
filemeta:$fileSystemReportedUsableSpace" +
+ s"conf:${diskInfo.configuredUsableSpace} totalUsage:$totalUsage
totalSpace:$fileSystemReportedTotalSpace" +
+ s"minimumReserveSize:$minimumReserveSize usableSpace:$usableSpace")
+ diskInfo.setUsableSpace(usableSpace)
+ diskInfo.setTotalSpace(fileSystemReportedTotalSpace)
diskInfo.updateFlushTime()
diskInfo.updateFetchTime()
}
logInfo(s"Updated diskInfos:\n${disksSnapshot().mkString("\n")}")
}
+ def getFileSystemReportedSpace(mountPoint: String): (Long, Long) = {
+ val fileStore = Files.getFileStore(Paths.get(mountPoint))
+ val fileSystemReportedUsableSpace = fileStore.getUsableSpace
+ val fileSystemReportedTotalSpace = fileStore.getTotalSpace
+ (fileSystemReportedUsableSpace, fileSystemReportedTotalSpace)
+ }
+
def userResourceConsumptionSnapshot(): Map[UserIdentifier,
ResourceConsumption] = {
diskFileInfos.synchronized {
// shuffleId -> (fileName -> fileInfo)
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala
index 5b747e9b1..b85697552 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala
@@ -17,12 +17,26 @@
package org.apache.celeborn.service.deploy.worker.storage
+import org.mockito.{Mockito, MockitoSugar}
+import org.mockito.ArgumentMatchersSugar.any
+import org.mockito.stubbing.Stubber
+
import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.CelebornConf
import
org.apache.celeborn.common.CelebornConf.{WORKER_GRACEFUL_SHUTDOWN_ENABLED,
WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH}
+import org.apache.celeborn.common.meta.DiskInfo
+import org.apache.celeborn.common.util.DiskUtils
import org.apache.celeborn.service.deploy.worker.WorkerSource
-class StorageManagerSuite extends CelebornFunSuite {
+trait MockitoHelper extends MockitoSugar {
+ def doReturn(toBeReturned: Any): Stubber = {
+ Mockito.doReturn(toBeReturned, Nil: _*)
+ }
+}
+
+class StorageManagerSuite extends CelebornFunSuite with MockitoHelper {
+
+ val conf = new CelebornConf()
test("[CELEBORN-926] saveAllCommittedFileInfosToDB cause
IllegalMonitorStateException") {
val conf = new CelebornConf().set(WORKER_GRACEFUL_SHUTDOWN_ENABLED, true)
@@ -31,4 +45,44 @@ class StorageManagerSuite extends CelebornFunSuite {
// should not throw IllegalMonitorStateException exception
storageManager.saveAllCommittedFileInfosToDB()
}
+
+ test("updateDiskInfosWithDiskReserveSize") {
+ val storageManager = new StorageManager(conf, new WorkerSource(conf))
+ val spyStorageManager = spy(storageManager)
+
+ val disks = prepareDisks()
+ val diskSetSpace = (80 * 1024 * 1024 * 1024L, 80 * 1024 * 1024 * 1024L)
+ doReturn(disks).when(spyStorageManager).disksSnapshot()
+
doReturn(diskSetSpace).when(spyStorageManager).getFileSystemReportedSpace(any)
+ spyStorageManager.updateDiskInfos()
+ for (disk <- disks) {
+ val minimumReserveSize =
+ DiskUtils.getMinimumUsableSize(
+ disk,
+ conf.workerDiskReserveSize,
+ conf.workerDiskReserveRatio)
+ assert(disk.actualUsableSpace == diskSetSpace._1 - minimumReserveSize)
+ }
+ }
+
+ def prepareDisks(): List[DiskInfo] = {
+ val diskSetSpaces = Array(
+ 90L * 1024 * 1024 * 1024,
+ 95L * 1024 * 1024 * 1024,
+ 100L * 1024 * 1024 * 1024)
+
+ val diskInfo1 = new DiskInfo("/mnt/disk1", List.empty, null, conf)
+ diskInfo1.configuredUsableSpace = (Long.MaxValue)
+ diskInfo1.setUsableSpace(diskSetSpaces(0))
+
+ val diskInfo2 = new DiskInfo("/mnt/disk2", List.empty, null, conf)
+ diskInfo2.configuredUsableSpace = (Long.MaxValue)
+ diskInfo2.setUsableSpace(diskSetSpaces(1))
+
+ val diskInfo3 = new DiskInfo("/mnt/disk3", List.empty, null, conf)
+ diskInfo3.configuredUsableSpace = (Long.MaxValue)
+ diskInfo3.setUsableSpace(diskSetSpaces(2))
+
+ List(diskInfo1, diskInfo2, diskInfo3)
+ }
}