This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new 230d7137b [CELEBORN-786] Change default flush threads
230d7137b is described below

commit 230d7137b07dd079297c37c8c0e7948f7e872245
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Tue Jul 11 13:09:29 2023 +0800

    [CELEBORN-786] Change default flush threads
    
    ### What changes were proposed in this pull request?
    This PR changes default values of the following configs:
    
    |config|previous default value|new default value|
    |----|----|----|
    |celeborn.worker.flusher.threads|2|16|
    |celeborn.worker.flusher.ssd.threads|8|16|
    
    ### Why are the changes needed?
    If disk type is not specified, ```celeborn.worker.flusher.threads``` will 
be used. Recently many users
    use SSD for Celeborn workers without specifying disk type, and 2 flush 
threads is far from leveraging the power of SSD.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, default configs are changed.
    
    ### How was this patch tested?
    Passes GA.
    
    Closes #1703 from waitinfuture/786.
    
    Authored-by: zky.zhoukeyong <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
    (cherry picked from commit 7a47fae23045dc2efbc5b09d834374559f856e35)
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../src/main/scala/org/apache/celeborn/common/CelebornConf.scala  | 4 ++--
 .../test/scala/org/apache/celeborn/common/CelebornConfSuite.scala | 8 ++++----
 docs/configuration/worker.md                                      | 4 ++--
 3 files changed, 8 insertions(+), 8 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 24992be77..4e4c8a3c8 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -2191,7 +2191,7 @@ object CelebornConf extends Logging {
       .doc("Flusher's thread count per disk for unkown-type disks.")
       .version("0.2.0")
       .intConf
-      .createWithDefault(2)
+      .createWithDefault(16)
 
   val WORKER_FLUSHER_HDD_THREADS: ConfigEntry[Int] =
     buildConf("celeborn.worker.flusher.hdd.threads")
@@ -2207,7 +2207,7 @@ object CelebornConf extends Logging {
       .doc("Flusher's thread count per disk used for write data to SSD disks.")
       .version("0.2.0")
       .intConf
-      .createWithDefault(8)
+      .createWithDefault(16)
 
   val WORKER_FLUSHER_HDFS_THREADS: ConfigEntry[Int] =
     buildConf("celeborn.worker.flusher.hdfs.threads")
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala 
b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
index 3105d25d7..3e6cbc7ef 100644
--- a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
@@ -38,7 +38,7 @@ class CelebornConfSuite extends CelebornFunSuite {
     conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, "/mnt/disk1")
     val workerBaseDirs = conf.workerBaseDirs
     assert(workerBaseDirs.size == 1)
-    assert(workerBaseDirs.head._3 == 2)
+    assert(workerBaseDirs.head._3 == 16)
     assert(workerBaseDirs.head._2 == defaultMaxUsableSpace)
   }
 
@@ -47,7 +47,7 @@ class CelebornConfSuite extends CelebornFunSuite {
     conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, 
"/mnt/disk1:disktype=SSD:capacity=10g")
     val workerBaseDirs = conf.workerBaseDirs
     assert(workerBaseDirs.size == 1)
-    assert(workerBaseDirs.head._3 == 8)
+    assert(workerBaseDirs.head._3 == 16)
     assert(workerBaseDirs.head._2 == 10 * 1024 * 1024 * 1024L)
   }
 
@@ -83,7 +83,7 @@ class CelebornConfSuite extends CelebornFunSuite {
     val conf = new CelebornConf()
     conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, "/mnt/disk1")
     val workerBaseDirs = conf.workerBaseDirs
-    assert(workerBaseDirs.head._3 == 2)
+    assert(workerBaseDirs.head._3 == 16)
   }
 
   test("storage test6") {
@@ -107,7 +107,7 @@ class CelebornConfSuite extends CelebornFunSuite {
     conf.set(CelebornConf.WORKER_FLUSHER_THREADS.key, "4")
       .set(CelebornConf.WORKER_STORAGE_DIRS.key, "/mnt/disk1:disktype=SSD")
     val workerBaseDirs = conf.workerBaseDirs
-    assert(workerBaseDirs.head._3 == 8)
+    assert(workerBaseDirs.head._3 == 16)
   }
 
   test("storage test9") {
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 887788cef..0a1372786 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -47,8 +47,8 @@ license: |
 | celeborn.worker.flusher.hdfs.buffer.size | 4m | Size of buffer used by a 
HDFS flusher. | 0.3.0 | 
 | celeborn.worker.flusher.hdfs.threads | 8 | Flusher's thread count used for 
write data to HDFS. | 0.2.0 | 
 | celeborn.worker.flusher.shutdownTimeout | 3s | Timeout for a flusher to 
shutdown. | 0.2.0 | 
-| celeborn.worker.flusher.ssd.threads | 8 | Flusher's thread count per disk 
used for write data to SSD disks. | 0.2.0 | 
-| celeborn.worker.flusher.threads | 2 | Flusher's thread count per disk for 
unkown-type disks. | 0.2.0 | 
+| celeborn.worker.flusher.ssd.threads | 16 | Flusher's thread count per disk 
used for write data to SSD disks. | 0.2.0 | 
+| celeborn.worker.flusher.threads | 16 | Flusher's thread count per disk for 
unkown-type disks. | 0.2.0 | 
 | celeborn.worker.graceful.shutdown.checkSlotsFinished.interval | 1s | The 
wait interval of checking whether all released slots to be committed or 
destroyed during worker graceful shutdown | 0.2.0 | 
 | celeborn.worker.graceful.shutdown.checkSlotsFinished.timeout | 480s | The 
wait time of waiting for the released slots to be committed or destroyed during 
worker graceful shutdown. | 0.2.0 | 
 | celeborn.worker.graceful.shutdown.enabled | false | When true, during worker 
shutdown, the worker will wait for all released slots to be committed or 
destroyed. | 0.2.0 | 

Reply via email to