This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 230d7137b [CELEBORN-786] Change default flush threads
230d7137b is described below
commit 230d7137b07dd079297c37c8c0e7948f7e872245
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Tue Jul 11 13:09:29 2023 +0800
[CELEBORN-786] Change default flush threads
### What changes were proposed in this pull request?
This PR changes default values of the following configs:
|config|previous default value|new default value|
|----|----|----|
|celeborn.worker.flusher.threads|2|16|
|celeborn.worker.flusher.ssd.threads|8|16|
### Why are the changes needed?
If disk type is not specified, ```celeborn.worker.flusher.threads``` will
be used. Recently many users
use SSD for Celeborn workers without specifying disk type, and 2 flush
threads is far from leveraging the power of SSD.
### Does this PR introduce _any_ user-facing change?
Yes, default configs are changed.
### How was this patch tested?
Passes GA.
Closes #1703 from waitinfuture/786.
Authored-by: zky.zhoukeyong <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
(cherry picked from commit 7a47fae23045dc2efbc5b09d834374559f856e35)
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../src/main/scala/org/apache/celeborn/common/CelebornConf.scala | 4 ++--
.../test/scala/org/apache/celeborn/common/CelebornConfSuite.scala | 8 ++++----
docs/configuration/worker.md | 4 ++--
3 files changed, 8 insertions(+), 8 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 24992be77..4e4c8a3c8 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -2191,7 +2191,7 @@ object CelebornConf extends Logging {
.doc("Flusher's thread count per disk for unkown-type disks.")
.version("0.2.0")
.intConf
- .createWithDefault(2)
+ .createWithDefault(16)
val WORKER_FLUSHER_HDD_THREADS: ConfigEntry[Int] =
buildConf("celeborn.worker.flusher.hdd.threads")
@@ -2207,7 +2207,7 @@ object CelebornConf extends Logging {
.doc("Flusher's thread count per disk used for write data to SSD disks.")
.version("0.2.0")
.intConf
- .createWithDefault(8)
+ .createWithDefault(16)
val WORKER_FLUSHER_HDFS_THREADS: ConfigEntry[Int] =
buildConf("celeborn.worker.flusher.hdfs.threads")
diff --git
a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
index 3105d25d7..3e6cbc7ef 100644
--- a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
@@ -38,7 +38,7 @@ class CelebornConfSuite extends CelebornFunSuite {
conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, "/mnt/disk1")
val workerBaseDirs = conf.workerBaseDirs
assert(workerBaseDirs.size == 1)
- assert(workerBaseDirs.head._3 == 2)
+ assert(workerBaseDirs.head._3 == 16)
assert(workerBaseDirs.head._2 == defaultMaxUsableSpace)
}
@@ -47,7 +47,7 @@ class CelebornConfSuite extends CelebornFunSuite {
conf.set(CelebornConf.WORKER_STORAGE_DIRS.key,
"/mnt/disk1:disktype=SSD:capacity=10g")
val workerBaseDirs = conf.workerBaseDirs
assert(workerBaseDirs.size == 1)
- assert(workerBaseDirs.head._3 == 8)
+ assert(workerBaseDirs.head._3 == 16)
assert(workerBaseDirs.head._2 == 10 * 1024 * 1024 * 1024L)
}
@@ -83,7 +83,7 @@ class CelebornConfSuite extends CelebornFunSuite {
val conf = new CelebornConf()
conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, "/mnt/disk1")
val workerBaseDirs = conf.workerBaseDirs
- assert(workerBaseDirs.head._3 == 2)
+ assert(workerBaseDirs.head._3 == 16)
}
test("storage test6") {
@@ -107,7 +107,7 @@ class CelebornConfSuite extends CelebornFunSuite {
conf.set(CelebornConf.WORKER_FLUSHER_THREADS.key, "4")
.set(CelebornConf.WORKER_STORAGE_DIRS.key, "/mnt/disk1:disktype=SSD")
val workerBaseDirs = conf.workerBaseDirs
- assert(workerBaseDirs.head._3 == 8)
+ assert(workerBaseDirs.head._3 == 16)
}
test("storage test9") {
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 887788cef..0a1372786 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -47,8 +47,8 @@ license: |
| celeborn.worker.flusher.hdfs.buffer.size | 4m | Size of buffer used by a
HDFS flusher. | 0.3.0 |
| celeborn.worker.flusher.hdfs.threads | 8 | Flusher's thread count used for
write data to HDFS. | 0.2.0 |
| celeborn.worker.flusher.shutdownTimeout | 3s | Timeout for a flusher to
shutdown. | 0.2.0 |
-| celeborn.worker.flusher.ssd.threads | 8 | Flusher's thread count per disk
used for write data to SSD disks. | 0.2.0 |
-| celeborn.worker.flusher.threads | 2 | Flusher's thread count per disk for
unkown-type disks. | 0.2.0 |
+| celeborn.worker.flusher.ssd.threads | 16 | Flusher's thread count per disk
used for write data to SSD disks. | 0.2.0 |
+| celeborn.worker.flusher.threads | 16 | Flusher's thread count per disk for
unkown-type disks. | 0.2.0 |
| celeborn.worker.graceful.shutdown.checkSlotsFinished.interval | 1s | The
wait interval of checking whether all released slots to be committed or
destroyed during worker graceful shutdown | 0.2.0 |
| celeborn.worker.graceful.shutdown.checkSlotsFinished.timeout | 480s | The
wait time of waiting for the released slots to be committed or destroyed during
worker graceful shutdown. | 0.2.0 |
| celeborn.worker.graceful.shutdown.enabled | false | When true, during worker
shutdown, the worker will wait for all released slots to be committed or
destroyed. | 0.2.0 |