This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 81a0d5113 [CELEBORN-1660] Cache available workers and only count the 
available workers device free capacity
81a0d5113 is described below

commit 81a0d5113cc4c985e59437fc2196c9b4da6ef64e
Author: Wang, Fei <[email protected]>
AuthorDate: Thu Nov 14 11:10:45 2024 +0800

    [CELEBORN-1660] Cache available workers and only count the available 
workers device free capacity
    
    ### What changes were proposed in this pull request?
    1. cache the available workers
    2. Only count the available workers device free capacity.
    3. place the metrics_AvailableWorkerCount_Value in overall and 
metrics_WorkerCount_Value in `Master` part
    
    ### Why are the changes needed?
    Cache  the available workers to reduce the computation that need to loop 
the workers frequently.
    To have an accurate device capacity overview that does not include the 
excluded workers, decommissioning workers, etc.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    UT.
    
    <img width="1705" alt="image" 
src="https://github.com/user-attachments/assets/bee17b4e-785d-4112-8410-dbb684270ec0";>
    
    Closes #2827 from turboFei/device_free.
    
    Authored-by: Wang, Fei <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 assets/grafana/celeborn-dashboard.json             | 12 ++--
 docs/monitoring.md                                 |  2 +-
 .../master/clustermeta/AbstractMetaManager.java    | 70 +++++++++++++++++--
 .../clustermeta/SingleMasterMetaManager.java       |  2 +-
 .../deploy/master/clustermeta/ha/MetaHandler.java  |  2 +-
 .../celeborn/service/deploy/master/Master.scala    | 20 +++---
 .../clustermeta/DefaultMetaSystemSuiteJ.java       | 10 +++
 .../clustermeta/ha/MasterStateMachineSuiteJ.java   | 11 +--
 .../ha/RatisMasterStatusSystemSuiteJ.java          | 79 ++++++++++++++++++++++
 .../ChangePartitionManagerUpdateWorkersSuite.scala |  5 +-
 10 files changed, 185 insertions(+), 28 deletions(-)

diff --git a/assets/grafana/celeborn-dashboard.json 
b/assets/grafana/celeborn-dashboard.json
index 4d29a6aca..a60d3b41d 100644
--- a/assets/grafana/celeborn-dashboard.json
+++ b/assets/grafana/celeborn-dashboard.json
@@ -165,7 +165,7 @@
             "type": "prometheus",
             "uid": "${DS_PROMETHEUS}"
           },
-          "description": "The count of active workers.",
+          "description": "The count of workers in available list.",
           "fieldConfig": {
             "defaults": {
               "color": {
@@ -243,12 +243,12 @@
                 "type": "prometheus",
                 "uid": "${DS_PROMETHEUS}"
               },
-              "expr": "metrics_WorkerCount_Value{instance=~\"${instance}\"}",
+              "expr": 
"metrics_AvailableWorkerCount_Value{instance=~\"${instance}\"}",
               "legendFormat": "${baseLegend}",
               "refId": "A"
             }
           ],
-          "title": "metrics_WorkerCount_Value",
+          "title": "metrics_AvailableWorkerCount_Value",
           "type": "timeseries"
         },
         {
@@ -1287,7 +1287,7 @@
             "type": "prometheus",
             "uid": "${DS_PROMETHEUS}"
           },
-          "description": "The count of workers in available list.",
+          "description": "The count of active workers.",
           "fieldConfig": {
             "defaults": {
               "color": {
@@ -1366,13 +1366,13 @@
                 "uid": "${DS_PROMETHEUS}"
               },
               "editorMode": "code",
-              "expr": 
"metrics_AvailableWorkerCount_Value{instance=~\"${instance}\"}",
+              "expr": "metrics_WorkerCount_Value{instance=~\"${instance}\"}",
               "legendFormat": "${baseLegend}",
               "range": true,
               "refId": "A"
             }
           ],
