This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.2
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.2 by this push:
new c6bc535a [CELEBORN-214] Push/Replicate/Fetch io threads default value
is 16 (#1158)
c6bc535a is described below
commit c6bc535a7ec2203aadb6ef395d785af518f94e09
Author: zy.jordan <[email protected]>
AuthorDate: Tue Jan 10 17:46:56 2023 +0800
[CELEBORN-214] Push/Replicate/Fetch io threads default value is 16 (#1158)
---
.../org/apache/celeborn/common/CelebornConf.scala | 24 +++++++++++-----------
docs/configuration/worker.md | 6 +++---
.../celeborn/service/deploy/worker/Worker.scala | 6 +++---
3 files changed, 18 insertions(+), 18 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 7652066a..c0e194a5 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -490,9 +490,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def workerPushPort: Int = get(WORKER_PUSH_PORT)
def workerFetchPort: Int = get(WORKER_FETCH_PORT)
def workerReplicatePort: Int = get(WORKER_REPLICATE_PORT)
- def workerPushIoThreads: Option[Int] = get(WORKER_PUSH_IO_THREADS)
- def workerFetchIoThreads: Option[Int] = get(WORKER_FETCH_IO_THREADS)
- def workerReplicateIoThreads: Option[Int] = get(WORKER_REPLICATE_IO_THREADS)
+ def workerPushIoThreads: Int = get(WORKER_PUSH_IO_THREADS)
+ def workerFetchIoThreads: Int = get(WORKER_FETCH_IO_THREADS)
+ def workerReplicateIoThreads: Int = get(WORKER_REPLICATE_IO_THREADS)
def registerWorkerTimeout: Long = get(WORKER_REGISTER_TIMEOUT)
def workerNonEmptyDirExpireDuration: Long =
get(WORKER_NON_EMPTY_DIR_EXPIRE_DURATION)
def workerWorkingDir: String = get(WORKER_WORKING_DIR)
@@ -1717,35 +1717,35 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(0)
- val WORKER_PUSH_IO_THREADS: OptionalConfigEntry[Int] =
+ val WORKER_PUSH_IO_THREADS: ConfigEntry[Int] =
buildConf("celeborn.worker.push.io.threads")
.withAlternative("rss.push.io.threads")
.categories("worker")
.doc("Netty IO thread number of worker to handle client push data. " +
- s"The default threads number is `size(${WORKER_STORAGE_DIRS.key})*2`.")
+ s"The default threads number is 16.")
.version("0.2.0")
.intConf
- .createOptional
+ .createWithDefault(16)
- val WORKER_FETCH_IO_THREADS: OptionalConfigEntry[Int] =
+ val WORKER_FETCH_IO_THREADS: ConfigEntry[Int] =
buildConf("celeborn.worker.fetch.io.threads")
.withAlternative("rss.fetch.io.threads")
.categories("worker")
.doc("Netty IO thread number of worker to handle client fetch data. " +
- s"The default threads number is `size(${WORKER_STORAGE_DIRS.key})*2`.")
+ s"The default threads number is 16.")
.version("0.2.0")
.intConf
- .createOptional
+ .createWithDefault(16)
- val WORKER_REPLICATE_IO_THREADS: OptionalConfigEntry[Int] =
+ val WORKER_REPLICATE_IO_THREADS: ConfigEntry[Int] =
buildConf("celeborn.worker.replicate.io.threads")
.withAlternative("rss.replicate.io.threads")
.categories("worker")
.doc("Netty IO thread number of worker to replicate shuffle data. " +
- s"The default threads number is `size(${WORKER_STORAGE_DIRS.key})*2`.")
+ s"The default threads number is 16.")
.version("0.2.0")
.intConf
- .createOptional
+ .createWithDefault(16)
val WORKER_REGISTER_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.register.timeout")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index f34703c9..89b9eb79 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -42,7 +42,7 @@ license: |
| celeborn.worker.disk.checkFileClean.maxRetries | 3 | The number of retries
for a worker to check if the working directory is cleaned up before registering
with the master. | 0.2.0 |
| celeborn.worker.disk.checkFileClean.timeout | 1000ms | The wait time per
retry for a worker to check if the working directory is cleaned up before
registering with the master. | 0.2.0 |
| celeborn.worker.disk.reserve.size | 5G | Celeborn worker reserved space for
each disk. | 0.2.0 |
-| celeborn.worker.fetch.io.threads | <undefined> | Netty IO thread
number of worker to handle client fetch data. The default threads number is
`size(celeborn.worker.storage.dirs)*2`. | 0.2.0 |
+| celeborn.worker.fetch.io.threads | 16 | Netty IO thread number of worker to
handle client fetch data. The default threads number is 16. | 0.2.0 |
| celeborn.worker.fetch.port | 0 | Server port for Worker to receive fetch
data request from ShuffleClient. | 0.2.0 |
| celeborn.worker.flusher.avgFlushTime.slidingWindow.size | 20 | The size of
sliding windows used to calculate statistics about flushed time and count. |
0.2.0 |
| celeborn.worker.flusher.buffer.size | 256k | Size of buffer used by a single
flusher. | 0.2.0 |
@@ -69,11 +69,11 @@ license: |
| celeborn.worker.partitionSorter.directMemoryRatioThreshold | 0.1 | Max ratio
of partition sorter's memory for sorting, when reserved memory is higher than
max partition sorter memory, partition sorter will stop sorting. | 0.2.0 |
| celeborn.worker.partitionSorter.reservedMemoryPerPartition | 1mb | Initial
reserve memory when sorting a shuffle file off-heap. | 0.2.0 |
| celeborn.worker.partitionSorter.sort.timeout | 220s | Timeout for a shuffle
file to sort. | 0.2.0 |
-| celeborn.worker.push.io.threads | <undefined> | Netty IO thread number
of worker to handle client push data. The default threads number is
`size(celeborn.worker.storage.dirs)*2`. | 0.2.0 |
+| celeborn.worker.push.io.threads | 16 | Netty IO thread number of worker to
handle client push data. The default threads number is 16. | 0.2.0 |
| celeborn.worker.push.port | 0 | Server port for Worker to receive push data
request from ShuffleClient. | 0.2.0 |
| celeborn.worker.register.timeout | 180s | Worker register timeout. | 0.2.0 |
| celeborn.worker.replicate.fastFail.duration | 60s | If a replicate request
not replied during the duration, worker will mark the replicate data request as
failed. | 0.2.0 |
-| celeborn.worker.replicate.io.threads | <undefined> | Netty IO thread
number of worker to replicate shuffle data. The default threads number is
`size(celeborn.worker.storage.dirs)*2`. | 0.2.0 |
+| celeborn.worker.replicate.io.threads | 16 | Netty IO thread number of worker
to replicate shuffle data. The default threads number is 16. | 0.2.0 |
| celeborn.worker.replicate.port | 0 | Server port for Worker to receive
replicate data request from other Workers. | 0.2.0 |
| celeborn.worker.replicate.threads | 64 | Thread number of worker to
replicate shuffle data. | 0.2.0 |
| celeborn.worker.rpc.port | 0 | Server port for Worker to receive RPC
request. | 0.2.0 |
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index debdd390..046560ce 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -110,7 +110,7 @@ private[celeborn] class Worker(
val pushDataHandler = new PushDataHandler()
val (pushServer, pushClientFactory) = {
val closeIdleConnections = conf.workerCloseIdleConnections
- val numThreads =
conf.workerPushIoThreads.getOrElse(storageManager.disksSnapshot().size * 2)
+ val numThreads = conf.workerPushIoThreads
val transportConf =
Utils.fromCelebornConf(conf, TransportModuleConstants.PUSH_MODULE,
numThreads)
val pushServerLimiter = new
ChannelsLimiter(TransportModuleConstants.PUSH_MODULE)
@@ -125,7 +125,7 @@ private[celeborn] class Worker(
private val replicateServer = {
val closeIdleConnections = conf.workerCloseIdleConnections
val numThreads =
-
conf.workerReplicateIoThreads.getOrElse(storageManager.disksSnapshot().size * 2)
+ conf.workerReplicateIoThreads
val transportConf =
Utils.fromCelebornConf(conf, TransportModuleConstants.REPLICATE_MODULE,
numThreads)
val replicateLimiter = new
ChannelsLimiter(TransportModuleConstants.REPLICATE_MODULE)
@@ -137,7 +137,7 @@ private[celeborn] class Worker(
var fetchHandler: FetchHandler = _
private val fetchServer = {
val closeIdleConnections = conf.workerCloseIdleConnections
- val numThreads =
conf.workerFetchIoThreads.getOrElse(storageManager.disksSnapshot().size * 2)
+ val numThreads = conf.workerFetchIoThreads
val transportConf =
Utils.fromCelebornConf(conf, TransportModuleConstants.FETCH_MODULE,
numThreads)
fetchHandler = new FetchHandler(transportConf)