This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 5aa959a3 [CELEBORN-157] Change prefix of configurations to celeborn.
(#1104)
5aa959a3 is described below
commit 5aa959a335f93a4065a2667cc440d56853d5b608
Author: Ethan Feng <[email protected]>
AuthorDate: Wed Dec 21 15:17:28 2022 +0800
[CELEBORN-157] Change prefix of configurations to celeborn. (#1104)
---
CONTRIBUTING.md | 5 +++++
.../apache/celeborn/common/util/ShutdownHookManager.java | 2 +-
.../scala/org/apache/celeborn/common/CelebornConf.scala | 15 +++++++++++----
.../celeborn/common/internal/config/ConfigEntry.scala | 7 ++++---
.../apache/celeborn/common/metrics/MetricsConfig.scala | 5 +++--
.../org/apache/celeborn/common/rpc/netty/Dispatcher.scala | 2 +-
.../apache/celeborn/common/rpc/netty/NettyRpcEnv.scala | 2 +-
docs/configuration/index.md | 8 ++++----
docs/configuration/worker.md | 1 +
.../celeborn/service/deploy/worker/FetchHandler.scala | 2 +-
10 files changed, 32 insertions(+), 17 deletions(-)
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index dd63eeab..5b459cc9 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -8,6 +8,11 @@ This project uses check-style plugins. Run some checks before
you create a new p
/dev/reformat
```
+If you have changed configuration, run following command to refresh docs.
+```shell
+UPDATE=1 build/mvn clean test -pl common -am -Pspark-3.3 -Dtest=none
-DwildcardSuites=org.apache.celeborn.ConfigurationSuite
+```
+
## How to Contribute
For collaboration, feel free to contact us on
[Slack](https://join.slack.com/t/apachecelebor-kw08030/shared_invite/zt-1ju3hd5j8-4Z5keMdzpcVMspe4UJzF4Q).
To report a bug, you can just open a ticket on
[Jira](https://issues.apache.org/jira/projects/CELEBORN/issues)
diff --git
a/common/src/main/java/org/apache/celeborn/common/util/ShutdownHookManager.java
b/common/src/main/java/org/apache/celeborn/common/util/ShutdownHookManager.java
index efc12459..12552c52 100644
---
a/common/src/main/java/org/apache/celeborn/common/util/ShutdownHookManager.java
+++
b/common/src/main/java/org/apache/celeborn/common/util/ShutdownHookManager.java
@@ -46,7 +46,7 @@ import org.apache.celeborn.common.CelebornConf;
*
* <p>Unless a hook was registered with a shutdown explicitly set through
{@link
* #addShutdownHook(Runnable, int, long, TimeUnit)}, the shutdown time
allocated to it is set by the
- * configuration option `rss.shutdown.timeout`.
+ * configuration option `celeborn.worker.graceful.shutdown.timeout`.
*
* <p>Note: code refer to Hadoop's ShutdownHookManager.
*/
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 0dfbc3b8..1192836a 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -57,7 +57,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
private def loadFromMap(props: Map[String, String], silent: Boolean): Unit =
settings.synchronized {
- // Load any rss.* system properties
+ // Load any celeborn.* or rss.* system properties
for ((key, value) <- props if key.startsWith("celeborn.") ||
key.startsWith("rss.")) {
set(key, value, silent)
}
@@ -793,9 +793,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
}
}
- def partitionSplitMinimumSize: Long = {
- getSizeAsBytes("rss.partition.split.minimum.size", "1m")
- }
+ def partitionSplitMinimumSize: Long = get(PARTITION_SPLIT_MIN)
def hdfsDir: String = {
get(HDFS_DIR).map {
@@ -2140,6 +2138,15 @@ object CelebornConf extends Logging {
.checkValues(Set(PartitionSplitMode.SOFT.name,
PartitionSplitMode.HARD.name))
.createWithDefault(PartitionSplitMode.SOFT.name)
+ val PARTITION_SPLIT_MIN: ConfigEntry[Long] =
+ buildConf("celeborn.shuffle.partitionSplit.min")
+ .withAlternative("rss.partition.split.minimum.size")
+ .categories("worker")
+ .doc("Min size for a partition to split")
+ .version("0.2.0")
+ .longConf
+ .createWithDefaultString("1m")
+
val BATCH_HANDLE_CHANGE_PARTITION_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.shuffle.batchHandleChangePartition.enabled")
.withAlternative("rss.change.partition.batch.enabled")
diff --git
a/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigEntry.scala
b/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigEntry.scala
index 98b2625e..84be8043 100644
---
a/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigEntry.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/internal/config/ConfigEntry.scala
@@ -49,9 +49,10 @@ The followings are best practices of naming configs for some
common cases:
/**
* An entry contains all meta information for a configuration.
*
- * When applying variable substitution to config values, only references
starting with "rss." are
- * considered in the default namespace. For known RSS configuration keys (i.e.
those created using
- * `ConfigBuilder`), references will also consider the default value when it
exists.
+ * When applying variable substitution to config values, only references
starting with "celeborn."
+ * or "rss." are considered in the default namespace. For known RSS
configuration
+ * keys (i.e. those created using`ConfigBuilder`), references will also
consider the default
+ * value when it exists.
*
* Variable expansion is also applied to the default values of config entries
that have a default
* value declared as a string.
diff --git
a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsConfig.scala
b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsConfig.scala
index 77ca8b24..ac07f7cb 100644
---
a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsConfig.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsConfig.scala
@@ -47,10 +47,11 @@ private class MetricsConfig(conf: CelebornConf) extends
Logging {
// Add default properties in case there's no properties file
setDefaultProperties(properties)
- loadPropertiesFromFile(conf.getOption("rss.metrics.conf"))
+ val configKey = "celeborn.metrics.conf"
+ loadPropertiesFromFile(conf.getOption(configKey))
+ val prefix = s"${configKey}."
// Also look for the properties in provided rss configuration
- val prefix = "rss.metrics.conf."
conf.getAll.foreach {
case (k, v) if k.startsWith(prefix) =>
properties.setProperty(k.substring(prefix.length()), v)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala
index 99f78bee..d905be2a 100644
---
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala
@@ -205,7 +205,7 @@ private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv,
numUsableCores: Int) e
val availableCores =
if (numUsableCores > 0) numUsableCores
else Math.max(16, Runtime.getRuntime.availableProcessors())
- val numThreads = nettyEnv.conf.getInt("rss.rpc.dispatcher.numThreads",
availableCores)
+ val numThreads =
nettyEnv.conf.getInt("celeborn.rpc.dispatcher.numThreads", availableCores)
val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads,
"dispatcher-event-loop")
logInfo(s"Dispatcher numThreads: $numThreads")
for (i <- 0 until numThreads) {
diff --git
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
index 9f023c39..1e1681a2 100644
---
a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala
@@ -53,7 +53,7 @@ class NettyRpcEnv(
private[celeborn] val transportConf = Utils.fromCelebornConf(
conf.clone,
TransportModuleConstants.RPC_MODULE,
- conf.getInt("rss.rpc.io.threads", numUsableCores))
+ conf.getInt("celeborn.rpc.io.threads", numUsableCores))
private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 9a5042dd..a93456f6 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -146,8 +146,8 @@ and restore the meta after restarting workers, also workers
should keep a stable
`ShuffleClient` retry reading data. Users should set
`celeborn.worker.graceful.shutdown.enabled` to `true` and
set below service port with stable port to support worker recover status.
```
-rss.worker.rpc.port
-rss.fetchserver.port
-rss.pushserver.port
-rss.replicateserver.port
+celeborn.worker.rpc.port
+celeborn.worker.fetch.port
+celeborn.worker.push.port
+celeborn.worker.replicate.port
```
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 44b62816..d9df2184 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -28,6 +28,7 @@ license: |
| celeborn.metrics.timer.slidingWindow.size | 4096 | The sliding window size
of timer metric. | 0.2.0 |
| celeborn.shuffle.chuck.size | 8m | Max chunk size of reducer's merged
shuffle data. For example, if a reducer's shuffle data is 128M and the data
will need 16 fetch chunk requests to fetch. | 0.2.0 |
| celeborn.shuffle.minPartitionSizeToEstimate | 8mb | Ignore partition size
smaller than this configuration of partition size for estimation. | 0.2.0 |
+| celeborn.shuffle.partitionSplit.min | 1m | Min size for a partition to split
| 0.2.0 |
| celeborn.storage.hdfs.dir | <undefined> | HDFS dir configuration for
Celeborn to access HDFS. | 0.2.0 |
| celeborn.test.pushdataTimeout | false | Wheter to test pushdata timeout |
0.2.0 |
| celeborn.worker.closeIdleConnections | false | Whether worker will close
idle connections. | 0.2.0 |
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index f5562d80..0fa0e51b 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -149,7 +149,7 @@ class FetchHandler(val conf: TransportConf) extends
BaseMessageHandler with Logg
val chunksBeingTransferred = chunkStreamManager.chunksBeingTransferred
if (chunksBeingTransferred > conf.maxChunksBeingTransferred) {
val message = "Worker is too busy. The number of chunks being
transferred " +
- s"$chunksBeingTransferred exceeds
rss.shuffle.maxChunksBeingTransferred " +
+ s"$chunksBeingTransferred exceeds
celeborn.shuffle.maxChunksBeingTransferred " +
s"${conf.maxChunksBeingTransferred}."
logError(message)
client.getChannel.writeAndFlush(