This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new d00b8dbd5 [CELEBORN-959] Use Java API to obtain disk capacity 
information instead of `df` command
d00b8dbd5 is described below

commit d00b8dbd56f88035733638fff786d541f4c056e8
Author: sychen <[email protected]>
AuthorDate: Mon Sep 11 17:42:29 2023 +0800

    [CELEBORN-959] Use Java API to obtain disk capacity information instead of 
`df` command
    
    ### What changes were proposed in this pull request?
    Use Java API to obtain disk capacity information.
    
    
https://github.com/apache/hadoop/blob/bf605c8accbf1ad714ff8cb4d0cea89bfb981762/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DF.java#L84-L104
    
    
https://github.com/openjdk/jdk8u/blob/599bb77c451274bacdb38cb7fd2d8fe658846ab3/jdk/src/solaris/native/java/io/UnixFileSystem_md.c#L439-L467
    
    ### Why are the changes needed?
    
    Some OS does not support the `df -B1` command, and the worker will throw an 
`ArrayIndexOutOfBoundsException` exception.
    
    We can replace the df command with the Java API, which is more general.
    
    ```java
    23/09/08 22:03:25,522 ERROR [worker-disk-checker] LocalDeviceMonitor: 
Device check failed.
    java.util.concurrent.ExecutionException: 
java.lang.ArrayIndexOutOfBoundsException: -4
            at java.util.concurrent.FutureTask.report(FutureTask.java:122)
            at java.util.concurrent.FutureTask.get(FutureTask.java:206)
            at 
org.apache.celeborn.common.util.Utils$.tryWithTimeoutAndCallback(Utils.scala:858)
            at 
org.apache.celeborn.service.deploy.worker.storage.DeviceMonitor$.highDiskUsage(DeviceMonitor.scala:258)
            at 
org.apache.celeborn.service.deploy.worker.storage.LocalDeviceMonitor$$anon$1.$anonfun$run$9(DeviceMonitor.scala:136)
            at 
org.apache.celeborn.service.deploy.worker.storage.LocalDeviceMonitor$$anon$1.$anonfun$run$9$adapted(DeviceMonitor.scala:135)
            at scala.collection.Iterator.foreach(Iterator.scala:943)
            at scala.collection.Iterator.foreach$(Iterator.scala:943)
            at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
            at scala.collection.IterableLike.foreach(IterableLike.scala:74)
            at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
            at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
            at 
org.apache.celeborn.service.deploy.worker.storage.LocalDeviceMonitor$$anon$1.$anonfun$run$2(DeviceMonitor.scala:135)
            at 
org.apache.celeborn.service.deploy.worker.storage.LocalDeviceMonitor$$anon$1.$anonfun$run$2$adapted(DeviceMonitor.scala:110)
            at scala.collection.Iterator.foreach(Iterator.scala:943)
            at scala.collection.Iterator.foreach$(Iterator.scala:943)
            at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
            at scala.collection.IterableLike.foreach(IterableLike.scala:74)
            at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
            at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
            at 
org.apache.celeborn.service.deploy.worker.storage.LocalDeviceMonitor$$anon$1.run(DeviceMonitor.scala:110)
            at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
            at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
            at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:750)
    Caused by: java.lang.ArrayIndexOutOfBoundsException: -4
            at 
org.apache.celeborn.service.deploy.worker.storage.DeviceMonitor$.$anonfun$highDiskUsage$1(DeviceMonitor.scala:240)
            at 
scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
            at 
org.apache.celeborn.common.util.Utils$$anon$3.call(Utils.scala:851)
            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            ... 3 more
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #1892 from cxzl25/CELEBORN-959.
    
    Authored-by: sychen <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
    (cherry picked from commit d7e900fa9a4804ace3f539bd072e50be96e9dd6e)
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../deploy/worker/storage/DeviceMonitor.scala      | 27 +++++++++++++---------
 .../deploy/worker/storage/DeviceMonitorSuite.scala | 26 +++++++++++----------
 2 files changed, 30 insertions(+), 23 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
index fbd1dc1e9..714529c58 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
@@ -19,6 +19,7 @@ package org.apache.celeborn.service.deploy.worker.storage
 
 import java.io.{BufferedReader, File, FileInputStream, InputStreamReader, 
IOException}
 import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
 import java.util
 import java.util.concurrent.TimeUnit
 
@@ -85,12 +86,12 @@ class LocalDeviceMonitor(
       .groupBy(_.deviceInfo)
       .foreach { case (deviceInfo: DeviceInfo, diskInfos: List[DiskInfo]) =>
         val deviceLabel = Map("device" -> deviceInfo.name)
-        def usage = DeviceMonitor.getDiskUsageInfos(diskInfos.head)
+        def usage: DeviceMonitor.DiskUsageInfo = 
DeviceMonitor.getDiskUsageInfos(diskInfos.head)
         workerSource.addGauge(WorkerSource.DEVICE_OS_TOTAL_CAPACITY, 
deviceLabel) { () =>
-          usage(usage.length - 5).toLong
+          usage.totalSpace
         }
         workerSource.addGauge(WorkerSource.DEVICE_OS_FREE_CAPACITY, 
deviceLabel) { () =>
-          usage(usage.length - 3).toLong
+          usage.freeSpace
         }
         workerSource.addGauge(WorkerSource.DEVICE_CELEBORN_TOTAL_CAPACITY, 
deviceLabel) { () =>
           diskInfos.map(_.configuredUsableSpace).sum
@@ -222,9 +223,16 @@ object DeviceMonitor extends Logging {
     }
   }
 
+  case class DiskUsageInfo(totalSpace: Long, freeSpace: Long, usedSpace: Long, 
usedPercent: Int)
+
   // unit is byte
-  def getDiskUsageInfos(diskInfo: DiskInfo): Array[String] = {
-    runCommand(s"df -B1 ${diskInfo.mountPoint}").trim.split("[ \t]+")
+  def getDiskUsageInfos(diskInfo: DiskInfo): DiskUsageInfo = {
+    val dirFile = Files.getFileStore(Paths.get(diskInfo.mountPoint))
+    val totalSpace = dirFile.getTotalSpace
+    val freeSpace = dirFile.getUsableSpace
+    val usedSpace = totalSpace - freeSpace
+    val usedPercent = (usedSpace * 100.0 / totalSpace).toInt
+    DiskUsageInfo(totalSpace, freeSpace, usedSpace, usedPercent)
   }
 
   /**
@@ -237,16 +245,13 @@ object DeviceMonitor extends Logging {
   def highDiskUsage(conf: CelebornConf, diskInfo: DiskInfo): Boolean = {
     tryWithTimeoutAndCallback({
       val usage = getDiskUsageInfos(diskInfo)
-      val totalSpace = usage(usage.length - 5)
-      val freeSpace = usage(usage.length - 3)
-      val used_percent = usage(usage.length - 2)
       // assume no single device capacity exceeds 1EB in this era
       val highDiskUsage =
-        freeSpace.toLong < conf.workerDiskReserveSize || 
diskInfo.actualUsableSpace <= 0
+        usage.freeSpace < conf.workerDiskReserveSize || 
diskInfo.actualUsableSpace <= 0
       if (highDiskUsage) {
         logWarning(s"${diskInfo.mountPoint} usage is above threshold." +
-          s" Disk usage(Report by 
OS):{total:${Utils.bytesToString(totalSpace.toLong)}," +
-          s" free:${Utils.bytesToString(freeSpace.toLong)}, 
used_percent:$used_percent} " +
+          s" Disk usage(Report by 
OS):{total:${Utils.bytesToString(usage.totalSpace)}," +
+          s" free:${Utils.bytesToString(usage.freeSpace)}, 
used_percent:${usage.usedPercent}} " +
           s"usage(Report by Celeborn):{" +
           s"total:${Utils.bytesToString(diskInfo.configuredUsableSpace)}" +
           s" free:${Utils.bytesToString(diskInfo.actualUsableSpace)} }")
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
index ab031f87d..574894814 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
@@ -377,17 +377,18 @@ class DeviceMonitorSuite extends AnyFunSuite {
   }
 
   test("monitor device usage metrics") {
-    withObjectMocked[org.apache.celeborn.common.util.Utils.type] {
-      when(Utils.runCommand(dfBCmd1)).thenReturn(dfBOut1)
-      when(Utils.runCommand(dfBCmd2)).thenReturn(dfBOut2)
-      when(Utils.runCommand(dfBCmd3)).thenReturn(dfBOut3)
-      when(Utils.runCommand(dfBCmd4)).thenReturn(dfBOut4)
-      when(Utils.runCommand(dfBCmd5)).thenReturn(dfBOut5)
-      val dfBOut6 =
-        """
-          |Filesystem     1B-blocks            Used          Available Use% 
Mounted on
-          |/dev/vda   1395864371200    130996502528      1264867868672   9% 
/mnt/disk1
-          |""".stripMargin
+
+    
withObjectMocked[org.apache.celeborn.service.deploy.worker.storage.DeviceMonitor.type]
 {
+      val dfBOut1 = DeviceMonitor.DiskUsageInfo(1395864371200L, 
1293858897920L, 102005473280L, 7)
+      val dfBOut2 = DeviceMonitor.DiskUsageInfo(1932735283200L, 
1835024777216L, 97710505984L, 6)
+      val dfBOut3 = DeviceMonitor.DiskUsageInfo(1395864371200L, 
1293858897920L, 102005473280L, 7)
+      val dfBOut4 = DeviceMonitor.DiskUsageInfo(1932735283200L, 
1835024777216L, 97710505984L, 6)
+      val dfBOut5 = DeviceMonitor.DiskUsageInfo(1932735283200L, 
1835024777216L, 97710505984L, 6)
+      
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk1"))).thenReturn(dfBOut1)
+      
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk2"))).thenReturn(dfBOut2)
+      
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk3"))).thenReturn(dfBOut3)
+      
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk4"))).thenReturn(dfBOut4)
+      
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk5"))).thenReturn(dfBOut5)
 
       deviceMonitor2.init()
 
@@ -422,7 +423,8 @@ class DeviceMonitorSuite extends AnyFunSuite {
       assertEquals("vdb", metrics4.last.labels("device"))
       assertEquals(1024L * 3, metrics4.last.gauge.getValue)
 
-      when(Utils.runCommand(dfBCmd1)).thenReturn(dfBOut6)
+      val dfBOut6 = DeviceMonitor.DiskUsageInfo(1395864371200L, 
1264867868672L, 130996502528L, 9)
+      
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk1"))).thenReturn(dfBOut6)
       assertEquals(1264867868672L, metrics2.head.gauge.getValue)
     }
   }

Reply via email to