This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 351173bac [CELEBORN-1727] Correct the calculation of worker diskInfo
actualUsableSpace
351173bac is described below
commit 351173bacde5086e9c3f88d30f992ef53c2ecaa5
Author: onebox-li <[email protected]>
AuthorDate: Thu Nov 21 16:17:46 2024 +0800
[CELEBORN-1727] Correct the calculation of worker diskInfo actualUsableSpace
### What changes were proposed in this pull request?
Correct the calculation of worker diskInfo actualUsableSpace.
Make the expression of the function to get the reserve size clearer.
(`getMinimumUsableSize` -> `getActualReserveSize`).
Let deviceMonitor startCheck after the first
`storageManager.updateDiskInfos()` to avoid disks from being misidentified as
HIGH_DISK_USAGE.
Fix PushDataHandler#checkDiskFull judge.
### Why are the changes needed?
Make sure worker disk reserve work correctly.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Cluster test and UT.
Closes #2931 from onebox-li/fix-disk-usablespace.
Authored-by: onebox-li <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../apache/celeborn/common/util/DiskUtils.scala | 22 +++---
.../tests/spark/CelebornHashCheckDiskSuite.scala | 5 +-
.../service/deploy/worker/PushDataHandler.scala | 17 +----
.../celeborn/service/deploy/worker/Worker.scala | 1 +
.../deploy/worker/storage/DeviceMonitor.scala | 12 +--
.../deploy/worker/storage/StorageManager.scala | 23 +++---
.../worker/storage/StorageManagerSuite.scala | 87 ++++++++++++++--------
7 files changed, 91 insertions(+), 76 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/DiskUtils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/DiskUtils.scala
index 2e361a985..b4972b0f0 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/DiskUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/DiskUtils.scala
@@ -27,28 +27,28 @@ import org.apache.celeborn.common.meta.DiskInfo
object DiskUtils {
/**
- * Gets the minimum usable size for each disk, which size is the max space
between the reserved space
- * and the space calculate via reserved ratio.
+ * Get the actual size that the disk needs to reserve. It will take the
larger value between
+ * the configured fixed reserved size and the size calculated by the
reserved ratio.
*
- * @param diskInfo The reserved disk info.
- * @param diskReserveSize The reserved space for each disk.
- * @param diskReserveRatio The reserved ratio for each disk.
- * @return the minimum usable space.
+ * @param diskInfo The disk info being calculated.
+ * @param diskReserveSize The configured fixed reserved size space for the
disk.
+ * @param diskReserveRatio The configured reserved ratio for the disk.
+ * @return the actual size (in bytes) that the disk needs to reserve.
*/
- def getMinimumUsableSize(
+ def getActualReserveSize(
diskInfo: DiskInfo,
diskReserveSize: Long,
diskReserveRatio: Option[Double]): Long = {
- var minimumUsableSize = diskReserveSize
+ var actualReserveSize = diskReserveSize
if (diskReserveRatio.isDefined) {
try {
val totalSpace =
Files.getFileStore(Paths.get(diskInfo.mountPoint)).getTotalSpace
- minimumUsableSize =
- BigDecimal(totalSpace *
diskReserveRatio.get).longValue.max(minimumUsableSize)
+ actualReserveSize =
+ BigDecimal(totalSpace *
diskReserveRatio.get).longValue.max(actualReserveSize)
} catch {
case _: Exception => // Do nothing
}
}
- minimumUsableSize
+ actualReserveSize
}
}
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala
index 5b461013d..eafabbbe6 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala
@@ -60,7 +60,6 @@ class CelebornHashCheckDiskSuite extends SparkTestBase {
val combineResult = combine(sparkSession)
val groupByResult = groupBy(sparkSession)
val repartitionResult = repartition(sparkSession)
- val sqlResult = runsql(sparkSession)
sparkSession.stop()
val sparkSessionEnableCeleborn = SparkSession.builder()
@@ -69,16 +68,14 @@ class CelebornHashCheckDiskSuite extends SparkTestBase {
val celebornCombineResult = combine(sparkSessionEnableCeleborn)
val celebornGroupByResult = groupBy(sparkSessionEnableCeleborn)
val celebornRepartitionResult = repartition(sparkSessionEnableCeleborn)
- val celebornSqlResult = runsql(sparkSessionEnableCeleborn)
assert(combineResult.equals(celebornCombineResult))
assert(groupByResult.equals(celebornGroupByResult))
assert(repartitionResult.equals(celebornRepartitionResult))
- assert(combineResult.equals(celebornCombineResult))
- assert(sqlResult.equals(celebornSqlResult))
// shuffle key not expired, diskInfo.actualUsableSpace <= 0, no space
workers.foreach { worker =>
+ worker.storageManager.updateDiskInfos()
worker.storageManager.disksSnapshot().foreach { diskInfo =>
assert(diskInfo.actualUsableSpace <= 0)
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index b6ddff261..af6d4cc66 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -21,7 +21,6 @@ import java.nio.ByteBuffer
import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicIntegerArray}
-import scala.collection.JavaConverters._
import scala.concurrent.{Await, Promise}
import scala.concurrent.duration.Duration
import scala.util.{Failure, Success, Try}
@@ -42,7 +41,7 @@ import
org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMod
import org.apache.celeborn.common.protocol.PbPartitionLocation.Mode
import org.apache.celeborn.common.protocol.message.StatusCode
import org.apache.celeborn.common.unsafe.Platform
-import org.apache.celeborn.common.util.{DiskUtils, ExceptionUtils, Utils}
+import org.apache.celeborn.common.util.{ExceptionUtils, Utils}
import
org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
import org.apache.celeborn.service.deploy.worker.storage.{HdfsFlusher,
LocalFlusher, MapPartitionDataWriter, PartitionDataWriter, S3Flusher,
StorageManager}
import
org.apache.celeborn.service.deploy.worker.storage.segment.SegmentMapPartitionFileWriter
@@ -58,9 +57,6 @@ class PushDataHandler(val workerSource: WorkerSource) extends
BaseMessageHandler
private var replicateClientFactory: TransportClientFactory = _
private var registered: Option[AtomicBoolean] = None
private var workerInfo: WorkerInfo = _
- private var diskReserveSize: Long = _
- private var diskReserveRatio: Option[Double] = _
- private var diskUsableSizes: Map[String, Long] = _
private var partitionSplitMinimumSize: Long = _
private var partitionSplitMaximumSize: Long = _
private var shutdown: AtomicBoolean = _
@@ -80,11 +76,6 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
unavailablePeers = worker.unavailablePeers
replicateClientFactory = worker.replicateClientFactory
workerInfo = worker.workerInfo
- diskReserveSize = worker.conf.workerDiskReserveSize
- diskReserveRatio = worker.conf.workerDiskReserveRatio
- diskUsableSizes = workerInfo.diskInfos.asScala.map { case (mountPoint,
diskInfo) =>
- (mountPoint, DiskUtils.getMinimumUsableSize(diskInfo, diskReserveSize,
diskReserveRatio))
- }.toMap
partitionSplitMinimumSize = worker.conf.partitionSplitMinimumSize
partitionSplitMaximumSize = worker.conf.partitionSplitMaximumSize
storageManager = worker.storageManager
@@ -95,8 +86,6 @@ class PushDataHandler(val workerSource: WorkerSource) extends
BaseMessageHandler
testPushPrimaryDataTimeout = worker.conf.testPushPrimaryDataTimeout
testPushReplicaDataTimeout = worker.conf.testPushReplicaDataTimeout
registered = Some(worker.registered)
- logInfo(
- s"diskReserveSize ${Utils.bytesToString(diskReserveSize)},
diskReserveRatio ${diskReserveRatio.orNull}")
}
override def receive(
@@ -1230,8 +1219,8 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
}
val mountPoint = fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint
val diskInfo = workerInfo.diskInfos.get(mountPoint)
- val diskFull = diskInfo.status.equals(
- DiskStatus.HIGH_DISK_USAGE) || diskInfo.actualUsableSpace <
diskUsableSizes(mountPoint)
+ val diskFull =
+ diskInfo.status.equals(DiskStatus.HIGH_DISK_USAGE) ||
diskInfo.actualUsableSpace <= 0
diskFull
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 0be341912..15f89555a 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -274,6 +274,7 @@ private[celeborn] class Worker(
assert(replicatePort > 0, "worker replica bind port should be positive")
storageManager.updateDiskInfos()
+ storageManager.startDeviceMonitor()
// WorkerInfo's diskInfos is a reference to storageManager.diskInfos
val diskInfos = JavaUtils.newConcurrentHashMap[String, DiskInfo]()
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
index 8243f6522..77cd6d44f 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
@@ -254,18 +254,18 @@ object DeviceMonitor extends Logging {
tryWithTimeoutAndCallback({
val usage = getDiskUsageInfos(diskInfo)
// assume no single device capacity exceeds 1EB in this era
- val minimumUsableSize = DiskUtils.getMinimumUsableSize(
+ val actualReserveSize = DiskUtils.getActualReserveSize(
diskInfo,
conf.workerDiskReserveSize,
conf.workerDiskReserveRatio)
val highDiskUsage =
- usage.freeSpace < minimumUsableSize || diskInfo.actualUsableSpace <= 0
+ usage.freeSpace < actualReserveSize || diskInfo.actualUsableSpace <= 0
if (highDiskUsage) {
logWarning(s"${diskInfo.mountPoint} usage is above threshold." +
- s" Disk usage(Report by
OS):{total:${Utils.bytesToString(usage.totalSpace)}," +
- s" free:${Utils.bytesToString(usage.freeSpace)},
used_percent:${usage.usedPercent}} " +
- s"usage(Report by Celeborn):{" +
- s"total:${Utils.bytesToString(diskInfo.configuredUsableSpace)}" +
+ s" Disk usage(Report by OS):
{total:${Utils.bytesToString(usage.totalSpace)}," +
+ s" free:${Utils.bytesToString(usage.freeSpace)},
used_percent:${usage.usedPercent}}," +
+ s" usage(Report by Celeborn): {" +
+ s" total:${Utils.bytesToString(diskInfo.configuredUsableSpace)}," +
s" free:${Utils.bytesToString(diskInfo.actualUsableSpace)} }")
}
highDiskUsage
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 517bcb971..87d6de179 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -160,8 +160,6 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
(flushers, totalThread)
}
- deviceMonitor.startCheck()
-
val hdfsDir = conf.hdfsDir
val s3Dir = conf.s3Dir
val hdfsPermission = new FsPermission("755")
@@ -890,15 +888,17 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
try {
val (fileSystemReportedUsableSpace, fileSystemReportedTotalSpace) =
getFileSystemReportedSpace(diskInfo.mountPoint)
+ val actualReserveSize =
+ DiskUtils.getActualReserveSize(diskInfo, diskReserveSize,
diskReserveRatio)
val workingDirUsableSpace =
- Math.min(diskInfo.configuredUsableSpace - totalUsage,
fileSystemReportedUsableSpace)
- val minimumReserveSize =
- DiskUtils.getMinimumUsableSize(diskInfo, diskReserveSize,
diskReserveRatio)
- val usableSpace = Math.max(workingDirUsableSpace -
minimumReserveSize, 0)
+ Math.min(
+ diskInfo.configuredUsableSpace - totalUsage,
+ fileSystemReportedUsableSpace - actualReserveSize)
+ val usableSpace = Math.max(workingDirUsableSpace, 0)
logDebug(
- s"Update diskInfo:${diskInfo.mountPoint}
workingDirUsableSpace:$workingDirUsableSpace
fileMeta:$fileSystemReportedUsableSpace" +
- s"conf:${diskInfo.configuredUsableSpace}
totalUsage:$totalUsage totalSpace:$fileSystemReportedTotalSpace" +
- s"minimumReserveSize:$minimumReserveSize
usableSpace:$usableSpace")
+ s"Update diskInfo:${diskInfo.mountPoint}
workingDirUsableSpace:$workingDirUsableSpace
fileMeta:$fileSystemReportedUsableSpace " +
+ s"configuredUsableSpace:${diskInfo.configuredUsableSpace}
totalUsage:$totalUsage totalSpace:$fileSystemReportedTotalSpace " +
+ s"actualReserveSize:$actualReserveSize
usableSpace:$usableSpace")
diskInfo.setUsableSpace(usableSpace)
diskInfo.setTotalSpace(fileSystemReportedTotalSpace)
} catch {
@@ -1170,6 +1170,11 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
}
throw exception
}
+
+ def startDeviceMonitor(): Unit = {
+ deviceMonitor.startCheck()
+ }
+
}
object StorageManager {
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala
index b85697552..d393df083 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala
@@ -23,9 +23,9 @@ import org.mockito.stubbing.Stubber
import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.CelebornConf
-import
org.apache.celeborn.common.CelebornConf.{WORKER_GRACEFUL_SHUTDOWN_ENABLED,
WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH}
+import org.apache.celeborn.common.CelebornConf.{WORKER_DISK_RESERVE_SIZE,
WORKER_GRACEFUL_SHUTDOWN_ENABLED, WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH}
import org.apache.celeborn.common.meta.DiskInfo
-import org.apache.celeborn.common.util.DiskUtils
+import org.apache.celeborn.common.util.Utils
import org.apache.celeborn.service.deploy.worker.WorkerSource
trait MockitoHelper extends MockitoSugar {
@@ -36,8 +36,6 @@ trait MockitoHelper extends MockitoSugar {
class StorageManagerSuite extends CelebornFunSuite with MockitoHelper {
- val conf = new CelebornConf()
-
test("[CELEBORN-926] saveAllCommittedFileInfosToDB cause
IllegalMonitorStateException") {
val conf = new CelebornConf().set(WORKER_GRACEFUL_SHUTDOWN_ENABLED, true)
.set(WORKER_GRACEFUL_SHUTDOWN_RECOVER_PATH, "/tmp/recover")
@@ -47,42 +45,67 @@ class StorageManagerSuite extends CelebornFunSuite with
MockitoHelper {
}
test("updateDiskInfosWithDiskReserveSize") {
+ // reserve size set to 5g
+ val conf = new CelebornConf().set(WORKER_DISK_RESERVE_SIZE,
Utils.byteStringAsBytes("5g"))
val storageManager = new StorageManager(conf, new WorkerSource(conf))
val spyStorageManager = spy(storageManager)
- val disks = prepareDisks()
- val diskSetSpace = (80 * 1024 * 1024 * 1024L, 80 * 1024 * 1024 * 1024L)
- doReturn(disks).when(spyStorageManager).disksSnapshot()
-
doReturn(diskSetSpace).when(spyStorageManager).getFileSystemReportedSpace(any)
+ val diskInfo = new DiskInfo("/mnt/disk1", List.empty, null, conf)
+ diskInfo.setUsableSpace(-1L)
+
+ var diskSetSpace = (0L, 0L)
+ doReturn(List(diskInfo)).when(spyStorageManager).disksSnapshot()
+
doAnswer(diskSetSpace).when(spyStorageManager).getFileSystemReportedSpace(any)
+
+ // disk usable 80g, total 80g, worker config 8EB
+ diskSetSpace = (80 * 1024 * 1024 * 1024L, 80 * 1024 * 1024 * 1024L)
+ diskInfo.configuredUsableSpace = Long.MaxValue
spyStorageManager.updateDiskInfos()
- for (disk <- disks) {
- val minimumReserveSize =
- DiskUtils.getMinimumUsableSize(
- disk,
- conf.workerDiskReserveSize,
- conf.workerDiskReserveRatio)
- assert(disk.actualUsableSpace == diskSetSpace._1 - minimumReserveSize)
- }
- }
+ assert(diskInfo.actualUsableSpace == 75 * 1024 * 1024 * 1024L)
- def prepareDisks(): List[DiskInfo] = {
- val diskSetSpaces = Array(
- 90L * 1024 * 1024 * 1024,
- 95L * 1024 * 1024 * 1024,
- 100L * 1024 * 1024 * 1024)
+ // disk usable 80g, total 80g, worker config 50g
+ diskInfo.configuredUsableSpace = 50 * 1024 * 1024 * 1024L
+ diskInfo.setUsableSpace(-1L)
+ spyStorageManager.updateDiskInfos()
+ assert(diskInfo.actualUsableSpace == 50 * 1024 * 1024 * 1024L)
- val diskInfo1 = new DiskInfo("/mnt/disk1", List.empty, null, conf)
- diskInfo1.configuredUsableSpace = (Long.MaxValue)
- diskInfo1.setUsableSpace(diskSetSpaces(0))
+ // disk usable 10g, total 80g, worker config 20g
+ diskSetSpace = (10 * 1024 * 1024 * 1024L, 80 * 1024 * 1024 * 1024L)
+ diskInfo.configuredUsableSpace = 20 * 1024 * 1024 * 1024L
+ diskInfo.setUsableSpace(-1L)
+ spyStorageManager.updateDiskInfos()
+ assert(diskInfo.actualUsableSpace == 5 * 1024 * 1024 * 1024L)
- val diskInfo2 = new DiskInfo("/mnt/disk2", List.empty, null, conf)
- diskInfo2.configuredUsableSpace = (Long.MaxValue)
- diskInfo2.setUsableSpace(diskSetSpaces(1))
+ // disk usable 10g, total 80g, worker config 5g
+ diskInfo.configuredUsableSpace = 5 * 1024 * 1024 * 1024L
+ diskInfo.setUsableSpace(-1L)
+ spyStorageManager.updateDiskInfos()
+ assert(diskInfo.actualUsableSpace == 5 * 1024 * 1024 * 1024L)
- val diskInfo3 = new DiskInfo("/mnt/disk3", List.empty, null, conf)
- diskInfo3.configuredUsableSpace = (Long.MaxValue)
- diskInfo3.setUsableSpace(diskSetSpaces(2))
+ // disk usable 5g, total 80g, worker config 20g
+ diskSetSpace = (5 * 1024 * 1024 * 1024L, 80 * 1024 * 1024 * 1024L)
+ diskInfo.configuredUsableSpace = 20 * 1024 * 1024 * 1024L
+ diskInfo.setUsableSpace(-1L)
+ spyStorageManager.updateDiskInfos()
+ assert(diskInfo.actualUsableSpace == 0L)
+
+ // disk usable 5g, total 80g, worker config 5g
+ diskInfo.configuredUsableSpace = 5 * 1024 * 1024 * 1024L
+ diskInfo.setUsableSpace(-1L)
+ spyStorageManager.updateDiskInfos()
+ assert(diskInfo.actualUsableSpace == 0L)
- List(diskInfo1, diskInfo2, diskInfo3)
+ // disk usable 1g, total 80g, worker config 20g
+ diskSetSpace = (1 * 1024 * 1024 * 1024L, 80 * 1024 * 1024 * 1024L)
+ diskInfo.configuredUsableSpace = 20 * 1024 * 1024 * 1024L
+ diskInfo.setUsableSpace(-1L)
+ spyStorageManager.updateDiskInfos()
+ assert(diskInfo.actualUsableSpace == 0L)
+
+ // disk usable 1g, total 80g, worker config 5g
+ diskInfo.configuredUsableSpace = 5 * 1024 * 1024 * 1024L
+ diskInfo.setUsableSpace(-1L)
+ spyStorageManager.updateDiskInfos()
+ assert(diskInfo.actualUsableSpace == 0L)
}
}