This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git

commit 8cb0f87c87d10275084a2b96e7bbd0642d59c7de
Author: zwangsheng <[email protected]>
AuthorDate: Fri Aug 11 10:46:00 2023 +0800

    [CELEBORN-883][WORKER] Optimized configuration checks during MemoryManager 
initialization
    
    <!--
    Thanks for sending a pull request!  Here are some tips for you:
      - Make sure the PR title start w/ a JIRA ticket, e.g. '[CELEBORN-XXXX] 
Your PR title ...'.
      - Be sure to keep the PR description updated to reflect all changes.
      - Please write your PR title to summarize what this PR proposes.
      - If possible, provide a concise example to reproduce the issue for a 
faster review.
    -->
    
    ### What changes were proposed in this pull request?
    1. Expose the config check logic during `MemoryManager#initialization` in 
the user configuration doc.
    2. Add Preconditions Error Message
    3. Add unit test to make sure that part of the logic isn't altered by 
mistake
    
    ### Why are the changes needed?
    User-friendly
    
    ### Does this PR introduce _any_ user-facing change?
    Yes
    
    ### How was this patch tested?
    Add Unit Test
    
    Closes #1801 from zwangsheng/CELEBORN-883.
    
    Authored-by: zwangsheng <[email protected]>
    Signed-off-by: zwangsheng <[email protected]>
---
 .../org/apache/celeborn/common/CelebornConf.scala  |  3 +-
 docs/configuration/worker.md                       |  2 +-
 .../deploy/worker/memory/MemoryManager.java        |  9 ++++-
 .../service/deploy/memory/MemoryManagerSuite.scala | 45 ++++++++++++++++++++++
 4 files changed, 56 insertions(+), 3 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 01757935a..c6bc6310b 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -2462,7 +2462,8 @@ object CelebornConf extends Logging {
   val WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE: ConfigEntry[Double] =
     buildConf("celeborn.worker.directMemoryRatioToPauseReplicate")
       .categories("worker")
-      .doc("If direct memory usage reaches this limit, the worker will stop to 
receive replication data from other workers.")
+      .doc("If direct memory usage reaches this limit, the worker will stop to 
receive replication data from other workers. " +
+        s"This value should be higher than 
${WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE.key}.")
       .version("0.2.0")
       .doubleConf
       .createWithDefault(0.95)
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index d57591485..e724fa2aa 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -38,7 +38,7 @@ license: |
 | celeborn.worker.directMemoryRatioForMemoryShuffleStorage | 0.0 | Max ratio 
of direct memory to store shuffle data | 0.2.0 | 
 | celeborn.worker.directMemoryRatioForReadBuffer | 0.1 | Max ratio of direct 
memory for read buffer | 0.2.0 | 
 | celeborn.worker.directMemoryRatioToPauseReceive | 0.85 | If direct memory 
usage reaches this limit, the worker will stop to receive data from Celeborn 
shuffle clients. | 0.2.0 | 
-| celeborn.worker.directMemoryRatioToPauseReplicate | 0.95 | If direct memory 
usage reaches this limit, the worker will stop to receive replication data from 
other workers. | 0.2.0 | 
+| celeborn.worker.directMemoryRatioToPauseReplicate | 0.95 | If direct memory 
usage reaches this limit, the worker will stop to receive replication data from 
other workers. This value should be higher than 
celeborn.worker.directMemoryRatioToPauseReceive. | 0.2.0 | 
 | celeborn.worker.directMemoryRatioToResume | 0.5 | If direct memory usage is 
less than this limit, worker will resume. | 0.2.0 | 
 | celeborn.worker.fetch.heartbeat.enabled | false | enable the heartbeat from 
worker to client when fetching data | 0.3.0 | 
 | celeborn.worker.fetch.io.threads | &lt;undefined&gt; | Netty IO thread 
number of worker to handle client fetch data. The default threads number is the 
number of flush thread. | 0.2.0 | 
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index c0524fa3a..758ddbb8c 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -120,7 +120,14 @@ public class MemoryManager {
             .<Long>invoke();
 
     Preconditions.checkArgument(maxDirectorMemory > 0);
-    Preconditions.checkArgument(pauseReplicateRatio > pausePushDataRatio);
+    Preconditions.checkArgument(
+        pauseReplicateRatio > pausePushDataRatio,
+        String.format(
+            "Invalid config, %s(%s) should be greater than %s(%s)",
+            CelebornConf.WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE().key(),
+            pauseReplicateRatio,
+            CelebornConf.WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE().key(),
+            pausePushDataRatio));
     Preconditions.checkArgument(pausePushDataRatio > resumeRatio);
     Preconditions.checkArgument(resumeRatio > (readBufferRatio + 
shuffleStorageRatio));
 
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
new file mode 100644
index 000000000..0e0d627bf
--- /dev/null
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.memory
+
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.funsuite.AnyFunSuite
+
+import org.apache.celeborn.common.CelebornConf
+import 
org.apache.celeborn.common.CelebornConf.{WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE,
 WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE}
+import org.apache.celeborn.service.deploy.worker.memory.MemoryManager
+class MemoryManagerSuite extends AnyFunSuite with BeforeAndAfterEach {
+
+  // reset the memory manager before each test
+  override protected def beforeEach(): Unit = {
+    super.beforeEach()
+    MemoryManager.reset()
+  }
+
+  test("Init MemoryManager with invalid configuration") {
+    val conf = new 
CelebornConf().set(WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE, 0.95)
+      .set(WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE, 0.85)
+    val caught =
+      intercept[IllegalArgumentException] {
+        MemoryManager.initialize(conf);
+      }
+    assert(
+      caught.getMessage == s"Invalid config, 
${WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE.key}(0.85) " +
+        s"should be greater than 
${WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE.key}(0.95)")
+  }
+}

Reply via email to