This is an automated email from the ASF dual-hosted git repository.
angerszhuuuu pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 062f25ad5 [CELEBORN-680][DOC] Refresh celeborn configurations in doc
062f25ad5 is described below
commit 062f25ad5eab2d7f620b0ce112297961d130e364
Author: Angerszhuuuu <[email protected]>
AuthorDate: Thu Jun 15 13:59:38 2023 +0800
[CELEBORN-680][DOC] Refresh celeborn configurations in doc
### What changes were proposed in this pull request?
Refresh celeborn configurations in doc
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #1592 from AngersZhuuuu/CELEBORN-680.
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Angerszhuuuu <[email protected]>
(cherry picked from commit 1ba6dee32480b24703b73a0d5e445257d117e5b7)
Signed-off-by: Angerszhuuuu <[email protected]>
---
README.md | 16 ++++++++--------
.../org/apache/celeborn/common/CelebornConf.scala | 2 +-
docs/configuration/client.md | 2 +-
docs/configuration/index.md | 6 +++---
docs/deploy.md | 18 +++++++++---------
docs/migration.md | 6 +++---
docs/monitoring.md | 8 ++++----
docs/upgrade.md | 14 +++++++-------
.../celeborn/tests/spark/PushDataTimeoutTest.scala | 4 ++--
.../celeborn/tests/spark/RssHashCheckDiskSuite.scala | 9 +++++----
10 files changed, 43 insertions(+), 42 deletions(-)
diff --git a/README.md b/README.md
index c75724e31..4468fa1b9 100644
--- a/README.md
+++ b/README.md
@@ -112,7 +112,7 @@ celeborn.master.host clb-master
celeborn.master.port 9097
celeborn.metrics.enabled true
-celeborn.worker.flush.buffer.size 256k
+celeborn.worker.flusher.buffer.size 256k
# Disk type is HDD by defaut.
celeborn.worker.storage.dirs /mnt/disk1:disktype=SSD,/mnt/disk2:disktype=SSD
# If your hosts have disk raid or use lvm, set
celeborn.worker.monitor.disk.enabled to false
@@ -141,7 +141,7 @@ celeborn.ha.master.ratis.raft.server.storage.dir
/mnt/disk1/rss_ratis/
celeborn.metrics.enabled true
# If you want to use HDFS as shuffle storage, make sure that flush buffer size
is at least 4MB or larger.
-celeborn.worker.flush.buffer.size 256k
+celeborn.worker.flusher.buffer.size 256k
# Disk type is HDD by defaut.
celeborn.worker.storage.dirs /mnt/disk1:disktype=SSD,/mnt/disk2:disktype=SSD
# If your hosts have disk raid or use lvm, set
celeborn.worker.monitor.disk.enabled to false
@@ -210,11 +210,11 @@ spark.shuffle.service.enabled false
# options: hash, sort
# Hash shuffle writer use (partition count) * (celeborn.push.buffer.max.size)
* (spark.executor.cores) memory.
# Sort shuffle writer uses less memory than hash shuffle writer, if your
shuffle partition count is large, try to use sort hash writer.
-spark.celeborn.shuffle.writer hash
+spark.celeborn.client.spark.shuffle.writer hash
-# We recommend setting spark.celeborn.push.replicate.enabled to true to enable
server-side data replication
+# We recommend setting spark.celeborn.client.push.replicate.enabled to true to
enable server-side data replication
# If you have only one worker, this setting must be false
-spark.celeborn.push.replicate.enabled true
+spark.celeborn.client.push.replicate.enabled true
# Support for Spark AQE only tested under Spark 3
# we recommend setting localShuffleReader to false to get better performance
of Celeborn
@@ -234,14 +234,14 @@ To use Celeborn, the following flink configurations
should be added.
shuffle-service-factory.class:
org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory
celeborn.master.endpoints: clb-1:9097,clb-2:9097,clb-3:9097
-celeborn.shuffle.batchHandleReleasePartition.enabled: true
-celeborn.push.maxReqsInFlight: 128
+celeborn.client.shuffle.batchHandleReleasePartition.enabled: true
+celeborn.client.push.maxReqsInFlight: 128
# Network connections between peers
celeborn.data.io.numConnectionsPerPeer: 16
# threads number may vary according to your cluster but do not set to 1
celeborn.data.io.threads: 32
-celeborn.shuffle.batchHandleCommitPartition.threads: 32
+celeborn.client.shuffle.batchHandleCommitPartition.threads: 32
celeborn.rpc.dispatcher.numThreads: 32
# Floating buffers may need to change `taskmanager.network.memory.fraction`
and `taskmanager.network.memory.max`
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 6714b29f2..ec3ccf2f5 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -2586,7 +2586,7 @@ object CelebornConf extends Logging {
.categories("client")
.version("0.3.0")
.doc("Amount of Netty in-flight requests per worker. The maximum memory
is " +
- "`celeborn.push.maxReqsInFlight` * `celeborn.push.buffer.max.size` * "
+
+ "`celeborn.client.push.maxReqsInFlight` *
`celeborn.push.buffer.max.size` * " +
"compression ratio(1 in worst case), default: 64Kib * 32 = 2Mib")
.intConf
.createWithDefault(4)
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index cbd1bb7a5..165430db7 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -43,7 +43,7 @@ license: |
| celeborn.client.push.limit.inFlight.sleepInterval | 50ms | Sleep interval
when check netty in-flight requests to be done. | 0.3.0 |
| celeborn.client.push.limit.inFlight.timeout | <undefined> | Timeout
for netty in-flight requests to be done.Default value should be
`celeborn.client.push.timeout * 2`. | 0.3.0 |
| celeborn.client.push.limit.strategy | SIMPLE | The strategy used to control
the push speed. Valid strategies are SIMPLE and SLOWSTART. the SLOWSTART
strategy is usually cooperate with congest control mechanism in the worker
side. | 0.3.0 |
-| celeborn.client.push.maxReqsInFlight | 4 | Amount of Netty in-flight
requests per worker. The maximum memory is `celeborn.push.maxReqsInFlight` *
`celeborn.push.buffer.max.size` * compression ratio(1 in worst case), default:
64Kib * 32 = 2Mib | 0.3.0 |
+| celeborn.client.push.maxReqsInFlight | 4 | Amount of Netty in-flight
requests per worker. The maximum memory is
`celeborn.client.push.maxReqsInFlight` * `celeborn.push.buffer.max.size` *
compression ratio(1 in worst case), default: 64Kib * 32 = 2Mib | 0.3.0 |
| celeborn.client.push.queue.capacity | 512 | Push buffer queue size for a
task. The maximum memory is `celeborn.push.buffer.max.size` *
`celeborn.push.queue.capacity`, default: 64KiB * 512 = 32MiB | 0.3.0 |
| celeborn.client.push.replicate.enabled | false | When true, Celeborn worker
will replicate shuffle data to another Celeborn worker asynchronously to ensure
the pushed shuffle data won't be lost after the node failure. | 0.3.0 |
| celeborn.client.push.retry.threads | 8 | Thread number to process shuffle
re-send push data requests. | 0.3.0 |
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 175532c16..d1a64cadc 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -146,10 +146,10 @@ Assume we have a cluster described as below:
As we need to reserve 20% off-heap memory for netty,
so we could assume 16 GB off-heap memory can be used for flush buffers.
-If `spark.celeborn.push.buffer.max.size` is 64 KB, we can have in-flight
requests up to 1310720.
-If you have 8192 mapper tasks, you could set
`spark.celeborn.push.maxReqsInFlight=160` to gain performance improvements.
+If `spark.celeborn.client.push.buffer.max.size` is 64 KB, we can have
in-flight requests up to 1310720.
+If you have 8192 mapper tasks, you could set
`spark.celeborn.client.push.maxReqsInFlight=160` to gain performance
improvements.
-If `celeborn.worker.flush.buffer.size` is 256 KB, we can have total slots up
to 327680 slots.
+If `celeborn.worker.flusher.buffer.size` is 256 KB, we can have total slots up
to 327680 slots.
## Rack Awareness
diff --git a/docs/deploy.md b/docs/deploy.md
index 8d21b26b9..1eb105c7e 100644
--- a/docs/deploy.md
+++ b/docs/deploy.md
@@ -38,7 +38,7 @@ celeborn.master.host clb-master
celeborn.master.port 9097
celeborn.metrics.enabled true
-celeborn.worker.flush.buffer.size 256k
+celeborn.worker.flusher.buffer.size 256k
# Disk type is HDD by defaut.
celeborn.worker.storage.dirs /mnt/disk1:disktype=SSD,/mnt/disk2:disktype=SSD
# If your hosts have disk raid or use lvm, set
celeborn.worker.monitor.disk.enabled to false
@@ -67,7 +67,7 @@ celeborn.ha.master.ratis.raft.server.storage.dir
/mnt/disk1/rss_ratis/
celeborn.metrics.enabled true
# If you want to use HDFS as shuffle storage, make sure that flush buffer size
is at least 4MB or larger.
-celeborn.worker.flush.buffer.size 256k
+celeborn.worker.flusher.buffer.size 256k
# Disk type is HDD by defaut.
celeborn.worker.storage.dirs /mnt/disk1:disktype=SSD,/mnt/disk2:disktype=SSD
# If your hosts have disk raid or use lvm, set
celeborn.worker.monitor.disk.enabled to false
@@ -131,13 +131,13 @@ spark.celeborn.master.endpoints
clb-1:9097,clb-2:9097,clb-3:9097
spark.shuffle.service.enabled false
# options: hash, sort
-# Hash shuffle writer use (partition count) * (celeborn.push.buffer.max.size)
* (spark.executor.cores) memory.
+# Hash shuffle writer use (partition count) *
(celeborn.client.push.buffer.max.size) * (spark.executor.cores) memory.
# Sort shuffle writer use less memory than hash shuffle writer, if your
shuffle partition count is large, try to use sort hash writer.
-spark.celeborn.shuffle.writer hash
+spark.celeborn.client.spark.shuffle.writer hash
-# we recommend set spark.celeborn.push.replicate.enabled to true to enable
server-side data replication
+# we recommend set spark.celeborn.client.push.replicate.enabled to true to
enable server-side data replication
# If you have only one worker, this setting must be false
-spark.celeborn.push.replicate.enabled true
+spark.celeborn.client.push.replicate.enabled true
# Support for Spark AQE only tested under Spark 3
# we recommend set localShuffleReader to false to get better performance of
Celeborn
@@ -157,14 +157,14 @@ TO use Celeborn, following flink configurations should be
added.
shuffle-service-factory.class:
org.apache.celeborn.plugin.flink.RemoteShuffleServiceFactory
celeborn.master.endpoints: clb-1:9097,clb-2:9097,clb-3:9097
-celeborn.shuffle.batchHandleReleasePartition.enabled: true
-celeborn.push.maxReqsInFlight: 128
+celeborn.client.shuffle.batchHandleReleasePartition.enabled: true
+celeborn.client.push.maxReqsInFlight: 128
# network connections between peers
celeborn.data.io.numConnectionsPerPeer: 16
# threads number may vary according to your cluster but do not set to 1
celeborn.data.io.threads: 32
-celeborn.shuffle.batchHandleCommitPartition.threads: 32
+celeborn.client.shuffle.batchHandleCommitPartition.threads: 32
celeborn.rpc.dispatcher.numThreads: 32
# floating buffers may need to change `taskmanager.network.memory.fraction`
and `taskmanager.network.memory.max`
diff --git a/docs/migration.md b/docs/migration.md
index 6a3e335a2..16c5fe101 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -21,9 +21,9 @@ license: |
## Upgrading from 0.2.1 to 0.3.0
- - From 0.3.0 on the default value for `celeborn.push.replicate.enabled` is
changed from `true` to `false`, users
+ - From 0.3.0 on the default value for
`celeborn.client.push.replicate.enabled` is changed from `true` to `false`,
users
who want replication on should explicitly enable replication. For example,
to enable replication for Spark
- users should add the spark config when submitting job:
`spark.celeborn.push.replicate.enabled=true`
+ users should add the spark config when submitting job:
`spark.celeborn.client.push.replicate.enabled=true`
- - From 0.3.0 on the default value for `celeborn.worker.workingDir` is changed
from `hadoop/rss-worker/shuffle_data` to `rss-worker/shuffle_data`,
+ - From 0.3.0 on the default value for `celeborn.worker.storage.workingDir` is
changed from `hadoop/rss-worker/shuffle_data` to `rss-worker/shuffle_data`,
users who want to use origin working dir path should set this configuration.
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 131fab606..d8b3c34ef 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -275,10 +275,10 @@ The configuration of `<master-prometheus-host>`,
`<master-prometheus-port>`, `<w
| Key | Default | Description
| Since |
|-----------------------------------------|---------|----------------------------|-------|
-| celeborn.master.metrics.prometheus.host | 0.0.0.0 | Master's Prometheus
host. | 0.2.0 |
-| celeborn.master.metrics.prometheus.port | 9098 | Master's Prometheus
port. | 0.2.0 |
-| celeborn.worker.metrics.prometheus.host | 0.0.0.0 | Worker's Prometheus
host. | 0.2.0 |
-| celeborn.worker.metrics.prometheus.port | 9096 | Worker's Prometheus
port. | 0.2.0 |
+| celeborn.metrics.master.prometheus.host | 0.0.0.0 | Master's Prometheus
host. | 0.2.0 |
+| celeborn.metrics.master.prometheus.port | 9098 | Master's Prometheus
port. | 0.2.0 |
+| celeborn.metrics.worker.prometheus.host | 0.0.0.0 | Worker's Prometheus
host. | 0.2.0 |
+| celeborn.metrics.worker.prometheus.port | 9096 | Worker's Prometheus
port. | 0.2.0 |
API path listed as below:
diff --git a/docs/upgrade.md b/docs/upgrade.md
index 101851766..86365b599 100644
--- a/docs/upgrade.md
+++ b/docs/upgrade.md
@@ -41,7 +41,7 @@ At startup, it will automatically select a free port, user
need to set a fixed v
At the same time, users need to adjust the number of retry times and retry
wait time
of the client according to cluster rolling restart situation
to support the shuffle client to read data through retries after worker
restarted.
-The shuffle client fetch data retry times configuration is
`celeborn.fetch.maxRetries`, default value is `3`.
+The shuffle client fetch data retry times configuration is
`celeborn.client.fetch.maxRetriesForEachReplica`, default value is `3`.
The shuffle client fetch data retry wait time configuration is
`celeborn.data.io.retryWait`, default value is `5s`.
Users can increase the configuration value appropriately according to the
situation.
@@ -70,7 +70,7 @@ In order to speed up the restart process, worker let all push
data requests retu
during worker shutdown, and shuffle client will re-apply for a new partition
location for these allocated partitions.
Then client side can record all HARD_SPLIT partition information and
pre-commit these partition,
then the worker side allocated partitions can be committed in a very short
time. User should enable
-`celeborn.shuffle.batchHandleCommitPartition.enabled`, the default value is
false.
+`celeborn.client.shuffle.batchHandleCommitPartition.enabled`, the default
value is false.
## Example setting:
@@ -86,8 +86,8 @@ then the worker side allocated partitions can be committed in
a very short time.
### Client
-| Key | Value |
-|-----------------------------------------------------------|-------|
-| spark.celeborn.shuffle.batchHandleCommitPartition.enabled | true |
-| spark.celeborn.fetch.maxRetries | 5 |
-| spark.celeborn.data.io.retryWait | 10s |
+| Key | Value |
+|------------------------------------------------------------------|-------|
+| spark.celeborn.client.shuffle.batchHandleCommitPartition.enabled | true |
+| spark.celeborn.client.fetch.maxRetriesForEachReplica | 5 |
+| spark.celeborn.data.io.retryWait | 10s |
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushDataTimeoutTest.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushDataTimeoutTest.scala
index 0e49c0d36..2389d070c 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushDataTimeoutTest.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/PushDataTimeoutTest.scala
@@ -33,8 +33,8 @@ class PushDataTimeoutTest extends AnyFunSuite
override def beforeAll(): Unit = {
logInfo("test initialized , setup celeborn mini cluster")
val workerConf = Map(
- "celeborn.test.pushMasterDataTimeout" -> "true",
- "celeborn.test.pushSlaveDataTimeout" -> "true",
+ CelebornConf.TEST_CLIENT_PUSH_MASTER_DATA_TIMEOUT.key -> "true",
+ CelebornConf.TEST_WORKER_PUSH_SLAVE_DATA_TIMEOUT.key -> "true",
"celeborn.push.timeoutCheck.interval" -> "1s")
setUpMiniCluster(masterConfs = null, workerConfs = workerConf)
}
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssHashCheckDiskSuite.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssHashCheckDiskSuite.scala
index d07a96463..7ef90799c 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssHashCheckDiskSuite.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/RssHashCheckDiskSuite.scala
@@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfterEach
import org.scalatest.funsuite.AnyFunSuite
import org.apache.celeborn.client.ShuffleClient
+import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.protocol.ShuffleMode
import org.apache.celeborn.service.deploy.worker.Worker
@@ -34,10 +35,10 @@ class RssHashCheckDiskSuite extends AnyFunSuite
var workers: collection.Set[Worker] = null
override def beforeAll(): Unit = {
logInfo("RssHashCheckDiskSuite test initialized , setup rss mini cluster")
- val masterConfs = Map("celeborn.application.heartbeat.timeout" -> "10s")
+ val masterConfs = Map(CelebornConf.APPLICATION_HEARTBEAT_TIMEOUT.key ->
"10s")
val workerConfs = Map(
- "celeborn.worker.storage.dirs" -> "/tmp:capacity=1000",
- "celeborn.worker.heartbeat.timeout" -> "10s")
+ CelebornConf.WORKER_STORAGE_DIRS.key -> "/tmp:capacity=1000",
+ CelebornConf.WORKER_HEARTBEAT_TIMEOUT.key -> "10s")
workers = setUpMiniCluster(masterConfs, workerConfs)._2
}
@@ -51,7 +52,7 @@ class RssHashCheckDiskSuite extends AnyFunSuite
test("celeborn spark integration test - hash-checkDiskFull") {
val sparkConf = new
SparkConf().setAppName("rss-demo").setMaster("local[2]").set(
- "spark.celeborn.shuffle.expired.checkInterval",
+ s"spark.${CelebornConf.SHUFFLE_EXPIRED_CHECK_INTERVAL.key}",
"5s")
val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
val combineResult = combine(sparkSession)