This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.5 by this push:
new aad1222ce [CELEBORN-1782] Worker in congestion control should be in
blacklist to avoid impact new shuffle
aad1222ce is described below
commit aad1222ceb6a005fef240c63d5ba2956685a910b
Author: Xianming Lei <[email protected]>
AuthorDate: Thu Dec 19 10:12:09 2024 +0800
[CELEBORN-1782] Worker in congestion control should be in blacklist to
avoid impact new shuffle
Set highWorkload=true when worker in congestion control.
Worker in congestion control should be in blacklist to avoid impact new
shuffle.
No
Existing UTS.
Closes #3003 from leixm/CELEBORN-1782.
Authored-by: Xianming Lei <[email protected]>
Signed-off-by: Shuang <[email protected]>
(cherry picked from commit cec88b2def8bf5a53dbaa0004f4818548e541855)
Signed-off-by: Shuang <[email protected]>
---
.../deploy/worker/congestcontrol/CongestionController.java | 4 ++++
.../org/apache/celeborn/service/deploy/worker/Worker.scala | 12 ++++++++----
2 files changed, 12 insertions(+), 4 deletions(-)
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/CongestionController.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/CongestionController.java
index d05ddd582..12424c396 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/CongestionController.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/CongestionController.java
@@ -233,6 +233,10 @@ public class CongestionController {
}
}
+ public Boolean isOverHighWatermark() {
+ return overHighWatermark.get();
+ }
+
public void close() {
logger.info("Closing {}", this.getClass().getSimpleName());
this.removeUserExecutorService.shutdownNow();
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index f52626e8a..251422610 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -438,10 +438,14 @@ private[celeborn] class Worker(
}
private def highWorkload: Boolean = {
- (memoryManager.currentServingState, conf.workerActiveConnectionMax) match {
- case (ServingState.PUSH_AND_REPLICATE_PAUSED, _) => true
- case (ServingState.PUSH_PAUSED, _) => true
- case (_, Some(activeConnectionMax)) =>
+ (
+ memoryManager.currentServingState(),
+ Option(CongestionController.instance()),
+ conf.workerActiveConnectionMax) match {
+ case (_, Some(instance), _) if instance.isOverHighWatermark => true
+ case (ServingState.PUSH_AND_REPLICATE_PAUSED, _, _) => true
+ case (ServingState.PUSH_PAUSED, _, _) => true
+ case (_, _, Some(activeConnectionMax)) =>
workerSource.getCounterCount(ACTIVE_CONNECTION_COUNT) >=
activeConnectionMax
case _ => false
}