-          "title": "metrics_AvailableWorkerCount_Value",
+          "title": "metrics_WorkerCount_Value",
           "type": "timeseries"
         },
         {
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 93e997e8e..553672912 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -95,7 +95,7 @@ These metrics are exposed by Celeborn master.
     | Metric Name              | Description                                   
                                    |
     
|--------------------------|-----------------------------------------------------------------------------------|
     | RegisteredShuffleCount   | The count of registered shuffle.              
                                    |
-    | DeviceCelebornFreeBytes  | The actual usable space of Celeborn for 
device.                                   |
+    | DeviceCelebornFreeBytes  | The actual usable space of Celeborn available 
workers for device.                 |
     | DeviceCelebornTotalBytes | The total space of Celeborn for device.       
                                    |
     | RunningApplicationCount  | The count of running applications.            
                                    |
     | ActiveShuffleSize        | The active shuffle size of workers.           
                                    |
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index cede0284a..14c6108bb 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -32,6 +32,7 @@ import java.util.stream.Collectors;
 import scala.Option;
 import scala.Tuple2;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
 import org.slf4j.Logger;
@@ -57,6 +58,13 @@ import org.apache.celeborn.common.util.PbSerDeUtils;
 import org.apache.celeborn.common.util.Utils;
 import org.apache.celeborn.common.util.WorkerStatusUtils;
 
+/**
+ * Note: Do not update the worker collections directly from outside the 
metadata manager, especially
+ * {@link #workersMap}, {@link #workerEventInfos}, {@link #shutdownWorkers}, 
{@link
+ * #excludedWorkers}, {@link #manuallyExcludedWorkers}, {@link 
#availableWorkers}.
+ *
+ * <p>All updates should be done through the provided methods to ensure 
consistency.
+ */
 public abstract class AbstractMetaManager implements IMetadataHandler {
   private static final Logger LOG = 
LoggerFactory.getLogger(AbstractMetaManager.class);
 
@@ -65,6 +73,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
       JavaUtils.newConcurrentHashMap();
   public final Set<String> hostnameSet = ConcurrentHashMap.newKeySet();
   public final Map<String, WorkerInfo> workersMap = 
JavaUtils.newConcurrentHashMap();
+  public final Set<WorkerInfo> availableWorkers = 
ConcurrentHashMap.newKeySet();
 
   public final ConcurrentHashMap<WorkerInfo, Long> lostWorkers = 
JavaUtils.newConcurrentHashMap();
   public final ConcurrentHashMap<WorkerInfo, WorkerEventInfo> workerEventInfos 
=
@@ -161,10 +170,33 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     applicationMetas.remove(appId);
   }
 
-  public void updateWorkerExcludeMeta(
+  @VisibleForTesting
+  public void updateExcludedWorkersMeta(
+      List<WorkerInfo> workersToAdd, List<WorkerInfo> workersToRemove) {
+    workersToAdd.forEach(
+        worker -> {
+          excludedWorkers.add(worker);
+          availableWorkers.remove(worker);
+        });
+    workersToRemove.forEach(
+        worker -> {
+          excludedWorkers.remove(worker);
+          updateAvailableWorkers(worker);
+        });
+  }
+
+  public void updateManuallyExcludedWorkersMeta(
       List<WorkerInfo> workersToAdd, List<WorkerInfo> workersToRemove) {
-    manuallyExcludedWorkers.addAll(workersToAdd);
-    workersToRemove.forEach(manuallyExcludedWorkers::remove);
+    workersToAdd.forEach(
+        worker -> {
+          manuallyExcludedWorkers.add(worker);
+          availableWorkers.remove(worker);
+        });
+    workersToRemove.forEach(
+        worker -> {
+          manuallyExcludedWorkers.remove(worker);
+          updateAvailableWorkers(worker);
+        });
   }
 
   public void reviseLostShuffles(String appId, List<Integer> lostShuffles) {
@@ -183,6 +215,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     synchronized (workersMap) {
       workersMap.remove(worker.toUniqueId());
       lostWorkers.put(worker, System.currentTimeMillis());
+      availableWorkers.remove(worker);
     }
     excludedWorkers.remove(worker);
     workerLostEvents.remove(worker);
@@ -195,6 +228,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     synchronized (workersMap) {
       workersMap.remove(worker.toUniqueId());
       lostWorkers.put(worker, System.currentTimeMillis());
+      availableWorkers.remove(worker);
     }
     excludedWorkers.remove(worker);
   }
@@ -207,6 +241,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
           shutdownWorkers.remove(workerInfo);
           workerEventInfos.remove(workerInfo);
           decommissionWorkers.remove(workerInfo);
+          updateAvailableWorkers(workerInfo);
         }
       }
     }
@@ -266,6 +301,11 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
       // only unblack if numSlots larger than 0
       excludedWorkers.remove(worker);
     }
