This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new f13dfb74 [CELEBORN-113][FEATURE] Add metrics to monitor non-critical 
error number on local device (#1100)
f13dfb74 is described below

commit f13dfb7421c6fe1ba4a8edd7ab08c70c73f5c441
Author: nafiy <[email protected]>
AuthorDate: Tue Dec 20 22:30:55 2022 +0800

    [CELEBORN-113][FEATURE] Add metrics to monitor non-critical error number on 
local device (#1100)
---
 .../apache/celeborn/common/meta/DiskStatus.java    | 13 +++++++++
 .../celeborn/common/meta/DiskStatusSuiteJ.java}    | 24 ++++++++---------
 .../deploy/worker/storage/DeviceMonitor.scala      | 20 +++++++++-----
 .../deploy/worker/storage/StorageManager.scala     |  2 +-
 .../deploy/worker/storage/DeviceMonitorSuite.scala | 31 +++++++++++++++++++++-
 5 files changed, 69 insertions(+), 21 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/meta/DiskStatus.java 
b/common/src/main/java/org/apache/celeborn/common/meta/DiskStatus.java
index c7b6aff9..edf31327 100644
--- a/common/src/main/java/org/apache/celeborn/common/meta/DiskStatus.java
+++ b/common/src/main/java/org/apache/celeborn/common/meta/DiskStatus.java
@@ -34,4 +34,17 @@ public enum DiskStatus {
   public final byte getValue() {
     return value;
   }
+
+  public final String toMetric() {
+    String[] fragments = this.name().split("_");
+    String metric = "";
+    for (String fragment : fragments) {
+      int len = fragment.length();
+      if (len >= 1) {
+        metric += fragment.substring(0, 1).toUpperCase();
+        metric += fragment.substring(1, len).toLowerCase();
+      }
+    }
+    return metric;
+  }
 }
diff --git 
a/common/src/main/java/org/apache/celeborn/common/meta/DiskStatus.java 
b/common/src/test/java/org/apache/celeborn/common/meta/DiskStatusSuiteJ.java
similarity index 60%
copy from common/src/main/java/org/apache/celeborn/common/meta/DiskStatus.java
copy to 
common/src/test/java/org/apache/celeborn/common/meta/DiskStatusSuiteJ.java
index c7b6aff9..d080ed3e 100644
--- a/common/src/main/java/org/apache/celeborn/common/meta/DiskStatus.java
+++ b/common/src/test/java/org/apache/celeborn/common/meta/DiskStatusSuiteJ.java
@@ -17,21 +17,19 @@
 
 package org.apache.celeborn.common.meta;
 
-public enum DiskStatus {
-  HEALTHY(0),
-  READ_OR_WRITE_FAILURE(1),
-  IO_HANG(2),
-  HIGH_DISK_USAGE(3),
-  CRITICAL_ERROR(4);
+import static org.junit.Assert.*;
 
-  private final byte value;
+import org.junit.Test;
 
-  DiskStatus(int value) {
-    assert (value >= 0 && value < 256);
-    this.value = (byte) value;
-  }
+public class DiskStatusSuiteJ {
 
-  public final byte getValue() {
-    return value;
+  @Test
+  public void testDiskStatusToMetric() throws Exception {
+    assertEquals(DiskStatus.values().length, 5);
+    assertEquals(DiskStatus.HEALTHY.toMetric(), "Healthy");
+    assertEquals(DiskStatus.READ_OR_WRITE_FAILURE.toMetric(), 
"ReadOrWriteFailure");
+    assertEquals(DiskStatus.IO_HANG.toMetric(), "IoHang");
+    assertEquals(DiskStatus.HIGH_DISK_USAGE.toMetric(), "HighDiskUsage");
+    assertEquals(DiskStatus.CRITICAL_ERROR.toMetric(), "CriticalError");
   }
 }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
index ebc8ad6a..ef5ad5e6 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory
 
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.meta.{DeviceInfo, DiskInfo, DiskStatus}
+import org.apache.celeborn.common.metrics.source.AbstractSource
 import org.apache.celeborn.common.util.{ThreadUtils, Utils}
 import org.apache.celeborn.common.util.Utils._
 
@@ -51,10 +52,11 @@ class LocalDeviceMonitor(
     conf: CelebornConf,
     observer: DeviceObserver,
     deviceInfos: util.Map[String, DeviceInfo],
-    diskInfos: util.Map[String, DiskInfo]) extends DeviceMonitor {
+    diskInfos: util.Map[String, DiskInfo],
+    workerSource: AbstractSource) extends DeviceMonitor {
   val logger = LoggerFactory.getLogger(classOf[LocalDeviceMonitor])
 
-  class ObservedDevice(val deviceInfo: DeviceInfo) {
+  class ObservedDevice(val deviceInfo: DeviceInfo, workerSource: 
AbstractSource) {
     val diskInfos = new ConcurrentHashMap[String, DiskInfo]()
     deviceInfo.diskInfos.foreach { case diskInfo =>
       diskInfos.put(diskInfo.mountPoint, diskInfo)
@@ -101,7 +103,11 @@ class LocalDeviceMonitor(
       this.synchronized {
         val nonCriticalErrorSetFunc = new util.function.Function[DiskStatus, 
util.Set[Long]] {
           override def apply(t: DiskStatus): util.Set[Long] = {
-            ConcurrentHashMap.newKeySet[Long]()
+            val set = ConcurrentHashMap.newKeySet[Long]()
+            workerSource.addGauge(
+              s"Device_${deviceInfo.name}_${diskStatus.toMetric}_Count",
+              _ => set.size())
+            set
           }
         }
         nonCriticalErrors.computeIfAbsent(diskStatus, 
nonCriticalErrorSetFunc).add(
@@ -221,7 +227,7 @@ class LocalDeviceMonitor(
         s"because noDevice device $deviceName exists.")
     }
     deviceInfos.asScala.foreach(entry => {
-      val observedDevice = new ObservedDevice(entry._2)
+      val observedDevice = new ObservedDevice(entry._2, workerSource)
       observedDevice.addObserver(observer)
       observedDevices.put(entry._2, observedDevice)
     })
@@ -331,10 +337,12 @@ object DeviceMonitor {
       conf: CelebornConf,
       deviceObserver: DeviceObserver,
       deviceInfos: util.Map[String, DeviceInfo],
-      diskInfos: util.Map[String, DiskInfo]): DeviceMonitor = {
+      diskInfos: util.Map[String, DiskInfo],
+      workerSource: AbstractSource): DeviceMonitor = {
     try {
       if (conf.diskMonitorEnabled) {
-        val monitor = new LocalDeviceMonitor(conf, deviceObserver, 
deviceInfos, diskInfos)
+        val monitor =
+          new LocalDeviceMonitor(conf, deviceObserver, deviceInfos, diskInfos, 
workerSource)
         monitor.init()
         logger.info("Device monitor init success")
         monitor
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 49307519..5d75e03e 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -92,7 +92,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
     tmpDiskInfos.put(diskInfo.mountPoint, diskInfo)
   }
   private val deviceMonitor =
-    DeviceMonitor.createDeviceMonitor(conf, this, deviceInfos, tmpDiskInfos)
+    DeviceMonitor.createDeviceMonitor(conf, this, deviceInfos, tmpDiskInfos, 
workerSource)
 
   // (mountPoint -> LocalFlusher)
   private val localFlushers: ConcurrentHashMap[String, LocalFlusher] = {
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
index ca76c7bf..ccf6ca77 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
@@ -34,6 +34,7 @@ import 
org.apache.celeborn.common.CelebornConf.WORKER_DISK_MONITOR_CHECK_INTERVA
 import org.apache.celeborn.common.meta.{DeviceInfo, DiskInfo, DiskStatus}
 import org.apache.celeborn.common.protocol.StorageInfo
 import org.apache.celeborn.common.util.Utils
+import org.apache.celeborn.service.deploy.worker.WorkerSource
 
 class DeviceMonitorSuite extends AnyFunSuite {
   val dfCmd = "df -ah"
@@ -66,6 +67,7 @@ class DeviceMonitorSuite extends AnyFunSuite {
 
   val conf = new CelebornConf()
   conf.set(WORKER_DISK_MONITOR_CHECK_INTERVAL.key, "3600s")
+  val workerSource = new WorkerSource(conf)
 
   val storageManager = mock[DeviceObserver]
   var (deviceInfos, diskInfos, workingDirDiskInfos): (
@@ -82,7 +84,7 @@ class DeviceMonitorSuite extends AnyFunSuite {
     diskInfos = tdiskInfos
   }
   val deviceMonitor =
-    new LocalDeviceMonitor(conf, storageManager, deviceInfos, diskInfos)
+    new LocalDeviceMonitor(conf, storageManager, deviceInfos, diskInfos, 
workerSource)
 
   val vdaDeviceInfo = new DeviceInfo("vda")
   val vdbDeviceInfo = new DeviceInfo("vdb")
@@ -273,4 +275,31 @@ class DeviceMonitorSuite extends AnyFunSuite {
     })
     DeviceMonitor.deviceCheckThreadPool.shutdownNow()
   }
+
+  test("monitor non-critical error metrics") {
+    withObjectMocked[org.apache.celeborn.common.util.Utils.type] {
+      when(Utils.runCommand(dfCmd)) thenReturn dfOut
+      when(Utils.runCommand(lsCmd)) thenReturn lsOut
+
+      deviceMonitor.init()
+
+      val device1 = deviceMonitor.observedDevices.values().asScala.head
+      val mountPoints1 = device1.diskInfos.keySet().asScala.toList
+
+      device1.notifyObserversOnNonCriticalError(mountPoints1, 
DiskStatus.READ_OR_WRITE_FAILURE)
+      device1.notifyObserversOnNonCriticalError(mountPoints1, 
DiskStatus.IO_HANG)
+      val deviceMonitorMetrics =
+        
workerSource.gauges().filter(_.name.startsWith("Device")).sortBy(_.name)
+
+      assertEquals("Device_vda_IoHang_Count", deviceMonitorMetrics.head.name)
+      assertEquals("Device_vda_ReadOrWriteFailure_Count", 
deviceMonitorMetrics.last.name)
+      assertEquals(1, deviceMonitorMetrics.head.gauge.getValue)
+      assertEquals(1, deviceMonitorMetrics.last.gauge.getValue)
+
+      device1.notifyObserversOnNonCriticalError(mountPoints1, 
DiskStatus.READ_OR_WRITE_FAILURE)
+      device1.notifyObserversOnNonCriticalError(mountPoints1, 
DiskStatus.IO_HANG)
+      assertEquals(2, deviceMonitorMetrics.head.gauge.getValue)
+      assertEquals(2, deviceMonitorMetrics.last.gauge.getValue)
+    }
+  }
 }

Reply via email to