This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new a599ff2af [CELEBORN-1535] Support to disable master
workerUnavailableInfo expiration
a599ff2af is described below
commit a599ff2afe5847ebc3abdb53c8708747a5507bb4
Author: Wang, Fei <[email protected]>
AuthorDate: Wed Aug 7 08:22:36 2024 +0800
[CELEBORN-1535] Support to disable master workerUnavailableInfo expiration
### What changes were proposed in this pull request?
In this pr, it supports to disable the worker unavailable expiration by
setting the timeout to -1.
### Why are the changes needed?
In our use case, we want to reserve all the worker unavailable information.
It is acceptable if we use the fixed ports and hosts, and will not occupy
much memory resource.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Not needed.
Closes #2657 from turboFei/disable_Cleanup.
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 3 ++-
docs/configuration/master.md | 2 +-
.../celeborn/service/deploy/master/Master.scala | 20 +++++++++++---------
3 files changed, 14 insertions(+), 11 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 0a058d07d..e744437ad 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -2184,7 +2184,8 @@ object CelebornConf extends Logging {
buildConf("celeborn.master.workerUnavailableInfo.expireTimeout")
.categories("master")
.version("0.3.1")
- .doc("Worker unavailable info would be cleared when the retention period
is expired")
+ .doc("Worker unavailable info would be cleared when the retention period
is expired." +
+ " Set -1 to disable the expiration.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1800s")
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index 03d392560..6f2fecfad 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -68,7 +68,7 @@ license: |
| celeborn.master.slot.assign.maxWorkers | 10000 | false | Max workers that
slots of one shuffle can be allocated on. Will choose the smaller positive one
from Master side and Client side, see `celeborn.client.slot.assign.maxWorkers`.
| 0.3.1 | |
| celeborn.master.slot.assign.policy | ROUNDROBIN | false | Policy for master
to assign slots, Celeborn supports two types of policy: roundrobin and
loadaware. Loadaware policy will be ignored when `HDFS` is enabled in
`celeborn.storage.activeTypes` | 0.3.0 | celeborn.slots.assign.policy |
| celeborn.master.userResourceConsumption.update.interval | 30s | false | Time
length for a window about compute user resource consumption. | 0.3.0 | |
-| celeborn.master.workerUnavailableInfo.expireTimeout | 1800s | false | Worker
unavailable info would be cleared when the retention period is expired | 0.3.1
| |
+| celeborn.master.workerUnavailableInfo.expireTimeout | 1800s | false | Worker
unavailable info would be cleared when the retention period is expired. Set -1
to disable the expiration. | 0.3.1 | |
| celeborn.quota.enabled | true | false | When Master side sets to true, the
master will enable to check the quota via QuotaManager. When Client side sets
to true, LifecycleManager will request Master side to check whether the current
user has enough quota before registration of shuffle. Fallback to the default
shuffle service of Spark when Master side checks that there is no enough quota
for current user. | 0.2.0 | |
| celeborn.redaction.regex | (?i)secret|password|token|access[.]key | false |
Regex to decide which Celeborn configuration properties and environment
variables in master and worker environments contain sensitive information. When
this regex matches a property key or value, the value is redacted from the
logging. | 0.5.0 | |
| celeborn.storage.availableTypes | HDD | false | Enabled storages. Available
options: MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical.
| 0.3.0 | celeborn.storage.activeTypes |
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 1e99956e9..e7a2f94ff 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -311,15 +311,17 @@ private[celeborn] class Master(
appHeartbeatTimeoutMs / 2,
TimeUnit.MILLISECONDS)
- checkForUnavailableWorkerTimeOutTask =
forwardMessageThread.scheduleWithFixedDelay(
- new Runnable {
- override def run(): Unit = Utils.tryLogNonFatalError {
- self.send(CheckForWorkerUnavailableInfoTimeout)
- }
- },
- 0,
- workerUnavailableInfoExpireTimeoutMs / 2,
- TimeUnit.MILLISECONDS)
+ if (workerUnavailableInfoExpireTimeoutMs > 0) {
+ checkForUnavailableWorkerTimeOutTask =
forwardMessageThread.scheduleWithFixedDelay(
+ new Runnable {
+ override def run(): Unit = Utils.tryLogNonFatalError {
+ self.send(CheckForWorkerUnavailableInfoTimeout)
+ }
+ },
+ 0,
+ workerUnavailableInfoExpireTimeoutMs / 2,
+ TimeUnit.MILLISECONDS)
+ }
if (hasHDFSStorage || hasS3Storage) {
checkForS3RemnantDirsTimeOutTask =
forwardMessageThread.scheduleWithFixedDelay(