This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 7ca69e200 [CELEBORN-1861] Support 
celeborn.worker.storage.baseDir.diskType option to specify disk type of base 
directory for worker
7ca69e200 is described below

commit 7ca69e200f564d51d60c0e304e64605578512570
Author: Nicholas Jiang <[email protected]>
AuthorDate: Wed Feb 19 15:54:42 2025 +0800

    [CELEBORN-1861] Support celeborn.worker.storage.baseDir.diskType option to 
specify disk type of base directory for worker
    
    ### What changes were proposed in this pull request?
    
    Support `celeborn.worker.storage.baseDir.diskType` option to specify disk 
type of base directory for worker.
    
    ### Why are the changes needed?
    
    The disk type of base directory for worker is `HDD` at default. We could 
support `celeborn.worker.storage.baseDir.diskType` option to specify disk type 
of base directory for worker.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    CI.
    
    Closes #3096 from SteNicholas/CELEBORN-1861.
    
    Authored-by: Nicholas Jiang <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../org/apache/celeborn/common/CelebornConf.scala  | 23 +++++++++++++++++++++-
 1 file changed, 22 insertions(+), 1 deletion(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index ad9b35381..862073c31 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1144,8 +1144,16 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
       if (!hasHDFSStorage && !hasS3Storage) {
         val prefix = workerStorageBaseDirPrefix
         val number = workerStorageBaseDirNumber
+        val diskType = Type.valueOf(workerStorageBaseDirDiskType)
         (1 to number).map { i =>
-          (s"$prefix$i", defaultMaxCapacity, workerHddFlusherThreads, HDD)
+          (
+            s"$prefix$i",
+            defaultMaxCapacity,
+            diskType match {
+              case SSD => workerSsdFlusherThreads
+              case _ => workerHddFlusherThreads
+            },
+            diskType)
         }
       } else {
         Seq.empty
@@ -1190,6 +1198,7 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
 
   def workerStorageBaseDirPrefix: String = get(WORKER_STORAGE_BASE_DIR_PREFIX)
   def workerStorageBaseDirNumber: Int = get(WORKER_STORAGE_BASE_DIR_COUNT)
+  def workerStorageBaseDirDiskType: String = 
get(WORKER_STORAGE_BASE_DIR_DISK_TYPE)
   def workerStorageExpireDirTimeout: Long = 
get(WORKER_STORAGE_EXPIRE_DIR_TIMEOUT)
   def creditStreamThreadsPerMountpoint: Int = 
get(WORKER_BUFFERSTREAM_THREADS_PER_MOUNTPOINT)
   def workerDirectMemoryRatioForReadBuffer: Double = 
get(WORKER_DIRECT_MEMORY_RATIO_FOR_READ_BUFFER)
@@ -3080,6 +3089,18 @@ object CelebornConf extends Logging {
       .intConf
       .createWithDefault(16)
 
+  val WORKER_STORAGE_BASE_DIR_DISK_TYPE: ConfigEntry[String] =
+    buildConf("celeborn.worker.storage.baseDir.diskType")
+      .internal
+      .categories("worker")
+      .version("0.6.0")
+      .doc(s"The disk type of base directory for worker to write if 
`${WORKER_STORAGE_DIRS.key}` is not set. " +
+        s"Available options: ${StorageInfo.Type.HDD.name}, 
${StorageInfo.Type.SSD.name}.")
+      .stringConf
+      .transform(_.toUpperCase(Locale.ROOT))
+      .checkValues(Set(StorageInfo.Type.HDD.name, StorageInfo.Type.SSD.name))
+      .createWithDefault(StorageInfo.Type.HDD.name)
+
   val WORKER_STORAGE_EXPIRE_DIR_TIMEOUT: ConfigEntry[Long] =
     buildConf("celeborn.worker.storage.expireDirs.timeout")
       .categories("worker")

Reply via email to