This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new d00b8dbd5 [CELEBORN-959] Use Java API to obtain disk capacity
information instead of `df` command
d00b8dbd5 is described below
commit d00b8dbd56f88035733638fff786d541f4c056e8
Author: sychen <[email protected]>
AuthorDate: Mon Sep 11 17:42:29 2023 +0800
[CELEBORN-959] Use Java API to obtain disk capacity information instead of
`df` command
### What changes were proposed in this pull request?
Use Java API to obtain disk capacity information.
https://github.com/apache/hadoop/blob/bf605c8accbf1ad714ff8cb4d0cea89bfb981762/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DF.java#L84-L104
https://github.com/openjdk/jdk8u/blob/599bb77c451274bacdb38cb7fd2d8fe658846ab3/jdk/src/solaris/native/java/io/UnixFileSystem_md.c#L439-L467
### Why are the changes needed?
Some OS does not support the `df -B1` command, and the worker will throw an
`ArrayIndexOutOfBoundsException` exception.
We can replace the df command with the Java API, which is more general.
```java
23/09/08 22:03:25,522 ERROR [worker-disk-checker] LocalDeviceMonitor:
Device check failed.
java.util.concurrent.ExecutionException:
java.lang.ArrayIndexOutOfBoundsException: -4
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:206)
at
org.apache.celeborn.common.util.Utils$.tryWithTimeoutAndCallback(Utils.scala:858)
at
org.apache.celeborn.service.deploy.worker.storage.DeviceMonitor$.highDiskUsage(DeviceMonitor.scala:258)
at
org.apache.celeborn.service.deploy.worker.storage.LocalDeviceMonitor$$anon$1.$anonfun$run$9(DeviceMonitor.scala:136)
at
org.apache.celeborn.service.deploy.worker.storage.LocalDeviceMonitor$$anon$1.$anonfun$run$9$adapted(DeviceMonitor.scala:135)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at
org.apache.celeborn.service.deploy.worker.storage.LocalDeviceMonitor$$anon$1.$anonfun$run$2(DeviceMonitor.scala:135)
at
org.apache.celeborn.service.deploy.worker.storage.LocalDeviceMonitor$$anon$1.$anonfun$run$2$adapted(DeviceMonitor.scala:110)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at
org.apache.celeborn.service.deploy.worker.storage.LocalDeviceMonitor$$anon$1.run(DeviceMonitor.scala:110)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -4
at
org.apache.celeborn.service.deploy.worker.storage.DeviceMonitor$.$anonfun$highDiskUsage$1(DeviceMonitor.scala:240)
at
scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at
org.apache.celeborn.common.util.Utils$$anon$3.call(Utils.scala:851)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
... 3 more
```
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #1892 from cxzl25/CELEBORN-959.
Authored-by: sychen <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
(cherry picked from commit d7e900fa9a4804ace3f539bd072e50be96e9dd6e)
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../deploy/worker/storage/DeviceMonitor.scala | 27 +++++++++++++---------
.../deploy/worker/storage/DeviceMonitorSuite.scala | 26 +++++++++++----------
2 files changed, 30 insertions(+), 23 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
index fbd1dc1e9..714529c58 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
@@ -19,6 +19,7 @@ package org.apache.celeborn.service.deploy.worker.storage
import java.io.{BufferedReader, File, FileInputStream, InputStreamReader,
IOException}
import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
import java.util
import java.util.concurrent.TimeUnit
@@ -85,12 +86,12 @@ class LocalDeviceMonitor(
.groupBy(_.deviceInfo)
.foreach { case (deviceInfo: DeviceInfo, diskInfos: List[DiskInfo]) =>
val deviceLabel = Map("device" -> deviceInfo.name)
- def usage = DeviceMonitor.getDiskUsageInfos(diskInfos.head)
+ def usage: DeviceMonitor.DiskUsageInfo =
DeviceMonitor.getDiskUsageInfos(diskInfos.head)
workerSource.addGauge(WorkerSource.DEVICE_OS_TOTAL_CAPACITY,
deviceLabel) { () =>
- usage(usage.length - 5).toLong
+ usage.totalSpace
}
workerSource.addGauge(WorkerSource.DEVICE_OS_FREE_CAPACITY,
deviceLabel) { () =>
- usage(usage.length - 3).toLong
+ usage.freeSpace
}
workerSource.addGauge(WorkerSource.DEVICE_CELEBORN_TOTAL_CAPACITY,
deviceLabel) { () =>
diskInfos.map(_.configuredUsableSpace).sum
@@ -222,9 +223,16 @@ object DeviceMonitor extends Logging {
}
}
+ case class DiskUsageInfo(totalSpace: Long, freeSpace: Long, usedSpace: Long,
usedPercent: Int)
+
// unit is byte
- def getDiskUsageInfos(diskInfo: DiskInfo): Array[String] = {
- runCommand(s"df -B1 ${diskInfo.mountPoint}").trim.split("[ \t]+")
+ def getDiskUsageInfos(diskInfo: DiskInfo): DiskUsageInfo = {
+ val dirFile = Files.getFileStore(Paths.get(diskInfo.mountPoint))
+ val totalSpace = dirFile.getTotalSpace
+ val freeSpace = dirFile.getUsableSpace
+ val usedSpace = totalSpace - freeSpace
+ val usedPercent = (usedSpace * 100.0 / totalSpace).toInt
+ DiskUsageInfo(totalSpace, freeSpace, usedSpace, usedPercent)
}
/**
@@ -237,16 +245,13 @@ object DeviceMonitor extends Logging {
def highDiskUsage(conf: CelebornConf, diskInfo: DiskInfo): Boolean = {
tryWithTimeoutAndCallback({
val usage = getDiskUsageInfos(diskInfo)
- val totalSpace = usage(usage.length - 5)
- val freeSpace = usage(usage.length - 3)
- val used_percent = usage(usage.length - 2)
// assume no single device capacity exceeds 1EB in this era
val highDiskUsage =
- freeSpace.toLong < conf.workerDiskReserveSize ||
diskInfo.actualUsableSpace <= 0
+ usage.freeSpace < conf.workerDiskReserveSize ||
diskInfo.actualUsableSpace <= 0
if (highDiskUsage) {
logWarning(s"${diskInfo.mountPoint} usage is above threshold." +
- s" Disk usage(Report by
OS):{total:${Utils.bytesToString(totalSpace.toLong)}," +
- s" free:${Utils.bytesToString(freeSpace.toLong)},
used_percent:$used_percent} " +
+ s" Disk usage(Report by
OS):{total:${Utils.bytesToString(usage.totalSpace)}," +
+ s" free:${Utils.bytesToString(usage.freeSpace)},
used_percent:${usage.usedPercent}} " +
s"usage(Report by Celeborn):{" +
s"total:${Utils.bytesToString(diskInfo.configuredUsableSpace)}" +
s" free:${Utils.bytesToString(diskInfo.actualUsableSpace)} }")
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
index ab031f87d..574894814 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
@@ -377,17 +377,18 @@ class DeviceMonitorSuite extends AnyFunSuite {
}
test("monitor device usage metrics") {
- withObjectMocked[org.apache.celeborn.common.util.Utils.type] {
- when(Utils.runCommand(dfBCmd1)).thenReturn(dfBOut1)
- when(Utils.runCommand(dfBCmd2)).thenReturn(dfBOut2)
- when(Utils.runCommand(dfBCmd3)).thenReturn(dfBOut3)
- when(Utils.runCommand(dfBCmd4)).thenReturn(dfBOut4)
- when(Utils.runCommand(dfBCmd5)).thenReturn(dfBOut5)
- val dfBOut6 =
- """
- |Filesystem 1B-blocks Used Available Use%
Mounted on
- |/dev/vda 1395864371200 130996502528 1264867868672 9%
/mnt/disk1
- |""".stripMargin
+
+
withObjectMocked[org.apache.celeborn.service.deploy.worker.storage.DeviceMonitor.type]
{
+ val dfBOut1 = DeviceMonitor.DiskUsageInfo(1395864371200L,
1293858897920L, 102005473280L, 7)
+ val dfBOut2 = DeviceMonitor.DiskUsageInfo(1932735283200L,
1835024777216L, 97710505984L, 6)
+ val dfBOut3 = DeviceMonitor.DiskUsageInfo(1395864371200L,
1293858897920L, 102005473280L, 7)
+ val dfBOut4 = DeviceMonitor.DiskUsageInfo(1932735283200L,
1835024777216L, 97710505984L, 6)
+ val dfBOut5 = DeviceMonitor.DiskUsageInfo(1932735283200L,
1835024777216L, 97710505984L, 6)
+
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk1"))).thenReturn(dfBOut1)
+
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk2"))).thenReturn(dfBOut2)
+
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk3"))).thenReturn(dfBOut3)
+
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk4"))).thenReturn(dfBOut4)
+
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk5"))).thenReturn(dfBOut5)
deviceMonitor2.init()
@@ -422,7 +423,8 @@ class DeviceMonitorSuite extends AnyFunSuite {
assertEquals("vdb", metrics4.last.labels("device"))
assertEquals(1024L * 3, metrics4.last.gauge.getValue)
- when(Utils.runCommand(dfBCmd1)).thenReturn(dfBOut6)
+ val dfBOut6 = DeviceMonitor.DiskUsageInfo(1395864371200L,
1264867868672L, 130996502528L, 9)
+
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk1"))).thenReturn(dfBOut6)
assertEquals(1264867868672L, metrics2.head.gauge.getValue)
}
}