This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 91814602f [CELEBORN-1646][FOLLOWUP] DeviceMonitor should 
notifyObserversOnError with CRITICAL_ERROR disk status for input/ouput error
91814602f is described below

commit 91814602fcb66aeea5151f0355629a2c391d3a66
Author: SteNicholas <[email protected]>
AuthorDate: Thu Apr 10 09:55:40 2025 +0800

    [CELEBORN-1646][FOLLOWUP] DeviceMonitor should notifyObserversOnError with 
CRITICAL_ERROR disk status for input/ouput error
    
    ### What changes were proposed in this pull request?
    
    `DeviceMonitor` should `notifyObserversOnError` with `CRITICAL_ERROR` disk 
status for input/ouput error.
    
    Follow up #2809.
    
    ### Why are the changes needed?
    
    `DeviceMonitor` throws `ExecutionException` with `FileSystemException` 
cause for input/ouput error of `Files#getFileStore`, which causes that 
observers could not update disk status to `CRITICAL_ERROR`.
    
    ```
    2024-10-14 12:00:42,701 [ERROR] [worker-disk-checker] - 
org.apache.celeborn.service.deploy.worker.storage.LocalDeviceMonitor 
-Logging.scala(80) -Device check failed.
    java.util.concurrent.ExecutionException: java.nio.file.FileSystemException: 
/mnt/storage01: Input/output error
            at java.util.concurrent.FutureTask.report(FutureTask.java:122) 
~[?:1.8.0_162]
            at java.util.concurrent.FutureTask.get(FutureTask.java:206) 
~[?:1.8.0_162]
            at 
org.apache.celeborn.common.util.Utils$.tryWithTimeoutAndCallback(Utils.scala:950)
 ~[celeborn-common_2.12-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
            at 
org.apache.celeborn.service.deploy.worker.storage.DeviceMonitor$.highDiskUsage(DeviceMonitor.scala:268)
 ~[celeborn-worker_2.12-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
            at 
org.apache.celeborn.service.deploy.worker.storage.LocalDeviceMonitor$$anon$1.$anonfun$run$9(DeviceMonitor.scala:137)
 ~[celeborn-worker_2.12-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
            at 
org.apache.celeborn.service.deploy.worker.storage.LocalDeviceMonitor$$anon$1.$anonfun$run$9$adapted(DeviceMonitor.scala:136)
 ~[celeborn-worker_2.12-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
            at scala.collection.Iterator.foreach(Iterator.scala:941) 
~[scala-library-2.12.10.jar:?]
            at scala.collection.Iterator.foreach$(Iterator.scala:941) 
~[scala-library-2.12.10.jar:?]
            at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) 
~[scala-library-2.12.10.jar:?]
            at scala.collection.IterableLike.foreach(IterableLike.scala:74) 
~[scala-library-2.12.10.jar:?]
            at scala.collection.IterableLike.foreach$(IterableLike.scala:73) 
~[scala-library-2.12.10.jar:?]
            at scala.collection.AbstractIterable.foreach(Iterable.scala:56) 
~[scala-library-2.12.10.jar:?]
            at 
org.apache.celeborn.service.deploy.worker.storage.LocalDeviceMonitor$$anon$1.$anonfun$run$2(DeviceMonitor.scala:136)
 ~[celeborn-worker_2.12-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
            at 
org.apache.celeborn.service.deploy.worker.storage.LocalDeviceMonitor$$anon$1.$anonfun$run$2$adapted(DeviceMonitor.scala:111)
 ~[celeborn-worker_2.12-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
            at scala.collection.Iterator.foreach(Iterator.scala:941) 
~[scala-library-2.12.10.jar:?]
            at scala.collection.Iterator.foreach$(Iterator.scala:941) 
~[scala-library-2.12.10.jar:?]
            at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) 
~[scala-library-2.12.10.jar:?]
            at scala.collection.IterableLike.foreach(IterableLike.scala:74) 
~[scala-library-2.12.10.jar:?]
            at scala.collection.IterableLike.foreach$(IterableLike.scala:73) 
~[scala-library-2.12.10.jar:?]
            at scala.collection.AbstractIterable.foreach(Iterable.scala:56) 
~[scala-library-2.12.10.jar:?]
            at 
org.apache.celeborn.service.deploy.worker.storage.LocalDeviceMonitor$$anon$1.run(DeviceMonitor.scala:111)
 ~[celeborn-worker_2.12-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
            at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_162]
            at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
~[?:1.8.0_162]
            at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 ~[?:1.8.0_162]
            at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 ~[?:1.8.0_162]
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_162]
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_162]
            at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_162]
    Caused by: java.nio.file.FileSystemException: /mnt/storage01: Input/output 
error
            at 
sun.nio.fs.UnixException.translateToIOException(UnixException.java:91) 
~[?:1.8.0_162]
            at 
sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) 
~[?:1.8.0_162]
            at 
sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) 
~[?:1.8.0_162]
            at sun.nio.fs.UnixFileStore.devFor(UnixFileStore.java:57) 
~[?:1.8.0_162]
            at sun.nio.fs.UnixFileStore.<init>(UnixFileStore.java:64) 
~[?:1.8.0_162]
            at sun.nio.fs.LinuxFileStore.<init>(LinuxFileStore.java:44) 
~[?:1.8.0_162]
            at 
sun.nio.fs.LinuxFileSystemProvider.getFileStore(LinuxFileSystemProvider.java:51)
 ~[?:1.8.0_162]
            at 
sun.nio.fs.LinuxFileSystemProvider.getFileStore(LinuxFileSystemProvider.java:39)
 ~[?:1.8.0_162]
            at 
sun.nio.fs.UnixFileSystemProvider.getFileStore(UnixFileSystemProvider.java:368) 
~[?:1.8.0_162]
            at java.nio.file.Files.getFileStore(Files.java:1461) ~[?:1.8.0_162]
            at 
org.apache.celeborn.service.deploy.worker.storage.DeviceMonitor$.getDiskUsageInfos(DeviceMonitor.scala:231)
 ~[celeborn-worker_2.12-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
            at 
org.apache.celeborn.service.deploy.worker.storage.DeviceMonitor$.$anonfun$highDiskUsage$1(DeviceMonitor.scala:248)
 ~[celeborn-worker_2.12-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
            at 
scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) 
~[scala-library-2.12.10.jar:?]
            at 
org.apache.celeborn.common.util.Utils$$anon$3.call(Utils.scala:943) 
~[celeborn-common_2.12-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
            at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
~[?:1.8.0_162]
            ... 3 more
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    GA.
    
    Closes #3206 from SteNicholas/CELEBORN-1646.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../deploy/worker/storage/DeviceMonitor.scala      | 47 ++++++++++++++--------
 1 file changed, 31 insertions(+), 16 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
index 3b7b5735a..01af7468a 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
@@ -19,9 +19,9 @@ package org.apache.celeborn.service.deploy.worker.storage
 
 import java.io.{BufferedReader, File, FileInputStream, InputStreamReader, 
IOException}
 import java.nio.charset.Charset
-import java.nio.file.{Files, Paths}
+import java.nio.file.{Files, FileSystemException, Paths}
 import java.util
-import java.util.concurrent.{ThreadPoolExecutor, TimeUnit}
+import java.util.concurrent.{ExecutionException, ThreadPoolExecutor, TimeUnit}
 
 import scala.collection.JavaConverters._
 
@@ -29,7 +29,7 @@ import org.apache.commons.io.FileUtils
 
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.common.meta.{DeviceInfo, DiskFileInfo, DiskInfo, 
DiskStatus, FileInfo}
+import org.apache.celeborn.common.meta.{DeviceInfo, DiskInfo, DiskStatus}
 import org.apache.celeborn.common.metrics.source.AbstractSource
 import org.apache.celeborn.common.util.{DiskUtils, ThreadUtils, Utils}
 import org.apache.celeborn.common.util.Utils._
@@ -141,19 +141,34 @@ class LocalDeviceMonitor(
                   device.notifyObserversOnNonCriticalError(mountPoints, 
DiskStatus.IO_HANG)
                 } else {
                   device.diskInfos.values().asScala.foreach { diskInfo =>
-                    if (checkDiskUsage && DeviceMonitor.highDiskUsage(conf, 
diskInfo)) {
-                      logError(s"${diskInfo.mountPoint} high_disk_usage error, 
notify observers")
-                      
device.notifyObserversOnHighDiskUsage(diskInfo.mountPoint)
-                    } else if (checkReadWrite &&
-                      DeviceMonitor.readWriteError(conf, diskInfo.dirs.head)) {
-                      logError(s"${diskInfo.mountPoint} read-write error, 
notify observers")
-                      // We think that if one dir in device has read-write 
problem, if possible all
-                      // dirs in this device have the problem
-                      device.notifyObserversOnNonCriticalError(
-                        List(diskInfo.mountPoint),
-                        DiskStatus.READ_OR_WRITE_FAILURE)
-                    } else if (nonCriticalErrorSum <= 
device.notifyErrorThreshold * 0.5) {
-                      device.notifyObserversOnHealthy(diskInfo.mountPoint)
+                    try {
+                      if (checkDiskUsage && DeviceMonitor.highDiskUsage(conf, 
diskInfo)) {
+                        logError(s"${diskInfo.mountPoint} high_disk_usage 
error, notify observers")
+                        
device.notifyObserversOnHighDiskUsage(diskInfo.mountPoint)
+                      } else if (checkReadWrite &&
+                        DeviceMonitor.readWriteError(conf, 
diskInfo.dirs.head)) {
+                        logError(s"${diskInfo.mountPoint} read-write error, 
notify observers")
+                        // We think that if one dir in device has read-write 
problem, if possible all
+                        // dirs in this device have the problem
+                        device.notifyObserversOnNonCriticalError(
+                          List(diskInfo.mountPoint),
+                          DiskStatus.READ_OR_WRITE_FAILURE)
+                      } else if (nonCriticalErrorSum <= 
device.notifyErrorThreshold * 0.5) {
+                        device.notifyObserversOnHealthy(diskInfo.mountPoint)
+                      }
+                    } catch {
+                      case e: ExecutionException =>
+                        e.getCause match {
+                          case fse: FileSystemException =>
+                            logError(
+                              s"${diskInfo.mountPoint} critical error, notify 
observers",
+                              fse)
+                            device.notifyObserversOnError(
+                              List(diskInfo.mountPoint),
+                              DiskStatus.CRITICAL_ERROR)
+                          case throwable: Throwable =>
+                            throw throwable
+                        }
                     }
                   }
                 }

Reply via email to