This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 7ca69e200 [CELEBORN-1861] Support
celeborn.worker.storage.baseDir.diskType option to specify disk type of base
directory for worker
7ca69e200 is described below
commit 7ca69e200f564d51d60c0e304e64605578512570
Author: Nicholas Jiang <[email protected]>
AuthorDate: Wed Feb 19 15:54:42 2025 +0800
[CELEBORN-1861] Support celeborn.worker.storage.baseDir.diskType option to
specify disk type of base directory for worker
### What changes were proposed in this pull request?
Support `celeborn.worker.storage.baseDir.diskType` option to specify disk
type of base directory for worker.
### Why are the changes needed?
The disk type of base directory for worker is `HDD` at default. We could
support `celeborn.worker.storage.baseDir.diskType` option to specify disk type
of base directory for worker.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes #3096 from SteNicholas/CELEBORN-1861.
Authored-by: Nicholas Jiang <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 23 +++++++++++++++++++++-
1 file changed, 22 insertions(+), 1 deletion(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index ad9b35381..862073c31 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1144,8 +1144,16 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
if (!hasHDFSStorage && !hasS3Storage) {
val prefix = workerStorageBaseDirPrefix
val number = workerStorageBaseDirNumber
+ val diskType = Type.valueOf(workerStorageBaseDirDiskType)
(1 to number).map { i =>
- (s"$prefix$i", defaultMaxCapacity, workerHddFlusherThreads, HDD)
+ (
+ s"$prefix$i",
+ defaultMaxCapacity,
+ diskType match {
+ case SSD => workerSsdFlusherThreads
+ case _ => workerHddFlusherThreads
+ },
+ diskType)
}
} else {
Seq.empty
@@ -1190,6 +1198,7 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
def workerStorageBaseDirPrefix: String = get(WORKER_STORAGE_BASE_DIR_PREFIX)
def workerStorageBaseDirNumber: Int = get(WORKER_STORAGE_BASE_DIR_COUNT)
+ def workerStorageBaseDirDiskType: String =
get(WORKER_STORAGE_BASE_DIR_DISK_TYPE)
def workerStorageExpireDirTimeout: Long =
get(WORKER_STORAGE_EXPIRE_DIR_TIMEOUT)
def creditStreamThreadsPerMountpoint: Int =
get(WORKER_BUFFERSTREAM_THREADS_PER_MOUNTPOINT)
def workerDirectMemoryRatioForReadBuffer: Double =
get(WORKER_DIRECT_MEMORY_RATIO_FOR_READ_BUFFER)
@@ -3080,6 +3089,18 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(16)
+ val WORKER_STORAGE_BASE_DIR_DISK_TYPE: ConfigEntry[String] =
+ buildConf("celeborn.worker.storage.baseDir.diskType")
+ .internal
+ .categories("worker")
+ .version("0.6.0")
+ .doc(s"The disk type of base directory for worker to write if
`${WORKER_STORAGE_DIRS.key}` is not set. " +
+ s"Available options: ${StorageInfo.Type.HDD.name},
${StorageInfo.Type.SSD.name}.")
+ .stringConf
+ .transform(_.toUpperCase(Locale.ROOT))
+ .checkValues(Set(StorageInfo.Type.HDD.name, StorageInfo.Type.SSD.name))
+ .createWithDefault(StorageInfo.Type.HDD.name)
+
val WORKER_STORAGE_EXPIRE_DIR_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.storage.expireDirs.timeout")
.categories("worker")