This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 960ba2406 [CELEBORN-1531] Refactor self checks in master
960ba2406 is described below
commit 960ba2406f3af19097fd4f40d06eed568a33255a
Author: zhaohehuhu <[email protected]>
AuthorDate: Tue Aug 13 10:29:35 2024 +0800
[CELEBORN-1531] Refactor self checks in master
### What changes were proposed in this pull request?
as title
### Why are the changes needed?
add a scheduleCheckTask method to refactor some code
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
Closes #2653 from zhaohehuhu/dev-0731.
Authored-by: zhaohehuhu <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../celeborn/service/deploy/master/Master.scala | 54 ++++++++--------------
1 file changed, 19 insertions(+), 35 deletions(-)
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 134361b11..d43ece215 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -294,52 +294,36 @@ private[celeborn] class Master(
sendApplicationMetaThreads,
"send-application-meta")
}
- checkForWorkerTimeOutTask = forwardMessageThread.scheduleWithFixedDelay(
- new Runnable {
- override def run(): Unit = Utils.tryLogNonFatalError {
- self.send(ControlMessages.pbCheckForWorkerTimeout)
- }
- },
- 0,
- workerHeartbeatTimeoutMs,
- TimeUnit.MILLISECONDS)
- checkForApplicationTimeOutTask =
forwardMessageThread.scheduleWithFixedDelay(
- new Runnable {
- override def run(): Unit = Utils.tryLogNonFatalError {
- self.send(CheckForApplicationTimeOut)
- }
- },
- 0,
- appHeartbeatTimeoutMs / 2,
- TimeUnit.MILLISECONDS)
+ checkForWorkerTimeOutTask = scheduleCheckTask(workerHeartbeatTimeoutMs,
pbCheckForWorkerTimeout)
+ checkForApplicationTimeOutTask =
+ scheduleCheckTask(appHeartbeatTimeoutMs / 2, CheckForApplicationTimeOut)
if (workerUnavailableInfoExpireTimeoutMs > 0) {
- checkForUnavailableWorkerTimeOutTask =
forwardMessageThread.scheduleWithFixedDelay(
- new Runnable {
- override def run(): Unit = Utils.tryLogNonFatalError {
- self.send(CheckForWorkerUnavailableInfoTimeout)
- }
- },
- 0,
+ scheduleCheckTask(
workerUnavailableInfoExpireTimeoutMs / 2,
- TimeUnit.MILLISECONDS)
+ CheckForWorkerUnavailableInfoTimeout)
}
if (hasHDFSStorage || hasS3Storage) {
- checkForS3RemnantDirsTimeOutTask =
forwardMessageThread.scheduleWithFixedDelay(
- new Runnable {
- override def run(): Unit = Utils.tryLogNonFatalError {
- self.send(CheckForDFSExpiredDirsTimeout)
- }
- },
- dfsExpireDirsTimeoutMS,
- dfsExpireDirsTimeoutMS,
- TimeUnit.MILLISECONDS)
+ checkForHDFSRemnantDirsTimeOutTask =
+ scheduleCheckTask(dfsExpireDirsTimeoutMS,
CheckForDFSExpiredDirsTimeout)
}
}
+ def scheduleCheckTask[T](timeoutMS: Long, message: T): ScheduledFuture[_] = {
+ forwardMessageThread.scheduleWithFixedDelay(
+ new Runnable {
+ override def run(): Unit = Utils.tryLogNonFatalError {
+ self.send(message)
+ }
+ },
+ timeoutMS,
+ timeoutMS,
+ TimeUnit.MILLISECONDS)
+ }
+
override def onStop(): Unit = {
if (!threadsStarted.compareAndSet(true, false)) {
return