This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 615479c44 [CELEBORN-468] Timeout useless lostWorkers/shutdownWorkers
meta
615479c44 is described below
commit 615479c44289bd79c83774eabf99760db5d53e97
Author: Shuang <[email protected]>
AuthorDate: Mon Sep 18 18:39:43 2023 +0800
[CELEBORN-468] Timeout useless lostWorkers/shutdownWorkers meta
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
If Worker lost or lost after graceful shutdown, Master would retain these
lostWorker/shutdownWorkers meta permanently,
These meta would cause some noisy message in lifecycleManager. For these
meta better to delete them after a while
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT & E2E test
Closes #1916 from RexXiong/CELEBORN-468.
Authored-by: Shuang <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
common/src/main/proto/TransportMessages.proto | 6 ++
.../org/apache/celeborn/common/CelebornConf.scala | 10 +++
.../common/protocol/message/ControlMessages.scala | 20 ++++++
docs/configuration/master.md | 1 +
.../master/clustermeta/AbstractMetaManager.java | 11 ++++
.../master/clustermeta/IMetadataHandler.java | 2 +
.../clustermeta/SingleMasterMetaManager.java | 6 ++
.../master/clustermeta/ha/HAMasterMetaManager.java | 21 +++++++
.../deploy/master/clustermeta/ha/MetaHandler.java | 8 +++
master/src/main/proto/Resource.proto | 6 ++
.../celeborn/service/deploy/master/Master.scala | 49 +++++++++++++++
.../ha/RatisMasterStatusSystemSuiteJ.java | 73 ++++++++++++++++++++++
12 files changed, 213 insertions(+)
diff --git a/common/src/main/proto/TransportMessages.proto
b/common/src/main/proto/TransportMessages.proto
index a8dc7f251..2e55c73b1 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -75,6 +75,7 @@ enum MessageType {
STREAM_HANDLER = 52;
CHECK_WORKERS_AVAILABLE = 53;
CHECK_WORKERS_AVAILABLE_RESPONSE = 54;
+ REMOVE_WORKERS_UNAVAILABLE_INFO = 55;
}
message PbStorageInfo {
@@ -394,6 +395,11 @@ message PbDestroyWorkerSlotsResponse {
message PbCheckForWorkerTimeout {
}
+message PbRemoveWorkersUnavailableInfo {
+ repeated PbWorkerInfo workerInfo = 1;
+ string requestId = 2;
+}
+
message PbWorkerLost {
string host = 1;
int32 rpcPort = 2;
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index ecf35f67a..16a113b9b 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -640,6 +640,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def workerCheckFileCleanMaxRetries: Int =
get(WORKER_CHECK_FILE_CLEAN_MAX_RETRIES)
def workerCheckFileCleanTimeout: Long = get(WORKER_CHECK_FILE_CLEAN_TIMEOUT)
def workerHeartbeatTimeout: Long = get(WORKER_HEARTBEAT_TIMEOUT)
+ def workerUnavailableInfoExpireTimeout: Long =
get(WORKER_UNAVAILABLE_INFO_EXPIRE_TIMEOUT)
+
def workerReplicateThreads: Int = get(WORKER_REPLICATE_THREADS)
def workerCommitThreads: Int =
if (hasHDFSStorage) Math.max(128, get(WORKER_COMMIT_THREADS)) else
get(WORKER_COMMIT_THREADS)
@@ -1578,6 +1580,14 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("120s")
+ val WORKER_UNAVAILABLE_INFO_EXPIRE_TIMEOUT: ConfigEntry[Long] =
+ buildConf("celeborn.master.workerUnavailableInfo.expireTimeout")
+ .categories("master")
+ .version("0.3.2")
+ .doc("Worker unavailable info would be cleared when the retention period
is expired")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("1800s")
+
val MASTER_HOST: ConfigEntry[String] =
buildConf("celeborn.master.host")
.categories("master")
diff --git
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index 323f36399..749170a26 100644
---
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -66,6 +66,8 @@ object ControlMessages extends Logging {
case object CheckForApplicationTimeOut extends Message
+ case object CheckForWorkerUnavailableInfoTimeout extends Message
+
case object CheckForHDFSExpiredDirsTimeout extends Message
case object RemoveExpiredShuffle extends Message
@@ -360,6 +362,18 @@ object ControlMessages extends Logging {
.build()
}
+ object RemoveWorkersUnavailableInfo {
+ def apply(
+ unavailable: util.List[WorkerInfo],
+ requestId: String): PbRemoveWorkersUnavailableInfo =
+ PbRemoveWorkersUnavailableInfo.newBuilder()
+ .setRequestId(requestId)
+ .addAllWorkerInfo(unavailable.asScala.map { workerInfo =>
+ PbSerDeUtils.toPbWorkerInfo(workerInfo, true)
+ }.toList.asJava)
+ .build()
+ }
+
/**
* ==========================================
* handled by worker
@@ -682,6 +696,9 @@ object ControlMessages extends Logging {
.setRequestId(requestId).build().toByteArray
new TransportMessage(MessageType.REPORT_WORKER_FAILURE, payload)
+ case pb: PbRemoveWorkersUnavailableInfo =>
+ new TransportMessage(MessageType.REMOVE_WORKERS_UNAVAILABLE_INFO,
pb.toByteArray)
+
case pb: PbRegisterWorkerResponse =>
new TransportMessage(MessageType.REGISTER_WORKER_RESPONSE,
pb.toByteArray)
@@ -987,6 +1004,9 @@ object ControlMessages extends Logging {
.asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava),
pbReportWorkerUnavailable.getRequestId)
+ case REMOVE_WORKERS_UNAVAILABLE_INFO_VALUE =>
+ PbRemoveWorkersUnavailableInfo.parseFrom(message.getPayload)
+
case REGISTER_WORKER_RESPONSE_VALUE =>
PbRegisterWorkerResponse.parseFrom(message.getPayload)
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index 59baad7c6..3b5d7697a 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -35,6 +35,7 @@ license: |
| celeborn.master.slot.assign.maxWorkers | 10000 | Max workers that slots of
one shuffle can be allocated on. Will choose the smaller positive one from
Master side and Client side, see `celeborn.client.slot.assign.maxWorkers`. |
0.3.1 |
| celeborn.master.slot.assign.policy | ROUNDROBIN | Policy for master to
assign slots, Celeborn supports two types of policy: roundrobin and loadaware.
Loadaware policy will be ignored when `HDFS` is enabled in
`celeborn.storage.activeTypes` | 0.3.0 |
| celeborn.master.userResourceConsumption.update.interval | 30s | Time length
for a window about compute user resource consumption. | 0.3.0 |
+| celeborn.master.workerUnavailableInfo.expireTimeout | 1800s | Worker
unavailable info would be cleared when the retention period is expired | 0.3.2
|
| celeborn.storage.activeTypes | HDD,SSD | Enabled storage levels. Available
options: HDD,SSD,HDFS. | 0.3.0 |
| celeborn.storage.hdfs.dir | <undefined> | HDFS base directory for
Celeborn to store shuffle data. | 0.2.0 |
<!--end-include-->
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 36e60d89a..5e8f52b6f 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -135,6 +135,17 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
excludedWorkers.remove(worker);
}
+ public void removeWorkersUnavailableInfoMeta(List<WorkerInfo>
unavailableWorkers) {
+ synchronized (workers) {
+ for (WorkerInfo workerInfo : unavailableWorkers) {
+ if (lostWorkers.containsKey(workerInfo)) {
+ lostWorkers.remove(workerInfo);
+ shutdownWorkers.remove(workerInfo);
+ }
+ }
+ }
+ }
+
public void updateWorkerHeartbeatMeta(
String host,
int rpcPort,
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
index a34cb445d..008b35707 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
@@ -45,6 +45,8 @@ public interface IMetadataHandler {
void handleWorkerRemove(
String host, int rpcPort, int pushPort, int fetchPort, int
replicatePort, String requestId);
+ void handleRemoveWorkersUnavailableInfo(List<WorkerInfo> unavailableWorkers,
String requestId);
+
void handleWorkerHeartbeat(
String host,
int rpcPort,
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
index 15c0c6d6d..7d01b4961 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
@@ -81,6 +81,12 @@ public class SingleMasterMetaManager extends
AbstractMetaManager {
updateWorkerRemoveMeta(host, rpcPort, pushPort, fetchPort, replicatePort);
}
+ @Override
+ public void handleRemoveWorkersUnavailableInfo(
+ List<WorkerInfo> unavailableWorkers, String requestId) {
+ removeWorkersUnavailableInfoMeta(unavailableWorkers);
+ }
+
@Override
public void handleWorkerHeartbeat(
String host,
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
index 181c6e487..8fc641795 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
@@ -186,6 +186,27 @@ public class HAMasterMetaManager extends
AbstractMetaManager {
}
}
+ @Override
+ public void handleRemoveWorkersUnavailableInfo(
+ List<WorkerInfo> unavailableWorkers, String requestId) {
+ try {
+ List<ResourceProtos.WorkerAddress> addrs =
+
unavailableWorkers.stream().map(MetaUtil::infoToAddr).collect(Collectors.toList());
+ ratisServer.submitRequest(
+ ResourceRequest.newBuilder()
+ .setCmdType(Type.RemoveWorkersUnavailableInfo)
+ .setRequestId(requestId)
+ .setRemoveWorkersUnavailableInfoRequest(
+
ResourceProtos.RemoveWorkersUnavailableInfoRequest.newBuilder()
+ .addAllUnavailable(addrs)
+ .build())
+ .build());
+ } catch (CelebornRuntimeException e) {
+ LOG.error("Handle remove workers unavailable info failed!", e);
+ throw e;
+ }
+ }
+
@Override
public void handleWorkerHeartbeat(
String host,
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
index 27ba6d882..ab9d39dbc 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
@@ -228,6 +228,14 @@ public class MetaHandler {
metaSystem.updatePartitionSize();
break;
+ case RemoveWorkersUnavailableInfo:
+ List<ResourceProtos.WorkerAddress> unavailableList =
+
request.getRemoveWorkersUnavailableInfoRequest().getUnavailableList();
+ List<WorkerInfo> unavailableWorkers =
+
unavailableList.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
+ metaSystem.removeWorkersUnavailableInfoMeta(unavailableWorkers);
+ break;
+
default:
throw new IOException("Can not parse this command!" + request);
}
diff --git a/master/src/main/proto/Resource.proto
b/master/src/main/proto/Resource.proto
index a6fb5d173..9120105a5 100644
--- a/master/src/main/proto/Resource.proto
+++ b/master/src/main/proto/Resource.proto
@@ -35,6 +35,7 @@ enum Type {
ReportWorkerUnavailable = 20;
UpdatePartitionSize = 21;
WorkerRemove = 22;
+ RemoveWorkersUnavailableInfo = 23;
}
message ResourceRequest {
@@ -53,6 +54,7 @@ message ResourceRequest {
optional RegisterWorkerRequest registerWorkerRequest = 17;
optional ReportWorkerUnavailableRequest reportWorkerUnavailableRequest = 18;
optional WorkerRemoveRequest workerRemoveRequest = 19;
+ optional RemoveWorkersUnavailableInfoRequest
removeWorkersUnavailableInfoRequest = 20;
}
message DiskInfo {
@@ -138,6 +140,10 @@ message ReportWorkerUnavailableRequest {
repeated WorkerAddress unavailable = 1;
}
+message RemoveWorkersUnavailableInfoRequest {
+ repeated WorkerAddress unavailable = 1;
+}
+
message WorkerAddress {
required string host = 1;
required int32 rpcPort = 2;
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index b06d4684c..7cb4a12d7 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -94,12 +94,15 @@ private[celeborn] class Master(
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")
private var checkForWorkerTimeOutTask: ScheduledFuture[_] = _
private var checkForApplicationTimeOutTask: ScheduledFuture[_] = _
+ private var checkForUnavailableWorkerTimeOutTask: ScheduledFuture[_] = _
private var checkForHDFSRemnantDirsTimeOutTask: ScheduledFuture[_] = _
private val nonEagerHandler =
ThreadUtils.newDaemonCachedThreadPool("master-noneager-handler", 64)
// Config constants
private val workerHeartbeatTimeoutMs = conf.workerHeartbeatTimeout
private val appHeartbeatTimeoutMs = conf.appHeartbeatTimeoutMs
+ private val workerUnavailableInfoExpireTimeoutMs =
conf.workerUnavailableInfoExpireTimeout
+
private val hdfsExpireDirsTimeoutMS = conf.hdfsExpireDirsTimeoutMS
private val hasHDFSStorage = conf.hasHDFSStorage
@@ -191,6 +194,16 @@ private[celeborn] class Master(
appHeartbeatTimeoutMs / 2,
TimeUnit.MILLISECONDS)
+ checkForUnavailableWorkerTimeOutTask =
forwardMessageThread.scheduleAtFixedRate(
+ new Runnable {
+ override def run(): Unit = Utils.tryLogNonFatalError {
+ self.send(CheckForWorkerUnavailableInfoTimeout)
+ }
+ },
+ 0,
+ workerUnavailableInfoExpireTimeoutMs / 2,
+ TimeUnit.MILLISECONDS)
+
if (hasHDFSStorage) {
checkForHDFSRemnantDirsTimeOutTask =
forwardMessageThread.scheduleAtFixedRate(
new Runnable {
@@ -210,6 +223,9 @@ private[celeborn] class Master(
if (checkForWorkerTimeOutTask != null) {
checkForWorkerTimeOutTask.cancel(true)
}
+ if (checkForUnavailableWorkerTimeOutTask != null) {
+ checkForUnavailableWorkerTimeOutTask.cancel(true)
+ }
if (checkForApplicationTimeOutTask != null) {
checkForApplicationTimeOutTask.cancel(true)
}
@@ -238,6 +254,8 @@ private[celeborn] class Master(
override def receive: PartialFunction[Any, Unit] = {
case _: PbCheckForWorkerTimeout =>
executeWithLeaderChecker(null, timeoutDeadWorkers())
+ case CheckForWorkerUnavailableInfoTimeout =>
+ executeWithLeaderChecker(null, timeoutWorkerUnavailableInfos())
case CheckForApplicationTimeOut =>
executeWithLeaderChecker(null, timeoutDeadApplications())
case CheckForHDFSExpiredDirsTimeout =>
@@ -253,6 +271,12 @@ private[celeborn] class Master(
executeWithLeaderChecker(
null,
handleWorkerLost(null, host, rpcPort, pushPort, fetchPort,
replicatePort, requestId))
+ case pb: PbRemoveWorkersUnavailableInfo =>
+ val unavailableWorkers = new
util.ArrayList[WorkerInfo](pb.getWorkerInfoList
+ .asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava)
+ executeWithLeaderChecker(
+ null,
+ handleRemoveWorkersUnavailableInfos(unavailableWorkers,
pb.getRequestId))
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any,
Unit] = {
@@ -403,6 +427,25 @@ private[celeborn] class Master(
}
}
+ private def timeoutWorkerUnavailableInfos(): Unit = {
+ val currentTime = System.currentTimeMillis()
+ // Need increase timeout deadline to avoid long time leader election period
+ if (HAHelper.getWorkerTimeoutDeadline(statusSystem) > currentTime) {
+ return
+ }
+
+ val unavailableInfoTimeoutWorkers = lostWorkersSnapshot.asScala.filter {
+ case (_, lostTime) => currentTime - lostTime >
workerUnavailableInfoExpireTimeoutMs
+ }.keySet.toList.asJava
+
+ if (!unavailableInfoTimeoutWorkers.isEmpty) {
+ logDebug(s"Remove unavailable info for workers:
$unavailableInfoTimeoutWorkers")
+ self.send(RemoveWorkersUnavailableInfo(
+ unavailableInfoTimeoutWorkers,
+ MasterClient.genRequestId()));
+ }
+ }
+
private def timeoutDeadApplications(): Unit = {
val currentTime = System.currentTimeMillis()
// Need increase timeout deadline to avoid long time leader election period
@@ -726,6 +769,12 @@ private[celeborn] class Master(
}
}
+ private def handleRemoveWorkersUnavailableInfos(
+ unavailableWorkers: util.List[WorkerInfo],
+ requestId: String): Unit = {
+ statusSystem.handleRemoveWorkersUnavailableInfo(unavailableWorkers,
requestId);
+ }
+
private def computeUserResourceConsumption(userIdentifier: UserIdentifier)
: ResourceConsumption = {
val current = System.currentTimeMillis()
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index 6ae55d162..b8cdae002 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -907,6 +907,79 @@ public class RatisMasterStatusSystemSuiteJ {
Assert.assertEquals(0, STATUSSYSTEM3.excludedWorkers.size());
}
+ @Test
+ public void testHandleRemoveWorkersUnavailableInfo() throws
InterruptedException {
+ AbstractMetaManager statusSystem = pickLeaderStatusSystem();
+ Assert.assertNotNull(statusSystem);
+
+ statusSystem.handleRegisterWorker(
+ HOSTNAME1,
+ RPCPORT1,
+ PUSHPORT1,
+ FETCHPORT1,
+ REPLICATEPORT1,
+ disks1,
+ userResourceConsumption1,
+ getNewReqeustId());
+ statusSystem.handleRegisterWorker(
+ HOSTNAME2,
+ RPCPORT2,
+ PUSHPORT2,
+ FETCHPORT2,
+ REPLICATEPORT2,
+ disks2,
+ userResourceConsumption2,
+ getNewReqeustId());
+ statusSystem.handleRegisterWorker(
+ HOSTNAME3,
+ RPCPORT3,
+ PUSHPORT3,
+ FETCHPORT3,
+ REPLICATEPORT3,
+ disks3,
+ userResourceConsumption3,
+ getNewReqeustId());
+
+ WorkerInfo workerInfo1 =
+ new WorkerInfo(
+ HOSTNAME1,
+ RPCPORT1,
+ PUSHPORT1,
+ FETCHPORT1,
+ REPLICATEPORT1,
+ disks1,
+ userResourceConsumption1);
+
+ List<WorkerInfo> unavailableWorkers = new ArrayList<>();
+ unavailableWorkers.add(workerInfo1);
+
+ statusSystem.handleWorkerLost(
+ HOSTNAME1, RPCPORT1, PUSHPORT1, FETCHPORT1, REPLICATEPORT1,
getNewReqeustId());
+ statusSystem.handleReportWorkerUnavailable(unavailableWorkers,
getNewReqeustId());
+
+ Thread.sleep(3000L);
+ Assert.assertEquals(2, STATUSSYSTEM1.workers.size());
+
+ Assert.assertEquals(1, STATUSSYSTEM1.shutdownWorkers.size());
+ Assert.assertEquals(1, STATUSSYSTEM2.shutdownWorkers.size());
+ Assert.assertEquals(1, STATUSSYSTEM3.shutdownWorkers.size());
+
+ Assert.assertEquals(1, STATUSSYSTEM1.lostWorkers.size());
+ Assert.assertEquals(1, STATUSSYSTEM2.lostWorkers.size());
+ Assert.assertEquals(1, STATUSSYSTEM3.lostWorkers.size());
+
+ statusSystem.handleRemoveWorkersUnavailableInfo(unavailableWorkers,
getNewReqeustId());
+ Thread.sleep(3000L);
+
+ Assert.assertEquals(0, STATUSSYSTEM1.shutdownWorkers.size());
+ Assert.assertEquals(0, STATUSSYSTEM2.shutdownWorkers.size());
+ Assert.assertEquals(0, STATUSSYSTEM3.shutdownWorkers.size());
+
+ Assert.assertEquals(0, STATUSSYSTEM1.lostWorkers.size());
+ Assert.assertEquals(0, STATUSSYSTEM2.lostWorkers.size());
+ Assert.assertEquals(0, STATUSSYSTEM3.lostWorkers.size());
+ }
+
@Test
public void testHandleUpdatePartitionSize() throws InterruptedException {
AbstractMetaManager statusSystem = pickLeaderStatusSystem();