This is an automated email from the ASF dual-hosted git repository.
binjieyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 63df84593 [CELEBORN-883][WORKER] Optimized configuration checks during
MemoryManager initialization
63df84593 is described below
commit 63df84593ec6e2de5e1f7733e95708c1e2966e79
Author: zwangsheng <[email protected]>
AuthorDate: Fri Aug 11 10:46:00 2023 +0800
[CELEBORN-883][WORKER] Optimized configuration checks during MemoryManager
initialization
<!--
Thanks for sending a pull request! Here are some tips for you:
- Make sure the PR title start w/ a JIRA ticket, e.g. '[CELEBORN-XXXX]
Your PR title ...'.
- Be sure to keep the PR description updated to reflect all changes.
- Please write your PR title to summarize what this PR proposes.
- If possible, provide a concise example to reproduce the issue for a
faster review.
-->
### What changes were proposed in this pull request?
1. Expose the config check logic during `MemoryManager#initialization` in
the user configuration doc.
2. Add Preconditions Error Message
3. Add unit test to make sure that part of the logic isn't altered by
mistake
### Why are the changes needed?
User-friendly
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
Add Unit Test
Closes #1801 from zwangsheng/CELEBORN-883.
Authored-by: zwangsheng <[email protected]>
Signed-off-by: zwangsheng <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 3 +-
docs/configuration/worker.md | 2 +-
.../deploy/worker/memory/MemoryManager.java | 9 ++++-
.../service/deploy/memory/MemoryManagerSuite.scala | 45 ++++++++++++++++++++++
4 files changed, 56 insertions(+), 3 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index afa8276b0..fdc4988b1 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -2450,7 +2450,8 @@ object CelebornConf extends Logging {
val WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE: ConfigEntry[Double] =
buildConf("celeborn.worker.directMemoryRatioToPauseReplicate")
.categories("worker")
- .doc("If direct memory usage reaches this limit, the worker will stop to
receive replication data from other workers.")
+ .doc("If direct memory usage reaches this limit, the worker will stop to
receive replication data from other workers. " +
+ s"This value should be higher than
${WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE.key}.")
.version("0.2.0")
.doubleConf
.createWithDefault(0.95)
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index d57591485..e724fa2aa 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -38,7 +38,7 @@ license: |
| celeborn.worker.directMemoryRatioForMemoryShuffleStorage | 0.0 | Max ratio
of direct memory to store shuffle data | 0.2.0 |
| celeborn.worker.directMemoryRatioForReadBuffer | 0.1 | Max ratio of direct
memory for read buffer | 0.2.0 |
| celeborn.worker.directMemoryRatioToPauseReceive | 0.85 | If direct memory
usage reaches this limit, the worker will stop to receive data from Celeborn
shuffle clients. | 0.2.0 |
-| celeborn.worker.directMemoryRatioToPauseReplicate | 0.95 | If direct memory
usage reaches this limit, the worker will stop to receive replication data from
other workers. | 0.2.0 |
+| celeborn.worker.directMemoryRatioToPauseReplicate | 0.95 | If direct memory
usage reaches this limit, the worker will stop to receive replication data from
other workers. This value should be higher than
celeborn.worker.directMemoryRatioToPauseReceive. | 0.2.0 |
| celeborn.worker.directMemoryRatioToResume | 0.5 | If direct memory usage is
less than this limit, worker will resume. | 0.2.0 |
| celeborn.worker.fetch.heartbeat.enabled | false | enable the heartbeat from
worker to client when fetching data | 0.3.0 |
| celeborn.worker.fetch.io.threads | <undefined> | Netty IO thread
number of worker to handle client fetch data. The default threads number is the
number of flush thread. | 0.2.0 |
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index c0524fa3a..758ddbb8c 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -120,7 +120,14 @@ public class MemoryManager {
.<Long>invoke();
Preconditions.checkArgument(maxDirectorMemory > 0);
- Preconditions.checkArgument(pauseReplicateRatio > pausePushDataRatio);
+ Preconditions.checkArgument(
+ pauseReplicateRatio > pausePushDataRatio,
+ String.format(
+ "Invalid config, %s(%s) should be greater than %s(%s)",
+ CelebornConf.WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE().key(),
+ pauseReplicateRatio,
+ CelebornConf.WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE().key(),
+ pausePushDataRatio));
Preconditions.checkArgument(pausePushDataRatio > resumeRatio);
Preconditions.checkArgument(resumeRatio > (readBufferRatio +
shuffleStorageRatio));
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
new file mode 100644
index 000000000..0e0d627bf
--- /dev/null
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.memory
+
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.funsuite.AnyFunSuite
+
+import org.apache.celeborn.common.CelebornConf
+import
org.apache.celeborn.common.CelebornConf.{WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE,
WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE}
+import org.apache.celeborn.service.deploy.worker.memory.MemoryManager
+class MemoryManagerSuite extends AnyFunSuite with BeforeAndAfterEach {
+
+ // reset the memory manager before each test
+ override protected def beforeEach(): Unit = {
+ super.beforeEach()
+ MemoryManager.reset()
+ }
+
+ test("Init MemoryManager with invalid configuration") {
+ val conf = new
CelebornConf().set(WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE, 0.95)
+ .set(WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE, 0.85)
+ val caught =
+ intercept[IllegalArgumentException] {
+ MemoryManager.initialize(conf);
+ }
+ assert(
+ caught.getMessage == s"Invalid config,
${WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE.key}(0.85) " +
+ s"should be greater than
${WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE.key}(0.95)")
+ }
+}