This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new b5201df04 [CELEBORN-1531][FOLLOWUP] Assign the scheduled check task
b5201df04 is described below

commit b5201df04c7b591916fffb86c369ec6157db3774
Author: Wang, Fei <[email protected]>
AuthorDate: Tue Nov 5 11:18:33 2024 +0800

    [CELEBORN-1531][FOLLOWUP] Assign the scheduled check task
    
    ### What changes were proposed in this pull request?
    
    As title.
    
    ### Why are the changes needed?
    Followup for https://github.com/apache/celeborn/pull/2653
    
    `checkForUnavailableWorkerTimeOutTask` and 
`checkForS3RemnantDirsTimeOutTask` are not assigned and always null.
    <img width="834" alt="image" 
src="https://github.com/user-attachments/assets/747a3054-87db-458f-acf8-876926bd1883";>
    
    Combine the `checkForHDFSRemnantDirsTimeOutTask` and 
`checkForS3RemnantDirsTimeOutTask` with `checkForDFSRemnantDirsTimeOutTask`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    GA.
    
    Closes #2871 from turboFei/1531_followup.
    
    Authored-by: Wang, Fei <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../celeborn/service/deploy/master/Master.scala    | 28 +++++++---------------
 1 file changed, 8 insertions(+), 20 deletions(-)

diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 1f065b3d8..dae240977 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -171,8 +171,7 @@ private[celeborn] class Master(
   private var checkForWorkerTimeOutTask: ScheduledFuture[_] = _
   private var checkForApplicationTimeOutTask: ScheduledFuture[_] = _
   private var checkForUnavailableWorkerTimeOutTask: ScheduledFuture[_] = _
-  private var checkForHDFSRemnantDirsTimeOutTask: ScheduledFuture[_] = _
-  private var checkForS3RemnantDirsTimeOutTask: ScheduledFuture[_] = _
+  private var checkForDFSRemnantDirsTimeOutTask: ScheduledFuture[_] = _
   private val nonEagerHandler = 
ThreadUtils.newDaemonCachedThreadPool("master-noneager-handler", 64)
 
   // Config constants
@@ -309,19 +308,19 @@ private[celeborn] class Master(
       scheduleCheckTask(appHeartbeatTimeoutMs / 2, CheckForApplicationTimeOut)
 
     if (workerUnavailableInfoExpireTimeoutMs > 0) {
-      scheduleCheckTask(
+      checkForUnavailableWorkerTimeOutTask = scheduleCheckTask(
         workerUnavailableInfoExpireTimeoutMs / 2,
         CheckForWorkerUnavailableInfoTimeout)
     }
 
     if (hasHDFSStorage || hasS3Storage) {
-      checkForHDFSRemnantDirsTimeOutTask =
+      checkForDFSRemnantDirsTimeOutTask =
         scheduleCheckTask(dfsExpireDirsTimeoutMS, 
CheckForDFSExpiredDirsTimeout)
     }
 
   }
 
-  def scheduleCheckTask[T](timeoutMS: Long, message: T): ScheduledFuture[_] = {
+  private def scheduleCheckTask[T](timeoutMS: Long, message: T): 
ScheduledFuture[_] = {
     forwardMessageThread.scheduleWithFixedDelay(
       new Runnable {
         override def run(): Unit = Utils.tryLogNonFatalError {
@@ -338,21 +337,10 @@ private[celeborn] class Master(
       return
     }
     logInfo("Stopping Celeborn Master.")
-    if (checkForWorkerTimeOutTask != null) {
-      checkForWorkerTimeOutTask.cancel(true)
-    }
-    if (checkForUnavailableWorkerTimeOutTask != null) {
-      checkForUnavailableWorkerTimeOutTask.cancel(true)
-    }
-    if (checkForApplicationTimeOutTask != null) {
-      checkForApplicationTimeOutTask.cancel(true)
-    }
-    if (checkForHDFSRemnantDirsTimeOutTask != null) {
-      checkForHDFSRemnantDirsTimeOutTask.cancel(true)
-    }
-    if (checkForS3RemnantDirsTimeOutTask != null) {
-      checkForS3RemnantDirsTimeOutTask.cancel(true)
-    }
+    Option(checkForWorkerTimeOutTask).foreach(_.cancel(true))
+    Option(checkForUnavailableWorkerTimeOutTask).foreach(_.cancel(true))
+    Option(checkForApplicationTimeOutTask).foreach(_.cancel(true))
+    Option(checkForDFSRemnantDirsTimeOutTask).foreach(_.cancel(true))
     forwardMessageThread.shutdownNow()
     rackResolver.stop()
     if (authEnabled) {

Reply via email to