This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.4 by this push:
     new 39670822a [CELEBORN-1570] Fix flaky test - monitor non-critical error 
metrics in DeviceMonitorSuite
39670822a is described below

commit 39670822a9fff4a3b6b70dc10449df48a8387c5b
Author: sychen <[email protected]>
AuthorDate: Thu Aug 22 13:50:57 2024 +0800

    [CELEBORN-1570] Fix flaky test - monitor non-critical error metrics in 
DeviceMonitorSuite
    
    ### What changes were proposed in this pull request?
    
    ### Why are the changes needed?
    
https://github.com/apache/celeborn/actions/runs/10441850633/job/28913478820?pr=2692
    
    ```
    - monitor non-critical error metrics *** FAILED ***
      java.lang.AssertionError: expected:<2> but was:<1>
      at org.junit.Assert.fail(Assert.java:89)
      at org.junit.Assert.failNotEquals(Assert.java:835)
      at org.junit.Assert.assertEquals(Assert.java:120)
      at org.junit.Assert.assertEquals(Assert.java:146)
      at 
org.apache.celeborn.service.deploy.worker.storage.DeviceMonitorSuite.$anonfun$new$22(DeviceMonitorSuite.scala:405)
    ```
    
    Because calling `System.currentTimeMillis()` twice may get the same value. 
Gauge uses the size of the set, and the same value will be deduplicated.
    
    
https://github.com/apache/celeborn/blob/0ee3c3a4bdef3b97fca6be04f03ac36c2e12e1a6/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/ObservedDevice.scala#L81-L91
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    GA
    
    Closes #2694 from cxzl25/CELEBORN-1570.
    
    Authored-by: sychen <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
    (cherry picked from commit 1f497cb92b9a4a78dba8d7b57d77c29ed5862db3)
    Signed-off-by: SteNicholas <[email protected]>
---
 .../service/deploy/worker/storage/DeviceMonitor.scala      |  6 +++---
 .../service/deploy/worker/storage/ObservedDevice.scala     | 14 +++++++-------
 2 files changed, 10 insertions(+), 10 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
index b480b5175..3082f3928 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
@@ -112,10 +112,10 @@ class LocalDeviceMonitor(
               val mountPoints = device.diskInfos.keySet.asScala.toList
               // tolerate time accuracy for better performance
               val now = System.currentTimeMillis()
-              for (concurrentSet <- device.nonCriticalErrors.values().asScala) 
{
-                for (time <- concurrentSet.asScala) {
+              for (concurrentList <- 
device.nonCriticalErrors.values().asScala) {
+                for (time <- concurrentList.asScala) {
                   if (now - time > device.notifyErrorExpireTimeout) {
-                    concurrentSet.remove(time)
+                    concurrentList.remove(time)
                   }
                 }
               }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/ObservedDevice.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/ObservedDevice.scala
index 9196ac49a..ef45fd885 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/ObservedDevice.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/ObservedDevice.scala
@@ -19,8 +19,8 @@ package org.apache.celeborn.service.deploy.worker.storage
 
 import java.io.File
 import java.util
-import java.util.{Set => JSet}
-import java.util.concurrent.ConcurrentHashMap
+import java.util.{List => JList, Set => JSet}
+import java.util.concurrent.{ConcurrentHashMap, CopyOnWriteArrayList}
 
 import scala.collection.JavaConverters._
 import scala.io.Source
@@ -45,8 +45,8 @@ class ObservedDevice(val deviceInfo: DeviceInfo, conf: 
CelebornConf, workerSourc
   val statFile = new File(s"$sysBlockDir/${deviceInfo.name}/stat")
   val inFlightFile = new File(s"$sysBlockDir/${deviceInfo.name}/inflight")
 
-  val nonCriticalErrors: ConcurrentHashMap[DiskStatus, JSet[Long]] =
-    JavaUtils.newConcurrentHashMap[DiskStatus, JSet[Long]]()
+  val nonCriticalErrors: ConcurrentHashMap[DiskStatus, JList[Long]] =
+    JavaUtils.newConcurrentHashMap[DiskStatus, JList[Long]]()
   val notifyErrorThreshold: Int = conf.workerDiskMonitorNotifyErrorThreshold
   val notifyErrorExpireTimeout: Long = 
conf.workerDiskMonitorNotifyErrorExpireTimeout
 
@@ -83,11 +83,11 @@ class ObservedDevice(val deviceInfo: DeviceInfo, conf: 
CelebornConf, workerSourc
       nonCriticalErrors.computeIfAbsent(
         diskStatus,
         (_: DiskStatus) => {
-          val set = ConcurrentHashMap.newKeySet[Long]()
+          val list = new CopyOnWriteArrayList[Long]()
           
workerSource.addGauge(s"Device_${deviceInfo.name}_${diskStatus.toMetric}_Count")
 { () =>
-            set.size()
+            list.size()
           }
-          set
+          list
         }).add(System.currentTimeMillis())
       mountPoints.foreach { mountPoint =>
         diskInfos.get(mountPoint).setStatus(diskStatus)

Reply via email to