This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new c931663e [CELEBORN-110][REFACTOR] Notify critical error after
collecting a certain number of non-critical error (#1055)
c931663e is described below
commit c931663e5fed3713c1d193670d1e518e4f13559e
Author: nafiy <[email protected]>
AuthorDate: Fri Dec 16 15:47:36 2022 +0800
[CELEBORN-110][REFACTOR] Notify critical error after collecting a certain
number of non-critical error (#1055)
---
.../org/apache/celeborn/common/CelebornConf.scala | 21 ++++++
docs/configuration/worker.md | 2 +
.../deploy/worker/storage/DeviceMonitor.scala | 86 +++++++++++++---------
.../deploy/worker/storage/StorageManager.scala | 4 +-
.../deploy/worker/storage/DeviceMonitorSuite.scala | 37 +---------
5 files changed, 82 insertions(+), 68 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index ec837625..f5d87b86 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -724,6 +724,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def diskMonitorCheckList: Seq[String] = get(WORKER_DISK_MONITOR_CHECKLIST)
def diskMonitorCheckInterval: Long = get(WORKER_DISK_MONITOR_CHECK_INTERVAL)
def diskMonitorSysBlockDir: String = get(WORKER_DISK_MONITOR_SYS_BLOCK_DIR)
+ def diskMonitorNotifyErrorThreshold: Int =
get(WORKER_DISK_MONITOR_NOTIFY_ERROR_THRESHOLD)
+ def diskMonitorNotifyErrorExpireTimeout: Long =
+ get(WORKER_DISK_MONITOR_NOTIFY_ERROR_EXPIRE_TIMEOUT)
def createWriterMaxAttempts: Int = get(WORKER_WRITER_CREATE_MAX_ATTEMPTS)
def workerStorageBaseDirPrefix: String = get(WORKER_STORAGE_BASE_DIR_PREFIX)
def workerStorageBaseDirNumber: Int = get(WORKER_STORAGE_BASE_DIR_COUNT)
@@ -2445,6 +2448,24 @@ object CelebornConf extends Logging {
.stringConf
.createWithDefault("/sys/block")
+ val WORKER_DISK_MONITOR_NOTIFY_ERROR_THRESHOLD: ConfigEntry[Int] =
+ buildConf("celeborn.worker.monitor.disk.notifyError.threshold")
+ .categories("worker")
+ .version("0.3.0")
+ .doc("Device monitor will only notify critical error once the
accumulated valid non-critical error number " +
+ "exceeding this threshold.")
+ .intConf
+ .createWithDefault(64)
+
+ val WORKER_DISK_MONITOR_NOTIFY_ERROR_EXPIRE_TIMEOUT: ConfigEntry[Long] =
+ buildConf("celeborn.worker.monitor.disk.notifyError.expireTimeout")
+ .categories("worker")
+ .version("0.3.0")
+ .doc("The expire timeout of non-critical device error. Only notify
critical error when the number of non-critical " +
+ "errors for a period of time exceeds threshold.")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("10m")
+
val WORKER_WRITER_CREATE_MAX_ATTEMPTS: ConfigEntry[Int] =
buildConf("celeborn.worker.writer.create.maxAttempts")
.withAlternative("rss.create.file.writer.retry.count")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index bd7d7d91..0ae29ef9 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -62,6 +62,8 @@ license: |
| celeborn.worker.monitor.disk.checkInterval | 60s | Intervals between device
monitor to check disk. | 0.2.0 |
| celeborn.worker.monitor.disk.checklist | readwrite,diskusage | Monitor type
for disk, available items are: iohang, readwrite and diskusage. | 0.2.0 |
| celeborn.worker.monitor.disk.enabled | true | When true, worker will monitor
device and report to master. | 0.2.0 |
+| celeborn.worker.monitor.disk.notifyError.expireTimeout | 10m | The expire
timeout of non-critical device error. Only notify critical error when the
number of non-critical errors for a period of time exceeds threshold. | 0.3.0 |
+| celeborn.worker.monitor.disk.notifyError.threshold | 64 | Device monitor
will only notify critical error once the accumulated valid non-critical error
number exceeding this threshold. | 0.3.0 |
| celeborn.worker.monitor.disk.sys.block.dir | /sys/block | The directory
where linux file block information is stored. | 0.2.0 |
| celeborn.worker.noneEmptyDirExpireDuration | 1d | If a non-empty application
shuffle data dir have not been operated during le duration time, will mark this
application as expired. | 0.2.0 |
| celeborn.worker.partitionSorter.directMemoryRatioThreshold | 0.1 | Max ratio
of partition sorter's memory for sorting, when reserved memory is higher than
max partition sorter memory, partition sorter will stop sorting. | 0.2.0 |
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
index 630804ee..ebc8ad6a 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
@@ -42,7 +42,6 @@ trait DeviceMonitor {
def registerFlusher(flusher: LocalFlusher): Unit = {}
def unregisterFlusher(flusher: LocalFlusher): Unit = {}
def reportNonCriticalError(mountPoint: String, e: IOException, diskStatus:
DiskStatus): Unit = {}
- def reportDeviceError(mountPoint: String, e: IOException, diskStatus:
DiskStatus): Unit = {}
def close() {}
}
@@ -66,6 +65,10 @@ class LocalDeviceMonitor(
val statFile = new File(s"$sysBlockDir/${deviceInfo.name}/stat")
val inFlightFile = new File(s"$sysBlockDir/${deviceInfo.name}/inflight")
+ val nonCriticalErrors = new ConcurrentHashMap[DiskStatus, util.Set[Long]]()
+ val notifyErrorThreshold = conf.diskMonitorNotifyErrorThreshold
+ val notifyErrorExpireTimeout = conf.diskMonitorNotifyErrorExpireTimeout
+
var lastReadComplete: Long = -1
var lastWriteComplete: Long = -1
var lastReadInflight: Long = -1
@@ -96,6 +99,13 @@ class LocalDeviceMonitor(
def notifyObserversOnNonCriticalError(mountPoints: List[String],
diskStatus: DiskStatus): Unit =
this.synchronized {
+ val nonCriticalErrorSetFunc = new util.function.Function[DiskStatus,
util.Set[Long]] {
+ override def apply(t: DiskStatus): util.Set[Long] = {
+ ConcurrentHashMap.newKeySet[Long]()
+ }
+ }
+ nonCriticalErrors.computeIfAbsent(diskStatus,
nonCriticalErrorSetFunc).add(
+ System.currentTimeMillis())
mountPoints.foreach { case mountPoint =>
diskInfos.get(mountPoint).setStatus(diskStatus)
}
@@ -225,26 +235,45 @@ class LocalDeviceMonitor(
try {
observedDevices.values().asScala.foreach(device => {
val mountPoints = device.diskInfos.keySet.asScala.toList
-
- if (checkIoHang && device.ioHang()) {
- logger.error(s"Encounter device io hang error!" +
- s"${device.deviceInfo.name}, notify observers")
- device.notifyObserversOnNonCriticalError(mountPoints,
DiskStatus.IO_HANG)
+ // tolerate time accuracy for better performance
+ val now = System.currentTimeMillis()
+ for (concurrentSet <- device.nonCriticalErrors.values().asScala)
{
+ for (time <- concurrentSet.asScala) {
+ if (now - time > device.notifyErrorExpireTimeout) {
+ concurrentSet.remove(time)
+ }
+ }
+ }
+ val nonCriticalErrorSum =
device.nonCriticalErrors.values().asScala.map(_.size).sum
+ if (nonCriticalErrorSum > device.notifyErrorThreshold) {
+ logger.error(s"Device ${device.deviceInfo.name} has
accumulated $nonCriticalErrorSum non-critical " +
+ s"error within the past
${Utils.msDurationToString(device.notifyErrorExpireTimeout)} , its sum has " +
+ s"exceed the threshold (${device.notifyErrorThreshold}),
device monitor will notify error to " +
+ s"observed device.")
+ val mountPoints =
device.diskInfos.values().asScala.map(_.mountPoint).toList
+ device.notifyObserversOnError(mountPoints,
DiskStatus.CRITICAL_ERROR)
} else {
- device.diskInfos.values().asScala.foreach { case diskInfo =>
- if (checkDiskUsage && DeviceMonitor.highDiskUsage(conf,
diskInfo)) {
- logger.error(s"${diskInfo.mountPoint} high_disk_usage
error, notify observers")
- device.notifyObserversOnHighDiskUsage(diskInfo.mountPoint)
- } else if (checkReadWrite &&
- DeviceMonitor.readWriteError(conf, diskInfo.dirs.head)) {
- logger.error(s"${diskInfo.mountPoint} read-write error,
notify observers")
- // We think that if one dir in device has read-write
problem, if possible all
- // dirs in this device have the problem
- device.notifyObserversOnNonCriticalError(
- List(diskInfo.mountPoint),
- DiskStatus.READ_OR_WRITE_FAILURE)
- } else {
- device.notifyObserversOnHealthy(diskInfo.mountPoint)
+ if (checkIoHang && device.ioHang()) {
+ logger.error(s"Encounter device io hang error!" +
+ s"${device.deviceInfo.name}, notify observers")
+ device.notifyObserversOnNonCriticalError(mountPoints,
DiskStatus.IO_HANG)
+ } else {
+ device.diskInfos.values().asScala.foreach { case diskInfo =>
+ if (checkDiskUsage && DeviceMonitor.highDiskUsage(conf,
diskInfo)) {
+ logger.error(
+ s"${diskInfo.mountPoint} high_disk_usage error, notify
observers")
+
device.notifyObserversOnHighDiskUsage(diskInfo.mountPoint)
+ } else if (checkReadWrite &&
+ DeviceMonitor.readWriteError(conf, diskInfo.dirs.head)) {
+ logger.error(s"${diskInfo.mountPoint} read-write error,
notify observers")
+ // We think that if one dir in device has read-write
problem, if possible all
+ // dirs in this device have the problem
+ device.notifyObserversOnNonCriticalError(
+ List(diskInfo.mountPoint),
+ DiskStatus.READ_OR_WRITE_FAILURE)
+ } else if (nonCriticalErrorSum <=
device.notifyErrorThreshold * 0.5) {
+ device.notifyObserversOnHealthy(diskInfo.mountPoint)
+ }
}
}
}
@@ -278,17 +307,6 @@ class LocalDeviceMonitor(
observedDevices.get(diskInfos.get(flusher.mountPoint).deviceInfo).removeObserver(flusher)
}
- override def reportDeviceError(
- mountPoint: String,
- e: IOException,
- diskStatus: DiskStatus): Unit = {
- logger.error(s"Receive report exception, disk $mountPoint, $e")
- if (diskInfos.containsKey(mountPoint)) {
- observedDevices.get(diskInfos.get(mountPoint).deviceInfo)
- .notifyObserversOnError(List(mountPoint), diskStatus)
- }
- }
-
override def reportNonCriticalError(
mountPoint: String,
e: IOException,
@@ -332,7 +350,8 @@ object DeviceMonitor {
/**
* check if the disk is high usage
- * @param conf conf
+ *
+ * @param conf conf
* @param diskInfo diskInfo
* @return true if high disk usage
*/
@@ -362,7 +381,8 @@ object DeviceMonitor {
/**
* check if the data dir has read-write problem
- * @param conf conf
+ *
+ * @param conf conf
* @param dataDir one of shuffle data dirs in mount disk
* @return true if disk has read-write problem
*/
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 54d2138a..49307519 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -143,8 +143,8 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
}
override def notifyError(mountPoint: String, diskStatus: DiskStatus): Unit =
this.synchronized {
- if (diskStatus == DiskStatus.IO_HANG) {
- logInfo("IoHang, remove disk operator.")
+ if (diskStatus == DiskStatus.CRITICAL_ERROR) {
+ logInfo(s"Disk ${mountPoint} faces critical error, will remove its disk
operator.")
val operator = diskOperators.remove(mountPoint)
if (operator != null) {
operator.shutdown()
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
index de05c64f..ca76c7bf 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
@@ -214,19 +214,19 @@ class DeviceMonitorSuite extends AnyFunSuite {
deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager))
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(df4))
- when(fw2.notifyError("vda", DiskStatus.IO_HANG))
+ when(fw2.notifyError("vda", DiskStatus.CRITICAL_ERROR))
.thenAnswer((a: String, b: List[File]) => {
deviceMonitor.unregisterFileWriter(fw2)
})
- when(fw4.notifyError("vdb", DiskStatus.IO_HANG))
+ when(fw4.notifyError("vdb", DiskStatus.CRITICAL_ERROR))
.thenAnswer((a: String, b: List[File]) => {
deviceMonitor.unregisterFileWriter(fw4)
})
- when(df2.notifyError("vda", DiskStatus.IO_HANG))
+ when(df2.notifyError("vda", DiskStatus.CRITICAL_ERROR))
.thenAnswer((a: String, b: List[File]) => {
df2.stopFlag.set(true)
})
- when(df4.notifyError("vdb", DiskStatus.IO_HANG))
+ when(df4.notifyError("vdb", DiskStatus.CRITICAL_ERROR))
.thenAnswer((a: String, b: List[File]) => {
df4.stopFlag.set(true)
})
@@ -252,35 +252,6 @@ class DeviceMonitorSuite extends AnyFunSuite {
deviceMonitor.registerFileWriter(fw4)
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(),
4)
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(),
4)
- val dirs = new jArrayList[File]()
- dirs.add(null)
- when(fw1.notifyError(any(), any()))
- .thenAnswer((_: Any) => {
- deviceMonitor.unregisterFileWriter(fw1)
- })
- when(fw2.notifyError(any(), any()))
- .thenAnswer((_: Any) => {
- deviceMonitor.unregisterFileWriter(fw2)
- })
- deviceMonitor.reportDeviceError("/mnt/disk1", null, DiskStatus.IO_HANG)
-
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(),
2)
- assert(
-
deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager))
-
assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(df2))
-
- when(fw3.notifyError(any(), any()))
- .thenAnswer((_: Any) => {
- deviceMonitor.unregisterFileWriter(fw3)
- })
- when(fw4.notifyError(any(), any()))
- .thenAnswer((_: Any) => {
- deviceMonitor.unregisterFileWriter(fw4)
- })
- deviceMonitor.reportDeviceError("/mnt/disk2", null, DiskStatus.IO_HANG)
-
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(),
2)
- assert(
-
deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager))
-
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(df4))
}
}