+
+    // try to update the available workers if the worker status is Normal
+    if (workerStatus.getState() == PbWorkerStatus.State.Normal) {
+      updateAvailableWorkers(worker);
+    }
   }
 
   public void updateRegisterWorkerMeta(
@@ -304,6 +344,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
       excludedWorkers.remove(workerInfo);
       workerEventInfos.remove(workerInfo);
       decommissionWorkers.remove(workerInfo);
+      updateAvailableWorkers(workerInfo);
     }
   }
 
@@ -443,6 +484,11 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
           .getApplicationMetasMap()
           .forEach(
               (key, value) -> applicationMetas.put(key, 
PbSerDeUtils.fromPbApplicationMeta(value)));
+
+      availableWorkers.addAll(
+          workersMap.values().stream()
+              .filter(worker -> isWorkerAvailable(worker))
+              .collect(Collectors.toSet()));
     } catch (Exception e) {
       throw new IOException(e);
     }
@@ -462,6 +508,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     registeredAppAndShuffles.clear();
     hostnameSet.clear();
     workersMap.clear();
+    availableWorkers.clear();
     lostWorkers.clear();
     appHeartbeatTime.clear();
     excludedWorkers.clear();
@@ -480,6 +527,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
   public void updateMetaByReportWorkerUnavailable(List<WorkerInfo> 
failedWorkers) {
     synchronized (this.workersMap) {
       shutdownWorkers.addAll(failedWorkers);
+      availableWorkers.removeAll(failedWorkers);
     }
   }
 
@@ -494,8 +542,10 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
         if (workerEventInfo == null || 
!workerEventInfo.isSameEvent(eventType.getNumber())) {
           if (eventType == ResourceProtos.WorkerEventType.None) {
             workerEventInfos.remove(workerInfo);
+            updateAvailableWorkers(workerInfo);
           } else {
             workerEventInfos.put(workerInfo, new 
WorkerEventInfo(eventType.getNumber(), eventTime));
+            availableWorkers.remove(workerInfo);
           }
         }
       }
@@ -505,6 +555,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
   public void updateMetaByReportWorkerDecommission(List<WorkerInfo> workers) {
     synchronized (this.workersMap) {
       decommissionWorkers.addAll(workers);
+      availableWorkers.removeAll(workers);
     }
   }
 
@@ -541,7 +592,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     workers.forEach(workerInfo -> 
workerInfo.updateDiskMaxSlots(estimatedPartitionSize));
   }
 
