This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new da04703fe [CELEBORN-1032][FOLLOWUP] Use scheduleWithFixedDelay instead 
of scheduleAtFixedRate in threads pool of master and worker
da04703fe is described below

commit da04703fea124a42204c0ecd447a3f3fa4a95bfa
Author: SteNicholas <[email protected]>
AuthorDate: Fri Oct 27 11:20:28 2023 +0800

    [CELEBORN-1032][FOLLOWUP] Use scheduleWithFixedDelay instead of 
scheduleAtFixedRate in threads pool of master and worker
    
    ### What changes were proposed in this pull request?
    
    Use `scheduleWithFixedDelay` instead of `scheduleAtFixedRate` in thread 
pool of Celeborn Master and Worker.
    
    ### Why are the changes needed?
    
    Follow up #1970.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Internal tests.
    
    Closes #2048 from SteNicholas/CELEBORN-1032.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
    (cherry picked from commit df40a28959d6845e9ddb91a433eae4f062194b54)
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../java/org/apache/spark/shuffle/celeborn/SendBufferPool.java |  2 +-
 .../main/java/org/apache/celeborn/client/ReviveManager.java    |  2 +-
 .../org/apache/celeborn/client/ApplicationHeartbeater.scala    |  2 +-
 .../org/apache/celeborn/client/ChangePartitionManager.scala    |  2 +-
 .../main/scala/org/apache/celeborn/client/CommitManager.scala  |  2 +-
 .../scala/org/apache/celeborn/client/LifecycleManager.scala    |  2 +-
 .../org/apache/celeborn/client/ReleasePartitionManager.scala   |  2 +-
 .../common/network/server/TransportChannelHandler.java         |  2 +-
 .../org/apache/celeborn/common/meta/AppDiskUsageMetric.scala   |  2 +-
 .../org/apache/celeborn/service/deploy/master/Master.scala     | 10 +++++-----
 .../org/apache/celeborn/service/deploy/worker/Worker.scala     |  4 ++--
 .../celeborn/service/deploy/worker/storage/DeviceMonitor.scala |  2 +-
 .../service/deploy/worker/storage/StorageManager.scala         |  4 ++--
 13 files changed, 19 insertions(+), 19 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SendBufferPool.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SendBufferPool.java
