This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 9f0af3456 [CELEBORN-1564] Fix actualUsableSpace of offerSlotsLoadAware
condition on diskInfo
9f0af3456 is described below
commit 9f0af3456a321d2b49909232eef87ca776fab1f6
Author: szt <[email protected]>
AuthorDate: Mon Aug 26 14:17:55 2024 +0800
[CELEBORN-1564] Fix actualUsableSpace of offerSlotsLoadAware condition on
diskInfo
### What changes were proposed in this pull request?
fix offerSlotsLoadAware's actualUsableSpace condition on diskInfo,
considering diskReserveSize when updateDiskInfos in StorageManager,
so master don't need to calculate usableSpace when offerSlots.
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT
Closes #2688 from zaynt4606/main.
Authored-by: szt <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../service/deploy/master/SlotsAllocator.java | 13 +----
.../celeborn/service/deploy/master/Master.scala | 2 -
.../deploy/master/SlotsAllocatorSuiteJ.java | 5 --
.../tests/spark/CelebornHashCheckDiskSuite.scala | 7 +--
.../deploy/worker/storage/StorageManager.scala | 29 ++++++++---
.../worker/storage/StorageManagerSuite.scala | 56 +++++++++++++++++++++-
6 files changed, 82 insertions(+), 30 deletions(-)
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
index eda5a3a2a..caf11f712 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
@@ -21,8 +21,6 @@ import java.util.*;
import java.util.function.IntUnaryOperator;
import java.util.stream.Collectors;
-import scala.Double;
-import scala.Option;
import scala.Tuple2;
import org.apache.commons.lang3.StringUtils;
@@ -35,7 +33,6 @@ import org.apache.celeborn.common.meta.DiskStatus;
import org.apache.celeborn.common.meta.WorkerInfo;
import org.apache.celeborn.common.protocol.PartitionLocation;
import org.apache.celeborn.common.protocol.StorageInfo;
-import org.apache.celeborn.common.util.DiskUtils;
public class SlotsAllocator {
static class UsableDiskInfo {
@@ -112,8 +109,6 @@ public class SlotsAllocator {
List<Integer> partitionIds,
boolean shouldReplicate,
boolean shouldRackAware,
- long diskReserveSize,
- Option<Double> diskReserveRatio,
int diskGroupCount,
double diskGroupGradient,
double flushTimeWeight,
@@ -143,13 +138,7 @@ public class SlotsAllocator {
.forEach(
(key, diskInfo) -> {
diskToWorkerMap.put(diskInfo, i);
- if (diskInfo.actualUsableSpace()
- > DiskUtils.getMinimumUsableSize(
- diskInfo,
- diskReserveSize,
- diskReserveRatio.isEmpty()
- ? Option.empty()
- : Option.apply(diskReserveRatio.get()))
+ if (diskInfo.actualUsableSpace() > 0
&& diskInfo.status().equals(DiskStatus.HEALTHY)
&& diskInfo.storageType() != StorageInfo.Type.HDFS
&& diskInfo.storageType() != StorageInfo.Type.S3) {
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 4bfdc2996..fa386e70e 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -867,8 +867,6 @@ private[celeborn] class Master(
requestSlots.partitionIdList,
requestSlots.shouldReplicate,
requestSlots.shouldRackAware,
- diskReserveSize,
- diskReserveRatio,
slotsAssignLoadAwareDiskGroupNum,
slotsAssignLoadAwareDiskGroupGradient,
loadAwareFlushTimeWeight,
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
index 1a0bc909b..0fb4c9d42 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertTrue;
import java.util.*;
-import scala.Option;
import scala.Tuple2;
import org.junit.Test;
@@ -232,8 +231,6 @@ public class SlotsAllocatorSuiteJ {
partitionIds,
shouldReplicate,
false,
- 10 * 1024 * 1024 * 1024L,
- Option.empty(),
conf.masterSlotAssignLoadAwareDiskGroupNum(),
conf.masterSlotAssignLoadAwareDiskGroupGradient(),
conf.masterSlotAssignLoadAwareFlushTimeWeight(),
@@ -313,8 +310,6 @@ public class SlotsAllocatorSuiteJ {
partitionIds,
shouldReplicate,
false,
- 1000_000_000,
- Option.empty(),
3,
0.1,
0,
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala
index 9fe49bfd4..5b461013d 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala
@@ -38,7 +38,8 @@ class CelebornHashCheckDiskSuite extends SparkTestBase {
CelebornConf.APPLICATION_HEARTBEAT_TIMEOUT.key -> "10s")
val workerConf = Map(
CelebornConf.WORKER_STORAGE_DIRS.key -> "/tmp:capacity=1000",
- CelebornConf.WORKER_HEARTBEAT_TIMEOUT.key -> "10s")
+ CelebornConf.WORKER_HEARTBEAT_TIMEOUT.key -> "10s",
+ CelebornConf.WORKER_DISK_RESERVE_SIZE.key -> "0G")
workers = setupMiniClusterWithRandomPorts(masterConf, workerConf)._2
}
@@ -76,10 +77,10 @@ class CelebornHashCheckDiskSuite extends SparkTestBase {
assert(combineResult.equals(celebornCombineResult))
assert(sqlResult.equals(celebornSqlResult))
- // shuffle key not expired, diskInfo.actualUsableSpace < 0, no space
+ // shuffle key not expired, diskInfo.actualUsableSpace <= 0, no space
workers.foreach { worker =>
worker.storageManager.disksSnapshot().foreach { diskInfo =>
- assert(diskInfo.actualUsableSpace < 0)
+ assert(diskInfo.actualUsableSpace <= 0)
}
}
sparkSessionEnableCeleborn.stop()
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index b7a5eb75b..c7c8f8adf 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -43,7 +43,7 @@ import
org.apache.celeborn.common.metrics.source.{AbstractSource, ThreadPoolSour
import org.apache.celeborn.common.network.util.{NettyUtils, TransportConf}
import org.apache.celeborn.common.protocol.{PartitionLocation,
PartitionSplitMode, PartitionType, StorageInfo}
import org.apache.celeborn.common.quota.ResourceConsumption
-import org.apache.celeborn.common.util.{CelebornExitKind, CelebornHadoopUtils,
JavaUtils, PbSerDeUtils, ThreadUtils, Utils}
+import org.apache.celeborn.common.util.{CelebornExitKind, CelebornHadoopUtils,
DiskUtils, JavaUtils, PbSerDeUtils, ThreadUtils, Utils}
import org.apache.celeborn.service.deploy.worker._
import org.apache.celeborn.service.deploy.worker.memory.MemoryManager
import
org.apache.celeborn.service.deploy.worker.memory.MemoryManager.MemoryPressureListener
@@ -73,6 +73,9 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
val storageExpireDirTimeout = conf.workerStorageExpireDirTimeout
val storagePolicy = new StoragePolicy(conf, this, workerSource)
+ val diskReserveSize = conf.workerDiskReserveSize
+ val diskReserveRatio = conf.workerDiskReserveRatio
+
val topDiskUsageCount = conf.metricsAppTopDiskUsageCount
// (deviceName -> deviceInfo) and (mount point -> diskInfo)
@@ -851,20 +854,32 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
0
}
}.sum
- val fileStore = Files.getFileStore(Paths.get(diskInfo.mountPoint))
- val fileSystemReportedUsableSpace = fileStore.getUsableSpace
+
+ val (fileSystemReportedUsableSpace, fileSystemReportedTotalSpace) =
+ getFileSystemReportedSpace(diskInfo.mountPoint)
val workingDirUsableSpace =
Math.min(diskInfo.configuredUsableSpace - totalUsage,
fileSystemReportedUsableSpace)
- val totalSpace = fileStore.getTotalSpace
- logDebug(s"updateDiskInfos workingDirUsableSpace:$workingDirUsableSpace
filemeta:$fileSystemReportedUsableSpace conf:${diskInfo.configuredUsableSpace}
totalUsage:$totalUsage totalSpace: $totalSpace")
- diskInfo.setUsableSpace(workingDirUsableSpace)
- diskInfo.setTotalSpace(totalSpace)
+ val minimumReserveSize =
+ DiskUtils.getMinimumUsableSize(diskInfo, diskReserveSize,
diskReserveRatio)
+ val usableSpace = Math.max(workingDirUsableSpace - minimumReserveSize, 0)
+ logDebug(s"updateDiskInfos workingDirUsableSpace:$workingDirUsableSpace
filemeta:$fileSystemReportedUsableSpace" +
+ s"conf:${diskInfo.configuredUsableSpace} totalUsage:$totalUsage
totalSpace:$fileSystemReportedTotalSpace" +
+ s"minimumReserveSize:$minimumReserveSize usableSpace:$usableSpace")
+ diskInfo.setUsableSpace(usableSpace)
+ diskInfo.setTotalSpace(fileSystemReportedTotalSpace)
diskInfo.updateFlushTime()
diskInfo.updateFetchTime()
}
logInfo(s"Updated diskInfos:\n${disksSnapshot().mkString("\n")}")
}
+ def getFileSystemReportedSpace(mountPoint: String): (Long, Long) = {
+ val fileStore = Files.getFileStore(Paths.get(mountPoint))
+ val fileSystemReportedUsableSpace = fileStore.getUsableSpace
+ val fileSystemReportedTotalSpace = fileStore.getTotalSpace
+ (fileSystemReportedUsableSpace, fileSystemReportedTotalSpace)
+ }
+
def userResourceConsumptionSnapshot(): Map[UserIdentifier,
ResourceConsumption] = {
diskFileInfos.synchronized {
// shuffleId -> (fileName -> fileInfo)
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala
index 5b747e9b1..b85697552 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala
@@ -17,12 +17,26 @@
package org.apache.celeborn.service.deploy.worker.storage
+import org.mockito.{Mockito, MockitoSugar}
+import org.mockito.ArgumentMatchersSugar.any
+import org.mockito.stubbing.Stubber
+
import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.CelebornConf
import
org.apache.celeborn.common.CelebornConf.{WORKER_GRACEFUL_SHUTDOWN_ENABLED,
WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH}
+import org.apache.celeborn.common.meta.DiskInfo
+import org.apache.celeborn.common.util.DiskUtils
import org.apache.celeborn.service.deploy.worker.WorkerSource
-class StorageManagerSuite extends CelebornFunSuite {
+trait MockitoHelper extends MockitoSugar {
+ def doReturn(toBeReturned: Any): Stubber = {
+ Mockito.doReturn(toBeReturned, Nil: _*)
+ }
+}
+
+class StorageManagerSuite extends CelebornFunSuite with MockitoHelper {
+
+ val conf = new CelebornConf()
test("[CELEBORN-926] saveAllCommittedFileInfosToDB cause
IllegalMonitorStateException") {
val conf = new CelebornConf().set(WORKER_GRACEFUL_SHUTDOWN_ENABLED, true)
@@ -31,4 +45,44 @@ class StorageManagerSuite extends CelebornFunSuite {
// should not throw IllegalMonitorStateException exception
storageManager.saveAllCommittedFileInfosToDB()
}
+
+ test("updateDiskInfosWithDiskReserveSize") {
+ val storageManager = new StorageManager(conf, new WorkerSource(conf))
+ val spyStorageManager = spy(storageManager)
+
+ val disks = prepareDisks()
+ val diskSetSpace = (80 * 1024 * 1024 * 1024L, 80 * 1024 * 1024 * 1024L)
+ doReturn(disks).when(spyStorageManager).disksSnapshot()
+
doReturn(diskSetSpace).when(spyStorageManager).getFileSystemReportedSpace(any)
+ spyStorageManager.updateDiskInfos()
+ for (disk <- disks) {
+ val minimumReserveSize =
+ DiskUtils.getMinimumUsableSize(
+ disk,
+ conf.workerDiskReserveSize,
+ conf.workerDiskReserveRatio)
+ assert(disk.actualUsableSpace == diskSetSpace._1 - minimumReserveSize)
+ }
+ }
+
+ def prepareDisks(): List[DiskInfo] = {
+ val diskSetSpaces = Array(
+ 90L * 1024 * 1024 * 1024,
+ 95L * 1024 * 1024 * 1024,
+ 100L * 1024 * 1024 * 1024)
+
+ val diskInfo1 = new DiskInfo("/mnt/disk1", List.empty, null, conf)
+ diskInfo1.configuredUsableSpace = (Long.MaxValue)
+ diskInfo1.setUsableSpace(diskSetSpaces(0))
+
+ val diskInfo2 = new DiskInfo("/mnt/disk2", List.empty, null, conf)
+ diskInfo2.configuredUsableSpace = (Long.MaxValue)
+ diskInfo2.setUsableSpace(diskSetSpaces(1))
+
+ val diskInfo3 = new DiskInfo("/mnt/disk3", List.empty, null, conf)
+ diskInfo3.configuredUsableSpace = (Long.MaxValue)
+ diskInfo3.setUsableSpace(diskSetSpaces(2))
+
+ List(diskInfo1, diskInfo2, diskInfo3)
+ }
}