This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 49ea88103 [MINOR] Remove unnecessary increment index of 
Master#timeoutDeadWorkers
49ea88103 is described below

commit 49ea8810376ceb387de0ef855b7e9205101c3958
Author: SteNicholas <[email protected]>
AuthorDate: Mon Oct 23 22:18:39 2023 +0800

    [MINOR] Remove unnecessary increment index of Master#timeoutDeadWorkers
    
    ### What changes were proposed in this pull request?
    
    Remove unnecessary increment index of `Master#timeoutDeadWorkers`.
    
    ### Why are the changes needed?
    
    Increment index of `Master#timeoutDeadWorkers` is unnecessary.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    No.
    
    Closes #2027 from SteNicholas/timeout-dead-workers.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../shuffle/celeborn/CelebornShuffleReader.scala   |  2 +-
 .../shuffle/celeborn/CelebornShuffleReader.scala   |  2 +-
 .../org/apache/celeborn/client/CommitManager.scala |  2 +-
 .../celeborn/client/ShuffleClientHelper.scala      |  4 ++--
 .../celeborn/client/WorkerStatusTrackerSuite.scala |  8 +++----
 .../common/identity/DefaultIdentityProvider.scala  |  2 +-
 .../celeborn/service/deploy/master/Master.scala    | 14 +++++------
 .../service/deploy/worker/PushDataHandler.scala    |  2 +-
 .../deploy/worker/storage/StorageManager.scala     |  2 +-
 .../service/deploy/memory/MemoryManagerSuite.scala | 28 +++++++++++-----------
 10 files changed, 32 insertions(+), 34 deletions(-)

diff --git 
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
 
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index c26a0280b..7518b1e6b 100644
--- 
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++ 
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -75,7 +75,7 @@ class CelebornShuffleReader[K, C](
           streamCreatorPool = ThreadUtils.newDaemonCachedThreadPool(
             "celeborn-create-stream-thread",
             conf.readStreamCreatorPoolThreads,
-            60);
+            60)
         }
       }
     }
diff --git 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index 6a0ebedc9..063ad0b6f 100644
--- 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++ 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -77,7 +77,7 @@ class CelebornShuffleReader[K, C](
           streamCreatorPool = ThreadUtils.newDaemonCachedThreadPool(
             "celeborn-create-stream-thread",
             conf.readStreamCreatorPoolThreads,
-            60);
+            60)
         }
       }
     }
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
index 0baacd10e..1322f02d8 100644
--- a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala
@@ -191,7 +191,7 @@ class CommitManager(appUniqueId: String, val conf: 
CelebornConf, lifecycleManage
         new AtomicInteger(),
         JavaUtils.newConcurrentHashMap[Int, AtomicInteger]()))
 
-    getCommitHandler(shuffleId).registerShuffle(shuffleId, numMappers);
+    getCommitHandler(shuffleId).registerShuffle(shuffleId, numMappers)
   }
 
   def isMapperEnded(shuffleId: Int, mapId: Int): Boolean = {
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/ShuffleClientHelper.scala 
b/client/src/main/scala/org/apache/celeborn/client/ShuffleClientHelper.scala
index f8a69468c..b94ba37c7 100644
--- a/client/src/main/scala/org/apache/celeborn/client/ShuffleClientHelper.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/ShuffleClientHelper.scala
@@ -52,7 +52,7 @@ object ShuffleClientHelper extends Logging {
           logInfo(s"Stage ended for $shuffleId")
         } else {
           logInfo(s"split failed for $respStatus, " +
-            s"shuffle file can be larger than expected, try split again");
+            s"shuffle file can be larger than expected, try split again")
         }
         splittingSet.remove(partitionId)
       case Failure(exception) =>
@@ -60,7 +60,7 @@ object ShuffleClientHelper extends Logging {
         logWarning(
           s"Shuffle file split failed for map ${shuffleId} partitionId 
${partitionId}," +
             s" try again, detail : {}",
-          exception);
+          exception)
 
     }(concurrent.ExecutionContext.fromExecutorService(executors))
   }
diff --git 
a/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
 
