This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.4 by this push:
new 39670822a [CELEBORN-1570] Fix flaky test - monitor non-critical error
metrics in DeviceMonitorSuite
39670822a is described below
commit 39670822a9fff4a3b6b70dc10449df48a8387c5b
Author: sychen <[email protected]>
AuthorDate: Thu Aug 22 13:50:57 2024 +0800
[CELEBORN-1570] Fix flaky test - monitor non-critical error metrics in
DeviceMonitorSuite
### What changes were proposed in this pull request?
### Why are the changes needed?
https://github.com/apache/celeborn/actions/runs/10441850633/job/28913478820?pr=2692
```
- monitor non-critical error metrics *** FAILED ***
java.lang.AssertionError: expected:<2> but was:<1>
at org.junit.Assert.fail(Assert.java:89)
at org.junit.Assert.failNotEquals(Assert.java:835)
at org.junit.Assert.assertEquals(Assert.java:120)
at org.junit.Assert.assertEquals(Assert.java:146)
at
org.apache.celeborn.service.deploy.worker.storage.DeviceMonitorSuite.$anonfun$new$22(DeviceMonitorSuite.scala:405)
```
Because calling `System.currentTimeMillis()` twice may get the same value.
Gauge uses the size of the set, and the same value will be deduplicated.
https://github.com/apache/celeborn/blob/0ee3c3a4bdef3b97fca6be04f03ac36c2e12e1a6/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/ObservedDevice.scala#L81-L91
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
GA
Closes #2694 from cxzl25/CELEBORN-1570.
Authored-by: sychen <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
(cherry picked from commit 1f497cb92b9a4a78dba8d7b57d77c29ed5862db3)
Signed-off-by: SteNicholas <[email protected]>
---
.../service/deploy/worker/storage/DeviceMonitor.scala | 6 +++---
.../service/deploy/worker/storage/ObservedDevice.scala | 14 +++++++-------
2 files changed, 10 insertions(+), 10 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
index b480b5175..3082f3928 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
@@ -112,10 +112,10 @@ class LocalDeviceMonitor(
val mountPoints = device.diskInfos.keySet.asScala.toList
// tolerate time accuracy for better performance
val now = System.currentTimeMillis()
- for (concurrentSet <- device.nonCriticalErrors.values().asScala)
{
- for (time <- concurrentSet.asScala) {
+ for (concurrentList <-
device.nonCriticalErrors.values().asScala) {
+ for (time <- concurrentList.asScala) {
if (now - time > device.notifyErrorExpireTimeout) {
- concurrentSet.remove(time)
+ concurrentList.remove(time)
}
}
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/ObservedDevice.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/ObservedDevice.scala
index 9196ac49a..ef45fd885 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/ObservedDevice.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/ObservedDevice.scala
@@ -19,8 +19,8 @@ package org.apache.celeborn.service.deploy.worker.storage
import java.io.File
import java.util
-import java.util.{Set => JSet}
-import java.util.concurrent.ConcurrentHashMap
+import java.util.{List => JList, Set => JSet}
+import java.util.concurrent.{ConcurrentHashMap, CopyOnWriteArrayList}
import scala.collection.JavaConverters._
import scala.io.Source
@@ -45,8 +45,8 @@ class ObservedDevice(val deviceInfo: DeviceInfo, conf:
CelebornConf, workerSourc
val statFile = new File(s"$sysBlockDir/${deviceInfo.name}/stat")
val inFlightFile = new File(s"$sysBlockDir/${deviceInfo.name}/inflight")
- val nonCriticalErrors: ConcurrentHashMap[DiskStatus, JSet[Long]] =
- JavaUtils.newConcurrentHashMap[DiskStatus, JSet[Long]]()
+ val nonCriticalErrors: ConcurrentHashMap[DiskStatus, JList[Long]] =
+ JavaUtils.newConcurrentHashMap[DiskStatus, JList[Long]]()
val notifyErrorThreshold: Int = conf.workerDiskMonitorNotifyErrorThreshold
val notifyErrorExpireTimeout: Long =
conf.workerDiskMonitorNotifyErrorExpireTimeout
@@ -83,11 +83,11 @@ class ObservedDevice(val deviceInfo: DeviceInfo, conf:
CelebornConf, workerSourc
nonCriticalErrors.computeIfAbsent(
diskStatus,
(_: DiskStatus) => {
- val set = ConcurrentHashMap.newKeySet[Long]()
+ val list = new CopyOnWriteArrayList[Long]()
workerSource.addGauge(s"Device_${deviceInfo.name}_${diskStatus.toMetric}_Count")
{ () =>
- set.size()
+ list.size()
}
- set
+ list
}).add(System.currentTimeMillis())
mountPoints.foreach { mountPoint =>
diskInfos.get(mountPoint).setStatus(diskStatus)