This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new f871c9861 [CELEBORN-1637] Enhance config to bypass memory check for
partition file sorter
f871c9861 is described below
commit f871c9861c3896e7490e1a84f1c0b3f550d4370d
Author: mingji <[email protected]>
AuthorDate: Fri Oct 11 19:41:40 2024 +0800
[CELEBORN-1637] Enhance config to bypass memory check for partition file
sorter
### What changes were proposed in this pull request?
Add a config to bypass memory check when sorting shuffle files.
### Why are the changes needed?
If a celeborn worker has quite a large memory and it supports both Spark
and Flink engines. This config should be enabled.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
Cluster test.
Closes #2798 from FMX/b1637.
Authored-by: mingji <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../src/main/scala/org/apache/celeborn/common/CelebornConf.scala | 3 ++-
docs/configuration/worker.md | 2 +-
.../celeborn/service/deploy/worker/memory/MemoryManager.java | 8 ++++----
.../scala/org/apache/celeborn/service/deploy/worker/Worker.scala | 2 +-
4 files changed, 8 insertions(+), 7 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 290ce60f9..49e669c56 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -3682,7 +3682,8 @@ object CelebornConf extends Logging {
buildConf("celeborn.worker.partitionSorter.directMemoryRatioThreshold")
.categories("worker")
.doc("Max ratio of partition sorter's memory for sorting, when reserved
memory is higher than max partition " +
- "sorter memory, partition sorter will stop sorting.")
+ "sorter memory, partition sorter will stop sorting." +
+ " If this value is set to 0, partition files sorter will skip memory
check and ServingState check.")
.version("0.2.0")
.doubleConf
.checkValue(v => v >= 0.0 && v <= 1.0, "Should be in [0.0, 1.0].")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 0bec91d5a..637c58231 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -140,7 +140,7 @@ license: |
| celeborn.worker.monitor.memory.trimFlushWaitInterval | 1s | false | Wait
time after worker trigger StorageManger to flush data. | 0.3.0 | |
| celeborn.worker.partition.initial.readBuffersMax | 1024 | false | Max number
of initial read buffers | 0.3.0 | |
| celeborn.worker.partition.initial.readBuffersMin | 1 | false | Min number of
initial read buffers | 0.3.0 | |
-| celeborn.worker.partitionSorter.directMemoryRatioThreshold | 0.1 | false |
Max ratio of partition sorter's memory for sorting, when reserved memory is
higher than max partition sorter memory, partition sorter will stop sorting. |
0.2.0 | |
+| celeborn.worker.partitionSorter.directMemoryRatioThreshold | 0.1 | false |
Max ratio of partition sorter's memory for sorting, when reserved memory is
higher than max partition sorter memory, partition sorter will stop sorting. If
this value is set to 0, partition files sorter will skip memory check and
ServingState check. | 0.2.0 | |
| celeborn.worker.push.heartbeat.enabled | false | false | enable the
heartbeat from worker to client when pushing data | 0.3.0 | |
| celeborn.worker.push.io.threads | <undefined> | false | Netty IO
thread number of worker to handle client push data. The default threads number
is the number of flush thread. | 0.2.0 | |
| celeborn.worker.push.port | 0 | false | Server port for Worker to receive
push data request from ShuffleClient. | 0.2.0 | |
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index d48888766..96425ecf8 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -69,7 +69,7 @@ public class MemoryManager {
private final AtomicLong diskBufferCounter = new AtomicLong(0);
private final LongAdder pausePushDataCounter = new LongAdder();
private final LongAdder pausePushDataAndReplicateCounter = new LongAdder();
- private ServingState servingState = ServingState.NONE_PAUSED;
+ public ServingState servingState = ServingState.NONE_PAUSED;
private long pausePushDataStartTime = -1L;
private long pausePushDataTime = 0L;
private long pausePushDataAndReplicateStartTime = -1L;
@@ -284,7 +284,7 @@ public class MemoryManager {
}
public boolean shouldEvict(boolean aggressiveMemoryFileEvictEnabled, double
evictRatio) {
- return currentServingState() != ServingState.NONE_PAUSED
+ return servingState != ServingState.NONE_PAUSED
&& (aggressiveMemoryFileEvictEnabled
|| (memoryFileStorageCounter.sum() >= evictRatio *
memoryFileStorageThreshold));
}
@@ -402,8 +402,8 @@ public class MemoryManager {
}
public boolean sortMemoryReady() {
- return currentServingState() == ServingState.NONE_PAUSED
- && sortMemoryCounter.get() < maxSortMemory;
+ return maxSortMemory == 0
+ || (servingState == ServingState.NONE_PAUSED &&
sortMemoryCounter.get() < maxSortMemory);
}
public void releaseSortMemory(long size) {
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index d0371e2bd..3777adc48 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -449,7 +449,7 @@ private[celeborn] class Worker(
}
private def highWorkload: Boolean = {
- (memoryManager.currentServingState, conf.workerActiveConnectionMax) match {
+ (memoryManager.servingState, conf.workerActiveConnectionMax) match {
case (ServingState.PUSH_AND_REPLICATE_PAUSED, _) => true
case (ServingState.PUSH_PAUSED, _) => true
case (_, Some(activeConnectionMax)) =>