b/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
index d060d7108..5d3355a33 100644
--- 
a/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
+++ 
b/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
@@ -32,12 +32,12 @@ class WorkerStatusTrackerSuite extends CelebornFunSuite {
 
   test("handleHeartbeatResponse") {
     val celebornConf = new CelebornConf()
-    celebornConf.set(CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT, 2000L);
+    celebornConf.set(CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT, 2000L)
     val statusTracker = new WorkerStatusTracker(celebornConf, null)
 
     val registerTime = System.currentTimeMillis()
-    statusTracker.excludedWorkers.put(mock("host1"), 
(StatusCode.WORKER_UNKNOWN, registerTime));
-    statusTracker.excludedWorkers.put(mock("host2"), 
(StatusCode.WORKER_SHUTDOWN, registerTime));
+    statusTracker.excludedWorkers.put(mock("host1"), 
(StatusCode.WORKER_UNKNOWN, registerTime))
+    statusTracker.excludedWorkers.put(mock("host2"), 
(StatusCode.WORKER_SHUTDOWN, registerTime))
 
     // test reserve (only statusCode list in handleHeartbeatResponse)
     val empty = buildResponse(Array.empty, Array.empty, Array.empty)
@@ -105,6 +105,6 @@ class WorkerStatusTrackerSuite extends CelebornFunSuite {
   }
 
   private def mock(host: String): WorkerInfo = {
-    new WorkerInfo(host, -1, -1, -1, -1);
+    new WorkerInfo(host, -1, -1, -1, -1)
   }
 }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/identity/DefaultIdentityProvider.scala
 
b/common/src/main/scala/org/apache/celeborn/common/identity/DefaultIdentityProvider.scala
index 7da111396..76b89b331 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/identity/DefaultIdentityProvider.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/identity/DefaultIdentityProvider.scala
@@ -21,7 +21,7 @@ import org.apache.celeborn.common.CelebornConf
 
 class DefaultIdentityProvider extends IdentityProvider {
   override def provide(): UserIdentifier = {
-    val conf = new CelebornConf();
+    val conf = new CelebornConf()
     UserIdentifier(
       conf.quotaUserSpecificTenant,
       conf.quotaUserSpecificUserName)
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 7493fbcb9..a94989dd6 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -427,14 +427,13 @@ private[celeborn] class Master(
       executeWithLeaderChecker(context, handleCheckWorkersAvailable(context))
   }
 
-  private def timeoutDeadWorkers() {
+  private def timeoutDeadWorkers(): Unit = {
     val currentTime = System.currentTimeMillis()
     // Need increase timeout deadline to avoid long time leader election period
     if (HAHelper.getWorkerTimeoutDeadline(statusSystem) > currentTime) {
       return
     }
 
-    var ind = 0
     workersSnapShot.asScala.foreach { worker =>
       if (worker.lastHeartbeat < currentTime - workerHeartbeatTimeoutMs
         && !statusSystem.workerLostEvents.contains(worker)) {
@@ -448,7 +447,6 @@ private[celeborn] class Master(
           worker.replicatePort,
           MasterClient.genRequestId()))
       }
-      ind += 1
     }
   }
 
@@ -467,7 +465,7 @@ private[celeborn] class Master(
       logDebug(s"Remove unavailable info for workers: 
$unavailableInfoTimeoutWorkers")
       self.send(RemoveWorkersUnavailableInfo(
         unavailableInfoTimeoutWorkers,
-        MasterClient.genRequestId()));
+        MasterClient.genRequestId()))
     }
   }
 
@@ -766,7 +764,7 @@ private[celeborn] class Master(
     }
     val hdfsWorkPath = new Path(conf.hdfsDir, conf.workerWorkingDir)
     if (hadoopFs.exists(hdfsWorkPath)) {
-      if (!expiredDir.isEmpty) {
+      if (expiredDir.nonEmpty) {
         val dirToDelete = new Path(hdfsWorkPath, expiredDir)
         // delete specific app dir on application lost
         CelebornHadoopUtils.deleteHDFSPathOrLogError(hadoopFs, dirToDelete, 
true)
@@ -812,7 +810,7 @@ private[celeborn] class Master(
   private def handleRemoveWorkersUnavailableInfos(
       unavailableWorkers: util.List[WorkerInfo],
       requestId: String): Unit = {
-    statusSystem.handleRemoveWorkersUnavailableInfo(unavailableWorkers, 
requestId);
+    statusSystem.handleRemoveWorkersUnavailableInfo(unavailableWorkers, 
requestId)
   }
 
   private def computeUserResourceConsumption(userIdentifier: UserIdentifier)
@@ -877,7 +875,7 @@ private[celeborn] class Master(
   override def getMasterGroupInfo: String = {
     val sb = new StringBuilder
     sb.append("====================== Master Group INFO 
==============================\n")
-    sb.append(getMasterGroupInfoInternal())
+    sb.append(getMasterGroupInfoInternal)
     sb.toString()
   }
 
@@ -981,7 +979,7 @@ private[celeborn] class Master(
     isActive
   }
 
-  private def getMasterGroupInfoInternal(): String = {
+  private def getMasterGroupInfoInternal: String = {
     if (conf.haEnabled) {
       val sb = new StringBuilder
       val groupInfo = 
statusSystem.asInstanceOf[HAMasterMetaManager].getRatisServer.getGroupInfo
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index a1bf40f57..4cf8456de 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -759,7 +759,7 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
         logError(s"Error while handle${message.`type`()} $message", e)
         client.getChannel.writeAndFlush(new RpcFailure(
           requestId,
-          Throwables.getStackTraceAsString(e)));
+          Throwables.getStackTraceAsString(e)))
     } finally {
       message.body().release()
     }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 9f370a6bc..79390ee77 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -759,7 +759,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
     })
     hdfsWriters.forEach(new BiConsumer[String, FileWriter] {
       override def accept(t: String, u: FileWriter): Unit = {
-        u.flushOnMemoryPressure();
+        u.flushOnMemoryPressure()
       }
     })
   }
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
index bfbb42043..f26962929 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala
@@ -42,7 +42,7 @@ class MemoryManagerSuite extends CelebornFunSuite {
       .set(WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE, 0.85)
     val caught =
       intercept[IllegalArgumentException] {
-        MemoryManager.initialize(conf);
+        MemoryManager.initialize(conf)
       }
     assert(
       caught.getMessage == s"Invalid config, 
${WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE.key}(0.85) " +
@@ -69,17 +69,17 @@ class MemoryManagerSuite extends CelebornFunSuite {
       memoryCounter.set(pushThreshold + 1)
       assert(ServingState.PUSH_PAUSED == memoryManager.currentServingState())
       // reach pause replicate data threshold
-      memoryCounter.set(replicateThreshold + 1);
-      assert(ServingState.PUSH_AND_REPLICATE_PAUSED == 
memoryManager.currentServingState());
+      memoryCounter.set(replicateThreshold + 1)
+      assert(ServingState.PUSH_AND_REPLICATE_PAUSED == 
memoryManager.currentServingState())
       // touch pause push data threshold again
-      memoryCounter.set(pushThreshold + 1);
-      assert(MemoryManager.ServingState.PUSH_PAUSED == 
memoryManager.currentServingState());
+      memoryCounter.set(pushThreshold + 1)
+      assert(MemoryManager.ServingState.PUSH_PAUSED == 
memoryManager.currentServingState())
       // between pause push data threshold and resume data threshold
-      memoryCounter.set(resumeThreshold + 2);
-      assert(MemoryManager.ServingState.PUSH_PAUSED == 
memoryManager.currentServingState());
+      memoryCounter.set(resumeThreshold + 2)
+      assert(MemoryManager.ServingState.PUSH_PAUSED == 
memoryManager.currentServingState())
       // touch resume data threshold
-      memoryCounter.set(0);
-      assert(MemoryManager.ServingState.NONE_PAUSED == 
memoryManager.currentServingState());
+      memoryCounter.set(0)
+      assert(MemoryManager.ServingState.NONE_PAUSED == 
memoryManager.currentServingState())
     } catch {
       case e: Exception => throw e
     } finally {
@@ -112,21 +112,21 @@ class MemoryManagerSuite extends CelebornFunSuite {
     }
 
     // PAUSE PUSH -> PAUSE PUSH AND REPLICATE
-    memoryCounter.set(replicateThreshold + 1);
+    memoryCounter.set(replicateThreshold + 1)
     eventually(timeout(30.second), interval(10.milliseconds)) {
       assert(pushListener.isPause)
       assert(replicateListener.isPause)
     }
 
     // PAUSE PUSH AND REPLICATE -> PAUSE PUSH
-    memoryCounter.set(pushThreshold + 1);
+    memoryCounter.set(pushThreshold + 1)
     eventually(timeout(30.second), interval(10.milliseconds)) {
       assert(pushListener.isPause)
       assert(!replicateListener.isPause)
     }
 
     // PAUSE PUSH -> NONE PAUSED
-    memoryCounter.set(0);
+    memoryCounter.set(0)
     eventually(timeout(30.second), interval(10.milliseconds)) {
       assert(!pushListener.isPause)
       assert(!replicateListener.isPause)
@@ -136,14 +136,14 @@ class MemoryManagerSuite extends CelebornFunSuite {
     val lastPauseTime = memoryManager.getPausePushDataTime.longValue()
 
     // NONE PAUSED -> PAUSE PUSH AND REPLICATE
-    memoryCounter.set(replicateThreshold + 1);
+    memoryCounter.set(replicateThreshold + 1)
     eventually(timeout(30.second), interval(10.milliseconds)) {
       assert(pushListener.isPause)
       assert(replicateListener.isPause)
     }
 
     // PAUSE PUSH AND REPLICATE -> NONE PAUSED
-    memoryCounter.set(0);
+    memoryCounter.set(0)
     eventually(timeout(30.second), interval(10.milliseconds)) {
       assert(!pushListener.isPause)
       assert(!replicateListener.isPause)

Reply via email to