-  public boolean isWorkerAvailable(WorkerInfo workerInfo) {
+  private boolean isWorkerAvailable(WorkerInfo workerInfo) {
     return (workerInfo.getWorkerStatus().getState() == 
PbWorkerStatus.State.Normal
             && !workerEventInfos.containsKey(workerInfo))
         && !excludedWorkers.contains(workerInfo)
@@ -549,6 +600,17 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
         && !manuallyExcludedWorkers.contains(workerInfo);
   }
 
+  private void updateAvailableWorkers(WorkerInfo worker) {
+    synchronized (workersMap) {
+      Optional<WorkerInfo> workerInfo = 
Optional.ofNullable(workersMap.get(worker.toUniqueId()));
+      if (workerInfo.map(this::isWorkerAvailable).orElse(false)) {
+        availableWorkers.add(workerInfo.get());
+      } else {
+        availableWorkers.remove(worker);
+      }
+    }
+  }
+
   public void updateApplicationMeta(ApplicationMeta applicationMeta) {
     applicationMetas.putIfAbsent(applicationMeta.appId(), applicationMeta);
   }
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
index 033be796d..4ddf371bf 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
@@ -92,7 +92,7 @@ public class SingleMasterMetaManager extends 
AbstractMetaManager {
   @Override
   public void handleWorkerExclude(
       List<WorkerInfo> workersToAdd, List<WorkerInfo> workersToRemove, String 
requestId) {
-    updateWorkerExcludeMeta(workersToAdd, workersToRemove);
+    updateManuallyExcludedWorkersMeta(workersToAdd, workersToRemove);
   }
 
   @Override
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
index 5ce91baed..e795c09e5 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
@@ -170,7 +170,7 @@ public class MetaHandler {
               
addAddresses.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
           List<WorkerInfo> workersToRemove =
               
removeAddresses.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
-          metaSystem.updateWorkerExcludeMeta(workersToAdd, workersToRemove);
+          metaSystem.updateManuallyExcludedWorkersMeta(workersToAdd, 
workersToRemove);
           break;
 
         case WorkerLost:
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index b3da11461..3a684d9a1 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -229,9 +229,7 @@ private[celeborn] class Master(
     statusSystem.excludedWorkers.size + 
statusSystem.manuallyExcludedWorkers.size
   }
   masterSource.addGauge(MasterSource.AVAILABLE_WORKER_COUNT) { () =>
-    statusSystem.workersMap.values().asScala.count { w =>
-      statusSystem.isWorkerAvailable(w)
-    }
+    statusSystem.availableWorkers.size()
   }
   masterSource.addGauge(MasterSource.SHUTDOWN_WORKER_COUNT) { () =>
     statusSystem.shutdownWorkers.size
@@ -274,7 +272,7 @@ private[celeborn] class Master(
   }
 
   masterSource.addGauge(MasterSource.DEVICE_CELEBORN_FREE_CAPACITY) { () =>
-    
statusSystem.workersMap.values().asScala.toList.map(_.totalActualUsableSpace()).sum
+    
statusSystem.availableWorkers.asScala.toList.map(_.totalActualUsableSpace()).sum
   }
 
   masterSource.addGauge(MasterSource.IS_ACTIVE_MASTER) { () => isMasterActive }
@@ -1135,7 +1133,7 @@ private[celeborn] class Master(
     if (shouldResponse) {
       // UserResourceConsumption and DiskInfo are eliminated from WorkerInfo
       // during serialization of HeartbeatFromApplicationResponse
-      var appRelatedShuffles =
+      val appRelatedShuffles =
         statusSystem.registeredAppAndShuffles.getOrDefault(appId, 
Collections.emptySet())
       context.reply(HeartbeatFromApplicationResponse(
         StatusCode.SUCCESS,
@@ -1237,7 +1235,7 @@ private[celeborn] class Master(
   }
 
   private def handleCheckWorkersAvailable(context: RpcCallContext): Unit = {
-    context.reply(CheckWorkersAvailableResponse(!workersAvailable().isEmpty))
+    
context.reply(CheckWorkersAvailableResponse(!statusSystem.availableWorkers.isEmpty))
   }
 
   private def handleWorkerEvent(
@@ -1251,9 +1249,13 @@ private[celeborn] class Master(
 
   private def workersAvailable(
       tmpExcludedWorkerList: Set[WorkerInfo] = Set.empty): 
util.List[WorkerInfo] = {
-    statusSystem.workersMap.values().asScala.filter { w =>
-      statusSystem.isWorkerAvailable(w) && !tmpExcludedWorkerList.contains(w)
-    }.toList.asJava
+    if (tmpExcludedWorkerList.isEmpty) {
+      new util.ArrayList[WorkerInfo](statusSystem.availableWorkers)
+    } else {
+      val availableWorkers = new util.HashSet(statusSystem.availableWorkers)
+      tmpExcludedWorkerList.foreach(availableWorkers.remove)
+      new util.ArrayList[WorkerInfo](availableWorkers)
+    }
   }
 
   private def handleRequestForApplicationMeta(
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
index 095bb9496..3f4c8d064 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
@@ -160,6 +160,7 @@ public class DefaultMetaSystemSuiteJ {
         getNewReqeustId());
 
     assertEquals(3, statusSystem.workersMap.size());
+    assertEquals(3, statusSystem.availableWorkers.size());
   }
 
   @Test
@@ -211,10 +212,12 @@ public class DefaultMetaSystemSuiteJ {
     statusSystem.handleWorkerExclude(
         Arrays.asList(workerInfo1, workerInfo2), Collections.emptyList(), 
getNewReqeustId());
     assertEquals(2, statusSystem.manuallyExcludedWorkers.size());
+    assertEquals(0, statusSystem.availableWorkers.size());
 
     statusSystem.handleWorkerExclude(
         Collections.emptyList(), Collections.singletonList(workerInfo1), 
getNewReqeustId());
     assertEquals(1, statusSystem.manuallyExcludedWorkers.size());
+    assertEquals(1, statusSystem.availableWorkers.size());
   }
 
   @Test
@@ -256,6 +259,7 @@ public class DefaultMetaSystemSuiteJ {
     statusSystem.handleWorkerLost(
         HOSTNAME1, RPCPORT1, PUSHPORT1, FETCHPORT1, REPLICATEPORT1, 
getNewReqeustId());
     assertEquals(2, statusSystem.workersMap.size());
+    assertEquals(2, statusSystem.availableWorkers.size());
   }
 
   private static final String APPID1 = "appId1";
@@ -700,6 +704,7 @@ public class DefaultMetaSystemSuiteJ {
         getNewReqeustId());
 
     assertEquals(statusSystem.excludedWorkers.size(), 1);
+    assertEquals(statusSystem.availableWorkers.size(), 2);
 
     statusSystem.handleWorkerHeartbeat(
         HOSTNAME2,
@@ -716,6 +721,7 @@ public class DefaultMetaSystemSuiteJ {
         getNewReqeustId());
 
     assertEquals(statusSystem.excludedWorkers.size(), 2);
+    assertEquals(statusSystem.availableWorkers.size(), 1);
 
     statusSystem.handleWorkerHeartbeat(
         HOSTNAME3,
@@ -732,6 +738,7 @@ public class DefaultMetaSystemSuiteJ {
         getNewReqeustId());
 
     assertEquals(statusSystem.excludedWorkers.size(), 2);
+    assertEquals(statusSystem.availableWorkers.size(), 1);
 
     statusSystem.handleWorkerHeartbeat(
         HOSTNAME3,
@@ -748,6 +755,7 @@ public class DefaultMetaSystemSuiteJ {
         getNewReqeustId());
 
     assertEquals(statusSystem.excludedWorkers.size(), 3);
+    assertEquals(statusSystem.availableWorkers.size(), 0);
   }
 
   @Test
@@ -801,6 +809,7 @@ public class DefaultMetaSystemSuiteJ {
     statusSystem.handleReportWorkerUnavailable(failedWorkers, 
getNewReqeustId());
     assertEquals(1, statusSystem.shutdownWorkers.size());
     assertTrue(statusSystem.excludedWorkers.isEmpty());
+    assertEquals(2, statusSystem.availableWorkers.size());
   }
 
   @Test
@@ -900,6 +909,7 @@ public class DefaultMetaSystemSuiteJ {
     statusSystem.handleReportWorkerDecommission(workers, getNewReqeustId());
     assertEquals(1, statusSystem.decommissionWorkers.size());
     assertTrue(statusSystem.excludedWorkers.isEmpty());
+    assertEquals(2, statusSystem.availableWorkers.size());
   }
 
   @Test
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
index d2539ba5d..1b8b57d74 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
@@ -205,12 +205,11 @@ public class MasterStateMachineSuiteJ extends 
RatisBaseSuiteJ {
     String host2 = "host2";
     String host3 = "host3";
 
-    masterStatusSystem.excludedWorkers.add(info1);
-    masterStatusSystem.excludedWorkers.add(info2);
-    masterStatusSystem.excludedWorkers.add(info3);
+    masterStatusSystem.updateExcludedWorkersMeta(
+        Arrays.asList(info1, info2, info3), Collections.emptyList());
 
-    masterStatusSystem.manuallyExcludedWorkers.add(info1);
-    masterStatusSystem.manuallyExcludedWorkers.add(info2);
+    masterStatusSystem.updateManuallyExcludedWorkersMeta(
+        Arrays.asList(info1, info2), Collections.emptyList());
 
     masterStatusSystem.hostnameSet.add(host1);
     masterStatusSystem.hostnameSet.add(host2);
@@ -245,10 +244,12 @@ public class MasterStateMachineSuiteJ extends 
RatisBaseSuiteJ {
     masterStatusSystem.excludedWorkers.clear();
     masterStatusSystem.manuallyExcludedWorkers.clear();
     masterStatusSystem.workersMap.clear();
+    masterStatusSystem.availableWorkers.clear();
 
     masterStatusSystem.restoreMetaFromFile(tmpFile);
 
     Assert.assertEquals(3, masterStatusSystem.workersMap.size());
+    Assert.assertEquals(3, masterStatusSystem.availableWorkers.size());
     Assert.assertEquals(3, masterStatusSystem.excludedWorkers.size());
     Assert.assertEquals(2, masterStatusSystem.manuallyExcludedWorkers.size());
     Assert.assertEquals(3, masterStatusSystem.hostnameSet.size());
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index 5f9e07de2..2d6d413cb 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -353,6 +353,10 @@ public class RatisMasterStatusSystemSuiteJ {
     Assert.assertEquals(3, STATUSSYSTEM2.workersMap.size());
     Assert.assertEquals(3, STATUSSYSTEM3.workersMap.size());
 
+    Assert.assertEquals(3, STATUSSYSTEM1.availableWorkers.size());
+    Assert.assertEquals(3, STATUSSYSTEM2.availableWorkers.size());
+    Assert.assertEquals(3, STATUSSYSTEM3.availableWorkers.size());
+
     assertWorkers(STATUSSYSTEM1.workersMap.values());
     assertWorkers(STATUSSYSTEM2.workersMap.values());
     assertWorkers(STATUSSYSTEM3.workersMap.values());
@@ -429,6 +433,10 @@ public class RatisMasterStatusSystemSuiteJ {
     Assert.assertEquals(2, STATUSSYSTEM2.manuallyExcludedWorkers.size());
     Assert.assertEquals(2, STATUSSYSTEM3.manuallyExcludedWorkers.size());
 
+    Assert.assertEquals(0, STATUSSYSTEM1.availableWorkers.size());
+    Assert.assertEquals(0, STATUSSYSTEM2.availableWorkers.size());
+    Assert.assertEquals(0, STATUSSYSTEM3.availableWorkers.size());
+
     statusSystem.handleWorkerExclude(
         Collections.emptyList(), Collections.singletonList(workerInfo1), 
getNewReqeustId());
     Thread.sleep(3000L);
@@ -436,6 +444,10 @@ public class RatisMasterStatusSystemSuiteJ {
     Assert.assertEquals(1, STATUSSYSTEM1.manuallyExcludedWorkers.size());
     Assert.assertEquals(1, STATUSSYSTEM2.manuallyExcludedWorkers.size());
     Assert.assertEquals(1, STATUSSYSTEM3.manuallyExcludedWorkers.size());
+
+    Assert.assertEquals(1, STATUSSYSTEM1.availableWorkers.size());
+    Assert.assertEquals(1, STATUSSYSTEM2.availableWorkers.size());
+    Assert.assertEquals(1, STATUSSYSTEM3.availableWorkers.size());
   }
 
   @Test
@@ -484,6 +496,10 @@ public class RatisMasterStatusSystemSuiteJ {
     Assert.assertEquals(2, STATUSSYSTEM1.workersMap.size());
     Assert.assertEquals(2, STATUSSYSTEM2.workersMap.size());
     Assert.assertEquals(2, STATUSSYSTEM3.workersMap.size());
+
+    Assert.assertEquals(2, STATUSSYSTEM1.availableWorkers.size());
+    Assert.assertEquals(2, STATUSSYSTEM2.availableWorkers.size());
+    Assert.assertEquals(2, STATUSSYSTEM3.availableWorkers.size());
   }
 
   @Test
@@ -1025,6 +1041,14 @@ public class RatisMasterStatusSystemSuiteJ {
     Assert.assertEquals(1, STATUSSYSTEM2.excludedWorkers.size());
     Assert.assertEquals(1, STATUSSYSTEM3.excludedWorkers.size());
 
+    Assert.assertEquals(3, STATUSSYSTEM1.workersMap.size());
+    Assert.assertEquals(3, STATUSSYSTEM2.workersMap.size());
+    Assert.assertEquals(3, STATUSSYSTEM3.workersMap.size());
+
+    Assert.assertEquals(2, STATUSSYSTEM1.availableWorkers.size());
+    Assert.assertEquals(2, STATUSSYSTEM2.availableWorkers.size());
+    Assert.assertEquals(2, STATUSSYSTEM3.availableWorkers.size());
+
     statusSystem.handleWorkerHeartbeat(
         HOSTNAME2,
         RPCPORT2,
@@ -1045,6 +1069,16 @@ public class RatisMasterStatusSystemSuiteJ {
     Assert.assertEquals(2, STATUSSYSTEM2.excludedWorkers.size());
     Assert.assertEquals(2, STATUSSYSTEM3.excludedWorkers.size());
 
+    Assert.assertEquals(3, statusSystem.workersMap.size());
+    Assert.assertEquals(3, STATUSSYSTEM1.workersMap.size());
+    Assert.assertEquals(3, STATUSSYSTEM2.workersMap.size());
+    Assert.assertEquals(3, STATUSSYSTEM3.workersMap.size());
+
+    Assert.assertEquals(1, statusSystem.availableWorkers.size());
+    Assert.assertEquals(1, STATUSSYSTEM1.availableWorkers.size());
+    Assert.assertEquals(1, STATUSSYSTEM2.availableWorkers.size());
+    Assert.assertEquals(1, STATUSSYSTEM3.availableWorkers.size());
+
     statusSystem.handleWorkerHeartbeat(
         HOSTNAME1,
         RPCPORT1,
@@ -1065,6 +1099,16 @@ public class RatisMasterStatusSystemSuiteJ {
     Assert.assertEquals(1, STATUSSYSTEM2.excludedWorkers.size());
     Assert.assertEquals(1, STATUSSYSTEM3.excludedWorkers.size());
 
+    Assert.assertEquals(3, statusSystem.workersMap.size());
+    Assert.assertEquals(3, STATUSSYSTEM1.workersMap.size());
+    Assert.assertEquals(3, STATUSSYSTEM2.workersMap.size());
+    Assert.assertEquals(3, STATUSSYSTEM3.workersMap.size());
+
+    Assert.assertEquals(2, statusSystem.availableWorkers.size());
+    Assert.assertEquals(2, STATUSSYSTEM1.availableWorkers.size());
+    Assert.assertEquals(2, STATUSSYSTEM2.availableWorkers.size());
+    Assert.assertEquals(2, STATUSSYSTEM3.availableWorkers.size());
+
     statusSystem.handleWorkerHeartbeat(
         HOSTNAME1,
         RPCPORT1,
@@ -1083,6 +1127,16 @@ public class RatisMasterStatusSystemSuiteJ {
     Assert.assertEquals(2, STATUSSYSTEM1.excludedWorkers.size());
     Assert.assertEquals(2, STATUSSYSTEM2.excludedWorkers.size());
     Assert.assertEquals(2, STATUSSYSTEM3.excludedWorkers.size());
+
+    Assert.assertEquals(3, statusSystem.workersMap.size());
+    Assert.assertEquals(3, STATUSSYSTEM1.workersMap.size());
+    Assert.assertEquals(3, STATUSSYSTEM2.workersMap.size());
+    Assert.assertEquals(3, STATUSSYSTEM3.workersMap.size());
+
+    Assert.assertEquals(1, statusSystem.availableWorkers.size());
+    Assert.assertEquals(1, STATUSSYSTEM1.availableWorkers.size());
+    Assert.assertEquals(1, STATUSSYSTEM2.availableWorkers.size());
+    Assert.assertEquals(1, STATUSSYSTEM3.availableWorkers.size());
   }
 
   @Before
@@ -1090,6 +1144,7 @@ public class RatisMasterStatusSystemSuiteJ {
     STATUSSYSTEM1.registeredAppAndShuffles.clear();
     STATUSSYSTEM1.hostnameSet.clear();
     STATUSSYSTEM1.workersMap.clear();
+    STATUSSYSTEM1.availableWorkers.clear();
     STATUSSYSTEM1.appHeartbeatTime.clear();
     STATUSSYSTEM1.excludedWorkers.clear();
     STATUSSYSTEM1.workerLostEvents.clear();
@@ -1097,6 +1152,7 @@ public class RatisMasterStatusSystemSuiteJ {
     STATUSSYSTEM2.registeredAppAndShuffles.clear();
     STATUSSYSTEM2.hostnameSet.clear();
     STATUSSYSTEM2.workersMap.clear();
+    STATUSSYSTEM2.availableWorkers.clear();
     STATUSSYSTEM2.appHeartbeatTime.clear();
     STATUSSYSTEM2.excludedWorkers.clear();
     STATUSSYSTEM2.workerLostEvents.clear();
@@ -1104,6 +1160,7 @@ public class RatisMasterStatusSystemSuiteJ {
     STATUSSYSTEM3.registeredAppAndShuffles.clear();
     STATUSSYSTEM3.hostnameSet.clear();
     STATUSSYSTEM3.workersMap.clear();
+    STATUSSYSTEM3.availableWorkers.clear();
     STATUSSYSTEM3.appHeartbeatTime.clear();
     STATUSSYSTEM3.excludedWorkers.clear();
     STATUSSYSTEM3.workerLostEvents.clear();
@@ -1222,6 +1279,9 @@ public class RatisMasterStatusSystemSuiteJ {
     Assert.assertEquals(0, STATUSSYSTEM1.excludedWorkers.size());
     Assert.assertEquals(0, STATUSSYSTEM2.excludedWorkers.size());
     Assert.assertEquals(0, STATUSSYSTEM3.excludedWorkers.size());
+    Assert.assertEquals(2, STATUSSYSTEM1.availableWorkers.size());
+    Assert.assertEquals(2, STATUSSYSTEM2.availableWorkers.size());
+    Assert.assertEquals(2, STATUSSYSTEM3.availableWorkers.size());
   }
 
   @Test
@@ -1292,6 +1352,10 @@ public class RatisMasterStatusSystemSuiteJ {
     Assert.assertEquals(1, STATUSSYSTEM2.lostWorkers.size());
     Assert.assertEquals(1, STATUSSYSTEM3.lostWorkers.size());
 
+    Assert.assertEquals(2, STATUSSYSTEM1.availableWorkers.size());
+    Assert.assertEquals(2, STATUSSYSTEM2.availableWorkers.size());
+    Assert.assertEquals(2, STATUSSYSTEM3.availableWorkers.size());
+
     statusSystem.handleRemoveWorkersUnavailableInfo(unavailableWorkers, 
getNewReqeustId());
     Thread.sleep(3000L);
 
@@ -1302,6 +1366,10 @@ public class RatisMasterStatusSystemSuiteJ {
     Assert.assertEquals(0, STATUSSYSTEM1.lostWorkers.size());
     Assert.assertEquals(0, STATUSSYSTEM2.lostWorkers.size());
     Assert.assertEquals(0, STATUSSYSTEM3.lostWorkers.size());
+
+    Assert.assertEquals(2, STATUSSYSTEM1.availableWorkers.size());
+    Assert.assertEquals(2, STATUSSYSTEM2.availableWorkers.size());
+    Assert.assertEquals(2, STATUSSYSTEM3.availableWorkers.size());
   }
 
   @Test
@@ -1397,6 +1465,10 @@ public class RatisMasterStatusSystemSuiteJ {
     Assert.assertEquals(2, STATUSSYSTEM2.workerEventInfos.size());
     Assert.assertEquals(2, STATUSSYSTEM3.workerEventInfos.size());
 
+    Assert.assertEquals(1, STATUSSYSTEM1.availableWorkers.size());
+    Assert.assertEquals(1, STATUSSYSTEM2.availableWorkers.size());
+    Assert.assertEquals(1, STATUSSYSTEM3.availableWorkers.size());
+
     Assert.assertTrue(STATUSSYSTEM1.workerEventInfos.containsKey(workerInfo1));
     Assert.assertTrue(STATUSSYSTEM1.workerEventInfos.containsKey(workerInfo2));
 
@@ -1416,6 +1488,10 @@ public class RatisMasterStatusSystemSuiteJ {
     Assert.assertEquals(
         WorkerEventType.Decommission,
         STATUSSYSTEM1.workerEventInfos.get(workerInfo2).getEventType());
+
+    Assert.assertEquals(2, STATUSSYSTEM1.availableWorkers.size());
+    Assert.assertEquals(2, STATUSSYSTEM2.availableWorkers.size());
+    Assert.assertEquals(2, STATUSSYSTEM3.availableWorkers.size());
   }
 
   @Test
@@ -1504,6 +1580,9 @@ public class RatisMasterStatusSystemSuiteJ {
     Assert.assertEquals(0, STATUSSYSTEM1.excludedWorkers.size());
     Assert.assertEquals(0, STATUSSYSTEM2.excludedWorkers.size());
     Assert.assertEquals(0, STATUSSYSTEM3.excludedWorkers.size());
+    Assert.assertEquals(2, STATUSSYSTEM1.availableWorkers.size());
+    Assert.assertEquals(2, STATUSSYSTEM2.availableWorkers.size());
+    Assert.assertEquals(2, STATUSSYSTEM3.availableWorkers.size());
   }
 
   @Test
diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/ChangePartitionManagerUpdateWorkersSuite.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/ChangePartitionManagerUpdateWorkersSuite.scala
index 3c6547c1c..4a9f87d5b 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/ChangePartitionManagerUpdateWorkersSuite.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/ChangePartitionManagerUpdateWorkersSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.celeborn.tests.client
 
 import java.util
+import java.util.Collections
 
 import scala.collection.JavaConverters.{asScalaSetConverter, 
mapAsScalaMapConverter}
 
@@ -175,7 +176,9 @@ class ChangePartitionManagerUpdateWorkersSuite extends 
WithShuffleClientSuite
       val (worker, _) = workerInfoList(index)
       // Workers in miniClusterFeature wont update status with master through 
heartbeat.
       // So update status manually.
-      masterInfo._1.statusSystem.excludedWorkers.add(worker.workerInfo)
+      masterInfo._1.statusSystem.updateExcludedWorkersMeta(
+        Collections.singletonList(worker.workerInfo),
+        Collections.emptyList())
 
       val failedWorker = new ShuffleFailedWorkers()
       failedWorker.put(

Reply via email to