This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new e2f640ce3 [CELEBORN-1660] Using map for workers to find worker fast
e2f640ce3 is described below
commit e2f640ce3ba52691707d18450257ebdfbe4f93a6
Author: Wang, Fei <[email protected]>
AuthorDate: Fri Nov 1 15:58:53 2024 +0800
[CELEBORN-1660] Using map for workers to find worker fast
### What changes were proposed in this pull request?
Using map for workers so that we can find a worker by uniqueId fast.
### Why are the changes needed?
For large celeborn cluster, it might be slow.
- updateWorkerHeartbeatMeta
https://github.com/apache/celeborn/blob/1e77f01cd317b1dc885965d6053b391db1d42bc7/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java#L222
- handleWorkerLost
https://github.com/apache/celeborn/blob/1e77f01cd317b1dc885965d6053b391db1d42bc7/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala#L762-L765
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UT.
Closes #2870 from turboFei/worksMap.
Lead-authored-by: Wang, Fei <[email protected]>
Co-authored-by: Fei Wang <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../master/clustermeta/AbstractMetaManager.java | 58 +++++++++++-----------
.../celeborn/service/deploy/master/Master.scala | 58 +++++++++++-----------
.../deploy/master/http/api/v1/WorkerResource.scala | 5 +-
.../clustermeta/DefaultMetaSystemSuiteJ.java | 12 ++---
.../clustermeta/ha/MasterStateMachineSuiteJ.java | 15 +++---
.../ha/RatisMasterStatusSystemSuiteJ.java | 50 +++++++++----------
6 files changed, 99 insertions(+), 99 deletions(-)
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 4080edae4..f1d8a37e8 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -64,7 +64,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
public final Map<String, Set<Integer>> registeredAppAndShuffles =
JavaUtils.newConcurrentHashMap();
public final Set<String> hostnameSet = ConcurrentHashMap.newKeySet();
- public final Set<WorkerInfo> workers = ConcurrentHashMap.newKeySet();
+ public final Map<String, WorkerInfo> workersMap =
JavaUtils.newConcurrentHashMap();
public final ConcurrentHashMap<WorkerInfo, Long> lostWorkers =
JavaUtils.newConcurrentHashMap();
public final ConcurrentHashMap<WorkerInfo, WorkerEventInfo> workerEventInfos
=
@@ -170,8 +170,8 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
WorkerInfo worker = new WorkerInfo(host, rpcPort, pushPort, fetchPort,
replicatePort);
workerLostEvents.add(worker);
// remove worker from workers
- synchronized (workers) {
- workers.remove(worker);
+ synchronized (workersMap) {
+ workersMap.remove(worker.toUniqueId());
lostWorkers.put(worker, System.currentTimeMillis());
}
excludedWorkers.remove(worker);
@@ -182,15 +182,15 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
String host, int rpcPort, int pushPort, int fetchPort, int
replicatePort) {
WorkerInfo worker = new WorkerInfo(host, rpcPort, pushPort, fetchPort,
replicatePort);
// remove worker from workers
- synchronized (workers) {
- workers.remove(worker);
+ synchronized (workersMap) {
+ workersMap.remove(worker.toUniqueId());
lostWorkers.put(worker, System.currentTimeMillis());
}
excludedWorkers.remove(worker);
}
public void removeWorkersUnavailableInfoMeta(List<WorkerInfo>
unavailableWorkers) {
- synchronized (workers) {
+ synchronized (workersMap) {
for (WorkerInfo workerInfo : unavailableWorkers) {
if (lostWorkers.containsKey(workerInfo)) {
lostWorkers.remove(workerInfo);
@@ -219,8 +219,8 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
host, rpcPort, pushPort, fetchPort, replicatePort, -1, disks,
userResourceConsumption);
AtomicLong availableSlots = new AtomicLong();
LOG.debug("update worker {}:{} heartbeat {}", host, rpcPort, disks);
- synchronized (workers) {
- Optional<WorkerInfo> workerInfo = workers.stream().filter(w ->
w.equals(worker)).findFirst();
+ synchronized (workersMap) {
+ Optional<WorkerInfo> workerInfo =
Optional.ofNullable(workersMap.get(worker.toUniqueId()));
workerInfo.ifPresent(
info -> {
info.updateThenGetDiskInfos(disks,
Option.apply(estimatedPartitionSize));
@@ -287,10 +287,8 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
workerInfo.networkLocation_$eq(rackResolver.resolve(host).getNetworkLocation());
}
workerInfo.updateDiskMaxSlots(estimatedPartitionSize);
- synchronized (workers) {
- if (!workers.contains(workerInfo)) {
- workers.add(workerInfo);
- }
+ synchronized (workersMap) {
+ workersMap.putIfAbsent(workerInfo.toUniqueId(), workerInfo);
shutdownWorkers.remove(workerInfo);
lostWorkers.remove(workerInfo);
excludedWorkers.remove(workerInfo);
@@ -315,7 +313,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
manuallyExcludedWorkers,
workerLostEvents,
appHeartbeatTime,
- workers,
+ new HashSet(workersMap.values()),
partitionTotalWritten.sum(),
partitionTotalFileCount.sum(),
appDiskUsageMetric.snapShots(),
@@ -381,7 +379,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
.collect(Collectors.toList());
scala.collection.immutable.Map<String, Node> resolveMap =
rackResolver.resolveToMap(workerHostList);
- workers.addAll(
+ workersMap.putAll(
workerInfoSet.stream()
.peek(
workerInfo -> {
@@ -391,7 +389,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
resolveMap.get(workerInfo.host()).get().getNetworkLocation());
}
})
- .collect(Collectors.toSet()));
+ .collect(Collectors.toMap(WorkerInfo::toUniqueId, w -> w)));
snapshotMetaInfo
.getLostWorkersMap()
@@ -437,11 +435,11 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
LOG.info("Successfully restore meta info from snapshot {}",
file.getAbsolutePath());
LOG.info(
"Worker size: {}, Registered shuffle size: {}. Worker excluded list
size: {}. Manually Excluded list size: {}",
- workers.size(),
+ workersMap.size(),
registeredAppAndShuffles.size(),
excludedWorkers.size(),
manuallyExcludedWorkers.size());
- workers.forEach(workerInfo -> LOG.info(workerInfo.toString()));
+ workersMap.values().forEach(workerInfo -> LOG.info(workerInfo.toString()));
registeredAppAndShuffles.forEach(
(appId, shuffleId) -> LOG.info("RegisteredShuffle {}-{}", appId,
shuffleId));
}
@@ -449,7 +447,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
private void cleanUpState() {
registeredAppAndShuffles.clear();
hostnameSet.clear();
- workers.clear();
+ workersMap.clear();
lostWorkers.clear();
appHeartbeatTime.clear();
excludedWorkers.clear();
@@ -464,7 +462,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
}
public void updateMetaByReportWorkerUnavailable(List<WorkerInfo>
failedWorkers) {
- synchronized (this.workers) {
+ synchronized (this.workersMap) {
shutdownWorkers.addAll(failedWorkers);
}
}
@@ -473,7 +471,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
long eventTime = System.currentTimeMillis();
ResourceProtos.WorkerEventType eventType =
ResourceProtos.WorkerEventType.forNumber(workerEventTypeValue);
- synchronized (this.workers) {
+ synchronized (this.workersMap) {
for (WorkerInfo workerInfo : workerInfoList) {
WorkerEventInfo workerEventInfo = workerEventInfos.get(workerInfo);
LOG.info("Received worker event: {} for worker: {}", eventType,
workerInfo.toUniqueId());
@@ -489,7 +487,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
}
public void updateMetaByReportWorkerDecommission(List<WorkerInfo> workers) {
- synchronized (this.workers) {
+ synchronized (this.workersMap) {
decommissionWorkers.addAll(workers);
}
}
@@ -520,19 +518,19 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
"Celeborn cluster estimated partition size changed from {} to {}",
Utils.bytesToString(oldEstimatedPartitionSize),
Utils.bytesToString(estimatedPartitionSize));
- workers.stream()
- .filter(
- worker ->
- !excludedWorkers.contains(worker) &&
!manuallyExcludedWorkers.contains(worker))
- .forEach(workerInfo ->
workerInfo.updateDiskMaxSlots(estimatedPartitionSize));
+
+ HashSet<WorkerInfo> workers = new HashSet(workersMap.values());
+ excludedWorkers.forEach(workers::remove);
+ manuallyExcludedWorkers.forEach(workers::remove);
+ workers.forEach(workerInfo ->
workerInfo.updateDiskMaxSlots(estimatedPartitionSize));
}
public boolean isWorkerAvailable(WorkerInfo workerInfo) {
- return !excludedWorkers.contains(workerInfo)
+ return (workerInfo.getWorkerStatus().getState() ==
PbWorkerStatus.State.Normal
+ && !workerEventInfos.containsKey(workerInfo))
+ && !excludedWorkers.contains(workerInfo)
&& !shutdownWorkers.contains(workerInfo)
- && !manuallyExcludedWorkers.contains(workerInfo)
- && (!workerEventInfos.containsKey(workerInfo)
- && workerInfo.getWorkerStatus().getState() ==
PbWorkerStatus.State.Normal);
+ && !manuallyExcludedWorkers.contains(workerInfo);
}
public void updateApplicationMeta(ApplicationMeta applicationMeta) {
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index bef74b1c5..1f065b3d8 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -224,13 +224,13 @@ private[celeborn] class Master(
masterSource.addGauge(MasterSource.REGISTERED_SHUFFLE_COUNT) { () =>
statusSystem.registeredShuffleCount
}
- masterSource.addGauge(MasterSource.WORKER_COUNT) { () =>
statusSystem.workers.size }
+ masterSource.addGauge(MasterSource.WORKER_COUNT) { () =>
statusSystem.workersMap.size }
masterSource.addGauge(MasterSource.LOST_WORKER_COUNT) { () =>
statusSystem.lostWorkers.size }
masterSource.addGauge(MasterSource.EXCLUDED_WORKER_COUNT) { () =>
statusSystem.excludedWorkers.size +
statusSystem.manuallyExcludedWorkers.size
}
masterSource.addGauge(MasterSource.AVAILABLE_WORKER_COUNT) { () =>
- statusSystem.workers.asScala.count { w =>
+ statusSystem.workersMap.values().asScala.count { w =>
statusSystem.isWorkerAvailable(w)
}
}
@@ -242,7 +242,7 @@ private[celeborn] class Master(
}
masterSource.addGauge(MasterSource.PARTITION_SIZE) { () =>
statusSystem.estimatedPartitionSize }
masterSource.addGauge(MasterSource.ACTIVE_SHUFFLE_SIZE) { () =>
- statusSystem.workers.parallelStream()
+ statusSystem.workersMap.values().parallelStream()
.mapToLong(new ToLongFunction[WorkerInfo]() {
override def applyAsLong(value: WorkerInfo): Long =
value.userResourceConsumption.values().parallelStream()
@@ -252,7 +252,7 @@ private[celeborn] class Master(
}).sum()
}
masterSource.addGauge(MasterSource.ACTIVE_SHUFFLE_FILE_COUNT) { () =>
- statusSystem.workers.parallelStream()
+ statusSystem.workersMap.values().parallelStream()
.mapToLong(new ToLongFunction[WorkerInfo]() {
override def applyAsLong(value: WorkerInfo): Long =
value.userResourceConsumption.values().parallelStream()
@@ -263,11 +263,11 @@ private[celeborn] class Master(
}
masterSource.addGauge(MasterSource.DEVICE_CELEBORN_TOTAL_CAPACITY) { () =>
- statusSystem.workers.asScala.toList.map(_.totalSpace()).sum
+ statusSystem.workersMap.values().asScala.toList.map(_.totalSpace()).sum
}
masterSource.addGauge(MasterSource.DEVICE_CELEBORN_FREE_CAPACITY) { () =>
- statusSystem.workers.asScala.toList.map(_.totalActualUsableSpace()).sum
+
statusSystem.workersMap.values().asScala.toList.map(_.totalActualUsableSpace()).sum
}
masterSource.addGauge(MasterSource.IS_ACTIVE_MASTER) { () => isMasterActive }
@@ -596,7 +596,7 @@ private[celeborn] class Master(
return
}
- statusSystem.workers.asScala.foreach { worker =>
+ statusSystem.workersMap.values().asScala.foreach { worker =>
if (worker.lastHeartbeat < currentTime - workerHeartbeatTimeoutMs
&& !statusSystem.workerLostEvents.contains(worker)) {
logWarning(s"Worker ${worker.readableAddress()} timeout! Trigger
WorkerLost event.")
@@ -635,18 +635,18 @@ private[celeborn] class Master(
if (HAHelper.getAppTimeoutDeadline(statusSystem) > currentTime) {
return
}
- statusSystem.appHeartbeatTime.keySet().asScala.foreach { key =>
- if (statusSystem.appHeartbeatTime.get(key) < currentTime -
appHeartbeatTimeoutMs) {
- logWarning(s"Application $key timeout, trigger applicationLost event.")
+ statusSystem.appHeartbeatTime.asScala.foreach { case (appId,
heartbeatTime) =>
+ if (heartbeatTime < currentTime - appHeartbeatTimeoutMs) {
+ logWarning(s"Application $appId timeout, trigger applicationLost
event.")
val requestId = MasterClient.genRequestId()
- var res = self.askSync[ApplicationLostResponse](ApplicationLost(key,
requestId))
+ var res = self.askSync[ApplicationLostResponse](ApplicationLost(appId,
requestId))
var retry = 1
while (res.status != StatusCode.SUCCESS && retry <= 3) {
- res = self.askSync[ApplicationLostResponse](ApplicationLost(key,
requestId))
+ res = self.askSync[ApplicationLostResponse](ApplicationLost(appId,
requestId))
retry += 1
}
if (retry > 3) {
- logWarning(s"Handle ApplicationLost event for $key failed more than
3 times!")
+ logWarning(s"Handle ApplicationLost event for $appId failed more
than 3 times!")
}
}
}
@@ -667,7 +667,7 @@ private[celeborn] class Master(
workerStatus: WorkerStatus,
requestId: String): Unit = {
val targetWorker = new WorkerInfo(host, rpcPort, pushPort, fetchPort,
replicatePort)
- val registered = statusSystem.workers.asScala.contains(targetWorker)
+ val registered =
statusSystem.workersMap.containsKey(targetWorker.toUniqueId())
if (!registered) {
logWarning(s"Received heartbeat from unknown worker " +
s"$host:$rpcPort:$pushPort:$fetchPort:$replicatePort.")
@@ -758,10 +758,7 @@ private[celeborn] class Master(
-1,
new util.HashMap[String, DiskInfo](),
JavaUtils.newConcurrentHashMap[UserIdentifier, ResourceConsumption]())
- val worker: WorkerInfo = statusSystem.workers
- .asScala
- .find(_ == targetWorker)
- .orNull
+ val worker: WorkerInfo =
statusSystem.workersMap.get(targetWorker.toUniqueId())
if (worker == null) {
logWarning(s"Unknown worker
$host:$rpcPort:$pushPort:$fetchPort:$replicatePort" +
s" for WorkerLost handler!")
@@ -806,7 +803,7 @@ private[celeborn] class Master(
return
}
- if (statusSystem.workers.contains(workerToRegister)) {
+ if (statusSystem.workersMap.containsKey(workerToRegister.toUniqueId())) {
logWarning(s"Receive RegisterWorker while worker" +
s" ${workerToRegister.toString()} already exists, re-register.")
statusSystem.handleRegisterWorker(
@@ -908,7 +905,7 @@ private[celeborn] class Master(
// offer slots
val slots =
masterSource.sample(MasterSource.OFFER_SLOTS_TIME,
s"offerSlots-${Random.nextInt()}") {
- statusSystem.workers.synchronized {
+ statusSystem.workersMap.synchronized {
if (slotsAssignPolicy == SlotsAssignPolicy.LOADAWARE) {
SlotsAllocator.offerSlotsLoadAware(
selectedWorkers,
@@ -1121,24 +1118,24 @@ private[celeborn] class Master(
fileCount,
System.currentTimeMillis(),
requestId)
- // unknown workers will retain in needCheckedWorkerList
- needCheckedWorkerList.removeAll(statusSystem.workers)
+ val unknownWorkers = needCheckedWorkerList.asScala.filterNot(w =>
+ statusSystem.workersMap.containsKey(w.toUniqueId())).asJava
if (shouldResponse) {
// UserResourceConsumption and DiskInfo are eliminated from WorkerInfo
// during serialization of HeartbeatFromApplicationResponse
var availableWorksSentToClient = new util.ArrayList[WorkerInfo]()
if (needAvailableWorkers) {
availableWorksSentToClient = new util.ArrayList[WorkerInfo](
- statusSystem.workers.asScala.filter(worker =>
- statusSystem.isWorkerAvailable(worker)).asJava)
+ statusSystem.workersMap.values().asScala.filter(worker =>
+ statusSystem.isWorkerAvailable(worker)).toList.asJava)
}
- var appRelatedShuffles =
+ val appRelatedShuffles =
statusSystem.registeredAppAndShuffles.getOrDefault(appId,
Collections.emptySet())
context.reply(HeartbeatFromApplicationResponse(
StatusCode.SUCCESS,
new util.ArrayList(
(statusSystem.excludedWorkers.asScala ++
statusSystem.manuallyExcludedWorkers.asScala).asJava),
- needCheckedWorkerList,
+ unknownWorkers,
new util.ArrayList[WorkerInfo](
(statusSystem.shutdownWorkers.asScala ++
statusSystem.decommissionWorkers.asScala).asJava),
availableWorksSentToClient,
@@ -1215,7 +1212,7 @@ private[celeborn] class Master(
// TODO: Support calculate topN app resource consumption.
private def computeUserResourceConsumption(
userIdentifier: UserIdentifier): ResourceConsumption = {
- val resourceConsumption = statusSystem.workers.asScala.flatMap {
+ val resourceConsumption = statusSystem.workersMap.values().asScala.flatMap
{
workerInfo =>
workerInfo.userResourceConsumption.asScala.get(userIdentifier)
}.foldRight(ResourceConsumption(0, 0, 0, 0))(_ add _)
resourceConsumption
@@ -1249,7 +1246,7 @@ private[celeborn] class Master(
private def workersAvailable(
tmpExcludedWorkerList: Set[WorkerInfo] = Set.empty):
util.List[WorkerInfo] = {
- statusSystem.workers.asScala.filter { w =>
+ statusSystem.workersMap.values().asScala.filter { w =>
statusSystem.isWorkerAvailable(w) && !tmpExcludedWorkerList.contains(w)
}.toList.asJava
}
@@ -1282,7 +1279,7 @@ private[celeborn] class Master(
}
private def getWorkers: String = {
- statusSystem.workers.asScala.mkString("\n")
+ statusSystem.workersMap.values().asScala.mkString("\n")
}
override def handleWorkerEvent(
@@ -1411,7 +1408,8 @@ private[celeborn] class Master(
",")} and remove
${removeWorkers.map(_.readableAddress).mkString(",")}.\n")
}
val unknownExcludedWorkers =
- (addWorkers ++ removeWorkers).filter(!statusSystem.workers.contains(_))
+ (addWorkers ++ removeWorkers).filterNot(w =>
+ statusSystem.workersMap.containsKey(w.toUniqueId()))
if (unknownExcludedWorkers.nonEmpty) {
sb.append(
s"Unknown workers
${unknownExcludedWorkers.map(_.readableAddress).mkString(",")}." +
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/WorkerResource.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/WorkerResource.scala
index 8d1b20bc3..34e6c734e 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/WorkerResource.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/WorkerResource.scala
@@ -51,7 +51,7 @@ class WorkerResource extends ApiRequestContext {
@GET
def workers: WorkersResponse = {
new WorkersResponse()
-
.workers(statusSystem.workers.asScala.map(ApiUtils.workerData).toSeq.asJava)
+
.workers(statusSystem.workersMap.values().asScala.map(ApiUtils.workerData).toSeq.asJava)
.lostWorkers(statusSystem.lostWorkers.asScala.toSeq.sortBy(_._2)
.map(kv =>
new
WorkerTimestampData().worker(ApiUtils.workerData(kv._1)).timestamp(kv._2)).asJava)
@@ -134,7 +134,8 @@ class WorkerResource extends ApiRequestContext {
s"eventType(${request.getEventType}) and
workers(${request.getWorkers}) are required")
}
val workers = request.getWorkers.asScala.map(ApiUtils.toWorkerInfo).toSeq
- val (filteredWorkers, unknownWorkers) =
workers.partition(statusSystem.workers.contains)
+ val (filteredWorkers, unknownWorkers) =
+ workers.partition(w =>
statusSystem.workersMap.containsKey(w.toUniqueId()))
if (filteredWorkers.isEmpty) {
throw new BadRequestException(
s"None of the workers are known:
${unknownWorkers.map(_.readableAddress).mkString(", ")}")
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
index 65660158e..cde62c4cf 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
@@ -157,7 +157,7 @@ public class DefaultMetaSystemSuiteJ {
userResourceConsumption3,
getNewReqeustId());
- assertEquals(3, statusSystem.workers.size());
+ assertEquals(3, statusSystem.workersMap.size());
}
@Test
@@ -253,7 +253,7 @@ public class DefaultMetaSystemSuiteJ {
statusSystem.handleWorkerLost(
HOSTNAME1, RPCPORT1, PUSHPORT1, FETCHPORT1, REPLICATEPORT1,
getNewReqeustId());
- assertEquals(2, statusSystem.workers.size());
+ assertEquals(2, statusSystem.workersMap.size());
}
private static final String APPID1 = "appId1";
@@ -376,20 +376,20 @@ public class DefaultMetaSystemSuiteJ {
userResourceConsumption3,
getNewReqeustId());
- assertEquals(3, statusSystem.workers.size());
+ assertEquals(3, statusSystem.workersMap.size());
Map<String, Map<String, Integer>> workersToAllocate = new HashMap<>();
Map<String, Integer> allocation = new HashMap<>();
allocation.put("disk1", 5);
workersToAllocate.put(
- statusSystem.workers.stream()
+ statusSystem.workersMap.values().stream()
.filter(w -> w.host().equals(HOSTNAME1))
.findFirst()
.get()
.toUniqueId(),
allocation);
workersToAllocate.put(
- statusSystem.workers.stream()
+ statusSystem.workersMap.values().stream()
.filter(w -> w.host().equals(HOSTNAME2))
.findFirst()
.get()
@@ -399,7 +399,7 @@ public class DefaultMetaSystemSuiteJ {
statusSystem.handleRequestSlots(SHUFFLEKEY1, HOSTNAME1, workersToAllocate,
getNewReqeustId());
assertEquals(
0,
- statusSystem.workers.stream()
+ statusSystem.workersMap.values().stream()
.filter(w -> w.host().equals(HOSTNAME1))
.findFirst()
.get()
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
index 9696dba2d..d2539ba5d 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
@@ -232,20 +232,23 @@ public class MasterStateMachineSuiteJ extends
RatisBaseSuiteJ {
AppDiskUsageSnapShot originCurrentSnapshot =
masterStatusSystem.appDiskUsageMetric.currentSnapShot().get();
- masterStatusSystem.workers.add(new WorkerInfo(host1, 9095, 9094, 9093,
9092, 9091));
- masterStatusSystem.workers.add(new WorkerInfo(host2, 9095, 9094, 9093,
9092, 9091));
- masterStatusSystem.workers.add(new WorkerInfo(host3, 9095, 9094, 9093,
9092, 9091));
+ WorkerInfo workerInfo1 = new WorkerInfo(host1, 9095, 9094, 9093, 9092,
9091);
+ WorkerInfo workerInfo2 = new WorkerInfo(host2, 9095, 9094, 9093, 9092,
9091);
+ WorkerInfo workerInfo3 = new WorkerInfo(host3, 9095, 9094, 9093, 9092,
9091);
+ masterStatusSystem.workersMap.put(workerInfo1.toUniqueId(), workerInfo1);
+ masterStatusSystem.workersMap.put(workerInfo2.toUniqueId(), workerInfo2);
+ masterStatusSystem.workersMap.put(workerInfo3.toUniqueId(), workerInfo3);
masterStatusSystem.writeMetaInfoToFile(tmpFile);
masterStatusSystem.hostnameSet.clear();
masterStatusSystem.excludedWorkers.clear();
masterStatusSystem.manuallyExcludedWorkers.clear();
- masterStatusSystem.workers.clear();
+ masterStatusSystem.workersMap.clear();
masterStatusSystem.restoreMetaFromFile(tmpFile);
- Assert.assertEquals(3, masterStatusSystem.workers.size());
+ Assert.assertEquals(3, masterStatusSystem.workersMap.size());
Assert.assertEquals(3, masterStatusSystem.excludedWorkers.size());
Assert.assertEquals(2, masterStatusSystem.manuallyExcludedWorkers.size());
Assert.assertEquals(3, masterStatusSystem.hostnameSet.size());
@@ -260,7 +263,7 @@ public class MasterStateMachineSuiteJ extends
RatisBaseSuiteJ {
Assert.assertArrayEquals(originSnapshots,
masterStatusSystem.appDiskUsageMetric.snapShots());
masterStatusSystem.restoreMetaFromFile(tmpFile);
- Assert.assertEquals(3, masterStatusSystem.workers.size());
+ Assert.assertEquals(3, masterStatusSystem.workersMap.size());
}
private String getNewReqeustId() {
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index 4f2fae78a..e4170a02c 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -347,16 +347,16 @@ public class RatisMasterStatusSystemSuiteJ {
getNewReqeustId());
Thread.sleep(3000L);
- Assert.assertEquals(3, STATUSSYSTEM1.workers.size());
- Assert.assertEquals(3, STATUSSYSTEM2.workers.size());
- Assert.assertEquals(3, STATUSSYSTEM3.workers.size());
+ Assert.assertEquals(3, STATUSSYSTEM1.workersMap.size());
+ Assert.assertEquals(3, STATUSSYSTEM2.workersMap.size());
+ Assert.assertEquals(3, STATUSSYSTEM3.workersMap.size());
- assertWorkers(STATUSSYSTEM1.workers);
- assertWorkers(STATUSSYSTEM2.workers);
- assertWorkers(STATUSSYSTEM3.workers);
+ assertWorkers(STATUSSYSTEM1.workersMap.values());
+ assertWorkers(STATUSSYSTEM2.workersMap.values());
+ assertWorkers(STATUSSYSTEM3.workersMap.values());
}
- private void assertWorkers(Set<WorkerInfo> workerInfos) {
+ private void assertWorkers(Collection<WorkerInfo> workerInfos) {
for (WorkerInfo workerInfo : workerInfos) {
assertWorker(workerInfo);
}
@@ -479,9 +479,9 @@ public class RatisMasterStatusSystemSuiteJ {
HOSTNAME1, RPCPORT1, PUSHPORT1, FETCHPORT1, REPLICATEPORT1,
getNewReqeustId());
Thread.sleep(3000L);
- Assert.assertEquals(2, STATUSSYSTEM1.workers.size());
- Assert.assertEquals(2, STATUSSYSTEM2.workers.size());
- Assert.assertEquals(2, STATUSSYSTEM3.workers.size());
+ Assert.assertEquals(2, STATUSSYSTEM1.workersMap.size());
+ Assert.assertEquals(2, STATUSSYSTEM2.workersMap.size());
+ Assert.assertEquals(2, STATUSSYSTEM3.workersMap.size());
}
@Test
@@ -571,21 +571,21 @@ public class RatisMasterStatusSystemSuiteJ {
Assert.assertEquals(
0,
- statusSystem.workers.stream()
+ statusSystem.workersMap.values().stream()
.filter(w -> w.host().equals(HOSTNAME1))
.findFirst()
.get()
.usedSlots());
Assert.assertEquals(
0,
- statusSystem.workers.stream()
+ statusSystem.workersMap.values().stream()
.filter(w -> w.host().equals(HOSTNAME2))
.findFirst()
.get()
.usedSlots());
Assert.assertEquals(
0,
- statusSystem.workers.stream()
+ statusSystem.workersMap.values().stream()
.filter(w -> w.host().equals(HOSTNAME3))
.findFirst()
.get()
@@ -632,22 +632,22 @@ public class RatisMasterStatusSystemSuiteJ {
getNewReqeustId());
Thread.sleep(3000L);
- Assert.assertEquals(3, STATUSSYSTEM1.workers.size());
- Assert.assertEquals(3, STATUSSYSTEM2.workers.size());
- Assert.assertEquals(3, STATUSSYSTEM3.workers.size());
+ Assert.assertEquals(3, STATUSSYSTEM1.workersMap.size());
+ Assert.assertEquals(3, STATUSSYSTEM2.workersMap.size());
+ Assert.assertEquals(3, STATUSSYSTEM3.workersMap.size());
Map<String, Map<String, Integer>> workersToAllocate = new HashMap<>();
Map<String, Integer> allocations = new HashMap<>();
allocations.put("disk1", 5);
workersToAllocate.put(
- statusSystem.workers.stream()
+ statusSystem.workersMap.values().stream()
.filter(w -> w.host().equals(HOSTNAME1))
.findFirst()
.get()
.toUniqueId(),
allocations);
workersToAllocate.put(
- statusSystem.workers.stream()
+ statusSystem.workersMap.values().stream()
.filter(w -> w.host().equals(HOSTNAME2))
.findFirst()
.get()
@@ -661,21 +661,21 @@ public class RatisMasterStatusSystemSuiteJ {
Assert.assertEquals(
0,
- STATUSSYSTEM1.workers.stream()
+ STATUSSYSTEM1.workersMap.values().stream()
.filter(w -> w.host().equals(HOSTNAME1))
.findFirst()
.get()
.usedSlots());
Assert.assertEquals(
0,
- STATUSSYSTEM2.workers.stream()
+ STATUSSYSTEM2.workersMap.values().stream()
.filter(w -> w.host().equals(HOSTNAME1))
.findFirst()
.get()
.usedSlots());
Assert.assertEquals(
0,
- STATUSSYSTEM3.workers.stream()
+ STATUSSYSTEM3.workersMap.values().stream()
.filter(w -> w.host().equals(HOSTNAME1))
.findFirst()
.get()
@@ -1087,21 +1087,21 @@ public class RatisMasterStatusSystemSuiteJ {
public void resetStatus() {
STATUSSYSTEM1.registeredAppAndShuffles.clear();
STATUSSYSTEM1.hostnameSet.clear();
- STATUSSYSTEM1.workers.clear();
+ STATUSSYSTEM1.workersMap.clear();
STATUSSYSTEM1.appHeartbeatTime.clear();
STATUSSYSTEM1.excludedWorkers.clear();
STATUSSYSTEM1.workerLostEvents.clear();
STATUSSYSTEM2.registeredAppAndShuffles.clear();
STATUSSYSTEM2.hostnameSet.clear();
- STATUSSYSTEM2.workers.clear();
+ STATUSSYSTEM2.workersMap.clear();
STATUSSYSTEM2.appHeartbeatTime.clear();
STATUSSYSTEM2.excludedWorkers.clear();
STATUSSYSTEM2.workerLostEvents.clear();
STATUSSYSTEM3.registeredAppAndShuffles.clear();
STATUSSYSTEM3.hostnameSet.clear();
- STATUSSYSTEM3.workers.clear();
+ STATUSSYSTEM3.workersMap.clear();
STATUSSYSTEM3.appHeartbeatTime.clear();
STATUSSYSTEM3.excludedWorkers.clear();
STATUSSYSTEM3.workerLostEvents.clear();
@@ -1280,7 +1280,7 @@ public class RatisMasterStatusSystemSuiteJ {
statusSystem.handleReportWorkerUnavailable(unavailableWorkers,
getNewReqeustId());
Thread.sleep(3000L);
- Assert.assertEquals(2, STATUSSYSTEM1.workers.size());
+ Assert.assertEquals(2, STATUSSYSTEM1.workersMap.size());
Assert.assertEquals(1, STATUSSYSTEM1.shutdownWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM2.shutdownWorkers.size());