This is an automated email from the ASF dual-hosted git repository. zhouky pushed a commit to branch branch-0.3 in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
commit 8cb0f87c87d10275084a2b96e7bbd0642d59c7de Author: zwangsheng <[email protected]> AuthorDate: Fri Aug 11 10:46:00 2023 +0800 [CELEBORN-883][WORKER] Optimized configuration checks during MemoryManager initialization <!-- Thanks for sending a pull request! Here are some tips for you: - Make sure the PR title start w/ a JIRA ticket, e.g. '[CELEBORN-XXXX] Your PR title ...'. - Be sure to keep the PR description updated to reflect all changes. - Please write your PR title to summarize what this PR proposes. - If possible, provide a concise example to reproduce the issue for a faster review. --> ### What changes were proposed in this pull request? 1. Expose the config check logic during `MemoryManager#initialization` in the user configuration doc. 2. Add Preconditions Error Message 3. Add unit test to make sure that part of the logic isn't altered by mistake ### Why are the changes needed? User-friendly ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? Add Unit Test Closes #1801 from zwangsheng/CELEBORN-883. Authored-by: zwangsheng <[email protected]> Signed-off-by: zwangsheng <[email protected]> --- .../org/apache/celeborn/common/CelebornConf.scala | 3 +- docs/configuration/worker.md | 2 +- .../deploy/worker/memory/MemoryManager.java | 9 ++++- .../service/deploy/memory/MemoryManagerSuite.scala | 45 ++++++++++++++++++++++ 4 files changed, 56 insertions(+), 3 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 01757935a..c6bc6310b 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -2462,7 +2462,8 @@ object CelebornConf extends Logging { val WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE: ConfigEntry[Double] = buildConf("celeborn.worker.directMemoryRatioToPauseReplicate") .categories("worker") - .doc("If direct memory usage reaches this limit, the worker will stop to receive replication data from other workers.") + .doc("If direct memory usage reaches this limit, the worker will stop to receive replication data from other workers. " + + s"This value should be higher than ${WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE.key}.") .version("0.2.0") .doubleConf .createWithDefault(0.95) diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index d57591485..e724fa2aa 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -38,7 +38,7 @@ license: | | celeborn.worker.directMemoryRatioForMemoryShuffleStorage | 0.0 | Max ratio of direct memory to store shuffle data | 0.2.0 | | celeborn.worker.directMemoryRatioForReadBuffer | 0.1 | Max ratio of direct memory for read buffer | 0.2.0 | | celeborn.worker.directMemoryRatioToPauseReceive | 0.85 | If direct memory usage reaches this limit, the worker will stop to receive data from Celeborn shuffle clients. | 0.2.0 | -| celeborn.worker.directMemoryRatioToPauseReplicate | 0.95 | If direct memory usage reaches this limit, the worker will stop to receive replication data from other workers. | 0.2.0 | +| celeborn.worker.directMemoryRatioToPauseReplicate | 0.95 | If direct memory usage reaches this limit, the worker will stop to receive replication data from other workers. This value should be higher than celeborn.worker.directMemoryRatioToPauseReceive. | 0.2.0 | | celeborn.worker.directMemoryRatioToResume | 0.5 | If direct memory usage is less than this limit, worker will resume. | 0.2.0 | | celeborn.worker.fetch.heartbeat.enabled | false | enable the heartbeat from worker to client when fetching data | 0.3.0 | | celeborn.worker.fetch.io.threads | <undefined> | Netty IO thread number of worker to handle client fetch data. The default threads number is the number of flush thread. | 0.2.0 | diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java index c0524fa3a..758ddbb8c 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java @@ -120,7 +120,14 @@ public class MemoryManager { .<Long>invoke(); Preconditions.checkArgument(maxDirectorMemory > 0); - Preconditions.checkArgument(pauseReplicateRatio > pausePushDataRatio); + Preconditions.checkArgument( + pauseReplicateRatio > pausePushDataRatio, + String.format( + "Invalid config, %s(%s) should be greater than %s(%s)", + CelebornConf.WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE().key(), + pauseReplicateRatio, + CelebornConf.WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE().key(), + pausePushDataRatio)); Preconditions.checkArgument(pausePushDataRatio > resumeRatio); Preconditions.checkArgument(resumeRatio > (readBufferRatio + shuffleStorageRatio)); diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala new file mode 100644 index 000000000..0e0d627bf --- /dev/null +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.memory + +import org.scalatest.BeforeAndAfterEach +import org.scalatest.funsuite.AnyFunSuite + +import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.CelebornConf.{WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE, WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE} +import org.apache.celeborn.service.deploy.worker.memory.MemoryManager +class MemoryManagerSuite extends AnyFunSuite with BeforeAndAfterEach { + + // reset the memory manager before each test + override protected def beforeEach(): Unit = { + super.beforeEach() + MemoryManager.reset() + } + + test("Init MemoryManager with invalid configuration") { + val conf = new CelebornConf().set(WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE, 0.95) + .set(WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE, 0.85) + val caught = + intercept[IllegalArgumentException] { + MemoryManager.initialize(conf); + } + assert( + caught.getMessage == s"Invalid config, ${WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE.key}(0.85) " + + s"should be greater than ${WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE.key}(0.95)") + } +}
