This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new e8ae23bc7 [CELEBORN-1960] Fix PauseSpentTime only append the interval
check time
e8ae23bc7 is described below
commit e8ae23bc7a7a44e468fafa1ccde1a3d4dd4938a1
Author: zhengtao <[email protected]>
AuthorDate: Thu May 15 23:15:28 2025 +0800
[CELEBORN-1960] Fix PauseSpentTime only append the interval check time
### What changes were proposed in this pull request?
Fix the pauseTime metrics count error.
### Why are the changes needed?
Assume that 0 is NONE PAUSED status, 1 is PAUSE PUSH, 2 is PAUSE PUSH AND
REPLICATE.
Every check interval will record the status.
Here is the status changements:
0 -> 0 -> 1 -> 1 -> 1 -> 1 -> 2 -> 2 -> 2 -> 1 -> 1 -> 1 -> 0 -> 0 -> 0
The previous code only count the interval time and every interval time will
update the pauseStartTime.
ps(pauseStartTime), pe(pauseEndTime)
0 -> 0 -> 1(ps)-> 1(ps ) -> 1(ps ) -> 1(ps) -> 2(rs) -> 2(rs) -> 2(re-rs)
-> 1(ps) -> 1(ps) ->` _1(ps) -> 0(pe)_` -> 0 -> 0
It should be
0 -> 0 -> 1(ps)-> 1 -> 1-> 1 -> 2(rs) -> 2 -> 2 -> 1(re) -> 1-> 1 -> 0(pe)
-> 0 -> 0
0 -> 0 -> 1(ps)-> 1-> 1 -> 0(pe) -> 0 -> 0
0 -> 0 -> 2(ps, rs)-> 2-> 2 -> 0(pe, re) -> 0
0 -> 0 -> 1(ps)-> 1-> 2(rs) -> 2 -> 0(pe, re)
0 -> 0 -> 2(ps, rs)-> 2-> 1(re) -> 1 -> 0(pe)
The pauseRpelicaTime should include pausePushTime.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT
Closes #3207 from zaynt4606/clb1960.
Authored-by: zhengtao <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../deploy/worker/memory/MemoryManager.java | 24 +++++++----
.../service/deploy/memory/MemoryManagerSuite.scala | 49 +++++++++++++++++++---
2 files changed, 59 insertions(+), 14 deletions(-)
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index b3891d9b5..ff08c077c 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -337,10 +337,13 @@ public class MemoryManager {
if (!tryResumeByPinnedMemory(servingState, lastState)) {
pausePushDataCounter.increment();
if (lastState == ServingState.PUSH_AND_REPLICATE_PAUSED) {
+ appendPauseSpentTime(lastState);
resumeReplicate();
} else {
- logger.info("Trigger action: PAUSE PUSH");
- pausePushDataStartTime = System.currentTimeMillis();
+ if (servingState != lastState) {
+ pausePushDataStartTime = System.currentTimeMillis();
+ logger.info("Trigger action: PAUSE PUSH");
+ }
resumingByPinnedMemory = false;
memoryPressureListeners.forEach(
memoryPressureListener ->
@@ -362,13 +365,17 @@ public class MemoryManager {
case PUSH_AND_REPLICATE_PAUSED:
if (!tryResumeByPinnedMemory(servingState, lastState)) {
pausePushDataAndReplicateCounter.increment();
- logger.info("Trigger action: PAUSE PUSH");
- pausePushDataAndReplicateStartTime = System.currentTimeMillis();
+ if (servingState != lastState) {
+ pausePushDataAndReplicateStartTime = System.currentTimeMillis();
+ logger.info("Trigger action: PAUSE PUSH and REPLICATE");
+ if (lastState == ServingState.NONE_PAUSED) {
+ pausePushDataStartTime = pausePushDataAndReplicateStartTime;
+ }
+ }
resumingByPinnedMemory = false;
memoryPressureListeners.forEach(
memoryPressureListener ->
memoryPressureListener.onPause(TransportModuleConstants.PUSH_MODULE));
- logger.info("Trigger action: PAUSE REPLICATE");
memoryPressureListeners.forEach(
memoryPressureListener ->
memoryPressureListener.onPause(TransportModuleConstants.REPLICATE_MODULE));
@@ -517,10 +524,9 @@ public class MemoryManager {
private void appendPauseSpentTime(ServingState servingState) {
long nextPauseStartTime = System.currentTimeMillis();
- if (servingState == ServingState.PUSH_PAUSED) {
- pausePushDataTime += nextPauseStartTime - pausePushDataStartTime;
- pausePushDataStartTime = nextPauseStartTime;
- } else {
+ pausePushDataTime += nextPauseStartTime - pausePushDataStartTime;
+ pausePushDataStartTime = nextPauseStartTime;
+ if (servingState == ServingState.PUSH_AND_REPLICATE_PAUSED) {
pausePushDataAndReplicateTime += nextPauseStartTime -
pausePushDataAndReplicateStartTime;
pausePushDataAndReplicateStartTime = nextPauseStartTime;
}
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
index d74f1889e..2a9f8fa09 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
@@ -113,6 +113,7 @@ class MemoryManagerSuite extends CelebornFunSuite {
assert(pushListener.isPause)
assert(!replicateListener.isPause)
}
+ Thread.sleep(20)
// PAUSE PUSH -> PAUSE PUSH AND REPLICATE
memoryCounter.set(replicateThreshold + 1)
@@ -120,6 +121,7 @@ class MemoryManagerSuite extends CelebornFunSuite {
assert(pushListener.isPause)
assert(replicateListener.isPause)
}
+ Thread.sleep(20)
// PAUSE PUSH AND REPLICATE -> PAUSE PUSH
memoryCounter.set(pushThreshold + 1)
@@ -127,6 +129,7 @@ class MemoryManagerSuite extends CelebornFunSuite {
assert(pushListener.isPause)
assert(!replicateListener.isPause)
}
+ Thread.sleep(20)
// PAUSE PUSH -> NONE PAUSED
memoryCounter.set(0)
@@ -134,10 +137,14 @@ class MemoryManagerSuite extends CelebornFunSuite {
assert(!pushListener.isPause)
assert(!replicateListener.isPause)
}
+ Thread.sleep(20)
// [CELEBORN-882] Test record pause push time
- assert(memoryManager.getPausePushDataTime.longValue() > 0)
- assert(memoryManager.getPausePushDataAndReplicateTime.longValue() == 0)
- val lastPauseTime = memoryManager.getPausePushDataTime.longValue()
+ val lastPauseTime1 = memoryManager.getPausePushDataTime.longValue()
+ val lastPauseReplicaTime1 =
memoryManager.getPausePushDataAndReplicateTime.longValue()
+ // PauseTime should count the actual waiting time
+ assert(lastPauseTime1 >= 60)
+ assert(lastPauseReplicaTime1 >= 20)
+ logInfo(s"lastPauseTime1: $lastPauseTime1, lastPauseReplicaTime1:
$lastPauseReplicaTime1")
// NONE PAUSED -> PAUSE PUSH AND REPLICATE
memoryCounter.set(replicateThreshold + 1)
@@ -146,14 +153,46 @@ class MemoryManagerSuite extends CelebornFunSuite {
assert(replicateListener.isPause)
}
+ Thread.sleep(20)
+
// PAUSE PUSH AND REPLICATE -> NONE PAUSED
memoryCounter.set(0)
eventually(timeout(30.second), interval(10.milliseconds)) {
assert(!pushListener.isPause)
assert(!replicateListener.isPause)
}
- assert(memoryManager.getPausePushDataTime.longValue() == lastPauseTime)
- assert(memoryManager.getPausePushDataAndReplicateTime.longValue() > 0)
+
+ // Wait for the check thread to update the metrics
+ memoryManager.switchServingState()
+ val lastPauseTime2 = memoryManager.getPausePushDataTime.longValue()
+ val lastPauseReplicaTime2 =
memoryManager.getPausePushDataAndReplicateTime.longValue()
+ assert(lastPauseTime2 > lastPauseTime1)
+ assert(lastPauseReplicaTime2 > lastPauseReplicaTime1)
+ logInfo(s"lastPauseTime2: $lastPauseTime2, lastPauseReplicaTime2:
$lastPauseReplicaTime2")
+
+ // NONE PAUSED -> PAUSE PUSH
+ memoryCounter.set(pushThreshold + 1)
+ eventually(timeout(30.second), interval(10.milliseconds)) {
+ assert(pushListener.isPause)
+ assert(!replicateListener.isPause)
+ }
+
+ Thread.sleep(20)
+
+ // PAUSE PUSH -> NONE PAUSED
+ memoryCounter.set(0)
+ eventually(timeout(30.second), interval(10.milliseconds)) {
+ assert(!pushListener.isPause)
+ assert(!replicateListener.isPause)
+ }
+
+ // Wait for the check thread to update the metrics
+ memoryManager.switchServingState()
+ val lastPauseTime3 = memoryManager.getPausePushDataTime.longValue()
+ val lastPauseReplicaTime3 =
memoryManager.getPausePushDataAndReplicateTime.longValue()
+ assert(lastPauseTime3 > lastPauseTime2)
+ assert(lastPauseReplicaTime3 == lastPauseReplicaTime2)
+ logInfo(s"lastPauseTime3: $lastPauseTime3, lastPauseReplicaTime3:
$lastPauseReplicaTime3")
}
test("[CELEBORN-1792] Test MemoryManager resume by pinned memory") {