This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 450dac824 [CELEBORN-1447] Support configuring thread number of worker
to wait for commit shuffle data files to finish
450dac824 is described below
commit 450dac82454ecc92b984b32b75818665fee2a032
Author: SteNicholas <[email protected]>
AuthorDate: Mon Jun 3 17:47:01 2024 +0800
[CELEBORN-1447] Support configuring thread number of worker to wait for
commit shuffle data files to finish
### What changes were proposed in this pull request?
Introduce `celeborn.worker.commitFiles.wait.threads` to support configuring
thread number of worker to wait for commit shuffle data files to finish.
### Why are the changes needed?
`celeborn.worker.commitFiles.threads` supports the configuration that is
the thread number of worker to commit shuffle data files asynchronously
including waiting for commit files to finish at present. It should support to
configure thread number of waiting for commit shuffle data files to finish
which avoids the situation where the commit thread pool is waiting for commit
files and no thread could commit shuffle data files.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GA.
Closes #2539 from SteNicholas/CELEBORN-1447.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../src/main/scala/org/apache/celeborn/common/CelebornConf.scala | 9 +++++++++
docs/configuration/worker.md | 1 +
.../org/apache/celeborn/service/deploy/worker/Controller.scala | 5 ++++-
.../scala/org/apache/celeborn/service/deploy/worker/Worker.scala | 4 ++++
4 files changed, 18 insertions(+), 1 deletion(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index e4e7b9026..690758b77 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -805,6 +805,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def workerReplicateThreads: Int = get(WORKER_REPLICATE_THREADS)
def workerCommitThreads: Int =
if (hasHDFSStorage) Math.max(128, get(WORKER_COMMIT_THREADS)) else
get(WORKER_COMMIT_THREADS)
+ def workerCommitFilesWaitThreads: Int = get(WORKER_COMMIT_FILES_WAIT_THREADS)
def workerCleanThreads: Int = get(WORKER_CLEAN_THREADS)
def workerShuffleCommitTimeout: Long = get(WORKER_SHUFFLE_COMMIT_TIMEOUT)
def maxPartitionSizeToEstimate: Long =
@@ -2907,6 +2908,14 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(32)
+ val WORKER_COMMIT_FILES_WAIT_THREADS: ConfigEntry[Int] =
+ buildConf("celeborn.worker.commitFiles.wait.threads")
+ .categories("worker")
+ .version("0.5.0")
+ .doc("Thread number of worker to wait for commit shuffle data files to
finish.")
+ .intConf
+ .createWithDefault(32)
+
val WORKER_CLEAN_THREADS: ConfigEntry[Int] =
buildConf("celeborn.worker.clean.threads")
.categories("worker")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index ee6756131..30d27f780 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -51,6 +51,7 @@ license: |
| celeborn.worker.closeIdleConnections | false | false | Whether worker will
close idle connections. | 0.2.0 | |
| celeborn.worker.commitFiles.threads | 32 | false | Thread number of worker
to commit shuffle data files asynchronously. It's recommended to set at least
`128` when `HDFS` is enabled in `celeborn.storage.activeTypes`. | 0.3.0 |
celeborn.worker.commit.threads |
| celeborn.worker.commitFiles.timeout | 120s | false | Timeout for a Celeborn
worker to commit files of a shuffle. It's recommended to set at least `240s`
when `HDFS` is enabled in `celeborn.storage.activeTypes`. | 0.3.0 |
celeborn.worker.shuffle.commit.timeout |
+| celeborn.worker.commitFiles.wait.threads | 32 | false | Thread number of
worker to wait for commit shuffle data files to finish. | 0.5.0 | |
| celeborn.worker.congestionControl.check.interval | 10ms | false | Interval
of worker checks congestion if celeborn.worker.congestionControl.enabled is
true. | 0.3.2 | |
| celeborn.worker.congestionControl.enabled | false | false | Whether to
enable congestion control or not. | 0.3.0 | |
| celeborn.worker.congestionControl.high.watermark | <undefined> | false
| If the total bytes in disk buffer exceeds this configure, will start to
congestusers whose produce rate is higher than the potential average consume
rate. The congestion will stop if the produce rate is lower or equal to the
average consume rate, or the total pending bytes lower than
celeborn.worker.congestionControl.low.watermark | 0.3.0 | |
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
index 6d9a44df1..8deac0905 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
@@ -57,6 +57,7 @@ private[deploy] class Controller(
var partitionLocationInfo: WorkerPartitionLocationInfo = _
var timer: HashedWheelTimer = _
var commitThreadPool: ThreadPoolExecutor = _
+ var waitThreadPool: ThreadPoolExecutor = _
var asyncReplyPool: ScheduledExecutorService = _
val minPartitionSizeToEstimate = conf.minPartitionSizeToEstimate
var shutdown: AtomicBoolean = _
@@ -72,6 +73,7 @@ private[deploy] class Controller(
partitionLocationInfo = worker.partitionLocationInfo
timer = worker.timer
commitThreadPool = worker.commitThreadPool
+ waitThreadPool = worker.waitThreadPool
asyncReplyPool = worker.asyncReplyPool
shutdown = worker.shutdown
}
@@ -431,7 +433,8 @@ private[deploy] class Controller(
return
} else if (commitInfo.status == CommitInfo.COMMIT_INPROCESS) {
logInfo(s"$shuffleKey CommitFiles inprogress, wait for finish")
- commitThreadPool.submit(new Runnable {
+ // should not use commitThreadPool in case of block by commit files.
+ waitThreadPool.submit(new Runnable {
override def run(): Unit = {
waitForCommitFinish()
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index bdaab805c..bc815f6d1 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -325,6 +325,8 @@ private[celeborn] class Worker(
ThreadUtils.newDaemonCachedThreadPool("worker-data-replicator",
conf.workerReplicateThreads)
val commitThreadPool: ThreadPoolExecutor =
ThreadUtils.newDaemonCachedThreadPool("worker-files-committer",
conf.workerCommitThreads)
+ val waitThreadPool: ThreadPoolExecutor =
+ ThreadUtils.newDaemonCachedThreadPool("worker-commit-waiter",
conf.workerCommitFilesWaitThreads)
val cleanThreadPool: ThreadPoolExecutor =
ThreadUtils.newDaemonCachedThreadPool(
"worker-expired-shuffle-cleaner",
@@ -563,11 +565,13 @@ private[celeborn] class Worker(
forwardMessageScheduler.shutdown()
replicateThreadPool.shutdown()
commitThreadPool.shutdown()
+ waitThreadPool.shutdown();
asyncReplyPool.shutdown()
} else {
forwardMessageScheduler.shutdownNow()
replicateThreadPool.shutdownNow()
commitThreadPool.shutdownNow()
+ waitThreadPool.shutdownNow();
asyncReplyPool.shutdownNow()
}
workerSource.appActiveConnections.clear()