This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 81a0d5113 [CELEBORN-1660] Cache available workers and only count the
available workers device free capacity
81a0d5113 is described below
commit 81a0d5113cc4c985e59437fc2196c9b4da6ef64e
Author: Wang, Fei <[email protected]>
AuthorDate: Thu Nov 14 11:10:45 2024 +0800
[CELEBORN-1660] Cache available workers and only count the available
workers device free capacity
### What changes were proposed in this pull request?
1. cache the available workers
2. Only count the available workers device free capacity.
3. place the metrics_AvailableWorkerCount_Value in overall and
metrics_WorkerCount_Value in `Master` part
### Why are the changes needed?
Cache the available workers to reduce the computation that need to loop
the workers frequently.
To have an accurate device capacity overview that does not include the
excluded workers, decommissioning workers, etc.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT.
<img width="1705" alt="image"
src="https://github.com/user-attachments/assets/bee17b4e-785d-4112-8410-dbb684270ec0">
Closes #2827 from turboFei/device_free.
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
assets/grafana/celeborn-dashboard.json | 12 ++--
docs/monitoring.md | 2 +-
.../master/clustermeta/AbstractMetaManager.java | 70 +++++++++++++++++--
.../clustermeta/SingleMasterMetaManager.java | 2 +-
.../deploy/master/clustermeta/ha/MetaHandler.java | 2 +-
.../celeborn/service/deploy/master/Master.scala | 20 +++---
.../clustermeta/DefaultMetaSystemSuiteJ.java | 10 +++
.../clustermeta/ha/MasterStateMachineSuiteJ.java | 11 +--
.../ha/RatisMasterStatusSystemSuiteJ.java | 79 ++++++++++++++++++++++
.../ChangePartitionManagerUpdateWorkersSuite.scala | 5 +-
10 files changed, 185 insertions(+), 28 deletions(-)
diff --git a/assets/grafana/celeborn-dashboard.json
b/assets/grafana/celeborn-dashboard.json
index 4d29a6aca..a60d3b41d 100644
--- a/assets/grafana/celeborn-dashboard.json
+++ b/assets/grafana/celeborn-dashboard.json
@@ -165,7 +165,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
- "description": "The count of active workers.",
+ "description": "The count of workers in available list.",
"fieldConfig": {
"defaults": {
"color": {
@@ -243,12 +243,12 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
- "expr": "metrics_WorkerCount_Value{instance=~\"${instance}\"}",
+ "expr":
"metrics_AvailableWorkerCount_Value{instance=~\"${instance}\"}",
"legendFormat": "${baseLegend}",
"refId": "A"
}
],
- "title": "metrics_WorkerCount_Value",
+ "title": "metrics_AvailableWorkerCount_Value",
"type": "timeseries"
},
{
@@ -1287,7 +1287,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
- "description": "The count of workers in available list.",
+ "description": "The count of active workers.",
"fieldConfig": {
"defaults": {
"color": {
@@ -1366,13 +1366,13 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
- "expr":
"metrics_AvailableWorkerCount_Value{instance=~\"${instance}\"}",
+ "expr": "metrics_WorkerCount_Value{instance=~\"${instance}\"}",
"legendFormat": "${baseLegend}",
"range": true,
"refId": "A"
}
],
- "title": "metrics_AvailableWorkerCount_Value",
+ "title": "metrics_WorkerCount_Value",
"type": "timeseries"
},
{
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 93e997e8e..553672912 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -95,7 +95,7 @@ These metrics are exposed by Celeborn master.
| Metric Name | Description
|
|--------------------------|-----------------------------------------------------------------------------------|
| RegisteredShuffleCount | The count of registered shuffle.
|
- | DeviceCelebornFreeBytes | The actual usable space of Celeborn for
device. |
+ | DeviceCelebornFreeBytes | The actual usable space of Celeborn available
workers for device. |
| DeviceCelebornTotalBytes | The total space of Celeborn for device.
|
| RunningApplicationCount | The count of running applications.
|
| ActiveShuffleSize | The active shuffle size of workers.
|
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index cede0284a..14c6108bb 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -32,6 +32,7 @@ import java.util.stream.Collectors;
import scala.Option;
import scala.Tuple2;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.slf4j.Logger;
@@ -57,6 +58,13 @@ import org.apache.celeborn.common.util.PbSerDeUtils;
import org.apache.celeborn.common.util.Utils;
import org.apache.celeborn.common.util.WorkerStatusUtils;
+/**
+ * Note: Do not update the worker collections directly from outside the
metadata manager, especially
+ * {@link #workersMap}, {@link #workerEventInfos}, {@link #shutdownWorkers},
{@link
+ * #excludedWorkers}, {@link #manuallyExcludedWorkers}, {@link
#availableWorkers}.
+ *
+ * <p>All updates should be done through the provided methods to ensure
consistency.
+ */
public abstract class AbstractMetaManager implements IMetadataHandler {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractMetaManager.class);
@@ -65,6 +73,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
JavaUtils.newConcurrentHashMap();
public final Set<String> hostnameSet = ConcurrentHashMap.newKeySet();
public final Map<String, WorkerInfo> workersMap =
JavaUtils.newConcurrentHashMap();
+ public final Set<WorkerInfo> availableWorkers =
ConcurrentHashMap.newKeySet();
public final ConcurrentHashMap<WorkerInfo, Long> lostWorkers =
JavaUtils.newConcurrentHashMap();
public final ConcurrentHashMap<WorkerInfo, WorkerEventInfo> workerEventInfos
=
@@ -161,10 +170,33 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
applicationMetas.remove(appId);
}
- public void updateWorkerExcludeMeta(
+ @VisibleForTesting
+ public void updateExcludedWorkersMeta(
+ List<WorkerInfo> workersToAdd, List<WorkerInfo> workersToRemove) {
+ workersToAdd.forEach(
+ worker -> {
+ excludedWorkers.add(worker);
+ availableWorkers.remove(worker);
+ });
+ workersToRemove.forEach(
+ worker -> {
+ excludedWorkers.remove(worker);
+ updateAvailableWorkers(worker);
+ });
+ }
+
+ public void updateManuallyExcludedWorkersMeta(
List<WorkerInfo> workersToAdd, List<WorkerInfo> workersToRemove) {
- manuallyExcludedWorkers.addAll(workersToAdd);
- workersToRemove.forEach(manuallyExcludedWorkers::remove);
+ workersToAdd.forEach(
+ worker -> {
+ manuallyExcludedWorkers.add(worker);
+ availableWorkers.remove(worker);
+ });
+ workersToRemove.forEach(
+ worker -> {
+ manuallyExcludedWorkers.remove(worker);
+ updateAvailableWorkers(worker);
+ });
}
public void reviseLostShuffles(String appId, List<Integer> lostShuffles) {
@@ -183,6 +215,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
synchronized (workersMap) {
workersMap.remove(worker.toUniqueId());
lostWorkers.put(worker, System.currentTimeMillis());
+ availableWorkers.remove(worker);
}
excludedWorkers.remove(worker);
workerLostEvents.remove(worker);
@@ -195,6 +228,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
synchronized (workersMap) {
workersMap.remove(worker.toUniqueId());
lostWorkers.put(worker, System.currentTimeMillis());
+ availableWorkers.remove(worker);
}
excludedWorkers.remove(worker);
}
@@ -207,6 +241,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
shutdownWorkers.remove(workerInfo);
workerEventInfos.remove(workerInfo);
decommissionWorkers.remove(workerInfo);
+ updateAvailableWorkers(workerInfo);
}
}
}
@@ -266,6 +301,11 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
// only unblack if numSlots larger than 0
excludedWorkers.remove(worker);
}
+
+ // try to update the available workers if the worker status is Normal
+ if (workerStatus.getState() == PbWorkerStatus.State.Normal) {
+ updateAvailableWorkers(worker);
+ }
}
public void updateRegisterWorkerMeta(
@@ -304,6 +344,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
excludedWorkers.remove(workerInfo);
workerEventInfos.remove(workerInfo);
decommissionWorkers.remove(workerInfo);
+ updateAvailableWorkers(workerInfo);
}
}
@@ -443,6 +484,11 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
.getApplicationMetasMap()
.forEach(
(key, value) -> applicationMetas.put(key,
PbSerDeUtils.fromPbApplicationMeta(value)));
+
+ availableWorkers.addAll(
+ workersMap.values().stream()
+ .filter(worker -> isWorkerAvailable(worker))
+ .collect(Collectors.toSet()));
} catch (Exception e) {
throw new IOException(e);
}
@@ -462,6 +508,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
registeredAppAndShuffles.clear();
hostnameSet.clear();
workersMap.clear();
+ availableWorkers.clear();
lostWorkers.clear();
appHeartbeatTime.clear();
excludedWorkers.clear();
@@ -480,6 +527,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
public void updateMetaByReportWorkerUnavailable(List<WorkerInfo>
failedWorkers) {
synchronized (this.workersMap) {
shutdownWorkers.addAll(failedWorkers);
+ availableWorkers.removeAll(failedWorkers);
}
}
@@ -494,8 +542,10 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
if (workerEventInfo == null ||
!workerEventInfo.isSameEvent(eventType.getNumber())) {
if (eventType == ResourceProtos.WorkerEventType.None) {
workerEventInfos.remove(workerInfo);
+ updateAvailableWorkers(workerInfo);
} else {
workerEventInfos.put(workerInfo, new
WorkerEventInfo(eventType.getNumber(), eventTime));
+ availableWorkers.remove(workerInfo);
}
}
}
@@ -505,6 +555,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
public void updateMetaByReportWorkerDecommission(List<WorkerInfo> workers) {
synchronized (this.workersMap) {
decommissionWorkers.addAll(workers);
+ availableWorkers.removeAll(workers);
}
}
@@ -541,7 +592,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
workers.forEach(workerInfo ->
workerInfo.updateDiskMaxSlots(estimatedPartitionSize));
}
- public boolean isWorkerAvailable(WorkerInfo workerInfo) {
+ private boolean isWorkerAvailable(WorkerInfo workerInfo) {
return (workerInfo.getWorkerStatus().getState() ==
PbWorkerStatus.State.Normal
&& !workerEventInfos.containsKey(workerInfo))
&& !excludedWorkers.contains(workerInfo)
@@ -549,6 +600,17 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
&& !manuallyExcludedWorkers.contains(workerInfo);
}
+ private void updateAvailableWorkers(WorkerInfo worker) {
+ synchronized (workersMap) {
+ Optional<WorkerInfo> workerInfo =
Optional.ofNullable(workersMap.get(worker.toUniqueId()));
+ if (workerInfo.map(this::isWorkerAvailable).orElse(false)) {
+ availableWorkers.add(workerInfo.get());
+ } else {
+ availableWorkers.remove(worker);
+ }
+ }
+ }
+
public void updateApplicationMeta(ApplicationMeta applicationMeta) {
applicationMetas.putIfAbsent(applicationMeta.appId(), applicationMeta);
}
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
index 033be796d..4ddf371bf 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
@@ -92,7 +92,7 @@ public class SingleMasterMetaManager extends
AbstractMetaManager {
@Override
public void handleWorkerExclude(
List<WorkerInfo> workersToAdd, List<WorkerInfo> workersToRemove, String
requestId) {
- updateWorkerExcludeMeta(workersToAdd, workersToRemove);
+ updateManuallyExcludedWorkersMeta(workersToAdd, workersToRemove);
}
@Override
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
index 5ce91baed..e795c09e5 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
@@ -170,7 +170,7 @@ public class MetaHandler {
addAddresses.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
List<WorkerInfo> workersToRemove =
removeAddresses.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
- metaSystem.updateWorkerExcludeMeta(workersToAdd, workersToRemove);
+ metaSystem.updateManuallyExcludedWorkersMeta(workersToAdd,
workersToRemove);
break;
case WorkerLost:
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index b3da11461..3a684d9a1 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -229,9 +229,7 @@ private[celeborn] class Master(
statusSystem.excludedWorkers.size +
statusSystem.manuallyExcludedWorkers.size
}
masterSource.addGauge(MasterSource.AVAILABLE_WORKER_COUNT) { () =>
- statusSystem.workersMap.values().asScala.count { w =>
- statusSystem.isWorkerAvailable(w)
- }
+ statusSystem.availableWorkers.size()
}
masterSource.addGauge(MasterSource.SHUTDOWN_WORKER_COUNT) { () =>
statusSystem.shutdownWorkers.size
@@ -274,7 +272,7 @@ private[celeborn] class Master(
}
masterSource.addGauge(MasterSource.DEVICE_CELEBORN_FREE_CAPACITY) { () =>
-
statusSystem.workersMap.values().asScala.toList.map(_.totalActualUsableSpace()).sum
+
statusSystem.availableWorkers.asScala.toList.map(_.totalActualUsableSpace()).sum
}
masterSource.addGauge(MasterSource.IS_ACTIVE_MASTER) { () => isMasterActive }
@@ -1135,7 +1133,7 @@ private[celeborn] class Master(
if (shouldResponse) {
// UserResourceConsumption and DiskInfo are eliminated from WorkerInfo
// during serialization of HeartbeatFromApplicationResponse
- var appRelatedShuffles =
+ val appRelatedShuffles =
statusSystem.registeredAppAndShuffles.getOrDefault(appId,
Collections.emptySet())
context.reply(HeartbeatFromApplicationResponse(
StatusCode.SUCCESS,
@@ -1237,7 +1235,7 @@ private[celeborn] class Master(
}
private def handleCheckWorkersAvailable(context: RpcCallContext): Unit = {
- context.reply(CheckWorkersAvailableResponse(!workersAvailable().isEmpty))
+
context.reply(CheckWorkersAvailableResponse(!statusSystem.availableWorkers.isEmpty))
}
private def handleWorkerEvent(
@@ -1251,9 +1249,13 @@ private[celeborn] class Master(
private def workersAvailable(
tmpExcludedWorkerList: Set[WorkerInfo] = Set.empty):
util.List[WorkerInfo] = {
- statusSystem.workersMap.values().asScala.filter { w =>
- statusSystem.isWorkerAvailable(w) && !tmpExcludedWorkerList.contains(w)
- }.toList.asJava
+ if (tmpExcludedWorkerList.isEmpty) {
+ new util.ArrayList[WorkerInfo](statusSystem.availableWorkers)
+ } else {
+ val availableWorkers = new util.HashSet(statusSystem.availableWorkers)
+ tmpExcludedWorkerList.foreach(availableWorkers.remove)
+ new util.ArrayList[WorkerInfo](availableWorkers)
+ }
}
private def handleRequestForApplicationMeta(
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
index 095bb9496..3f4c8d064 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
@@ -160,6 +160,7 @@ public class DefaultMetaSystemSuiteJ {
getNewReqeustId());
assertEquals(3, statusSystem.workersMap.size());
+ assertEquals(3, statusSystem.availableWorkers.size());
}
@Test
@@ -211,10 +212,12 @@ public class DefaultMetaSystemSuiteJ {
statusSystem.handleWorkerExclude(
Arrays.asList(workerInfo1, workerInfo2), Collections.emptyList(),
getNewReqeustId());
assertEquals(2, statusSystem.manuallyExcludedWorkers.size());
+ assertEquals(0, statusSystem.availableWorkers.size());
statusSystem.handleWorkerExclude(
Collections.emptyList(), Collections.singletonList(workerInfo1),
getNewReqeustId());
assertEquals(1, statusSystem.manuallyExcludedWorkers.size());
+ assertEquals(1, statusSystem.availableWorkers.size());
}
@Test
@@ -256,6 +259,7 @@ public class DefaultMetaSystemSuiteJ {
statusSystem.handleWorkerLost(
HOSTNAME1, RPCPORT1, PUSHPORT1, FETCHPORT1, REPLICATEPORT1,
getNewReqeustId());
assertEquals(2, statusSystem.workersMap.size());
+ assertEquals(2, statusSystem.availableWorkers.size());
}
private static final String APPID1 = "appId1";
@@ -700,6 +704,7 @@ public class DefaultMetaSystemSuiteJ {
getNewReqeustId());
assertEquals(statusSystem.excludedWorkers.size(), 1);
+ assertEquals(statusSystem.availableWorkers.size(), 2);
statusSystem.handleWorkerHeartbeat(
HOSTNAME2,
@@ -716,6 +721,7 @@ public class DefaultMetaSystemSuiteJ {
getNewReqeustId());
assertEquals(statusSystem.excludedWorkers.size(), 2);
+ assertEquals(statusSystem.availableWorkers.size(), 1);
statusSystem.handleWorkerHeartbeat(
HOSTNAME3,
@@ -732,6 +738,7 @@ public class DefaultMetaSystemSuiteJ {
getNewReqeustId());
assertEquals(statusSystem.excludedWorkers.size(), 2);
+ assertEquals(statusSystem.availableWorkers.size(), 1);
statusSystem.handleWorkerHeartbeat(
HOSTNAME3,
@@ -748,6 +755,7 @@ public class DefaultMetaSystemSuiteJ {
getNewReqeustId());
assertEquals(statusSystem.excludedWorkers.size(), 3);
+ assertEquals(statusSystem.availableWorkers.size(), 0);
}
@Test
@@ -801,6 +809,7 @@ public class DefaultMetaSystemSuiteJ {
statusSystem.handleReportWorkerUnavailable(failedWorkers,
getNewReqeustId());
assertEquals(1, statusSystem.shutdownWorkers.size());
assertTrue(statusSystem.excludedWorkers.isEmpty());
+ assertEquals(2, statusSystem.availableWorkers.size());
}
@Test
@@ -900,6 +909,7 @@ public class DefaultMetaSystemSuiteJ {
statusSystem.handleReportWorkerDecommission(workers, getNewReqeustId());
assertEquals(1, statusSystem.decommissionWorkers.size());
assertTrue(statusSystem.excludedWorkers.isEmpty());
+ assertEquals(2, statusSystem.availableWorkers.size());
}
@Test
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
index d2539ba5d..1b8b57d74 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
@@ -205,12 +205,11 @@ public class MasterStateMachineSuiteJ extends
RatisBaseSuiteJ {
String host2 = "host2";
String host3 = "host3";
- masterStatusSystem.excludedWorkers.add(info1);
- masterStatusSystem.excludedWorkers.add(info2);
- masterStatusSystem.excludedWorkers.add(info3);
+ masterStatusSystem.updateExcludedWorkersMeta(
+ Arrays.asList(info1, info2, info3), Collections.emptyList());
- masterStatusSystem.manuallyExcludedWorkers.add(info1);
- masterStatusSystem.manuallyExcludedWorkers.add(info2);
+ masterStatusSystem.updateManuallyExcludedWorkersMeta(
+ Arrays.asList(info1, info2), Collections.emptyList());
masterStatusSystem.hostnameSet.add(host1);
masterStatusSystem.hostnameSet.add(host2);
@@ -245,10 +244,12 @@ public class MasterStateMachineSuiteJ extends
RatisBaseSuiteJ {
masterStatusSystem.excludedWorkers.clear();
masterStatusSystem.manuallyExcludedWorkers.clear();
masterStatusSystem.workersMap.clear();
+ masterStatusSystem.availableWorkers.clear();
masterStatusSystem.restoreMetaFromFile(tmpFile);
Assert.assertEquals(3, masterStatusSystem.workersMap.size());
+ Assert.assertEquals(3, masterStatusSystem.availableWorkers.size());
Assert.assertEquals(3, masterStatusSystem.excludedWorkers.size());
Assert.assertEquals(2, masterStatusSystem.manuallyExcludedWorkers.size());
Assert.assertEquals(3, masterStatusSystem.hostnameSet.size());
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index 5f9e07de2..2d6d413cb 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -353,6 +353,10 @@ public class RatisMasterStatusSystemSuiteJ {
Assert.assertEquals(3, STATUSSYSTEM2.workersMap.size());
Assert.assertEquals(3, STATUSSYSTEM3.workersMap.size());
+ Assert.assertEquals(3, STATUSSYSTEM1.availableWorkers.size());
+ Assert.assertEquals(3, STATUSSYSTEM2.availableWorkers.size());
+ Assert.assertEquals(3, STATUSSYSTEM3.availableWorkers.size());
+
assertWorkers(STATUSSYSTEM1.workersMap.values());
assertWorkers(STATUSSYSTEM2.workersMap.values());
assertWorkers(STATUSSYSTEM3.workersMap.values());
@@ -429,6 +433,10 @@ public class RatisMasterStatusSystemSuiteJ {
Assert.assertEquals(2, STATUSSYSTEM2.manuallyExcludedWorkers.size());
Assert.assertEquals(2, STATUSSYSTEM3.manuallyExcludedWorkers.size());
+ Assert.assertEquals(0, STATUSSYSTEM1.availableWorkers.size());
+ Assert.assertEquals(0, STATUSSYSTEM2.availableWorkers.size());
+ Assert.assertEquals(0, STATUSSYSTEM3.availableWorkers.size());
+
statusSystem.handleWorkerExclude(
Collections.emptyList(), Collections.singletonList(workerInfo1),
getNewReqeustId());
Thread.sleep(3000L);
@@ -436,6 +444,10 @@ public class RatisMasterStatusSystemSuiteJ {
Assert.assertEquals(1, STATUSSYSTEM1.manuallyExcludedWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM2.manuallyExcludedWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM3.manuallyExcludedWorkers.size());
+
+ Assert.assertEquals(1, STATUSSYSTEM1.availableWorkers.size());
+ Assert.assertEquals(1, STATUSSYSTEM2.availableWorkers.size());
+ Assert.assertEquals(1, STATUSSYSTEM3.availableWorkers.size());
}
@Test
@@ -484,6 +496,10 @@ public class RatisMasterStatusSystemSuiteJ {
Assert.assertEquals(2, STATUSSYSTEM1.workersMap.size());
Assert.assertEquals(2, STATUSSYSTEM2.workersMap.size());
Assert.assertEquals(2, STATUSSYSTEM3.workersMap.size());
+
+ Assert.assertEquals(2, STATUSSYSTEM1.availableWorkers.size());
+ Assert.assertEquals(2, STATUSSYSTEM2.availableWorkers.size());
+ Assert.assertEquals(2, STATUSSYSTEM3.availableWorkers.size());
}
@Test
@@ -1025,6 +1041,14 @@ public class RatisMasterStatusSystemSuiteJ {
Assert.assertEquals(1, STATUSSYSTEM2.excludedWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM3.excludedWorkers.size());
+ Assert.assertEquals(3, STATUSSYSTEM1.workersMap.size());
+ Assert.assertEquals(3, STATUSSYSTEM2.workersMap.size());
+ Assert.assertEquals(3, STATUSSYSTEM3.workersMap.size());
+
+ Assert.assertEquals(2, STATUSSYSTEM1.availableWorkers.size());
+ Assert.assertEquals(2, STATUSSYSTEM2.availableWorkers.size());
+ Assert.assertEquals(2, STATUSSYSTEM3.availableWorkers.size());
+
statusSystem.handleWorkerHeartbeat(
HOSTNAME2,
RPCPORT2,
@@ -1045,6 +1069,16 @@ public class RatisMasterStatusSystemSuiteJ {
Assert.assertEquals(2, STATUSSYSTEM2.excludedWorkers.size());
Assert.assertEquals(2, STATUSSYSTEM3.excludedWorkers.size());
+ Assert.assertEquals(3, statusSystem.workersMap.size());
+ Assert.assertEquals(3, STATUSSYSTEM1.workersMap.size());
+ Assert.assertEquals(3, STATUSSYSTEM2.workersMap.size());
+ Assert.assertEquals(3, STATUSSYSTEM3.workersMap.size());
+
+ Assert.assertEquals(1, statusSystem.availableWorkers.size());
+ Assert.assertEquals(1, STATUSSYSTEM1.availableWorkers.size());
+ Assert.assertEquals(1, STATUSSYSTEM2.availableWorkers.size());
+ Assert.assertEquals(1, STATUSSYSTEM3.availableWorkers.size());
+
statusSystem.handleWorkerHeartbeat(
HOSTNAME1,
RPCPORT1,
@@ -1065,6 +1099,16 @@ public class RatisMasterStatusSystemSuiteJ {
Assert.assertEquals(1, STATUSSYSTEM2.excludedWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM3.excludedWorkers.size());
+ Assert.assertEquals(3, statusSystem.workersMap.size());
+ Assert.assertEquals(3, STATUSSYSTEM1.workersMap.size());
+ Assert.assertEquals(3, STATUSSYSTEM2.workersMap.size());
+ Assert.assertEquals(3, STATUSSYSTEM3.workersMap.size());
+
+ Assert.assertEquals(2, statusSystem.availableWorkers.size());
+ Assert.assertEquals(2, STATUSSYSTEM1.availableWorkers.size());
+ Assert.assertEquals(2, STATUSSYSTEM2.availableWorkers.size());
+ Assert.assertEquals(2, STATUSSYSTEM3.availableWorkers.size());
+
statusSystem.handleWorkerHeartbeat(
HOSTNAME1,
RPCPORT1,
@@ -1083,6 +1127,16 @@ public class RatisMasterStatusSystemSuiteJ {
Assert.assertEquals(2, STATUSSYSTEM1.excludedWorkers.size());
Assert.assertEquals(2, STATUSSYSTEM2.excludedWorkers.size());
Assert.assertEquals(2, STATUSSYSTEM3.excludedWorkers.size());
+
+ Assert.assertEquals(3, statusSystem.workersMap.size());
+ Assert.assertEquals(3, STATUSSYSTEM1.workersMap.size());
+ Assert.assertEquals(3, STATUSSYSTEM2.workersMap.size());
+ Assert.assertEquals(3, STATUSSYSTEM3.workersMap.size());
+
+ Assert.assertEquals(1, statusSystem.availableWorkers.size());
+ Assert.assertEquals(1, STATUSSYSTEM1.availableWorkers.size());
+ Assert.assertEquals(1, STATUSSYSTEM2.availableWorkers.size());
+ Assert.assertEquals(1, STATUSSYSTEM3.availableWorkers.size());
}
@Before
@@ -1090,6 +1144,7 @@ public class RatisMasterStatusSystemSuiteJ {
STATUSSYSTEM1.registeredAppAndShuffles.clear();
STATUSSYSTEM1.hostnameSet.clear();
STATUSSYSTEM1.workersMap.clear();
+ STATUSSYSTEM1.availableWorkers.clear();
STATUSSYSTEM1.appHeartbeatTime.clear();
STATUSSYSTEM1.excludedWorkers.clear();
STATUSSYSTEM1.workerLostEvents.clear();
@@ -1097,6 +1152,7 @@ public class RatisMasterStatusSystemSuiteJ {
STATUSSYSTEM2.registeredAppAndShuffles.clear();
STATUSSYSTEM2.hostnameSet.clear();
STATUSSYSTEM2.workersMap.clear();
+ STATUSSYSTEM2.availableWorkers.clear();
STATUSSYSTEM2.appHeartbeatTime.clear();
STATUSSYSTEM2.excludedWorkers.clear();
STATUSSYSTEM2.workerLostEvents.clear();
@@ -1104,6 +1160,7 @@ public class RatisMasterStatusSystemSuiteJ {
STATUSSYSTEM3.registeredAppAndShuffles.clear();
STATUSSYSTEM3.hostnameSet.clear();
STATUSSYSTEM3.workersMap.clear();
+ STATUSSYSTEM3.availableWorkers.clear();
STATUSSYSTEM3.appHeartbeatTime.clear();
STATUSSYSTEM3.excludedWorkers.clear();
STATUSSYSTEM3.workerLostEvents.clear();
@@ -1222,6 +1279,9 @@ public class RatisMasterStatusSystemSuiteJ {
Assert.assertEquals(0, STATUSSYSTEM1.excludedWorkers.size());
Assert.assertEquals(0, STATUSSYSTEM2.excludedWorkers.size());
Assert.assertEquals(0, STATUSSYSTEM3.excludedWorkers.size());
+ Assert.assertEquals(2, STATUSSYSTEM1.availableWorkers.size());
+ Assert.assertEquals(2, STATUSSYSTEM2.availableWorkers.size());
+ Assert.assertEquals(2, STATUSSYSTEM3.availableWorkers.size());
}
@Test
@@ -1292,6 +1352,10 @@ public class RatisMasterStatusSystemSuiteJ {
Assert.assertEquals(1, STATUSSYSTEM2.lostWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM3.lostWorkers.size());
+ Assert.assertEquals(2, STATUSSYSTEM1.availableWorkers.size());
+ Assert.assertEquals(2, STATUSSYSTEM2.availableWorkers.size());
+ Assert.assertEquals(2, STATUSSYSTEM3.availableWorkers.size());
+
statusSystem.handleRemoveWorkersUnavailableInfo(unavailableWorkers,
getNewReqeustId());
Thread.sleep(3000L);
@@ -1302,6 +1366,10 @@ public class RatisMasterStatusSystemSuiteJ {
Assert.assertEquals(0, STATUSSYSTEM1.lostWorkers.size());
Assert.assertEquals(0, STATUSSYSTEM2.lostWorkers.size());
Assert.assertEquals(0, STATUSSYSTEM3.lostWorkers.size());
+
+ Assert.assertEquals(2, STATUSSYSTEM1.availableWorkers.size());
+ Assert.assertEquals(2, STATUSSYSTEM2.availableWorkers.size());
+ Assert.assertEquals(2, STATUSSYSTEM3.availableWorkers.size());
}
@Test
@@ -1397,6 +1465,10 @@ public class RatisMasterStatusSystemSuiteJ {
Assert.assertEquals(2, STATUSSYSTEM2.workerEventInfos.size());
Assert.assertEquals(2, STATUSSYSTEM3.workerEventInfos.size());
+ Assert.assertEquals(1, STATUSSYSTEM1.availableWorkers.size());
+ Assert.assertEquals(1, STATUSSYSTEM2.availableWorkers.size());
+ Assert.assertEquals(1, STATUSSYSTEM3.availableWorkers.size());
+
Assert.assertTrue(STATUSSYSTEM1.workerEventInfos.containsKey(workerInfo1));
Assert.assertTrue(STATUSSYSTEM1.workerEventInfos.containsKey(workerInfo2));
@@ -1416,6 +1488,10 @@ public class RatisMasterStatusSystemSuiteJ {
Assert.assertEquals(
WorkerEventType.Decommission,
STATUSSYSTEM1.workerEventInfos.get(workerInfo2).getEventType());
+
+ Assert.assertEquals(2, STATUSSYSTEM1.availableWorkers.size());
+ Assert.assertEquals(2, STATUSSYSTEM2.availableWorkers.size());
+ Assert.assertEquals(2, STATUSSYSTEM3.availableWorkers.size());
}
@Test
@@ -1504,6 +1580,9 @@ public class RatisMasterStatusSystemSuiteJ {
Assert.assertEquals(0, STATUSSYSTEM1.excludedWorkers.size());
Assert.assertEquals(0, STATUSSYSTEM2.excludedWorkers.size());
Assert.assertEquals(0, STATUSSYSTEM3.excludedWorkers.size());
+ Assert.assertEquals(2, STATUSSYSTEM1.availableWorkers.size());
+ Assert.assertEquals(2, STATUSSYSTEM2.availableWorkers.size());
+ Assert.assertEquals(2, STATUSSYSTEM3.availableWorkers.size());
}
@Test
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/ChangePartitionManagerUpdateWorkersSuite.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/ChangePartitionManagerUpdateWorkersSuite.scala
index 3c6547c1c..4a9f87d5b 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/ChangePartitionManagerUpdateWorkersSuite.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/ChangePartitionManagerUpdateWorkersSuite.scala
@@ -18,6 +18,7 @@
package org.apache.celeborn.tests.client
import java.util
+import java.util.Collections
import scala.collection.JavaConverters.{asScalaSetConverter,
mapAsScalaMapConverter}
@@ -175,7 +176,9 @@ class ChangePartitionManagerUpdateWorkersSuite extends
WithShuffleClientSuite
val (worker, _) = workerInfoList(index)
// Workers in miniClusterFeature wont update status with master through
heartbeat.
// So update status manually.
- masterInfo._1.statusSystem.excludedWorkers.add(worker.workerInfo)
+ masterInfo._1.statusSystem.updateExcludedWorkersMeta(
+ Collections.singletonList(worker.workerInfo),
+ Collections.emptyList())
val failedWorker = new ShuffleFailedWorkers()
failedWorker.put(