This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new e2f640ce3 [CELEBORN-1660] Using map for workers to find worker fast
e2f640ce3 is described below

commit e2f640ce3ba52691707d18450257ebdfbe4f93a6
Author: Wang, Fei <[email protected]>
AuthorDate: Fri Nov 1 15:58:53 2024 +0800

    [CELEBORN-1660] Using map for workers to find worker fast
    
    ### What changes were proposed in this pull request?
    
    Using map for workers so that we can find a worker by uniqueId fast.
    
    ### Why are the changes needed?
    
    For large celeborn cluster, it might be slow.
    
    - updateWorkerHeartbeatMeta
    
https://github.com/apache/celeborn/blob/1e77f01cd317b1dc885965d6053b391db1d42bc7/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java#L222
    
    - handleWorkerLost
    
https://github.com/apache/celeborn/blob/1e77f01cd317b1dc885965d6053b391db1d42bc7/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala#L762-L765
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Existing UT.
    
    Closes #2870 from turboFei/worksMap.
    
    Lead-authored-by: Wang, Fei <[email protected]>
    Co-authored-by: Fei Wang <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../master/clustermeta/AbstractMetaManager.java    | 58 +++++++++++-----------
 .../celeborn/service/deploy/master/Master.scala    | 58 +++++++++++-----------
 .../deploy/master/http/api/v1/WorkerResource.scala |  5 +-
 .../clustermeta/DefaultMetaSystemSuiteJ.java       | 12 ++---
 .../clustermeta/ha/MasterStateMachineSuiteJ.java   | 15 +++---
 .../ha/RatisMasterStatusSystemSuiteJ.java          | 50 +++++++++----------
 6 files changed, 99 insertions(+), 99 deletions(-)

diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 4080edae4..f1d8a37e8 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -64,7 +64,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
   public final Map<String, Set<Integer>> registeredAppAndShuffles =
       JavaUtils.newConcurrentHashMap();
   public final Set<String> hostnameSet = ConcurrentHashMap.newKeySet();
-  public final Set<WorkerInfo> workers = ConcurrentHashMap.newKeySet();
+  public final Map<String, WorkerInfo> workersMap = 
JavaUtils.newConcurrentHashMap();
 
   public final ConcurrentHashMap<WorkerInfo, Long> lostWorkers = 
JavaUtils.newConcurrentHashMap();
   public final ConcurrentHashMap<WorkerInfo, WorkerEventInfo> workerEventInfos 
=
@@ -170,8 +170,8 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     WorkerInfo worker = new WorkerInfo(host, rpcPort, pushPort, fetchPort, 
replicatePort);
     workerLostEvents.add(worker);
     // remove worker from workers