index 849694518..9731a654b 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SendBufferPool.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SendBufferPool.java
@@ -57,7 +57,7 @@ public class SendBufferPool {
     pushTaskQueues = new LinkedList<>();
 
     lastAquireTime = System.currentTimeMillis();
-    cleaner.scheduleAtFixedRate(
+    cleaner.scheduleWithFixedDelay(
         () -> {
           if (System.currentTimeMillis() - lastAquireTime > timeout) {
             synchronized (this) {
diff --git a/client/src/main/java/org/apache/celeborn/client/ReviveManager.java 
b/client/src/main/java/org/apache/celeborn/client/ReviveManager.java
index c764351a5..3bbf28ecc 100644
--- a/client/src/main/java/org/apache/celeborn/client/ReviveManager.java
+++ b/client/src/main/java/org/apache/celeborn/client/ReviveManager.java
@@ -46,7 +46,7 @@ class ReviveManager {
     this.interval = conf.clientPushReviveInterval();
     this.batchSize = conf.clientPushReviveBatchSize();
 
-    batchReviveRequestScheduler.scheduleAtFixedRate(
+    batchReviveRequestScheduler.scheduleWithFixedDelay(
         () -> {
           Map<Integer, Set<ReviveRequest>> shuffleMap = new HashMap<>();
           do {
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala 
b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
index 38dc2a47d..e117ec3e1 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala
@@ -43,7 +43,7 @@ class ApplicationHeartbeater(
   private var appHeartbeat: ScheduledFuture[_] = _
 
   def start(): Unit = {
-    appHeartbeat = appHeartbeatHandlerThread.scheduleAtFixedRate(
+    appHeartbeat = appHeartbeatHandlerThread.scheduleWithFixedDelay(
       new Runnable {
         override def run(): Unit = {
           try {
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala
index ca3b2077e..e7b1a45b2 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala
@@ -70,7 +70,7 @@ class ChangePartitionManager(
   def start(): Unit = {
     batchHandleChangePartition = batchHandleChangePartitionSchedulerThread.map 
{
       // noinspection ConvertExpressionToSAM
-      _.scheduleAtFixedRate(
+      _.scheduleWithFixedDelay(
         new Runnable {
           override def run(): Unit = {
             try {
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
index 0baacd10e..e7a81aa61 100644
--- a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
@@ -94,7 +94,7 @@ class CommitManager(appUniqueId: String, val conf: 
CelebornConf, lifecycleManage
     lifecycleManager.registerWorkerStatusListener(new ShutdownWorkerListener)
 
     batchHandleCommitPartition = batchHandleCommitPartitionSchedulerThread.map 
{
-      _.scheduleAtFixedRate(
+      _.scheduleWithFixedDelay(
         new Runnable {
           override def run(): Unit = {
             committedPartitionInfo.asScala.foreach { case (shuffleId, 
shuffleCommittedInfo) =>
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index 807c4a660..ea1517a21 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -147,7 +147,7 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
 
   override def onStart(): Unit = {
     // noinspection ConvertExpressionToSAM
-    checkForShuffleRemoval = forwardMessageThread.scheduleAtFixedRate(
+    checkForShuffleRemoval = forwardMessageThread.scheduleWithFixedDelay(
       new Runnable {
         override def run(): Unit = Utils.tryLogNonFatalError {
           self.send(RemoveExpiredShuffle)
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala
 
b/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala
index e28b9a46a..aabb965ae 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala
@@ -54,7 +54,7 @@ class ReleasePartitionManager(
   def start(): Unit = {
     batchHandleReleasePartition = 
batchHandleReleasePartitionSchedulerThread.map {
       // noinspection ConvertExpressionToSAM
-      _.scheduleAtFixedRate(
+      _.scheduleWithFixedDelay(
         new Runnable {
           override def run(): Unit = {
             try {
diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportChannelHandler.java
 
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportChannelHandler.java
index f0eae1a7e..01219516e 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/server/TransportChannelHandler.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/server/TransportChannelHandler.java
@@ -106,7 +106,7 @@ public class TransportChannelHandler extends 
ChannelInboundHandlerAdapter {
     if (enableHeartbeat) {
       heartbeatFuture =
           ctx.executor()
-              .scheduleAtFixedRate(
+              .scheduleWithFixedDelay(
                   () -> {
                     logger.debug("send heartbeat");
                     ctx.writeAndFlush(new Heartbeat());
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/meta/AppDiskUsageMetric.scala
 
b/common/src/main/scala/org/apache/celeborn/common/meta/AppDiskUsageMetric.scala
index 46b7452d5..f66756ba1 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/meta/AppDiskUsageMetric.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/meta/AppDiskUsageMetric.scala
@@ -136,7 +136,7 @@ class AppDiskUsageMetric(conf: CelebornConf) extends 
Logging {
     })
   }
 
-  logExecutor.scheduleAtFixedRate(
+  logExecutor.scheduleWithFixedDelay(
     new Runnable {
       override def run(): Unit = {
         if (currentSnapShot.get() != null) {
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 73957e6e1..51b03174c 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -133,7 +133,7 @@ private[celeborn] class Master(
     conf.estimatedPartitionSizeForEstimationUpdateInterval
   private val partitionSizeUpdateService =
     
ThreadUtils.newDaemonSingleThreadScheduledExecutor("partition-size-updater")
-  partitionSizeUpdateService.scheduleAtFixedRate(
+  partitionSizeUpdateService.scheduleWithFixedDelay(
     new Runnable {
       override def run(): Unit = {
         executeWithLeaderChecker(
@@ -196,7 +196,7 @@ private[celeborn] class Master(
 
   // start threads to check timeout for workers and applications
   override def onStart(): Unit = {
-    checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(
+    checkForWorkerTimeOutTask = forwardMessageThread.scheduleWithFixedDelay(
       new Runnable {
         override def run(): Unit = Utils.tryLogNonFatalError {
           self.send(ControlMessages.pbCheckForWorkerTimeout)
@@ -206,7 +206,7 @@ private[celeborn] class Master(
       workerHeartbeatTimeoutMs,
       TimeUnit.MILLISECONDS)
 
-    checkForApplicationTimeOutTask = forwardMessageThread.scheduleAtFixedRate(
+    checkForApplicationTimeOutTask = 
forwardMessageThread.scheduleWithFixedDelay(
       new Runnable {
         override def run(): Unit = Utils.tryLogNonFatalError {
           self.send(CheckForApplicationTimeOut)
@@ -216,7 +216,7 @@ private[celeborn] class Master(
       appHeartbeatTimeoutMs / 2,
       TimeUnit.MILLISECONDS)
 
-    checkForUnavailableWorkerTimeOutTask = 
forwardMessageThread.scheduleAtFixedRate(
+    checkForUnavailableWorkerTimeOutTask = 
forwardMessageThread.scheduleWithFixedDelay(
       new Runnable {
         override def run(): Unit = Utils.tryLogNonFatalError {
           self.send(CheckForWorkerUnavailableInfoTimeout)
@@ -227,7 +227,7 @@ private[celeborn] class Master(
       TimeUnit.MILLISECONDS)
 
     if (hasHDFSStorage) {
-      checkForHDFSRemnantDirsTimeOutTask = 
forwardMessageThread.scheduleAtFixedRate(
+      checkForHDFSRemnantDirsTimeOutTask = 
forwardMessageThread.scheduleWithFixedDelay(
         new Runnable {
           override def run(): Unit = Utils.tryLogNonFatalError {
             self.send(CheckForHDFSExpiredDirsTimeout)
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 75782ed04..14410b6a8 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -372,7 +372,7 @@ private[celeborn] class Worker(
     registerWithMaster()
 
     // start heartbeat
-    sendHeartbeatTask = forwardMessageScheduler.scheduleAtFixedRate(
+    sendHeartbeatTask = forwardMessageScheduler.scheduleWithFixedDelay(
       new Runnable {
         override def run(): Unit = Utils.tryLogNonFatalError { 
heartbeatToMaster() }
       },
@@ -380,7 +380,7 @@ private[celeborn] class Worker(
       heartbeatInterval,
       TimeUnit.MILLISECONDS)
 
-    checkFastFailTask = forwardMessageScheduler.scheduleAtFixedRate(
+    checkFastFailTask = forwardMessageScheduler.scheduleWithFixedDelay(
       new Runnable {
         override def run(): Unit = Utils.tryLogNonFatalError {
           unavailablePeers.entrySet().forEach { entry: JMap.Entry[WorkerInfo, 
Long] =>
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
index 714529c58..4681140b9 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala
@@ -103,7 +103,7 @@ class LocalDeviceMonitor(
   }
 
   override def startCheck(): Unit = {
-    diskChecker.scheduleAtFixedRate(
+    diskChecker.scheduleWithFixedDelay(
       new Runnable {
         override def run(): Unit = {
           logDebug("Device check start")
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index cf6e308e9..af49cae60 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -211,7 +211,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
     saveCommittedFileInfosExecutor =
       ThreadUtils.newDaemonSingleThreadScheduledExecutor(
         "StorageManager-save-committed-fileinfo-thread")
-    saveCommittedFileInfosExecutor.scheduleAtFixedRate(
+    saveCommittedFileInfosExecutor.scheduleWithFixedDelay(
       new Runnable {
         override def run(): Unit = {
           if (!committedFileInfos.isEmpty) {
@@ -585,7 +585,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
   private val storageScheduler =
     ThreadUtils.newDaemonSingleThreadScheduledExecutor("storage-scheduler")
 
-  storageScheduler.scheduleAtFixedRate(
+  storageScheduler.scheduleWithFixedDelay(
     new Runnable {
       override def run(): Unit = {
         try {

Reply via email to