This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new b5201df04 [CELEBORN-1531][FOLLOWUP] Assign the scheduled check task
b5201df04 is described below
commit b5201df04c7b591916fffb86c369ec6157db3774
Author: Wang, Fei <[email protected]>
AuthorDate: Tue Nov 5 11:18:33 2024 +0800
[CELEBORN-1531][FOLLOWUP] Assign the scheduled check task
### What changes were proposed in this pull request?
As title.
### Why are the changes needed?
Followup for https://github.com/apache/celeborn/pull/2653
`checkForUnavailableWorkerTimeOutTask` and
`checkForS3RemnantDirsTimeOutTask` are not assigned and always null.
<img width="834" alt="image"
src="https://github.com/user-attachments/assets/747a3054-87db-458f-acf8-876926bd1883">
Combine the `checkForHDFSRemnantDirsTimeOutTask` and
`checkForS3RemnantDirsTimeOutTask` with `checkForDFSRemnantDirsTimeOutTask`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GA.
Closes #2871 from turboFei/1531_followup.
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../celeborn/service/deploy/master/Master.scala | 28 +++++++---------------
1 file changed, 8 insertions(+), 20 deletions(-)
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 1f065b3d8..dae240977 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -171,8 +171,7 @@ private[celeborn] class Master(
private var checkForWorkerTimeOutTask: ScheduledFuture[_] = _
private var checkForApplicationTimeOutTask: ScheduledFuture[_] = _
private var checkForUnavailableWorkerTimeOutTask: ScheduledFuture[_] = _
- private var checkForHDFSRemnantDirsTimeOutTask: ScheduledFuture[_] = _
- private var checkForS3RemnantDirsTimeOutTask: ScheduledFuture[_] = _
+ private var checkForDFSRemnantDirsTimeOutTask: ScheduledFuture[_] = _
private val nonEagerHandler =
ThreadUtils.newDaemonCachedThreadPool("master-noneager-handler", 64)
// Config constants
@@ -309,19 +308,19 @@ private[celeborn] class Master(
scheduleCheckTask(appHeartbeatTimeoutMs / 2, CheckForApplicationTimeOut)
if (workerUnavailableInfoExpireTimeoutMs > 0) {
- scheduleCheckTask(
+ checkForUnavailableWorkerTimeOutTask = scheduleCheckTask(
workerUnavailableInfoExpireTimeoutMs / 2,
CheckForWorkerUnavailableInfoTimeout)
}
if (hasHDFSStorage || hasS3Storage) {
- checkForHDFSRemnantDirsTimeOutTask =
+ checkForDFSRemnantDirsTimeOutTask =
scheduleCheckTask(dfsExpireDirsTimeoutMS,
CheckForDFSExpiredDirsTimeout)
}
}
- def scheduleCheckTask[T](timeoutMS: Long, message: T): ScheduledFuture[_] = {
+ private def scheduleCheckTask[T](timeoutMS: Long, message: T):
ScheduledFuture[_] = {
forwardMessageThread.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
@@ -338,21 +337,10 @@ private[celeborn] class Master(
return
}
logInfo("Stopping Celeborn Master.")
- if (checkForWorkerTimeOutTask != null) {
- checkForWorkerTimeOutTask.cancel(true)
- }
- if (checkForUnavailableWorkerTimeOutTask != null) {
- checkForUnavailableWorkerTimeOutTask.cancel(true)
- }
- if (checkForApplicationTimeOutTask != null) {
- checkForApplicationTimeOutTask.cancel(true)
- }
- if (checkForHDFSRemnantDirsTimeOutTask != null) {
- checkForHDFSRemnantDirsTimeOutTask.cancel(true)
- }
- if (checkForS3RemnantDirsTimeOutTask != null) {
- checkForS3RemnantDirsTimeOutTask.cancel(true)
- }
+ Option(checkForWorkerTimeOutTask).foreach(_.cancel(true))
+ Option(checkForUnavailableWorkerTimeOutTask).foreach(_.cancel(true))
+ Option(checkForApplicationTimeOutTask).foreach(_.cancel(true))
+ Option(checkForDFSRemnantDirsTimeOutTask).foreach(_.cancel(true))
forwardMessageThread.shutdownNow()
rackResolver.stop()
if (authEnabled) {