This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 91814602f [CELEBORN-1646][FOLLOWUP] DeviceMonitor should
notifyObserversOnError with CRITICAL_ERROR disk status for input/ouput error
91814602f is described below
commit 91814602fcb66aeea5151f0355629a2c391d3a66
Author: SteNicholas <[email protected]>
AuthorDate: Thu Apr 10 09:55:40 2025 +0800
[CELEBORN-1646][FOLLOWUP] DeviceMonitor should notifyObserversOnError with
CRITICAL_ERROR disk status for input/ouput error
### What changes were proposed in this pull request?
`DeviceMonitor` should `notifyObserversOnError` with `CRITICAL_ERROR` disk
status for input/ouput error.
Follow up #2809.
### Why are the changes needed?
`DeviceMonitor` throws `ExecutionException` with `FileSystemException`
cause for input/ouput error of `Files#getFileStore`, which causes that
observers could not update disk status to `CRITICAL_ERROR`.
```
2024-10-14 12:00:42,701 [ERROR] [worker-disk-checker] -
org.apache.celeborn.service.deploy.worker.storage.LocalDeviceMonitor
-Logging.scala(80) -Device check failed.
java.util.concurrent.ExecutionException: java.nio.file.FileSystemException:
/mnt/storage01: Input/output error
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
~[?:1.8.0_162]
at java.util.concurrent.FutureTask.get(FutureTask.java:206)
~[?:1.8.0_162]
at
org.apache.celeborn.common.util.Utils$.tryWithTimeoutAndCallback(Utils.scala:950)
~[celeborn-common_2.12-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
at
org.apache.celeborn.service.deploy.worker.storage.DeviceMonitor$.highDiskUsage(DeviceMonitor.scala:268)
~[celeborn-worker_2.12-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
at
org.apache.celeborn.service.deploy.worker.storage.LocalDeviceMonitor$$anon$1.$anonfun$run$9(DeviceMonitor.scala:137)
~[celeborn-worker_2.12-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
at
org.apache.celeborn.service.deploy.worker.storage.LocalDeviceMonitor$$anon$1.$anonfun$run$9$adapted(DeviceMonitor.scala:136)
~[celeborn-worker_2.12-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
at scala.collection.Iterator.foreach(Iterator.scala:941)
~[scala-library-2.12.10.jar:?]
at scala.collection.Iterator.foreach$(Iterator.scala:941)
~[scala-library-2.12.10.jar:?]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
~[scala-library-2.12.10.jar:?]
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
~[scala-library-2.12.10.jar:?]
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
~[scala-library-2.12.10.jar:?]
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
~[scala-library-2.12.10.jar:?]
at
org.apache.celeborn.service.deploy.worker.storage.LocalDeviceMonitor$$anon$1.$anonfun$run$2(DeviceMonitor.scala:136)
~[celeborn-worker_2.12-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
at
org.apache.celeborn.service.deploy.worker.storage.LocalDeviceMonitor$$anon$1.$anonfun$run$2$adapted(DeviceMonitor.scala:111)
~[celeborn-worker_2.12-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
at scala.collection.Iterator.foreach(Iterator.scala:941)
~[scala-library-2.12.10.jar:?]
at scala.collection.Iterator.foreach$(Iterator.scala:941)
~[scala-library-2.12.10.jar:?]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
~[scala-library-2.12.10.jar:?]
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
~[scala-library-2.12.10.jar:?]
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
~[scala-library-2.12.10.jar:?]
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
~[scala-library-2.12.10.jar:?]
at
org.apache.celeborn.service.deploy.worker.storage.LocalDeviceMonitor$$anon$1.run(DeviceMonitor.scala:111)
~[celeborn-worker_2.12-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_162]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
~[?:1.8.0_162]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
~[?:1.8.0_162]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
~[?:1.8.0_162]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_162]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_162]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_162]
Caused by: java.nio.file.FileSystemException: /mnt/storage01: Input/output
error
at
sun.nio.fs.UnixException.translateToIOException(UnixException.java:91)
~[?:1.8.0_162]
at
sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
~[?:1.8.0_162]
at
sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
~[?:1.8.0_162]
at sun.nio.fs.UnixFileStore.devFor(UnixFileStore.java:57)
~[?:1.8.0_162]
at sun.nio.fs.UnixFileStore.<init>(UnixFileStore.java:64)
~[?:1.8.0_162]
at sun.nio.fs.LinuxFileStore.<init>(LinuxFileStore.java:44)
~[?:1.8.0_162]
at
sun.nio.fs.LinuxFileSystemProvider.getFileStore(LinuxFileSystemProvider.java:51)
~[?:1.8.0_162]
at
sun.nio.fs.LinuxFileSystemProvider.getFileStore(LinuxFileSystemProvider.java:39)
~[?:1.8.0_162]
at
sun.nio.fs.UnixFileSystemProvider.getFileStore(UnixFileSystemProvider.java:368)
~[?:1.8.0_162]
at java.nio.file.Files.getFileStore(Files.java:1461) ~[?:1.8.0_162]
at
org.apache.celeborn.service.deploy.worker.storage.DeviceMonitor$.getDiskUsageInfos(DeviceMonitor.scala:231)
~[celeborn-worker_2.12-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
at
org.apache.celeborn.service.deploy.worker.storage.DeviceMonitor$.$anonfun$highDiskUsage$1(DeviceMonitor.scala:248)
~[celeborn-worker_2.12-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
at
scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
~[scala-library-2.12.10.jar:?]
at
org.apache.celeborn.common.util.Utils$$anon$3.call(Utils.scala:943)
~[celeborn-common_2.12-0.5.0-SNAPSHOT.jar:0.5.0-SNAPSHOT]
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
~[?:1.8.0_162]
... 3 more
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GA.
Closes #3206 from SteNicholas/CELEBORN-1646.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../deploy/worker/storage/DeviceMonitor.scala | 47 ++++++++++++++--------
1 file changed, 31 insertions(+), 16 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
index 3b7b5735a..01af7468a 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
@@ -19,9 +19,9 @@ package org.apache.celeborn.service.deploy.worker.storage
import java.io.{BufferedReader, File, FileInputStream, InputStreamReader,
IOException}
import java.nio.charset.Charset
-import java.nio.file.{Files, Paths}
+import java.nio.file.{Files, FileSystemException, Paths}
import java.util
-import java.util.concurrent.{ThreadPoolExecutor, TimeUnit}
+import java.util.concurrent.{ExecutionException, ThreadPoolExecutor, TimeUnit}
import scala.collection.JavaConverters._
@@ -29,7 +29,7 @@ import org.apache.commons.io.FileUtils
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
-import org.apache.celeborn.common.meta.{DeviceInfo, DiskFileInfo, DiskInfo,
DiskStatus, FileInfo}
+import org.apache.celeborn.common.meta.{DeviceInfo, DiskInfo, DiskStatus}
import org.apache.celeborn.common.metrics.source.AbstractSource
import org.apache.celeborn.common.util.{DiskUtils, ThreadUtils, Utils}
import org.apache.celeborn.common.util.Utils._
@@ -141,19 +141,34 @@ class LocalDeviceMonitor(
device.notifyObserversOnNonCriticalError(mountPoints,
DiskStatus.IO_HANG)
} else {
device.diskInfos.values().asScala.foreach { diskInfo =>
- if (checkDiskUsage && DeviceMonitor.highDiskUsage(conf,
diskInfo)) {
- logError(s"${diskInfo.mountPoint} high_disk_usage error,
notify observers")
-
device.notifyObserversOnHighDiskUsage(diskInfo.mountPoint)
- } else if (checkReadWrite &&
- DeviceMonitor.readWriteError(conf, diskInfo.dirs.head)) {
- logError(s"${diskInfo.mountPoint} read-write error,
notify observers")
- // We think that if one dir in device has read-write
problem, if possible all
- // dirs in this device have the problem
- device.notifyObserversOnNonCriticalError(
- List(diskInfo.mountPoint),
- DiskStatus.READ_OR_WRITE_FAILURE)
- } else if (nonCriticalErrorSum <=
device.notifyErrorThreshold * 0.5) {
- device.notifyObserversOnHealthy(diskInfo.mountPoint)
+ try {
+ if (checkDiskUsage && DeviceMonitor.highDiskUsage(conf,
diskInfo)) {
+ logError(s"${diskInfo.mountPoint} high_disk_usage
error, notify observers")
+
device.notifyObserversOnHighDiskUsage(diskInfo.mountPoint)
+ } else if (checkReadWrite &&
+ DeviceMonitor.readWriteError(conf,
diskInfo.dirs.head)) {
+ logError(s"${diskInfo.mountPoint} read-write error,
notify observers")
+ // We think that if one dir in device has read-write
problem, if possible all
+ // dirs in this device have the problem
+ device.notifyObserversOnNonCriticalError(
+ List(diskInfo.mountPoint),
+ DiskStatus.READ_OR_WRITE_FAILURE)
+ } else if (nonCriticalErrorSum <=
device.notifyErrorThreshold * 0.5) {
+ device.notifyObserversOnHealthy(diskInfo.mountPoint)
+ }
+ } catch {
+ case e: ExecutionException =>
+ e.getCause match {
+ case fse: FileSystemException =>
+ logError(
+ s"${diskInfo.mountPoint} critical error, notify
observers",
+ fse)
+ device.notifyObserversOnError(
+ List(diskInfo.mountPoint),
+ DiskStatus.CRITICAL_ERROR)
+ case throwable: Throwable =>
+ throw throwable
+ }
}
}
}