This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new ef9459e18 [CELEBORN-1037] Incorrect output for metrics of Prometheus
ef9459e18 is described below
commit ef9459e18b4b5a2f76157e60e997d0887d54ca36
Author: onebox-li <[email protected]>
AuthorDate: Fri Oct 13 11:18:03 2023 +0800
[CELEBORN-1037] Incorrect output for metrics of Prometheus
### What changes were proposed in this pull request?
The new added `deadlocks` metrics in `ThreadStatesGaugeSet` is a
Set<String>, which is invalid. So here add a filter at the `addGauge` extrance.
### Why are the changes needed?
Ditto
### Does this PR introduce _any_ user-facing change?
Remove the unused metrics. BTW the template use
`metrics_jvm_thread_deadlock_count_Value`
### How was this patch tested?
Manual test
Closes #1981 from onebox-li/fix-1037.
Authored-by: onebox-li <[email protected]>
Signed-off-by: mingji <[email protected]>
(cherry picked from commit 8b1bd07905022c198d677d13882a09ffb7eeafff)
Signed-off-by: mingji <[email protected]>
---
.../common/metrics/source/AbstractSource.scala | 8 +-
.../deploy/worker/storage/DeviceMonitorSuite.scala | 420 +++++++++++----------
2 files changed, 234 insertions(+), 194 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
index 21fd68d33..c4409c050 100644
---
a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala
@@ -73,7 +73,13 @@ abstract class AbstractSource(conf: CelebornConf, role:
String)
name: String,
labels: Map[String, String],
gauge: Gauge[T]): Unit = {
- namedGauges.add(NamedGauge(name, gauge, labels ++ staticLabels))
+ // filter out non-number type gauges
+ if (gauge.getValue.isInstanceOf[Number]) {
+ namedGauges.add(NamedGauge(name, gauge, labels ++ staticLabels))
+ } else {
+ logWarning(
+ s"Add gauge $name failed, the value type ${gauge.getValue.getClass} is
not a number")
+ }
}
def addGauge[T](
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
index 574894814..e4eb34281 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala
@@ -102,6 +102,17 @@ class DeviceMonitorSuite extends AnyFunSuite {
|/dev/vdb 1932735283200 97710505984 1835024777216 6%
/mnt/disk5
|""".stripMargin
+ val dfBOut1DiskUsageInfo =
+ DeviceMonitor.DiskUsageInfo(1395864371200L, 1293858897920L, 102005473280L,
7)
+ val dfBOut2DiskUsageInfo =
+ DeviceMonitor.DiskUsageInfo(1932735283200L, 1835024777216L, 97710505984L,
6)
+ val dfBOut3DiskUsageInfo =
+ DeviceMonitor.DiskUsageInfo(1395864371200L, 1293858897920L, 102005473280L,
7)
+ val dfBOut4DiskUsageInfo =
+ DeviceMonitor.DiskUsageInfo(1932735283200L, 1835024777216L, 97710505984L,
6)
+ val dfBOut5DiskUsageInfo =
+ DeviceMonitor.DiskUsageInfo(1932735283200L, 1835024777216L, 97710505984L,
6)
+
val dirs = new jArrayList[File]()
val workingDir1 = ListBuffer[File](new File("/mnt/disk1/data1"))
val workingDir2 = ListBuffer[File](new File("/mnt/disk1/data2"))
@@ -163,169 +174,183 @@ class DeviceMonitorSuite extends AnyFunSuite {
test("init") {
withObjectMocked[org.apache.celeborn.common.util.Utils.type] {
- when(Utils.runCommand(dfCmd)) thenReturn dfOut
- when(Utils.runCommand(lsCmd)) thenReturn lsOut
-
- deviceMonitor.init()
-
- assertEquals(2, deviceMonitor.observedDevices.size())
-
- assert(deviceMonitor.observedDevices.containsKey(vdaDeviceInfo))
- assert(deviceMonitor.observedDevices.containsKey(vdbDeviceInfo))
-
-
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).diskInfos.size, 1)
-
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).diskInfos.size, 1)
-
- assert(
-
deviceMonitor.observedDevices.get(vdaDeviceInfo).diskInfos.containsKey("/mnt/disk1"))
- assert(
-
deviceMonitor.observedDevices.get(vdbDeviceInfo).diskInfos.containsKey("/mnt/disk2"))
-
- assertEquals(
-
deviceMonitor.observedDevices.get(vdaDeviceInfo).diskInfos.get("/mnt/disk1").dirs(0),
- new File("/mnt/disk1/data1"))
- assertEquals(
-
deviceMonitor.observedDevices.get(vdaDeviceInfo).diskInfos.get("/mnt/disk1").dirs(1),
- new File("/mnt/disk1/data2"))
- assertEquals(
-
deviceMonitor.observedDevices.get(vdbDeviceInfo).diskInfos.get("/mnt/disk2").dirs(0),
- new File("/mnt/disk2/data3"))
- assertEquals(
-
deviceMonitor.observedDevices.get(vdbDeviceInfo).diskInfos.get("/mnt/disk2").dirs(1),
- new File("/mnt/disk2/data4"))
-
-
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(),
1)
-
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(),
1)
+
withObjectMocked[org.apache.celeborn.service.deploy.worker.storage.DeviceMonitor.type]
{
+ when(Utils.runCommand(dfCmd)) thenReturn dfOut
+ when(Utils.runCommand(lsCmd)) thenReturn lsOut
+
+
when(DeviceMonitor.getDiskUsageInfos(diskInfos.get("/mnt/disk1"))).thenReturn(
+ dfBOut1DiskUsageInfo)
+
when(DeviceMonitor.getDiskUsageInfos(diskInfos.get("/mnt/disk2"))).thenReturn(
+ dfBOut2DiskUsageInfo)
+
+ deviceMonitor.init()
+
+ assertEquals(2, deviceMonitor.observedDevices.size())
+
+ assert(deviceMonitor.observedDevices.containsKey(vdaDeviceInfo))
+ assert(deviceMonitor.observedDevices.containsKey(vdbDeviceInfo))
+
+
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).diskInfos.size, 1)
+
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).diskInfos.size, 1)
+
+ assert(
+
deviceMonitor.observedDevices.get(vdaDeviceInfo).diskInfos.containsKey("/mnt/disk1"))
+ assert(
+
deviceMonitor.observedDevices.get(vdbDeviceInfo).diskInfos.containsKey("/mnt/disk2"))
+
+ assertEquals(
+
deviceMonitor.observedDevices.get(vdaDeviceInfo).diskInfos.get("/mnt/disk1").dirs(0),
+ new File("/mnt/disk1/data1"))
+ assertEquals(
+
deviceMonitor.observedDevices.get(vdaDeviceInfo).diskInfos.get("/mnt/disk1").dirs(1),
+ new File("/mnt/disk1/data2"))
+ assertEquals(
+
deviceMonitor.observedDevices.get(vdbDeviceInfo).diskInfos.get("/mnt/disk2").dirs(0),
+ new File("/mnt/disk2/data3"))
+ assertEquals(
+
deviceMonitor.observedDevices.get(vdbDeviceInfo).diskInfos.get("/mnt/disk2").dirs(1),
+ new File("/mnt/disk2/data4"))
+
+
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(),
1)
+
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(),
1)
+ }
}
}
test("register/unregister/notify/report") {
withObjectMocked[org.apache.celeborn.common.util.Utils.type] {
- when(Utils.runCommand(dfCmd)) thenReturn dfOut
- when(Utils.runCommand(lsCmd)) thenReturn lsOut
-
- deviceMonitor.init()
-
- val fw1 = mock[FileWriter]
- val fw2 = mock[FileWriter]
- val fw3 = mock[FileWriter]
- val fw4 = mock[FileWriter]
-
- val f1 = new File("/mnt/disk1/data1/f1")
- val f2 = new File("/mnt/disk1/data2/f2")
- val f3 = new File("/mnt/disk2/data3/f3")
- val f4 = new File("/mnt/disk2/data4/f4")
- when(fw1.getFile).thenReturn(f1)
- when(fw2.getFile).thenReturn(f2)
- when(fw3.getFile).thenReturn(f3)
- when(fw4.getFile).thenReturn(f4)
-
- deviceMonitor.registerFileWriter(fw1)
- deviceMonitor.registerFileWriter(fw2)
- deviceMonitor.registerFileWriter(fw3)
- deviceMonitor.registerFileWriter(fw4)
-
-
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(),
3)
-
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(),
3)
- assert(
-
deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager))
-
assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(fw1))
-
assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(fw2))
- assert(
-
deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager))
-
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(fw3))
-
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(fw4))
-
- deviceMonitor.unregisterFileWriter(fw1)
- deviceMonitor.unregisterFileWriter(fw3)
-
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(),
2)
-
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(),
2)
- assert(
-
deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager))
-
assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(fw2))
- assert(
-
deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager))
-
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(fw4))
-
- val df1 = mock[LocalFlusher]
- val df2 = mock[LocalFlusher]
- val df3 = mock[LocalFlusher]
- val df4 = mock[LocalFlusher]
-
- when(df1.stopFlag).thenReturn(new AtomicBoolean(false))
- when(df2.stopFlag).thenReturn(new AtomicBoolean(false))
- when(df3.stopFlag).thenReturn(new AtomicBoolean(false))
- when(df4.stopFlag).thenReturn(new AtomicBoolean(false))
-
- when(df1.mountPoint).thenReturn("/mnt/disk1")
- when(df2.mountPoint).thenReturn("/mnt/disk1")
- when(df3.mountPoint).thenReturn("/mnt/disk2")
- when(df4.mountPoint).thenReturn("/mnt/disk2")
-
- deviceMonitor.registerFlusher(df1)
- deviceMonitor.registerFlusher(df2)
- deviceMonitor.registerFlusher(df3)
- deviceMonitor.registerFlusher(df4)
-
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(),
4)
-
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(),
4)
- assert(
-
deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager))
-
assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(df1))
-
assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(df2))
- assert(
-
deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager))
-
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(df3))
-
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(df4))
-
- deviceMonitor.unregisterFlusher(df1)
- deviceMonitor.unregisterFlusher(df3)
-
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(),
3)
-
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(),
3)
- assert(
-
deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager))
-
assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(df2))
- assert(
-
deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager))
-
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(df4))
-
- when(fw2.notifyError("vda", DiskStatus.CRITICAL_ERROR))
- .thenAnswer((a: String, b: List[File]) => {
- deviceMonitor.unregisterFileWriter(fw2)
- })
- when(fw4.notifyError("vdb", DiskStatus.CRITICAL_ERROR))
- .thenAnswer((a: String, b: List[File]) => {
- deviceMonitor.unregisterFileWriter(fw4)
- })
- when(df2.notifyError("vda", DiskStatus.CRITICAL_ERROR))
- .thenAnswer((a: String, b: List[File]) => {
- df2.stopFlag.set(true)
- })
- when(df4.notifyError("vdb", DiskStatus.CRITICAL_ERROR))
- .thenAnswer((a: String, b: List[File]) => {
- df4.stopFlag.set(true)
- })
-
- deviceMonitor.observedDevices
- .get(vdaDeviceInfo)
- .notifyObserversOnError(List("/mnt/disk1"), DiskStatus.IO_HANG)
- deviceMonitor.observedDevices
- .get(vdbDeviceInfo)
- .notifyObserversOnError(List("/mnt/disk2"), DiskStatus.IO_HANG)
-
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(),
3)
-
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(),
3)
- assert(
-
deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager))
-
assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(df2))
- assert(
-
deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager))
-
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(df4))
-
- deviceMonitor.registerFileWriter(fw1)
- deviceMonitor.registerFileWriter(fw2)
- deviceMonitor.registerFileWriter(fw3)
- deviceMonitor.registerFileWriter(fw4)
-
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(),
4)
-
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(),
4)
+
withObjectMocked[org.apache.celeborn.service.deploy.worker.storage.DeviceMonitor.type]
{
+ when(Utils.runCommand(dfCmd)) thenReturn dfOut
+ when(Utils.runCommand(lsCmd)) thenReturn lsOut
+
+
when(DeviceMonitor.getDiskUsageInfos(diskInfos.get("/mnt/disk1"))).thenReturn(
+ dfBOut1DiskUsageInfo)
+
when(DeviceMonitor.getDiskUsageInfos(diskInfos.get("/mnt/disk2"))).thenReturn(
+ dfBOut2DiskUsageInfo)
+
+ deviceMonitor.init()
+
+ val fw1 = mock[FileWriter]
+ val fw2 = mock[FileWriter]
+ val fw3 = mock[FileWriter]
+ val fw4 = mock[FileWriter]
+
+ val f1 = new File("/mnt/disk1/data1/f1")
+ val f2 = new File("/mnt/disk1/data2/f2")
+ val f3 = new File("/mnt/disk2/data3/f3")
+ val f4 = new File("/mnt/disk2/data4/f4")
+ when(fw1.getFile).thenReturn(f1)
+ when(fw2.getFile).thenReturn(f2)
+ when(fw3.getFile).thenReturn(f3)
+ when(fw4.getFile).thenReturn(f4)
+
+ deviceMonitor.registerFileWriter(fw1)
+ deviceMonitor.registerFileWriter(fw2)
+ deviceMonitor.registerFileWriter(fw3)
+ deviceMonitor.registerFileWriter(fw4)
+
+
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(),
3)
+
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(),
3)
+ assert(
+
deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager))
+
assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(fw1))
+
assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(fw2))
+ assert(
+
deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager))
+
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(fw3))
+
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(fw4))
+
+ deviceMonitor.unregisterFileWriter(fw1)
+ deviceMonitor.unregisterFileWriter(fw3)
+
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(),
2)
+
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(),
2)
+ assert(
+
deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager))
+
assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(fw2))
+ assert(
+
deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager))
+
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(fw4))
+
+ val df1 = mock[LocalFlusher]
+ val df2 = mock[LocalFlusher]
+ val df3 = mock[LocalFlusher]
+ val df4 = mock[LocalFlusher]
+
+ when(df1.stopFlag).thenReturn(new AtomicBoolean(false))
+ when(df2.stopFlag).thenReturn(new AtomicBoolean(false))
+ when(df3.stopFlag).thenReturn(new AtomicBoolean(false))
+ when(df4.stopFlag).thenReturn(new AtomicBoolean(false))
+
+ when(df1.mountPoint).thenReturn("/mnt/disk1")
+ when(df2.mountPoint).thenReturn("/mnt/disk1")
+ when(df3.mountPoint).thenReturn("/mnt/disk2")
+ when(df4.mountPoint).thenReturn("/mnt/disk2")
+
+ deviceMonitor.registerFlusher(df1)
+ deviceMonitor.registerFlusher(df2)
+ deviceMonitor.registerFlusher(df3)
+ deviceMonitor.registerFlusher(df4)
+
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(),
4)
+
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(),
4)
+ assert(
+
deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager))
+
assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(df1))
+
assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(df2))
+ assert(
+
deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager))
+
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(df3))
+
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(df4))
+
+ deviceMonitor.unregisterFlusher(df1)
+ deviceMonitor.unregisterFlusher(df3)
+
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(),
3)
+
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(),
3)
+ assert(
+
deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager))
+
assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(df2))
+ assert(
+
deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager))
+
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(df4))
+
+ when(fw2.notifyError("vda", DiskStatus.CRITICAL_ERROR))
+ .thenAnswer((a: String, b: List[File]) => {
+ deviceMonitor.unregisterFileWriter(fw2)
+ })
+ when(fw4.notifyError("vdb", DiskStatus.CRITICAL_ERROR))
+ .thenAnswer((a: String, b: List[File]) => {
+ deviceMonitor.unregisterFileWriter(fw4)
+ })
+ when(df2.notifyError("vda", DiskStatus.CRITICAL_ERROR))
+ .thenAnswer((a: String, b: List[File]) => {
+ df2.stopFlag.set(true)
+ })
+ when(df4.notifyError("vdb", DiskStatus.CRITICAL_ERROR))
+ .thenAnswer((a: String, b: List[File]) => {
+ df4.stopFlag.set(true)
+ })
+
+ deviceMonitor.observedDevices
+ .get(vdaDeviceInfo)
+ .notifyObserversOnError(List("/mnt/disk1"), DiskStatus.IO_HANG)
+ deviceMonitor.observedDevices
+ .get(vdbDeviceInfo)
+ .notifyObserversOnError(List("/mnt/disk2"), DiskStatus.IO_HANG)
+
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(),
3)
+
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(),
3)
+ assert(
+
deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager))
+
assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(df2))
+ assert(
+
deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager))
+
assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(df4))
+
+ deviceMonitor.registerFileWriter(fw1)
+ deviceMonitor.registerFileWriter(fw2)
+ deviceMonitor.registerFileWriter(fw3)
+ deviceMonitor.registerFileWriter(fw4)
+
assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(),
4)
+
assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(),
4)
+ }
}
}
@@ -350,45 +375,52 @@ class DeviceMonitorSuite extends AnyFunSuite {
test("monitor non-critical error metrics") {
withObjectMocked[org.apache.celeborn.common.util.Utils.type] {
- when(Utils.runCommand(dfCmd)) thenReturn dfOut
- when(Utils.runCommand(lsCmd)) thenReturn lsOut
-
- deviceMonitor.init()
-
- val device1 = deviceMonitor.observedDevices.values().asScala.head
- val mountPoints1 = device1.diskInfos.keySet().asScala.toList
-
- device1.notifyObserversOnNonCriticalError(mountPoints1,
DiskStatus.READ_OR_WRITE_FAILURE)
- device1.notifyObserversOnNonCriticalError(mountPoints1,
DiskStatus.IO_HANG)
- val deviceMonitorMetrics =
- workerSource.gauges().filter(_.name.startsWith("Device_" +
device1.deviceInfo.name))
- .sortBy(_.name)
-
- assertEquals("Device_vda_IoHang_Count", deviceMonitorMetrics.head.name)
- assertEquals("Device_vda_ReadOrWriteFailure_Count",
deviceMonitorMetrics.last.name)
- assertEquals(1, deviceMonitorMetrics.head.gauge.getValue)
- assertEquals(1, deviceMonitorMetrics.last.gauge.getValue)
-
- device1.notifyObserversOnNonCriticalError(mountPoints1,
DiskStatus.READ_OR_WRITE_FAILURE)
- device1.notifyObserversOnNonCriticalError(mountPoints1,
DiskStatus.IO_HANG)
- assertEquals(2, deviceMonitorMetrics.head.gauge.getValue)
- assertEquals(2, deviceMonitorMetrics.last.gauge.getValue)
+
withObjectMocked[org.apache.celeborn.service.deploy.worker.storage.DeviceMonitor.type]
{
+ when(Utils.runCommand(dfCmd)) thenReturn dfOut
+ when(Utils.runCommand(lsCmd)) thenReturn lsOut
+
+
when(DeviceMonitor.getDiskUsageInfos(diskInfos.get("/mnt/disk1"))).thenReturn(
+ dfBOut1DiskUsageInfo)
+
when(DeviceMonitor.getDiskUsageInfos(diskInfos.get("/mnt/disk2"))).thenReturn(
+ dfBOut2DiskUsageInfo)
+
+ deviceMonitor.init()
+
+ val device1 = deviceMonitor.observedDevices.values().asScala.head
+ val mountPoints1 = device1.diskInfos.keySet().asScala.toList
+
+ device1.notifyObserversOnNonCriticalError(mountPoints1,
DiskStatus.READ_OR_WRITE_FAILURE)
+ device1.notifyObserversOnNonCriticalError(mountPoints1,
DiskStatus.IO_HANG)
+ val deviceMonitorMetrics =
+ workerSource.gauges().filter(_.name.startsWith("Device_" +
device1.deviceInfo.name))
+ .sortBy(_.name)
+
+ assertEquals("Device_vda_IoHang_Count", deviceMonitorMetrics.head.name)
+ assertEquals("Device_vda_ReadOrWriteFailure_Count",
deviceMonitorMetrics.last.name)
+ assertEquals(1, deviceMonitorMetrics.head.gauge.getValue)
+ assertEquals(1, deviceMonitorMetrics.last.gauge.getValue)
+
+ device1.notifyObserversOnNonCriticalError(mountPoints1,
DiskStatus.READ_OR_WRITE_FAILURE)
+ device1.notifyObserversOnNonCriticalError(mountPoints1,
DiskStatus.IO_HANG)
+ assertEquals(2, deviceMonitorMetrics.head.gauge.getValue)
+ assertEquals(2, deviceMonitorMetrics.last.gauge.getValue)
+ }
}
}
test("monitor device usage metrics") {
withObjectMocked[org.apache.celeborn.service.deploy.worker.storage.DeviceMonitor.type]
{
- val dfBOut1 = DeviceMonitor.DiskUsageInfo(1395864371200L,
1293858897920L, 102005473280L, 7)
- val dfBOut2 = DeviceMonitor.DiskUsageInfo(1932735283200L,
1835024777216L, 97710505984L, 6)
- val dfBOut3 = DeviceMonitor.DiskUsageInfo(1395864371200L,
1293858897920L, 102005473280L, 7)
- val dfBOut4 = DeviceMonitor.DiskUsageInfo(1932735283200L,
1835024777216L, 97710505984L, 6)
- val dfBOut5 = DeviceMonitor.DiskUsageInfo(1932735283200L,
1835024777216L, 97710505984L, 6)
-
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk1"))).thenReturn(dfBOut1)
-
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk2"))).thenReturn(dfBOut2)
-
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk3"))).thenReturn(dfBOut3)
-
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk4"))).thenReturn(dfBOut4)
-
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk5"))).thenReturn(dfBOut5)
+
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk1"))).thenReturn(
+ dfBOut1DiskUsageInfo)
+
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk2"))).thenReturn(
+ dfBOut2DiskUsageInfo)
+
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk3"))).thenReturn(
+ dfBOut3DiskUsageInfo)
+
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk4"))).thenReturn(
+ dfBOut4DiskUsageInfo)
+
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk5"))).thenReturn(
+ dfBOut5DiskUsageInfo)
deviceMonitor2.init()
@@ -423,8 +455,10 @@ class DeviceMonitorSuite extends AnyFunSuite {
assertEquals("vdb", metrics4.last.labels("device"))
assertEquals(1024L * 3, metrics4.last.gauge.getValue)
- val dfBOut6 = DeviceMonitor.DiskUsageInfo(1395864371200L,
1264867868672L, 130996502528L, 9)
-
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk1"))).thenReturn(dfBOut6)
+ val dfBOut6DiskUsageInfo =
+ DeviceMonitor.DiskUsageInfo(1395864371200L, 1264867868672L,
130996502528L, 9)
+
when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk1"))).thenReturn(
+ dfBOut6DiskUsageInfo)
assertEquals(1264867868672L, metrics2.head.gauge.getValue)
}
}