This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new e8ae23bc7 [CELEBORN-1960] Fix PauseSpentTime only append the interval 
check time
e8ae23bc7 is described below

commit e8ae23bc7a7a44e468fafa1ccde1a3d4dd4938a1
Author: zhengtao <[email protected]>
AuthorDate: Thu May 15 23:15:28 2025 +0800

    [CELEBORN-1960] Fix PauseSpentTime only append the interval check time
    
    ### What changes were proposed in this pull request?
    Fix the pauseTime metrics count error.
    
    ### Why are the changes needed?
    Assume that 0 is NONE PAUSED status, 1 is PAUSE PUSH, 2 is PAUSE PUSH AND 
REPLICATE.
    Every check interval will record the status.
    Here is the status changements:
    0 -> 0 -> 1 -> 1 -> 1 -> 1 -> 2 -> 2 -> 2 -> 1 -> 1 -> 1 -> 0 -> 0 -> 0
    
    The previous code only count the interval time and every interval time will 
update the pauseStartTime.
    ps(pauseStartTime), pe(pauseEndTime)
    0 -> 0 -> 1(ps)-> 1(ps ) -> 1(ps ) -> 1(ps) -> 2(rs) -> 2(rs) -> 2(re-rs) 
-> 1(ps) -> 1(ps) ->` _1(ps) -> 0(pe)_` -> 0 -> 0
    
    It should be
    0 -> 0 -> 1(ps)-> 1 -> 1-> 1 -> 2(rs) -> 2 -> 2 -> 1(re) -> 1-> 1 -> 0(pe) 
-> 0 -> 0
    
    0 -> 0 -> 1(ps)-> 1-> 1 -> 0(pe) -> 0 -> 0
    
    0 -> 0 -> 2(ps, rs)-> 2-> 2 -> 0(pe, re) -> 0
    
    0 -> 0 -> 1(ps)-> 1-> 2(rs) -> 2 -> 0(pe, re)
    
    0 -> 0 -> 2(ps, rs)-> 2-> 1(re) -> 1 -> 0(pe)
    
    The pauseRpelicaTime should include pausePushTime.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT
    
    Closes #3207 from zaynt4606/clb1960.
    
    Authored-by: zhengtao <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../deploy/worker/memory/MemoryManager.java        | 24 +++++++----
 .../service/deploy/memory/MemoryManagerSuite.scala | 49 +++++++++++++++++++---
 2 files changed, 59 insertions(+), 14 deletions(-)

diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index b3891d9b5..ff08c077c 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -337,10 +337,13 @@ public class MemoryManager {
         if (!tryResumeByPinnedMemory(servingState, lastState)) {
           pausePushDataCounter.increment();
           if (lastState == ServingState.PUSH_AND_REPLICATE_PAUSED) {
+            appendPauseSpentTime(lastState);
             resumeReplicate();
           } else {
-            logger.info("Trigger action: PAUSE PUSH");
-            pausePushDataStartTime = System.currentTimeMillis();
+            if (servingState != lastState) {
+              pausePushDataStartTime = System.currentTimeMillis();
+              logger.info("Trigger action: PAUSE PUSH");
+            }
             resumingByPinnedMemory = false;
             memoryPressureListeners.forEach(
                 memoryPressureListener ->
@@ -362,13 +365,17 @@ public class MemoryManager {
       case PUSH_AND_REPLICATE_PAUSED:
         if (!tryResumeByPinnedMemory(servingState, lastState)) {
           pausePushDataAndReplicateCounter.increment();
-          logger.info("Trigger action: PAUSE PUSH");
-          pausePushDataAndReplicateStartTime = System.currentTimeMillis();
+          if (servingState != lastState) {
+            pausePushDataAndReplicateStartTime = System.currentTimeMillis();
+            logger.info("Trigger action: PAUSE PUSH and REPLICATE");
+            if (lastState == ServingState.NONE_PAUSED) {
+              pausePushDataStartTime = pausePushDataAndReplicateStartTime;
+            }
+          }
           resumingByPinnedMemory = false;
           memoryPressureListeners.forEach(
               memoryPressureListener ->
                   
memoryPressureListener.onPause(TransportModuleConstants.PUSH_MODULE));
-          logger.info("Trigger action: PAUSE REPLICATE");
           memoryPressureListeners.forEach(
               memoryPressureListener ->
                   
memoryPressureListener.onPause(TransportModuleConstants.REPLICATE_MODULE));
@@ -517,10 +524,9 @@ public class MemoryManager {
 
   private void appendPauseSpentTime(ServingState servingState) {
     long nextPauseStartTime = System.currentTimeMillis();
-    if (servingState == ServingState.PUSH_PAUSED) {
-      pausePushDataTime += nextPauseStartTime - pausePushDataStartTime;
-      pausePushDataStartTime = nextPauseStartTime;
-    } else {
+    pausePushDataTime += nextPauseStartTime - pausePushDataStartTime;
+    pausePushDataStartTime = nextPauseStartTime;
+    if (servingState == ServingState.PUSH_AND_REPLICATE_PAUSED) {
       pausePushDataAndReplicateTime += nextPauseStartTime - 
pausePushDataAndReplicateStartTime;
       pausePushDataAndReplicateStartTime = nextPauseStartTime;
     }
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
index d74f1889e..2a9f8fa09 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
@@ -113,6 +113,7 @@ class MemoryManagerSuite extends CelebornFunSuite {
       assert(pushListener.isPause)
       assert(!replicateListener.isPause)
     }
+    Thread.sleep(20)
 
     // PAUSE PUSH -> PAUSE PUSH AND REPLICATE
     memoryCounter.set(replicateThreshold + 1)
@@ -120,6 +121,7 @@ class MemoryManagerSuite extends CelebornFunSuite {
       assert(pushListener.isPause)
       assert(replicateListener.isPause)
     }
+    Thread.sleep(20)
 
     // PAUSE PUSH AND REPLICATE -> PAUSE PUSH
     memoryCounter.set(pushThreshold + 1)
@@ -127,6 +129,7 @@ class MemoryManagerSuite extends CelebornFunSuite {
       assert(pushListener.isPause)
       assert(!replicateListener.isPause)
     }
+    Thread.sleep(20)
 
     // PAUSE PUSH -> NONE PAUSED
     memoryCounter.set(0)
@@ -134,10 +137,14 @@ class MemoryManagerSuite extends CelebornFunSuite {
       assert(!pushListener.isPause)
       assert(!replicateListener.isPause)
     }
+    Thread.sleep(20)
     // [CELEBORN-882] Test record pause push time
-    assert(memoryManager.getPausePushDataTime.longValue() > 0)
-    assert(memoryManager.getPausePushDataAndReplicateTime.longValue() == 0)
-    val lastPauseTime = memoryManager.getPausePushDataTime.longValue()
+    val lastPauseTime1 = memoryManager.getPausePushDataTime.longValue()
+    val lastPauseReplicaTime1 = 
memoryManager.getPausePushDataAndReplicateTime.longValue()
+    // PauseTime should count the actual waiting time
+    assert(lastPauseTime1 >= 60)
+    assert(lastPauseReplicaTime1 >= 20)
+    logInfo(s"lastPauseTime1: $lastPauseTime1, lastPauseReplicaTime1: 
$lastPauseReplicaTime1")
 
     // NONE PAUSED -> PAUSE PUSH AND REPLICATE
     memoryCounter.set(replicateThreshold + 1)
@@ -146,14 +153,46 @@ class MemoryManagerSuite extends CelebornFunSuite {
       assert(replicateListener.isPause)
     }
 
+    Thread.sleep(20)
+
     // PAUSE PUSH AND REPLICATE -> NONE PAUSED
     memoryCounter.set(0)
     eventually(timeout(30.second), interval(10.milliseconds)) {
       assert(!pushListener.isPause)
       assert(!replicateListener.isPause)
     }
-    assert(memoryManager.getPausePushDataTime.longValue() == lastPauseTime)
-    assert(memoryManager.getPausePushDataAndReplicateTime.longValue() > 0)
+
+    // Wait for the check thread to update the metrics
+    memoryManager.switchServingState()
+    val lastPauseTime2 = memoryManager.getPausePushDataTime.longValue()
+    val lastPauseReplicaTime2 = 
memoryManager.getPausePushDataAndReplicateTime.longValue()
+    assert(lastPauseTime2 > lastPauseTime1)
+    assert(lastPauseReplicaTime2 > lastPauseReplicaTime1)
+    logInfo(s"lastPauseTime2: $lastPauseTime2, lastPauseReplicaTime2: 
$lastPauseReplicaTime2")
+
+    // NONE PAUSED -> PAUSE PUSH
+    memoryCounter.set(pushThreshold + 1)
+    eventually(timeout(30.second), interval(10.milliseconds)) {
+      assert(pushListener.isPause)
+      assert(!replicateListener.isPause)
+    }
+
+    Thread.sleep(20)
+
+    // PAUSE PUSH -> NONE PAUSED
+    memoryCounter.set(0)
+    eventually(timeout(30.second), interval(10.milliseconds)) {
+      assert(!pushListener.isPause)
+      assert(!replicateListener.isPause)
+    }
+
+    // Wait for the check thread to update the metrics
+    memoryManager.switchServingState()
+    val lastPauseTime3 = memoryManager.getPausePushDataTime.longValue()
+    val lastPauseReplicaTime3 = 
memoryManager.getPausePushDataAndReplicateTime.longValue()
+    assert(lastPauseTime3 > lastPauseTime2)
+    assert(lastPauseReplicaTime3 == lastPauseReplicaTime2)
+    logInfo(s"lastPauseTime3: $lastPauseTime3, lastPauseReplicaTime3: 
$lastPauseReplicaTime3")
   }
 
   test("[CELEBORN-1792] Test MemoryManager resume by pinned memory") {

Reply via email to