This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 8a10a2d46 [CELEBORN-1421] Refine code in master to reduce unnecessary 
sync to get workers/lostworkers/shutdownWorkers
8a10a2d46 is described below

commit 8a10a2d465c5ec9d947128d9828bcd5bb1f6bfa8
Author: Shuang <[email protected]>
AuthorDate: Fri May 17 14:06:37 2024 +0800

    [CELEBORN-1421] Refine code in master to reduce unnecessary sync to get 
workers/lostworkers/shutdownWorkers
    
    ### What changes were proposed in this pull request?
    
    1. Use ConcurrentSet to replace ArrayList for workers.
    2. Remove unnecessary sync and snapshot when get 
workers/lostworkers/shutdownWorkers
    
    ### Why are the changes needed?
    
    1. Reduce unnecessary sync to get workers/lostworkers/shutdownWorkers.
    2. Somewhere in the Master, directly using statusSystem.workers(ArrayList) 
is not safe, potentially leading to concurrent modification issues.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Pass GA
    
    Closes #2507 from RexXiong/CELEBORN-1421.
    
    Authored-by: Shuang <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 .../apache/celeborn/common/util/PbSerDeUtils.scala |  2 +-
 .../master/clustermeta/AbstractMetaManager.java    |  4 +--
 .../celeborn/service/deploy/master/Master.scala    | 34 +++++++++-------------
 3 files changed, 16 insertions(+), 24 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index e82988433..f2ef85c48 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -411,7 +411,7 @@ object PbSerDeUtils {
       manuallyExcludedWorkers: java.util.Set[WorkerInfo],
       workerLostEvent: java.util.Set[WorkerInfo],
       appHeartbeatTime: java.util.Map[String, java.lang.Long],
-      workers: java.util.List[WorkerInfo],
+      workers: java.util.Set[WorkerInfo],
       partitionTotalWritten: java.lang.Long,
       partitionTotalFileCount: java.lang.Long,
       appDiskUsageMetricSnapshots: Array[AppDiskUsageSnapShot],
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index c1cd37b28..b9c1c2c67 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -22,7 +22,6 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.file.Files;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -66,7 +65,8 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
   // Metadata for master service
   public final Set<String> registeredShuffle = ConcurrentHashMap.newKeySet();
   public final Set<String> hostnameSet = ConcurrentHashMap.newKeySet();
-  public final ArrayList<WorkerInfo> workers = new ArrayList<>();
+  public final Set<WorkerInfo> workers = ConcurrentHashMap.newKeySet();
+
   public final ConcurrentHashMap<WorkerInfo, Long> lostWorkers = 
JavaUtils.newConcurrentHashMap();
   public final ConcurrentHashMap<WorkerInfo, WorkerEventInfo> workerEventInfos 
=
       JavaUtils.newConcurrentHashMap();
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 6bc675ecb..b7f51b00b 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -180,14 +180,6 @@ private[celeborn] class Master(
   private val userResourceConsumptions =
     JavaUtils.newConcurrentHashMap[UserIdentifier, (ResourceConsumption, 
Long)]()
 
-  // States
-  private def workersSnapShot: util.List[WorkerInfo] =
-    statusSystem.workers.synchronized(new 
util.ArrayList[WorkerInfo](statusSystem.workers))
-  private def lostWorkersSnapshot: ConcurrentHashMap[WorkerInfo, 
java.lang.Long] =
-    
statusSystem.workers.synchronized(JavaUtils.newConcurrentHashMap(statusSystem.lostWorkers))
-  private def shutdownWorkerSnapshot: util.List[WorkerInfo] =
-    statusSystem.workers.synchronized(new 
util.ArrayList[WorkerInfo](statusSystem.shutdownWorkers))
-
   private def diskReserveSize = conf.workerDiskReserveSize
   private def diskReserveRatio = conf.workerDiskReserveRatio
 
@@ -565,7 +557,7 @@ private[celeborn] class Master(
       return
     }
 
-    workersSnapShot.asScala.foreach { worker =>
+    statusSystem.workers.asScala.foreach { worker =>
       if (worker.lastHeartbeat < currentTime - workerHeartbeatTimeoutMs
         && !statusSystem.workerLostEvents.contains(worker)) {
         logWarning(s"Worker ${worker.readableAddress()} timeout! Trigger 
WorkerLost event.")
@@ -588,7 +580,7 @@ private[celeborn] class Master(
       return
     }
 
-    val unavailableInfoTimeoutWorkers = lostWorkersSnapshot.asScala.filter {
+    val unavailableInfoTimeoutWorkers = 
statusSystem.lostWorkers.asScala.filter {
       case (_, lostTime) => currentTime - lostTime > 
workerUnavailableInfoExpireTimeoutMs
     }.keySet.toList.asJava
 
@@ -638,7 +630,7 @@ private[celeborn] class Master(
       workerStatus: WorkerStatus,
       requestId: String): Unit = {
     val targetWorker = new WorkerInfo(host, rpcPort, pushPort, fetchPort, 
replicatePort)
-    val registered = workersSnapShot.asScala.contains(targetWorker)
+    val registered = statusSystem.workers.asScala.contains(targetWorker)
     if (!registered) {
       logWarning(s"Received heartbeat from unknown worker " +
         s"$host:$rpcPort:$pushPort:$fetchPort:$replicatePort.")
@@ -708,7 +700,7 @@ private[celeborn] class Master(
       -1,
       new util.HashMap[String, DiskInfo](),
       JavaUtils.newConcurrentHashMap[UserIdentifier, ResourceConsumption]())
-    val worker: WorkerInfo = workersSnapShot
+    val worker: WorkerInfo = statusSystem.workers
       .asScala
       .find(_ == targetWorker)
       .orNull
@@ -745,7 +737,7 @@ private[celeborn] class Master(
         internalPort,
         disks,
         userResourceConsumption)
-    if (workersSnapShot.contains(workerToRegister)) {
+    if (statusSystem.workers.contains(workerToRegister)) {
       logWarning(s"Receive RegisterWorker while worker" +
         s" ${workerToRegister.toString()} already exists, re-register.")
       // TODO: remove `WorkerRemove` because we have improve register logic to 
cover `WorkerRemove`
@@ -1015,14 +1007,14 @@ private[celeborn] class Master(
       System.currentTimeMillis(),
       requestId)
     // unknown workers will retain in needCheckedWorkerList
-    needCheckedWorkerList.removeAll(workersSnapShot)
+    needCheckedWorkerList.removeAll(statusSystem.workers)
     if (shouldResponse) {
       context.reply(HeartbeatFromApplicationResponse(
         StatusCode.SUCCESS,
         new util.ArrayList(
           (statusSystem.excludedWorkers.asScala ++ 
statusSystem.manuallyExcludedWorkers.asScala).asJava),
         needCheckedWorkerList,
-        shutdownWorkerSnapshot))
+        new util.ArrayList[WorkerInfo](statusSystem.shutdownWorkers)))
     } else {
       context.reply(OneWayMessageResponse)
     }
@@ -1124,9 +1116,9 @@ private[celeborn] class Master(
 
   private def workersAvailable(
       tmpExcludedWorkerList: Set[WorkerInfo] = Set.empty): 
util.List[WorkerInfo] = {
-    workersSnapShot.asScala.filter { w =>
+    statusSystem.workers.asScala.filter { w =>
       statusSystem.isWorkerAvailable(w) && !tmpExcludedWorkerList.contains(w)
-    }.asJava
+    }.toList.asJava
   }
 
   private def handleRequestForApplicationMeta(
@@ -1157,7 +1149,7 @@ private[celeborn] class Master(
   }
 
   private def getWorkers: String = {
-    workersSnapShot.asScala.mkString("\n")
+    statusSystem.workers.asScala.mkString("\n")
   }
 
   override def handleWorkerEvent(workerEventType: String, workers: String): 
String = {
@@ -1199,7 +1191,7 @@ private[celeborn] class Master(
   override def getLostWorkers: String = {
     val sb = new StringBuilder
     sb.append("======================= Lost Workers in Master 
========================\n")
-    lostWorkersSnapshot.asScala.toSeq.sortBy(_._2).foreach { case (worker, 
time) =>
+    statusSystem.lostWorkers.asScala.toSeq.sortBy(_._2).foreach { case 
(worker, time) =>
       sb.append(s"${worker.toUniqueId().padTo(50, " 
").mkString}${Utils.formatTimestamp(time)}\n")
     }
     sb.toString()
@@ -1208,7 +1200,7 @@ private[celeborn] class Master(
   override def getShutdownWorkers: String = {
     val sb = new StringBuilder
     sb.append("===================== Shutdown Workers in Master 
======================\n")
-    shutdownWorkerSnapshot.asScala.foreach { worker =>
+    statusSystem.shutdownWorkers.asScala.foreach { worker =>
       sb.append(s"${worker.toUniqueId()}\n")
     }
     sb.toString()
@@ -1281,7 +1273,7 @@ private[celeborn] class Master(
         s"Failed to Exclude workers add ${workersToAdd.mkString(",")} and 
remove ${workersToRemove.mkString(",")}.\n")
     }
     val unknownExcludedWorkers =
-      (workersToAdd ++ workersToRemove).filter(!workersSnapShot.contains(_))
+      (workersToAdd ++ 
workersToRemove).filter(!statusSystem.workers.contains(_))
     if (unknownExcludedWorkers.nonEmpty) {
       sb.append(
         s"Unknown worker ${unknownExcludedWorkers.mkString(",")}. Workers in 
Master:\n$getWorkers.")

Reply via email to