This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new da04703fe [CELEBORN-1032][FOLLOWUP] Use scheduleWithFixedDelay instead
of scheduleAtFixedRate in threads pool of master and worker
da04703fe is described below
commit da04703fea124a42204c0ecd447a3f3fa4a95bfa
Author: SteNicholas <[email protected]>
AuthorDate: Fri Oct 27 11:20:28 2023 +0800
[CELEBORN-1032][FOLLOWUP] Use scheduleWithFixedDelay instead of
scheduleAtFixedRate in threads pool of master and worker
### What changes were proposed in this pull request?
Use `scheduleWithFixedDelay` instead of `scheduleAtFixedRate` in thread
pool of Celeborn Master and Worker.
### Why are the changes needed?
Follow up #1970.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Internal tests.
Closes #2048 from SteNicholas/CELEBORN-1032.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
(cherry picked from commit df40a28959d6845e9ddb91a433eae4f062194b54)
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../java/org/apache/spark/shuffle/celeborn/SendBufferPool.java | 2 +-
.../main/java/org/apache/celeborn/client/ReviveManager.java | 2 +-
.../org/apache/celeborn/client/ApplicationHeartbeater.scala | 2 +-
.../org/apache/celeborn/client/ChangePartitionManager.scala | 2 +-
.../main/scala/org/apache/celeborn/client/CommitManager.scala | 2 +-
.../scala/org/apache/celeborn/client/LifecycleManager.scala | 2 +-
.../org/apache/celeborn/client/ReleasePartitionManager.scala | 2 +-
.../common/network/server/TransportChannelHandler.java | 2 +-
.../org/apache/celeborn/common/meta/AppDiskUsageMetric.scala | 2 +-
.../org/apache/celeborn/service/deploy/master/Master.scala | 10 +++++-----
.../org/apache/celeborn/service/deploy/worker/Worker.scala | 4 ++--
.../celeborn/service/deploy/worker/storage/DeviceMonitor.scala | 2 +-
.../service/deploy/worker/storage/StorageManager.scala | 4 ++--
13 files changed, 19 insertions(+), 19 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SendBufferPool.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SendBufferPool.java
index 849694518..9731a654b 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SendBufferPool.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SendBufferPool.java
@@ -57,7 +57,7 @@ public class SendBufferPool {
pushTaskQueues = new LinkedList<>();
lastAquireTime = System.currentTimeMillis();
- cleaner.scheduleAtFixedRate(
+ cleaner.scheduleWithFixedDelay(
() -> {
if (System.currentTimeMillis() - lastAquireTime > timeout) {
synchronized (this) {
diff --git a/client/src/main/java/org/apache/celeborn/client/ReviveManager.java
b/client/src/main/java/org/apache/celeborn/client/ReviveManager.java
index c764351a5..3bbf28ecc 100644
--- a/client/src/main/java/org/apache/celeborn/client/ReviveManager.java
+++ b/client/src/main/java/org/apache/celeborn/client/ReviveManager.java
@@ -46,7 +46,7 @@ class ReviveManager {
this.interval = conf.clientPushReviveInterval();
this.batchSize = conf.clientPushReviveBatchSize();
- batchReviveRequestScheduler.scheduleAtFixedRate(
+ batchReviveRequestScheduler.scheduleWithFixedDelay(
() -> {
Map<Integer, Set<ReviveRequest>> shuffleMap = new HashMap<>();
do {
diff --git
a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
index 38dc2a47d..e117ec3e1 100644
---
a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
@@ -43,7 +43,7 @@ class ApplicationHeartbeater(
private var appHeartbeat: ScheduledFuture[_] = _
def start(): Unit = {
- appHeartbeat = appHeartbeatHandlerThread.scheduleAtFixedRate(
+ appHeartbeat = appHeartbeatHandlerThread.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = {
try {
diff --git
a/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala
b/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala
index ca3b2077e..e7b1a45b2 100644
---
a/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala
@@ -70,7 +70,7 @@ class ChangePartitionManager(
def start(): Unit = {
batchHandleChangePartition = batchHandleChangePartitionSchedulerThread.map
{
// noinspection ConvertExpressionToSAM
- _.scheduleAtFixedRate(
+ _.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = {
try {
diff --git
a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
index 0baacd10e..e7a81aa61 100644
--- a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
@@ -94,7 +94,7 @@ class CommitManager(appUniqueId: String, val conf:
CelebornConf, lifecycleManage
lifecycleManager.registerWorkerStatusListener(new ShutdownWorkerListener)
batchHandleCommitPartition = batchHandleCommitPartitionSchedulerThread.map
{
- _.scheduleAtFixedRate(
+ _.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = {
committedPartitionInfo.asScala.foreach { case (shuffleId,
shuffleCommittedInfo) =>
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 807c4a660..ea1517a21 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -147,7 +147,7 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
override def onStart(): Unit = {
// noinspection ConvertExpressionToSAM
- checkForShuffleRemoval = forwardMessageThread.scheduleAtFixedRate(
+ checkForShuffleRemoval = forwardMessageThread.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(RemoveExpiredShuffle)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala
b/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala
index e28b9a46a..aabb965ae 100644
---
a/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala
@@ -54,7 +54,7 @@ class ReleasePartitionManager(
def start(): Unit = {
batchHandleReleasePartition =
batchHandleReleasePartitionSchedulerThread.map {
// noinspection ConvertExpressionToSAM
- _.scheduleAtFixedRate(
+ _.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = {
try {
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportChannelHandler.java
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportChannelHandler.java
index f0eae1a7e..01219516e 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportChannelHandler.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportChannelHandler.java
@@ -106,7 +106,7 @@ public class TransportChannelHandler extends
ChannelInboundHandlerAdapter {
if (enableHeartbeat) {
heartbeatFuture =
ctx.executor()
- .scheduleAtFixedRate(
+ .scheduleWithFixedDelay(
() -> {
logger.debug("send heartbeat");
ctx.writeAndFlush(new Heartbeat());
diff --git
a/common/src/main/scala/org/apache/celeborn/common/meta/AppDiskUsageMetric.scala
b/common/src/main/scala/org/apache/celeborn/common/meta/AppDiskUsageMetric.scala
index 46b7452d5..f66756ba1 100644
---
a/common/src/main/scala/org/apache/celeborn/common/meta/AppDiskUsageMetric.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/meta/AppDiskUsageMetric.scala
@@ -136,7 +136,7 @@ class AppDiskUsageMetric(conf: CelebornConf) extends
Logging {
})
}
- logExecutor.scheduleAtFixedRate(
+ logExecutor.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = {
if (currentSnapShot.get() != null) {
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 73957e6e1..51b03174c 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -133,7 +133,7 @@ private[celeborn] class Master(
conf.estimatedPartitionSizeForEstimationUpdateInterval
private val partitionSizeUpdateService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("partition-size-updater")
- partitionSizeUpdateService.scheduleAtFixedRate(
+ partitionSizeUpdateService.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = {
executeWithLeaderChecker(
@@ -196,7 +196,7 @@ private[celeborn] class Master(
// start threads to check timeout for workers and applications
override def onStart(): Unit = {
- checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(
+ checkForWorkerTimeOutTask = forwardMessageThread.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(ControlMessages.pbCheckForWorkerTimeout)
@@ -206,7 +206,7 @@ private[celeborn] class Master(
workerHeartbeatTimeoutMs,
TimeUnit.MILLISECONDS)
- checkForApplicationTimeOutTask = forwardMessageThread.scheduleAtFixedRate(
+ checkForApplicationTimeOutTask =
forwardMessageThread.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForApplicationTimeOut)
@@ -216,7 +216,7 @@ private[celeborn] class Master(
appHeartbeatTimeoutMs / 2,
TimeUnit.MILLISECONDS)
- checkForUnavailableWorkerTimeOutTask =
forwardMessageThread.scheduleAtFixedRate(
+ checkForUnavailableWorkerTimeOutTask =
forwardMessageThread.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForWorkerUnavailableInfoTimeout)
@@ -227,7 +227,7 @@ private[celeborn] class Master(
TimeUnit.MILLISECONDS)
if (hasHDFSStorage) {
- checkForHDFSRemnantDirsTimeOutTask =
forwardMessageThread.scheduleAtFixedRate(
+ checkForHDFSRemnantDirsTimeOutTask =
forwardMessageThread.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForHDFSExpiredDirsTimeout)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 75782ed04..14410b6a8 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -372,7 +372,7 @@ private[celeborn] class Worker(
registerWithMaster()
// start heartbeat
- sendHeartbeatTask = forwardMessageScheduler.scheduleAtFixedRate(
+ sendHeartbeatTask = forwardMessageScheduler.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
heartbeatToMaster() }
},
@@ -380,7 +380,7 @@ private[celeborn] class Worker(
heartbeatInterval,
TimeUnit.MILLISECONDS)
- checkFastFailTask = forwardMessageScheduler.scheduleAtFixedRate(
+ checkFastFailTask = forwardMessageScheduler.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
unavailablePeers.entrySet().forEach { entry: JMap.Entry[WorkerInfo,
Long] =>
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
index 714529c58..4681140b9 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
@@ -103,7 +103,7 @@ class LocalDeviceMonitor(
}
override def startCheck(): Unit = {
- diskChecker.scheduleAtFixedRate(
+ diskChecker.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = {
logDebug("Device check start")
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index cf6e308e9..af49cae60 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -211,7 +211,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
saveCommittedFileInfosExecutor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"StorageManager-save-committed-fileinfo-thread")
- saveCommittedFileInfosExecutor.scheduleAtFixedRate(
+ saveCommittedFileInfosExecutor.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = {
if (!committedFileInfos.isEmpty) {
@@ -585,7 +585,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
private val storageScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("storage-scheduler")
- storageScheduler.scheduleAtFixedRate(
+ storageScheduler.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = {
try {