This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new c931663e [CELEBORN-110][REFACTOR] Notify critical error after 
collecting a certain number of non-critical error (#1055)
c931663e is described below

commit c931663e5fed3713c1d193670d1e518e4f13559e
Author: nafiy <[email protected]>
AuthorDate: Fri Dec 16 15:47:36 2022 +0800

    [CELEBORN-110][REFACTOR] Notify critical error after collecting a certain 
number of non-critical error (#1055)
---
 .../org/apache/celeborn/common/CelebornConf.scala  | 21 ++++++
 docs/configuration/worker.md                       |  2 +
 .../deploy/worker/storage/DeviceMonitor.scala      | 86 +++++++++++++---------
 .../deploy/worker/storage/StorageManager.scala     |  4 +-
 .../deploy/worker/storage/DeviceMonitorSuite.scala | 37 +---------
 5 files changed, 82 insertions(+), 68 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index ec837625..f5d87b86 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -724,6 +724,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def diskMonitorCheckList: Seq[String] = get(WORKER_DISK_MONITOR_CHECKLIST)
   def diskMonitorCheckInterval: Long = get(WORKER_DISK_MONITOR_CHECK_INTERVAL)
   def diskMonitorSysBlockDir: String = get(WORKER_DISK_MONITOR_SYS_BLOCK_DIR)
+  def diskMonitorNotifyErrorThreshold: Int = 
get(WORKER_DISK_MONITOR_NOTIFY_ERROR_THRESHOLD)
+  def diskMonitorNotifyErrorExpireTimeout: Long =
+    get(WORKER_DISK_MONITOR_NOTIFY_ERROR_EXPIRE_TIMEOUT)
   def createWriterMaxAttempts: Int = get(WORKER_WRITER_CREATE_MAX_ATTEMPTS)
   def workerStorageBaseDirPrefix: String = get(WORKER_STORAGE_BASE_DIR_PREFIX)
   def workerStorageBaseDirNumber: Int = get(WORKER_STORAGE_BASE_DIR_COUNT)
@@ -2445,6 +2448,24 @@ object CelebornConf extends Logging {
       .stringConf
       .createWithDefault("/sys/block")
 
+  val WORKER_DISK_MONITOR_NOTIFY_ERROR_THRESHOLD: ConfigEntry[Int] =
+    buildConf("celeborn.worker.monitor.disk.notifyError.threshold")
+      .categories("worker")
+      .version("0.3.0")
+      .doc("Device monitor will only notify critical error once the 
accumulated valid non-critical error number " +
+        "exceeding this threshold.")
+      .intConf
+      .createWithDefault(64)
+
+  val WORKER_DISK_MONITOR_NOTIFY_ERROR_EXPIRE_TIMEOUT: ConfigEntry[Long] =
+    buildConf("celeborn.worker.monitor.disk.notifyError.expireTimeout")
+      .categories("worker")
+      .version("0.3.0")
+      .doc("The expire timeout of non-critical device error. Only notify 
critical error when the number of non-critical " +
+        "errors for a period of time exceeds threshold.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("10m")
+
   val WORKER_WRITER_CREATE_MAX_ATTEMPTS: ConfigEntry[Int] =
     buildConf("celeborn.worker.writer.create.maxAttempts")
       .withAlternative("rss.create.file.writer.retry.count")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index bd7d7d91..0ae29ef9 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -62,6 +62,8 @@ license: |
 | celeborn.worker.monitor.disk.checkInterval | 60s | Intervals between device 
monitor to check disk. | 0.2.0 | 
 | celeborn.worker.monitor.disk.checklist | readwrite,diskusage | Monitor type 
for disk, available items are: iohang, readwrite and diskusage. | 0.2.0 | 
 | celeborn.worker.monitor.disk.enabled | true | When true, worker will monitor 
device and report to master. | 0.2.0 | 
+| celeborn.worker.monitor.disk.notifyError.expireTimeout | 10m | The expire 
timeout of non-critical device error. Only notify critical error when the 
number of non-critical errors for a period of time exceeds threshold. | 0.3.0 | 
+| celeborn.worker.monitor.disk.notifyError.threshold | 64 | Device monitor 
will only notify critical error once the accumulated valid non-critical error 
number exceeding this threshold. | 0.3.0 | 
 | celeborn.worker.monitor.disk.sys.block.dir | /sys/block | The directory 
where linux file block information is stored. | 0.2.0 | 
 | celeborn.worker.noneEmptyDirExpireDuration | 1d | If a non-empty application 
shuffle data dir have not been operated during le duration time, will mark this 
application as expired. | 0.2.0 | 
 | celeborn.worker.partitionSorter.directMemoryRatioThreshold | 0.1 | Max ratio 
of partition sorter's memory for sorting, when reserved memory is higher than 
max partition sorter memory, partition sorter will stop sorting. | 0.2.0 | 
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
index 630804ee..ebc8ad6a 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
@@ -42,7 +42,6 @@ trait DeviceMonitor {
   def registerFlusher(flusher: LocalFlusher): Unit = {}
   def unregisterFlusher(flusher: LocalFlusher): Unit = {}
   def reportNonCriticalError(mountPoint: String, e: IOException, diskStatus: 
DiskStatus): Unit = {}
-  def reportDeviceError(mountPoint: String, e: IOException, diskStatus: 
DiskStatus): Unit = {}
   def close() {}
 }
 
@@ -66,6 +65,10 @@ class LocalDeviceMonitor(
     val statFile = new File(s"$sysBlockDir/${deviceInfo.name}/stat")
     val inFlightFile = new File(s"$sysBlockDir/${deviceInfo.name}/inflight")
 
+    val nonCriticalErrors = new ConcurrentHashMap[DiskStatus, util.Set[Long]]()
+    val notifyErrorThreshold = conf.diskMonitorNotifyErrorThreshold
+    val notifyErrorExpireTimeout = conf.diskMonitorNotifyErrorExpireTimeout
+
     var lastReadComplete: Long = -1
     var lastWriteComplete: Long = -1
     var lastReadInflight: Long = -1
@@ -96,6 +99,13 @@ class LocalDeviceMonitor(
 
     def notifyObserversOnNonCriticalError(mountPoints: List[String], 
diskStatus: DiskStatus): Unit =
       this.synchronized {
+        val nonCriticalErrorSetFunc = new util.function.Function[DiskStatus, 
util.Set[Long]] {
+          override def apply(t: DiskStatus): util.Set[Long] = {
+            ConcurrentHashMap.newKeySet[Long]()
+          }
+        }
+        nonCriticalErrors.computeIfAbsent(diskStatus, 
nonCriticalErrorSetFunc).add(
+          System.currentTimeMillis())
         mountPoints.foreach { case mountPoint =>
           diskInfos.get(mountPoint).setStatus(diskStatus)
         }
@@ -225,26 +235,45 @@ class LocalDeviceMonitor(
           try {
             observedDevices.values().asScala.foreach(device => {
               val mountPoints = device.diskInfos.keySet.asScala.toList
-
-              if (checkIoHang && device.ioHang()) {
-                logger.error(s"Encounter device io hang error!" +
-                  s"${device.deviceInfo.name}, notify observers")
-                device.notifyObserversOnNonCriticalError(mountPoints, 
DiskStatus.IO_HANG)
+              // tolerate time accuracy for better performance
+              val now = System.currentTimeMillis()
+              for (concurrentSet <- device.nonCriticalErrors.values().asScala) 
{
+                for (time <- concurrentSet.asScala) {
+                  if (now - time > device.notifyErrorExpireTimeout) {
+                    concurrentSet.remove(time)
+                  }
+                }
+              }
+              val nonCriticalErrorSum = 
device.nonCriticalErrors.values().asScala.map(_.size).sum
+              if (nonCriticalErrorSum > device.notifyErrorThreshold) {
+                logger.error(s"Device ${device.deviceInfo.name} has 
accumulated $nonCriticalErrorSum non-critical " +
+                  s"error within the past 
${Utils.msDurationToString(device.notifyErrorExpireTimeout)} , its sum has " +
+                  s"exceed the threshold (${device.notifyErrorThreshold}), 
device monitor will notify error to " +
+                  s"observed device.")
+                val mountPoints = 
device.diskInfos.values().asScala.map(_.mountPoint).toList
+                device.notifyObserversOnError(mountPoints, 
DiskStatus.CRITICAL_ERROR)
               } else {
-                device.diskInfos.values().asScala.foreach { case diskInfo =>
-                  if (checkDiskUsage && DeviceMonitor.highDiskUsage(conf, 
diskInfo)) {
-                    logger.error(s"${diskInfo.mountPoint} high_disk_usage 
error, notify observers")
-                    device.notifyObserversOnHighDiskUsage(diskInfo.mountPoint)
-                  } else if (checkReadWrite &&
-                    DeviceMonitor.readWriteError(conf, diskInfo.dirs.head)) {
-                    logger.error(s"${diskInfo.mountPoint} read-write error, 
notify observers")
-                    // We think that if one dir in device has read-write 
problem, if possible all
-                    // dirs in this device have the problem
-                    device.notifyObserversOnNonCriticalError(
-                      List(diskInfo.mountPoint),
-                      DiskStatus.READ_OR_WRITE_FAILURE)
-                  } else {
-                    device.notifyObserversOnHealthy(diskInfo.mountPoint)
+                if (checkIoHang && device.ioHang()) {
+                  logger.error(s"Encounter device io hang error!" +
+                    s"${device.deviceInfo.name}, notify observers")
+                  device.notifyObserversOnNonCriticalError(mountPoints, 
DiskStatus.IO_HANG)
+                } else {
+                  device.diskInfos.values().asScala.foreach { case diskInfo =>
+                    if (checkDiskUsage && DeviceMonitor.highDiskUsage(conf, 
diskInfo)) {
+                      logger.error(
+                        s"${diskInfo.mountPoint} high_disk_usage error, notify 
observers")
+                      
device.notifyObserversOnHighDiskUsage(diskInfo.mountPoint)
+                    } else if (checkReadWrite &&
+                      DeviceMonitor.readWriteError(conf, diskInfo.dirs.head)) {
+                      logger.error(s"${diskInfo.mountPoint} read-write error, 
notify observers")
+                      // We think that if one dir in device has read-write 
problem, if possible all
+                      // dirs in this device have the problem
+                      device.notifyObserversOnNonCriticalError(
+                        List(diskInfo.mountPoint),
+                        DiskStatus.READ_OR_WRITE_FAILURE)
+                    } else if (nonCriticalErrorSum <= 
device.notifyErrorThreshold * 0.5) {
+                      device.notifyObserversOnHealthy(diskInfo.mountPoint)
+                    }
                   }
                 }
               }
@@ -278,17 +307,6 @@ class LocalDeviceMonitor(
     
observedDevices.get(diskInfos.get(flusher.mountPoint).deviceInfo).removeObserver(flusher)
   }
 
-  override def reportDeviceError(
-      mountPoint: String,
-      e: IOException,
-      diskStatus: DiskStatus): Unit = {
-    logger.error(s"Receive report exception, disk $mountPoint, $e")
-    if (diskInfos.containsKey(mountPoint)) {
-      observedDevices.get(diskInfos.get(mountPoint).deviceInfo)
-        .notifyObserversOnError(List(mountPoint), diskStatus)
-    }
-  }
-
   override def reportNonCriticalError(
       mountPoint: String,
       e: IOException,
@@ -332,7 +350,8 @@ object DeviceMonitor {
 
   /**
    * check if the disk is high usage
-   * @param conf conf
+   *
+   * @param conf     conf
    * @param diskInfo diskInfo
    * @return true if high disk usage
    */
@@ -362,7 +381,8 @@ object DeviceMonitor {
 
   /**
    * check if the data dir has read-write problem
-   * @param conf conf
+   *
+   * @param conf    conf
    * @param dataDir one of shuffle data dirs in mount disk
    * @return true if disk has read-write problem
    */
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 54d2138a..49307519 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -143,8 +143,8 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
     }
 
   override def notifyError(mountPoint: String, diskStatus: DiskStatus): Unit = 
this.synchronized {
-    if (diskStatus == DiskStatus.IO_HANG) {
-      logInfo("IoHang, remove disk operator.")
+    if (diskStatus == DiskStatus.CRITICAL_ERROR) {
+      logInfo(s"Disk ${mountPoint} faces critical error, will remove its disk 
operator.")
       val operator = diskOperators.remove(mountPoint)
       if (operator != null) {
         operator.shutdown()
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
index de05c64f..ca76c7bf 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
@@ -214,19 +214,19 @@ class DeviceMonitorSuite extends AnyFunSuite {
         
deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager))
       
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(df4))
 
-      when(fw2.notifyError("vda", DiskStatus.IO_HANG))
+      when(fw2.notifyError("vda", DiskStatus.CRITICAL_ERROR))
         .thenAnswer((a: String, b: List[File]) => {
           deviceMonitor.unregisterFileWriter(fw2)
         })
-      when(fw4.notifyError("vdb", DiskStatus.IO_HANG))
+      when(fw4.notifyError("vdb", DiskStatus.CRITICAL_ERROR))
         .thenAnswer((a: String, b: List[File]) => {
           deviceMonitor.unregisterFileWriter(fw4)
         })
-      when(df2.notifyError("vda", DiskStatus.IO_HANG))
+      when(df2.notifyError("vda", DiskStatus.CRITICAL_ERROR))
         .thenAnswer((a: String, b: List[File]) => {
           df2.stopFlag.set(true)
         })
-      when(df4.notifyError("vdb", DiskStatus.IO_HANG))
+      when(df4.notifyError("vdb", DiskStatus.CRITICAL_ERROR))
         .thenAnswer((a: String, b: List[File]) => {
           df4.stopFlag.set(true)
         })
@@ -252,35 +252,6 @@ class DeviceMonitorSuite extends AnyFunSuite {
       deviceMonitor.registerFileWriter(fw4)
       
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 
4)
       
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 
4)
-      val dirs = new jArrayList[File]()
-      dirs.add(null)
-      when(fw1.notifyError(any(), any()))
-        .thenAnswer((_: Any) => {
-          deviceMonitor.unregisterFileWriter(fw1)
-        })
-      when(fw2.notifyError(any(), any()))
-        .thenAnswer((_: Any) => {
-          deviceMonitor.unregisterFileWriter(fw2)
-        })
-      deviceMonitor.reportDeviceError("/mnt/disk1", null, DiskStatus.IO_HANG)
-      
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 
2)
-      assert(
-        
deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager))
-      
assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(df2))
-
-      when(fw3.notifyError(any(), any()))
-        .thenAnswer((_: Any) => {
-          deviceMonitor.unregisterFileWriter(fw3)
-        })
-      when(fw4.notifyError(any(), any()))
-        .thenAnswer((_: Any) => {
-          deviceMonitor.unregisterFileWriter(fw4)
-        })
-      deviceMonitor.reportDeviceError("/mnt/disk2", null, DiskStatus.IO_HANG)
-      
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 
2)
-      assert(
-        
deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager))
-      
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(df4))
     }
   }
 

Reply via email to