This is an automated email from the ASF dual-hosted git repository. ethanfeng pushed a commit to branch CELEBORN-157 in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
commit 962375af93b0df35268cb5eb1de4d17d391ebbba Author: Ethan Feng <[email protected]> AuthorDate: Wed Dec 21 14:48:23 2022 +0800 [CELEBORN-157] Change prefix of configurations to celeborn. --- CONTRIBUTING.md | 5 +++++ .../apache/celeborn/common/util/ShutdownHookManager.java | 2 +- .../scala/org/apache/celeborn/common/CelebornConf.scala | 15 +++++++++++---- .../celeborn/common/internal/config/ConfigEntry.scala | 7 ++++--- .../apache/celeborn/common/metrics/MetricsConfig.scala | 5 +++-- .../org/apache/celeborn/common/rpc/netty/Dispatcher.scala | 2 +- .../apache/celeborn/common/rpc/netty/NettyRpcEnv.scala | 2 +- docs/configuration/client.md | 1 + docs/configuration/index.md | 8 ++++---- .../celeborn/service/deploy/worker/FetchHandler.scala | 2 +- 10 files changed, 32 insertions(+), 17 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index dd63eeab..5b459cc9 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -8,6 +8,11 @@ This project uses check-style plugins. Run some checks before you create a new p /dev/reformat ``` +If you have changed configuration, run following command to refresh docs. +```shell +UPDATE=1 build/mvn clean test -pl common -am -Pspark-3.3 -Dtest=none -DwildcardSuites=org.apache.celeborn.ConfigurationSuite +``` + ## How to Contribute For collaboration, feel free to contact us on [Slack](https://join.slack.com/t/apachecelebor-kw08030/shared_invite/zt-1ju3hd5j8-4Z5keMdzpcVMspe4UJzF4Q). To report a bug, you can just open a ticket on [Jira](https://issues.apache.org/jira/projects/CELEBORN/issues) diff --git a/common/src/main/java/org/apache/celeborn/common/util/ShutdownHookManager.java b/common/src/main/java/org/apache/celeborn/common/util/ShutdownHookManager.java index efc12459..12552c52 100644 --- a/common/src/main/java/org/apache/celeborn/common/util/ShutdownHookManager.java +++ b/common/src/main/java/org/apache/celeborn/common/util/ShutdownHookManager.java @@ -46,7 +46,7 @@ import org.apache.celeborn.common.CelebornConf; * * <p>Unless a hook was registered with a shutdown explicitly set through {@link * #addShutdownHook(Runnable, int, long, TimeUnit)}, the shutdown time allocated to it is set by the - * configuration option `rss.shutdown.timeout`. + * configuration option `celeborn.worker.graceful.shutdown.timeout`. * * <p>Note: code refer to Hadoop's ShutdownHookManager. */ diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 0dfbc3b8..c6cc6ac6 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -57,7 +57,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se private def loadFromMap(props: Map[String, String], silent: Boolean): Unit = settings.synchronized { - // Load any rss.* system properties + // Load any celeborn.* or rss.* system properties for ((key, value) <- props if key.startsWith("celeborn.") || key.startsWith("rss.")) { set(key, value, silent) } @@ -793,9 +793,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se } } - def partitionSplitMinimumSize: Long = { - getSizeAsBytes("rss.partition.split.minimum.size", "1m") - } + def partitionSplitMinimumSize: Long = get(PARTITION_SPLIT_MIN) def hdfsDir: String = { get(HDFS_DIR).map { @@ -2140,6 +2138,15 @@ object CelebornConf extends Logging { .checkValues(Set(PartitionSplitMode.SOFT.name, PartitionSplitMode.HARD.name)) .createWithDefault(PartitionSplitMode.SOFT.name) + val PARTITION_SPLIT_MIN: ConfigEntry[Long] = + buildConf("celeborn.shuffle.partitionSplit.min") + .withAlternative("rss.partition.split.minimum.size") + .categories("client") + .doc("Min size for a partition to split") + .version("0.2.0") + .longConf + .createWithDefaultString("1m") + val BATCH_HANDLE_CHANGE_PARTITION_ENABLED: ConfigEntry[Boolean] = buildConf("celeborn.shuffle.batchHandleChangePartition.enabled") .withAlternative("rss.change.partition.batch.enabled") diff --git a/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigEntry.scala b/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigEntry.scala index 98b2625e..84be8043 100644 --- a/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigEntry.scala +++ b/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigEntry.scala @@ -49,9 +49,10 @@ The followings are best practices of naming configs for some common cases: /** * An entry contains all meta information for a configuration. * - * When applying variable substitution to config values, only references starting with "rss." are - * considered in the default namespace. For known RSS configuration keys (i.e. those created using - * `ConfigBuilder`), references will also consider the default value when it exists. + * When applying variable substitution to config values, only references starting with "celeborn." + * or "rss." are considered in the default namespace. For known RSS configuration + * keys (i.e. those created using`ConfigBuilder`), references will also consider the default + * value when it exists. * * Variable expansion is also applied to the default values of config entries that have a default * value declared as a string. diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsConfig.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsConfig.scala index 77ca8b24..ac07f7cb 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsConfig.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsConfig.scala @@ -47,10 +47,11 @@ private class MetricsConfig(conf: CelebornConf) extends Logging { // Add default properties in case there's no properties file setDefaultProperties(properties) - loadPropertiesFromFile(conf.getOption("rss.metrics.conf")) + val configKey = "celeborn.metrics.conf" + loadPropertiesFromFile(conf.getOption(configKey)) + val prefix = s"${configKey}." // Also look for the properties in provided rss configuration - val prefix = "rss.metrics.conf." conf.getAll.foreach { case (k, v) if k.startsWith(prefix) => properties.setProperty(k.substring(prefix.length()), v) diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala index 99f78bee..d905be2a 100644 --- a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala +++ b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala @@ -205,7 +205,7 @@ private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) e val availableCores = if (numUsableCores > 0) numUsableCores else Math.max(16, Runtime.getRuntime.availableProcessors()) - val numThreads = nettyEnv.conf.getInt("rss.rpc.dispatcher.numThreads", availableCores) + val numThreads = nettyEnv.conf.getInt("celeborn.rpc.dispatcher.numThreads", availableCores) val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop") logInfo(s"Dispatcher numThreads: $numThreads") for (i <- 0 until numThreads) { diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala index 9f023c39..1e1681a2 100644 --- a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala +++ b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala @@ -53,7 +53,7 @@ class NettyRpcEnv( private[celeborn] val transportConf = Utils.fromCelebornConf( conf.clone, TransportModuleConstants.RPC_MODULE, - conf.getInt("rss.rpc.io.threads", numUsableCores)) + conf.getInt("celeborn.rpc.io.threads", numUsableCores)) private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores) diff --git a/docs/configuration/client.md b/docs/configuration/client.md index b58fac84..6ecdc0e4 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -59,6 +59,7 @@ license: | | celeborn.shuffle.forceFallback.numPartitionsThreshold | 500000 | Celeborn will only accept shuffle of partition number lower than this configuration value. | 0.2.0 | | celeborn.shuffle.manager.port | 0 | Port used by the LifecycleManager on the Driver. | 0.2.0 | | celeborn.shuffle.partition.type | REDUCE | Type of shuffle's partition. | 0.2.0 | +| celeborn.shuffle.partitionSplit.min | 1m | Min size for a partition to split | 0.2.0 | | celeborn.shuffle.partitionSplit.mode | SOFT | soft: the shuffle file size might be larger than split threshold. hard: the shuffle file size will be limited to split threshold. | 0.2.0 | | celeborn.shuffle.partitionSplit.threshold | 1G | Shuffle file size threshold, if file size exceeds this, trigger split. | 0.2.0 | | celeborn.shuffle.rangeReadFilter.enabled | false | If a spark application have skewed partition, this value can set to true to improve performance. | 0.2.0 | diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 9a5042dd..a93456f6 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -146,8 +146,8 @@ and restore the meta after restarting workers, also workers should keep a stable `ShuffleClient` retry reading data. Users should set `celeborn.worker.graceful.shutdown.enabled` to `true` and set below service port with stable port to support worker recover status. ``` -rss.worker.rpc.port -rss.fetchserver.port -rss.pushserver.port -rss.replicateserver.port +celeborn.worker.rpc.port +celeborn.worker.fetch.port +celeborn.worker.push.port +celeborn.worker.replicate.port ``` diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala index f5562d80..0fa0e51b 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala @@ -149,7 +149,7 @@ class FetchHandler(val conf: TransportConf) extends BaseMessageHandler with Logg val chunksBeingTransferred = chunkStreamManager.chunksBeingTransferred if (chunksBeingTransferred > conf.maxChunksBeingTransferred) { val message = "Worker is too busy. The number of chunks being transferred " + - s"$chunksBeingTransferred exceeds rss.shuffle.maxChunksBeingTransferred " + + s"$chunksBeingTransferred exceeds celeborn.shuffle.maxChunksBeingTransferred " + s"${conf.maxChunksBeingTransferred}." logError(message) client.getChannel.writeAndFlush(