-    synchronized (workers) {
-      workers.remove(worker);
+    synchronized (workersMap) {
+      workersMap.remove(worker.toUniqueId());
       lostWorkers.put(worker, System.currentTimeMillis());
     }
     excludedWorkers.remove(worker);
@@ -182,15 +182,15 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
       String host, int rpcPort, int pushPort, int fetchPort, int 
replicatePort) {
     WorkerInfo worker = new WorkerInfo(host, rpcPort, pushPort, fetchPort, 
replicatePort);
     // remove worker from workers
-    synchronized (workers) {
-      workers.remove(worker);
+    synchronized (workersMap) {
+      workersMap.remove(worker.toUniqueId());
       lostWorkers.put(worker, System.currentTimeMillis());
     }
     excludedWorkers.remove(worker);
   }
 
   public void removeWorkersUnavailableInfoMeta(List<WorkerInfo> 
unavailableWorkers) {
-    synchronized (workers) {
+    synchronized (workersMap) {
       for (WorkerInfo workerInfo : unavailableWorkers) {
         if (lostWorkers.containsKey(workerInfo)) {
           lostWorkers.remove(workerInfo);
@@ -219,8 +219,8 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
             host, rpcPort, pushPort, fetchPort, replicatePort, -1, disks, 
userResourceConsumption);
     AtomicLong availableSlots = new AtomicLong();
     LOG.debug("update worker {}:{} heartbeat {}", host, rpcPort, disks);
-    synchronized (workers) {
-      Optional<WorkerInfo> workerInfo = workers.stream().filter(w -> 
w.equals(worker)).findFirst();
+    synchronized (workersMap) {
+      Optional<WorkerInfo> workerInfo = 
Optional.ofNullable(workersMap.get(worker.toUniqueId()));
       workerInfo.ifPresent(
           info -> {
             info.updateThenGetDiskInfos(disks, 
Option.apply(estimatedPartitionSize));
@@ -287,10 +287,8 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
       
workerInfo.networkLocation_$eq(rackResolver.resolve(host).getNetworkLocation());
     }
     workerInfo.updateDiskMaxSlots(estimatedPartitionSize);
-    synchronized (workers) {
-      if (!workers.contains(workerInfo)) {
-        workers.add(workerInfo);
-      }
+    synchronized (workersMap) {
+      workersMap.putIfAbsent(workerInfo.toUniqueId(), workerInfo);
       shutdownWorkers.remove(workerInfo);
       lostWorkers.remove(workerInfo);
       excludedWorkers.remove(workerInfo);
@@ -315,7 +313,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
                 manuallyExcludedWorkers,
                 workerLostEvents,
                 appHeartbeatTime,
-                workers,
+                new HashSet(workersMap.values()),
                 partitionTotalWritten.sum(),
                 partitionTotalFileCount.sum(),
                 appDiskUsageMetric.snapShots(),
@@ -381,7 +379,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
               .collect(Collectors.toList());
       scala.collection.immutable.Map<String, Node> resolveMap =
           rackResolver.resolveToMap(workerHostList);
-      workers.addAll(
+      workersMap.putAll(
           workerInfoSet.stream()
               .peek(
                   workerInfo -> {
@@ -391,7 +389,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
                           
resolveMap.get(workerInfo.host()).get().getNetworkLocation());
                     }
                   })
-              .collect(Collectors.toSet()));
+              .collect(Collectors.toMap(WorkerInfo::toUniqueId, w -> w)));
 
       snapshotMetaInfo
           .getLostWorkersMap()
@@ -437,11 +435,11 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     LOG.info("Successfully restore meta info from snapshot {}", 
file.getAbsolutePath());
     LOG.info(
         "Worker size: {}, Registered shuffle size: {}. Worker excluded list 
size: {}. Manually Excluded list size: {}",
-        workers.size(),
+        workersMap.size(),
         registeredAppAndShuffles.size(),
         excludedWorkers.size(),
         manuallyExcludedWorkers.size());
-    workers.forEach(workerInfo -> LOG.info(workerInfo.toString()));
+    workersMap.values().forEach(workerInfo -> LOG.info(workerInfo.toString()));
     registeredAppAndShuffles.forEach(
         (appId, shuffleId) -> LOG.info("RegisteredShuffle {}-{}", appId, 
shuffleId));
   }
@@ -449,7 +447,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
   private void cleanUpState() {
     registeredAppAndShuffles.clear();
     hostnameSet.clear();
-    workers.clear();
+    workersMap.clear();
     lostWorkers.clear();
     appHeartbeatTime.clear();
     excludedWorkers.clear();
@@ -464,7 +462,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
   }
 
   public void updateMetaByReportWorkerUnavailable(List<WorkerInfo> 
failedWorkers) {
-    synchronized (this.workers) {
+    synchronized (this.workersMap) {
       shutdownWorkers.addAll(failedWorkers);
     }
   }
@@ -473,7 +471,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     long eventTime = System.currentTimeMillis();
     ResourceProtos.WorkerEventType eventType =
         ResourceProtos.WorkerEventType.forNumber(workerEventTypeValue);
-    synchronized (this.workers) {
+    synchronized (this.workersMap) {
       for (WorkerInfo workerInfo : workerInfoList) {
         WorkerEventInfo workerEventInfo = workerEventInfos.get(workerInfo);
         LOG.info("Received worker event: {} for worker: {}", eventType, 
workerInfo.toUniqueId());
@@ -489,7 +487,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
   }
 
   public void updateMetaByReportWorkerDecommission(List<WorkerInfo> workers) {
-    synchronized (this.workers) {
+    synchronized (this.workersMap) {
       decommissionWorkers.addAll(workers);
     }
   }
@@ -520,19 +518,19 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
         "Celeborn cluster estimated partition size changed from {} to {}",
         Utils.bytesToString(oldEstimatedPartitionSize),
         Utils.bytesToString(estimatedPartitionSize));
-    workers.stream()
-        .filter(
-            worker ->
-                !excludedWorkers.contains(worker) && 
!manuallyExcludedWorkers.contains(worker))
-        .forEach(workerInfo -> 
workerInfo.updateDiskMaxSlots(estimatedPartitionSize));
+
+    HashSet<WorkerInfo> workers = new HashSet(workersMap.values());
+    excludedWorkers.forEach(workers::remove);
+    manuallyExcludedWorkers.forEach(workers::remove);
+    workers.forEach(workerInfo -> 
workerInfo.updateDiskMaxSlots(estimatedPartitionSize));
   }
 
   public boolean isWorkerAvailable(WorkerInfo workerInfo) {
-    return !excludedWorkers.contains(workerInfo)
+    return (workerInfo.getWorkerStatus().getState() == 
PbWorkerStatus.State.Normal
+            && !workerEventInfos.containsKey(workerInfo))
+        && !excludedWorkers.contains(workerInfo)
         && !shutdownWorkers.contains(workerInfo)
-        && !manuallyExcludedWorkers.contains(workerInfo)
-        && (!workerEventInfos.containsKey(workerInfo)
-            && workerInfo.getWorkerStatus().getState() == 
PbWorkerStatus.State.Normal);
+        && !manuallyExcludedWorkers.contains(workerInfo);
   }
 
   public void updateApplicationMeta(ApplicationMeta applicationMeta) {
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index bef74b1c5..1f065b3d8 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -224,13 +224,13 @@ private[celeborn] class Master(
   masterSource.addGauge(MasterSource.REGISTERED_SHUFFLE_COUNT) { () =>
     statusSystem.registeredShuffleCount
   }
-  masterSource.addGauge(MasterSource.WORKER_COUNT) { () => 
statusSystem.workers.size }
+  masterSource.addGauge(MasterSource.WORKER_COUNT) { () => 
statusSystem.workersMap.size }
   masterSource.addGauge(MasterSource.LOST_WORKER_COUNT) { () => 
statusSystem.lostWorkers.size }
   masterSource.addGauge(MasterSource.EXCLUDED_WORKER_COUNT) { () =>
     statusSystem.excludedWorkers.size + 
statusSystem.manuallyExcludedWorkers.size
   }
   masterSource.addGauge(MasterSource.AVAILABLE_WORKER_COUNT) { () =>
-    statusSystem.workers.asScala.count { w =>
+    statusSystem.workersMap.values().asScala.count { w =>
       statusSystem.isWorkerAvailable(w)
     }
   }
@@ -242,7 +242,7 @@ private[celeborn] class Master(
   }
   masterSource.addGauge(MasterSource.PARTITION_SIZE) { () => 
statusSystem.estimatedPartitionSize }
   masterSource.addGauge(MasterSource.ACTIVE_SHUFFLE_SIZE) { () =>
-    statusSystem.workers.parallelStream()
+    statusSystem.workersMap.values().parallelStream()
       .mapToLong(new ToLongFunction[WorkerInfo]() {
         override def applyAsLong(value: WorkerInfo): Long =
           value.userResourceConsumption.values().parallelStream()
@@ -252,7 +252,7 @@ private[celeborn] class Master(
       }).sum()
   }
   masterSource.addGauge(MasterSource.ACTIVE_SHUFFLE_FILE_COUNT) { () =>
-    statusSystem.workers.parallelStream()
+    statusSystem.workersMap.values().parallelStream()
       .mapToLong(new ToLongFunction[WorkerInfo]() {
         override def applyAsLong(value: WorkerInfo): Long =
           value.userResourceConsumption.values().parallelStream()
@@ -263,11 +263,11 @@ private[celeborn] class Master(
   }
 
   masterSource.addGauge(MasterSource.DEVICE_CELEBORN_TOTAL_CAPACITY) { () =>
-    statusSystem.workers.asScala.toList.map(_.totalSpace()).sum
+    statusSystem.workersMap.values().asScala.toList.map(_.totalSpace()).sum
   }
 
   masterSource.addGauge(MasterSource.DEVICE_CELEBORN_FREE_CAPACITY) { () =>
-    statusSystem.workers.asScala.toList.map(_.totalActualUsableSpace()).sum
+    
statusSystem.workersMap.values().asScala.toList.map(_.totalActualUsableSpace()).sum
   }
 
   masterSource.addGauge(MasterSource.IS_ACTIVE_MASTER) { () => isMasterActive }
@@ -596,7 +596,7 @@ private[celeborn] class Master(
       return
     }
 
-    statusSystem.workers.asScala.foreach { worker =>
+    statusSystem.workersMap.values().asScala.foreach { worker =>
       if (worker.lastHeartbeat < currentTime - workerHeartbeatTimeoutMs
         && !statusSystem.workerLostEvents.contains(worker)) {
         logWarning(s"Worker ${worker.readableAddress()} timeout! Trigger 
WorkerLost event.")
@@ -635,18 +635,18 @@ private[celeborn] class Master(
     if (HAHelper.getAppTimeoutDeadline(statusSystem) > currentTime) {
       return
     }
-    statusSystem.appHeartbeatTime.keySet().asScala.foreach { key =>
-      if (statusSystem.appHeartbeatTime.get(key) < currentTime - 
appHeartbeatTimeoutMs) {
-        logWarning(s"Application $key timeout, trigger applicationLost event.")
+    statusSystem.appHeartbeatTime.asScala.foreach { case (appId, 
heartbeatTime) =>
+      if (heartbeatTime < currentTime - appHeartbeatTimeoutMs) {
+        logWarning(s"Application $appId timeout, trigger applicationLost 
event.")
         val requestId = MasterClient.genRequestId()
-        var res = self.askSync[ApplicationLostResponse](ApplicationLost(key, 
requestId))
+        var res = self.askSync[ApplicationLostResponse](ApplicationLost(appId, 
requestId))
         var retry = 1
         while (res.status != StatusCode.SUCCESS && retry <= 3) {
-          res = self.askSync[ApplicationLostResponse](ApplicationLost(key, 
requestId))
+          res = self.askSync[ApplicationLostResponse](ApplicationLost(appId, 
requestId))
           retry += 1
         }
         if (retry > 3) {
-          logWarning(s"Handle ApplicationLost event for $key failed more than 
3 times!")
+          logWarning(s"Handle ApplicationLost event for $appId failed more 
than 3 times!")
         }
       }
     }
@@ -667,7 +667,7 @@ private[celeborn] class Master(
       workerStatus: WorkerStatus,
       requestId: String): Unit = {
     val targetWorker = new WorkerInfo(host, rpcPort, pushPort, fetchPort, 
replicatePort)
-    val registered = statusSystem.workers.asScala.contains(targetWorker)
+    val registered = 
statusSystem.workersMap.containsKey(targetWorker.toUniqueId())
     if (!registered) {
       logWarning(s"Received heartbeat from unknown worker " +
         s"$host:$rpcPort:$pushPort:$fetchPort:$replicatePort.")
@@ -758,10 +758,7 @@ private[celeborn] class Master(
       -1,
       new util.HashMap[String, DiskInfo](),
       JavaUtils.newConcurrentHashMap[UserIdentifier, ResourceConsumption]())
-    val worker: WorkerInfo = statusSystem.workers
-      .asScala
-      .find(_ == targetWorker)
-      .orNull
+    val worker: WorkerInfo = 
statusSystem.workersMap.get(targetWorker.toUniqueId())
     if (worker == null) {
       logWarning(s"Unknown worker 
$host:$rpcPort:$pushPort:$fetchPort:$replicatePort" +
         s" for WorkerLost handler!")
@@ -806,7 +803,7 @@ private[celeborn] class Master(
       return
     }
 
-    if (statusSystem.workers.contains(workerToRegister)) {
+    if (statusSystem.workersMap.containsKey(workerToRegister.toUniqueId())) {
       logWarning(s"Receive RegisterWorker while worker" +
         s" ${workerToRegister.toString()} already exists, re-register.")
       statusSystem.handleRegisterWorker(
@@ -908,7 +905,7 @@ private[celeborn] class Master(
     // offer slots
     val slots =
       masterSource.sample(MasterSource.OFFER_SLOTS_TIME, 
s"offerSlots-${Random.nextInt()}") {
-        statusSystem.workers.synchronized {
+        statusSystem.workersMap.synchronized {
           if (slotsAssignPolicy == SlotsAssignPolicy.LOADAWARE) {
             SlotsAllocator.offerSlotsLoadAware(
               selectedWorkers,
@@ -1121,24 +1118,24 @@ private[celeborn] class Master(
       fileCount,
       System.currentTimeMillis(),
       requestId)
-    // unknown workers will retain in needCheckedWorkerList
-    needCheckedWorkerList.removeAll(statusSystem.workers)
+    val unknownWorkers = needCheckedWorkerList.asScala.filterNot(w =>
+      statusSystem.workersMap.containsKey(w.toUniqueId())).asJava
     if (shouldResponse) {
       // UserResourceConsumption and DiskInfo are eliminated from WorkerInfo
       // during serialization of HeartbeatFromApplicationResponse
       var availableWorksSentToClient = new util.ArrayList[WorkerInfo]()
       if (needAvailableWorkers) {
         availableWorksSentToClient = new util.ArrayList[WorkerInfo](
-          statusSystem.workers.asScala.filter(worker =>
-            statusSystem.isWorkerAvailable(worker)).asJava)
+          statusSystem.workersMap.values().asScala.filter(worker =>
+            statusSystem.isWorkerAvailable(worker)).toList.asJava)
       }
-      var appRelatedShuffles =
+      val appRelatedShuffles =
         statusSystem.registeredAppAndShuffles.getOrDefault(appId, 
Collections.emptySet())
       context.reply(HeartbeatFromApplicationResponse(
         StatusCode.SUCCESS,
         new util.ArrayList(
           (statusSystem.excludedWorkers.asScala ++ 
statusSystem.manuallyExcludedWorkers.asScala).asJava),
-        needCheckedWorkerList,
+        unknownWorkers,
         new util.ArrayList[WorkerInfo](
           (statusSystem.shutdownWorkers.asScala ++ 
statusSystem.decommissionWorkers.asScala).asJava),
         availableWorksSentToClient,
@@ -1215,7 +1212,7 @@ private[celeborn] class Master(
   // TODO: Support calculate topN app resource consumption.
   private def computeUserResourceConsumption(
       userIdentifier: UserIdentifier): ResourceConsumption = {
-    val resourceConsumption = statusSystem.workers.asScala.flatMap {
+    val resourceConsumption = statusSystem.workersMap.values().asScala.flatMap 
{
       workerInfo => 
workerInfo.userResourceConsumption.asScala.get(userIdentifier)
     }.foldRight(ResourceConsumption(0, 0, 0, 0))(_ add _)
     resourceConsumption
@@ -1249,7 +1246,7 @@ private[celeborn] class Master(
 
   private def workersAvailable(
       tmpExcludedWorkerList: Set[WorkerInfo] = Set.empty): 
util.List[WorkerInfo] = {
-    statusSystem.workers.asScala.filter { w =>
+    statusSystem.workersMap.values().asScala.filter { w =>
       statusSystem.isWorkerAvailable(w) && !tmpExcludedWorkerList.contains(w)
     }.toList.asJava
   }
@@ -1282,7 +1279,7 @@ private[celeborn] class Master(
   }
 
   private def getWorkers: String = {
-    statusSystem.workers.asScala.mkString("\n")
+    statusSystem.workersMap.values().asScala.mkString("\n")
   }
 
   override def handleWorkerEvent(
@@ -1411,7 +1408,8 @@ private[celeborn] class Master(
           ",")} and remove 
${removeWorkers.map(_.readableAddress).mkString(",")}.\n")
     }
     val unknownExcludedWorkers =
-      (addWorkers ++ removeWorkers).filter(!statusSystem.workers.contains(_))
+      (addWorkers ++ removeWorkers).filterNot(w =>
+        statusSystem.workersMap.containsKey(w.toUniqueId()))
     if (unknownExcludedWorkers.nonEmpty) {
       sb.append(
         s"Unknown workers 
${unknownExcludedWorkers.map(_.readableAddress).mkString(",")}." +
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/WorkerResource.scala
 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/WorkerResource.scala
index 8d1b20bc3..34e6c734e 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/WorkerResource.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/WorkerResource.scala
@@ -51,7 +51,7 @@ class WorkerResource extends ApiRequestContext {
   @GET
   def workers: WorkersResponse = {
     new WorkersResponse()
-      
.workers(statusSystem.workers.asScala.map(ApiUtils.workerData).toSeq.asJava)
+      
.workers(statusSystem.workersMap.values().asScala.map(ApiUtils.workerData).toSeq.asJava)
       .lostWorkers(statusSystem.lostWorkers.asScala.toSeq.sortBy(_._2)
         .map(kv =>
           new 
WorkerTimestampData().worker(ApiUtils.workerData(kv._1)).timestamp(kv._2)).asJava)
@@ -134,7 +134,8 @@ class WorkerResource extends ApiRequestContext {
           s"eventType(${request.getEventType}) and 
workers(${request.getWorkers}) are required")
       }
       val workers = request.getWorkers.asScala.map(ApiUtils.toWorkerInfo).toSeq
-      val (filteredWorkers, unknownWorkers) = 
workers.partition(statusSystem.workers.contains)
+      val (filteredWorkers, unknownWorkers) =
+        workers.partition(w => 
statusSystem.workersMap.containsKey(w.toUniqueId()))
       if (filteredWorkers.isEmpty) {
         throw new BadRequestException(
           s"None of the workers are known: 
${unknownWorkers.map(_.readableAddress).mkString(", ")}")
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
index 65660158e..cde62c4cf 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
@@ -157,7 +157,7 @@ public class DefaultMetaSystemSuiteJ {
         userResourceConsumption3,
         getNewReqeustId());
 
-    assertEquals(3, statusSystem.workers.size());
+    assertEquals(3, statusSystem.workersMap.size());
   }
 
   @Test
@@ -253,7 +253,7 @@ public class DefaultMetaSystemSuiteJ {
 
     statusSystem.handleWorkerLost(
         HOSTNAME1, RPCPORT1, PUSHPORT1, FETCHPORT1, REPLICATEPORT1, 
getNewReqeustId());
-    assertEquals(2, statusSystem.workers.size());
+    assertEquals(2, statusSystem.workersMap.size());
   }
 
   private static final String APPID1 = "appId1";
@@ -376,20 +376,20 @@ public class DefaultMetaSystemSuiteJ {
         userResourceConsumption3,
         getNewReqeustId());
 
-    assertEquals(3, statusSystem.workers.size());
+    assertEquals(3, statusSystem.workersMap.size());
 
     Map<String, Map<String, Integer>> workersToAllocate = new HashMap<>();
     Map<String, Integer> allocation = new HashMap<>();
     allocation.put("disk1", 5);
     workersToAllocate.put(
-        statusSystem.workers.stream()
+        statusSystem.workersMap.values().stream()
             .filter(w -> w.host().equals(HOSTNAME1))
             .findFirst()
             .get()
             .toUniqueId(),
         allocation);
     workersToAllocate.put(
-        statusSystem.workers.stream()
+        statusSystem.workersMap.values().stream()
             .filter(w -> w.host().equals(HOSTNAME2))
             .findFirst()
             .get()
@@ -399,7 +399,7 @@ public class DefaultMetaSystemSuiteJ {
     statusSystem.handleRequestSlots(SHUFFLEKEY1, HOSTNAME1, workersToAllocate, 
getNewReqeustId());
     assertEquals(
         0,
-        statusSystem.workers.stream()
+        statusSystem.workersMap.values().stream()
             .filter(w -> w.host().equals(HOSTNAME1))
             .findFirst()
             .get()
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
index 9696dba2d..d2539ba5d 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
@@ -232,20 +232,23 @@ public class MasterStateMachineSuiteJ extends 
RatisBaseSuiteJ {
     AppDiskUsageSnapShot originCurrentSnapshot =
         masterStatusSystem.appDiskUsageMetric.currentSnapShot().get();
 
-    masterStatusSystem.workers.add(new WorkerInfo(host1, 9095, 9094, 9093, 
9092, 9091));
-    masterStatusSystem.workers.add(new WorkerInfo(host2, 9095, 9094, 9093, 
9092, 9091));
-    masterStatusSystem.workers.add(new WorkerInfo(host3, 9095, 9094, 9093, 
9092, 9091));
+    WorkerInfo workerInfo1 = new WorkerInfo(host1, 9095, 9094, 9093, 9092, 
9091);
+    WorkerInfo workerInfo2 = new WorkerInfo(host2, 9095, 9094, 9093, 9092, 
9091);
+    WorkerInfo workerInfo3 = new WorkerInfo(host3, 9095, 9094, 9093, 9092, 
9091);
+    masterStatusSystem.workersMap.put(workerInfo1.toUniqueId(), workerInfo1);
+    masterStatusSystem.workersMap.put(workerInfo2.toUniqueId(), workerInfo2);
+    masterStatusSystem.workersMap.put(workerInfo3.toUniqueId(), workerInfo3);
 
     masterStatusSystem.writeMetaInfoToFile(tmpFile);
 
     masterStatusSystem.hostnameSet.clear();
     masterStatusSystem.excludedWorkers.clear();
     masterStatusSystem.manuallyExcludedWorkers.clear();
-    masterStatusSystem.workers.clear();
+    masterStatusSystem.workersMap.clear();
 
     masterStatusSystem.restoreMetaFromFile(tmpFile);
 
-    Assert.assertEquals(3, masterStatusSystem.workers.size());
+    Assert.assertEquals(3, masterStatusSystem.workersMap.size());
     Assert.assertEquals(3, masterStatusSystem.excludedWorkers.size());
     Assert.assertEquals(2, masterStatusSystem.manuallyExcludedWorkers.size());
     Assert.assertEquals(3, masterStatusSystem.hostnameSet.size());
@@ -260,7 +263,7 @@ public class MasterStateMachineSuiteJ extends 
RatisBaseSuiteJ {
     Assert.assertArrayEquals(originSnapshots, 
masterStatusSystem.appDiskUsageMetric.snapShots());
 
     masterStatusSystem.restoreMetaFromFile(tmpFile);
-    Assert.assertEquals(3, masterStatusSystem.workers.size());
+    Assert.assertEquals(3, masterStatusSystem.workersMap.size());
   }
 
   private String getNewReqeustId() {
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index 4f2fae78a..e4170a02c 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -347,16 +347,16 @@ public class RatisMasterStatusSystemSuiteJ {
         getNewReqeustId());
     Thread.sleep(3000L);
 
-    Assert.assertEquals(3, STATUSSYSTEM1.workers.size());
-    Assert.assertEquals(3, STATUSSYSTEM2.workers.size());
-    Assert.assertEquals(3, STATUSSYSTEM3.workers.size());
+    Assert.assertEquals(3, STATUSSYSTEM1.workersMap.size());
+    Assert.assertEquals(3, STATUSSYSTEM2.workersMap.size());
+    Assert.assertEquals(3, STATUSSYSTEM3.workersMap.size());
 
-    assertWorkers(STATUSSYSTEM1.workers);
-    assertWorkers(STATUSSYSTEM2.workers);
-    assertWorkers(STATUSSYSTEM3.workers);
+    assertWorkers(STATUSSYSTEM1.workersMap.values());
+    assertWorkers(STATUSSYSTEM2.workersMap.values());
+    assertWorkers(STATUSSYSTEM3.workersMap.values());
   }
 
-  private void assertWorkers(Set<WorkerInfo> workerInfos) {
+  private void assertWorkers(Collection<WorkerInfo> workerInfos) {
     for (WorkerInfo workerInfo : workerInfos) {
       assertWorker(workerInfo);
     }
@@ -479,9 +479,9 @@ public class RatisMasterStatusSystemSuiteJ {
         HOSTNAME1, RPCPORT1, PUSHPORT1, FETCHPORT1, REPLICATEPORT1, 
getNewReqeustId());
     Thread.sleep(3000L);
 
-    Assert.assertEquals(2, STATUSSYSTEM1.workers.size());
-    Assert.assertEquals(2, STATUSSYSTEM2.workers.size());
-    Assert.assertEquals(2, STATUSSYSTEM3.workers.size());
+    Assert.assertEquals(2, STATUSSYSTEM1.workersMap.size());
+    Assert.assertEquals(2, STATUSSYSTEM2.workersMap.size());
+    Assert.assertEquals(2, STATUSSYSTEM3.workersMap.size());
   }
 
   @Test
@@ -571,21 +571,21 @@ public class RatisMasterStatusSystemSuiteJ {
 
     Assert.assertEquals(
         0,
-        statusSystem.workers.stream()
+        statusSystem.workersMap.values().stream()
             .filter(w -> w.host().equals(HOSTNAME1))
             .findFirst()
             .get()
             .usedSlots());
     Assert.assertEquals(
         0,
-        statusSystem.workers.stream()
+        statusSystem.workersMap.values().stream()
             .filter(w -> w.host().equals(HOSTNAME2))
             .findFirst()
             .get()
             .usedSlots());
     Assert.assertEquals(
         0,
-        statusSystem.workers.stream()
+        statusSystem.workersMap.values().stream()
             .filter(w -> w.host().equals(HOSTNAME3))
             .findFirst()
             .get()
@@ -632,22 +632,22 @@ public class RatisMasterStatusSystemSuiteJ {
         getNewReqeustId());
     Thread.sleep(3000L);
 
-    Assert.assertEquals(3, STATUSSYSTEM1.workers.size());
-    Assert.assertEquals(3, STATUSSYSTEM2.workers.size());
-    Assert.assertEquals(3, STATUSSYSTEM3.workers.size());
+    Assert.assertEquals(3, STATUSSYSTEM1.workersMap.size());
+    Assert.assertEquals(3, STATUSSYSTEM2.workersMap.size());
+    Assert.assertEquals(3, STATUSSYSTEM3.workersMap.size());
 
     Map<String, Map<String, Integer>> workersToAllocate = new HashMap<>();
     Map<String, Integer> allocations = new HashMap<>();
     allocations.put("disk1", 5);
     workersToAllocate.put(
-        statusSystem.workers.stream()
+        statusSystem.workersMap.values().stream()
             .filter(w -> w.host().equals(HOSTNAME1))
             .findFirst()
             .get()
             .toUniqueId(),
         allocations);
     workersToAllocate.put(
-        statusSystem.workers.stream()
+        statusSystem.workersMap.values().stream()
             .filter(w -> w.host().equals(HOSTNAME2))
             .findFirst()
             .get()
@@ -661,21 +661,21 @@ public class RatisMasterStatusSystemSuiteJ {
 
     Assert.assertEquals(
         0,
-        STATUSSYSTEM1.workers.stream()
+        STATUSSYSTEM1.workersMap.values().stream()
             .filter(w -> w.host().equals(HOSTNAME1))
             .findFirst()
             .get()
             .usedSlots());
     Assert.assertEquals(
         0,
-        STATUSSYSTEM2.workers.stream()
+        STATUSSYSTEM2.workersMap.values().stream()
             .filter(w -> w.host().equals(HOSTNAME1))
             .findFirst()
             .get()
             .usedSlots());
     Assert.assertEquals(
         0,
-        STATUSSYSTEM3.workers.stream()
+        STATUSSYSTEM3.workersMap.values().stream()
             .filter(w -> w.host().equals(HOSTNAME1))
             .findFirst()
             .get()
@@ -1087,21 +1087,21 @@ public class RatisMasterStatusSystemSuiteJ {
   public void resetStatus() {
     STATUSSYSTEM1.registeredAppAndShuffles.clear();
     STATUSSYSTEM1.hostnameSet.clear();
-    STATUSSYSTEM1.workers.clear();
+    STATUSSYSTEM1.workersMap.clear();
     STATUSSYSTEM1.appHeartbeatTime.clear();
     STATUSSYSTEM1.excludedWorkers.clear();
     STATUSSYSTEM1.workerLostEvents.clear();
 
     STATUSSYSTEM2.registeredAppAndShuffles.clear();
     STATUSSYSTEM2.hostnameSet.clear();
-    STATUSSYSTEM2.workers.clear();
+    STATUSSYSTEM2.workersMap.clear();
     STATUSSYSTEM2.appHeartbeatTime.clear();
     STATUSSYSTEM2.excludedWorkers.clear();
     STATUSSYSTEM2.workerLostEvents.clear();
 
     STATUSSYSTEM3.registeredAppAndShuffles.clear();
     STATUSSYSTEM3.hostnameSet.clear();
-    STATUSSYSTEM3.workers.clear();
+    STATUSSYSTEM3.workersMap.clear();
     STATUSSYSTEM3.appHeartbeatTime.clear();
     STATUSSYSTEM3.excludedWorkers.clear();
     STATUSSYSTEM3.workerLostEvents.clear();
@@ -1280,7 +1280,7 @@ public class RatisMasterStatusSystemSuiteJ {
     statusSystem.handleReportWorkerUnavailable(unavailableWorkers, 
getNewReqeustId());
 
     Thread.sleep(3000L);
-    Assert.assertEquals(2, STATUSSYSTEM1.workers.size());
+    Assert.assertEquals(2, STATUSSYSTEM1.workersMap.size());
 
     Assert.assertEquals(1, STATUSSYSTEM1.shutdownWorkers.size());
     Assert.assertEquals(1, STATUSSYSTEM2.shutdownWorkers.size());

Reply via email to