This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new a599ff2af [CELEBORN-1535] Support to disable master 
workerUnavailableInfo expiration
a599ff2af is described below

commit a599ff2afe5847ebc3abdb53c8708747a5507bb4
Author: Wang, Fei <[email protected]>
AuthorDate: Wed Aug 7 08:22:36 2024 +0800

    [CELEBORN-1535] Support to disable master workerUnavailableInfo expiration
    
    ### What changes were proposed in this pull request?
    
    In this pr, it supports to disable the worker unavailable expiration by 
setting the timeout to -1.
    
    ### Why are the changes needed?
    
    In our use case, we want to reserve all the worker unavailable information.
    It is acceptable if we use the fixed ports and hosts, and will not occupy 
much memory resource.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    
    Not needed.
    
    Closes #2657 from turboFei/disable_Cleanup.
    
    Authored-by: Wang, Fei <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../org/apache/celeborn/common/CelebornConf.scala    |  3 ++-
 docs/configuration/master.md                         |  2 +-
 .../celeborn/service/deploy/master/Master.scala      | 20 +++++++++++---------
 3 files changed, 14 insertions(+), 11 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 0a058d07d..e744437ad 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -2184,7 +2184,8 @@ object CelebornConf extends Logging {
     buildConf("celeborn.master.workerUnavailableInfo.expireTimeout")
       .categories("master")
       .version("0.3.1")
-      .doc("Worker unavailable info would be cleared when the retention period 
is expired")
+      .doc("Worker unavailable info would be cleared when the retention period 
is expired." +
+        " Set -1 to disable the expiration.")
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("1800s")
 
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index 03d392560..6f2fecfad 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -68,7 +68,7 @@ license: |
 | celeborn.master.slot.assign.maxWorkers | 10000 | false | Max workers that 
slots of one shuffle can be allocated on. Will choose the smaller positive one 
from Master side and Client side, see `celeborn.client.slot.assign.maxWorkers`. 
| 0.3.1 |  | 
 | celeborn.master.slot.assign.policy | ROUNDROBIN | false | Policy for master 
to assign slots, Celeborn supports two types of policy: roundrobin and 
loadaware. Loadaware policy will be ignored when `HDFS` is enabled in 
`celeborn.storage.activeTypes` | 0.3.0 | celeborn.slots.assign.policy | 
 | celeborn.master.userResourceConsumption.update.interval | 30s | false | Time 
length for a window about compute user resource consumption. | 0.3.0 |  | 
-| celeborn.master.workerUnavailableInfo.expireTimeout | 1800s | false | Worker 
unavailable info would be cleared when the retention period is expired | 0.3.1 
|  | 
+| celeborn.master.workerUnavailableInfo.expireTimeout | 1800s | false | Worker 
unavailable info would be cleared when the retention period is expired. Set -1 
to disable the expiration. | 0.3.1 |  | 
 | celeborn.quota.enabled | true | false | When Master side sets to true, the 
master will enable to check the quota via QuotaManager. When Client side sets 
to true, LifecycleManager will request Master side to check whether the current 
user has enough quota before registration of shuffle. Fallback to the default 
shuffle service of Spark when Master side checks that there is no enough quota 
for current user. | 0.2.0 |  | 
 | celeborn.redaction.regex | (?i)secret|password|token|access[.]key | false | 
Regex to decide which Celeborn configuration properties and environment 
variables in master and worker environments contain sensitive information. When 
this regex matches a property key or value, the value is redacted from the 
logging. | 0.5.0 |  | 
 | celeborn.storage.availableTypes | HDD | false | Enabled storages. Available 
options: MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical. 
| 0.3.0 | celeborn.storage.activeTypes | 
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 1e99956e9..e7a2f94ff 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -311,15 +311,17 @@ private[celeborn] class Master(
       appHeartbeatTimeoutMs / 2,
       TimeUnit.MILLISECONDS)
 
-    checkForUnavailableWorkerTimeOutTask = 
forwardMessageThread.scheduleWithFixedDelay(
-      new Runnable {
-        override def run(): Unit = Utils.tryLogNonFatalError {
-          self.send(CheckForWorkerUnavailableInfoTimeout)
-        }
-      },
-      0,
-      workerUnavailableInfoExpireTimeoutMs / 2,
-      TimeUnit.MILLISECONDS)
+    if (workerUnavailableInfoExpireTimeoutMs > 0) {
+      checkForUnavailableWorkerTimeOutTask = 
forwardMessageThread.scheduleWithFixedDelay(
+        new Runnable {
+          override def run(): Unit = Utils.tryLogNonFatalError {
+            self.send(CheckForWorkerUnavailableInfoTimeout)
+          }
+        },
+        0,
+        workerUnavailableInfoExpireTimeoutMs / 2,
+        TimeUnit.MILLISECONDS)
+    }
 
     if (hasHDFSStorage || hasS3Storage) {
       checkForS3RemnantDirsTimeOutTask = 
forwardMessageThread.scheduleWithFixedDelay(

Reply via email to