This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 615479c44 [CELEBORN-468] Timeout useless lostWorkers/shutdownWorkers 
meta
615479c44 is described below

commit 615479c44289bd79c83774eabf99760db5d53e97
Author: Shuang <[email protected]>
AuthorDate: Mon Sep 18 18:39:43 2023 +0800

    [CELEBORN-468] Timeout useless lostWorkers/shutdownWorkers meta
    
    ### What changes were proposed in this pull request?
    As title
    
    ### Why are the changes needed?
    If Worker lost or lost after graceful shutdown, Master would retain these 
lostWorker/shutdownWorkers meta permanently,
    These meta would cause some noisy message in lifecycleManager. For these 
meta better to delete them after a while
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT & E2E test
    
    Closes #1916 from RexXiong/CELEBORN-468.
    
    Authored-by: Shuang <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 common/src/main/proto/TransportMessages.proto      |  6 ++
 .../org/apache/celeborn/common/CelebornConf.scala  | 10 +++
 .../common/protocol/message/ControlMessages.scala  | 20 ++++++
 docs/configuration/master.md                       |  1 +
 .../master/clustermeta/AbstractMetaManager.java    | 11 ++++
 .../master/clustermeta/IMetadataHandler.java       |  2 +
 .../clustermeta/SingleMasterMetaManager.java       |  6 ++
 .../master/clustermeta/ha/HAMasterMetaManager.java | 21 +++++++
 .../deploy/master/clustermeta/ha/MetaHandler.java  |  8 +++
 master/src/main/proto/Resource.proto               |  6 ++
 .../celeborn/service/deploy/master/Master.scala    | 49 +++++++++++++++
 .../ha/RatisMasterStatusSystemSuiteJ.java          | 73 ++++++++++++++++++++++
 12 files changed, 213 insertions(+)

diff --git a/common/src/main/proto/TransportMessages.proto 
b/common/src/main/proto/TransportMessages.proto
index a8dc7f251..2e55c73b1 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -75,6 +75,7 @@ enum MessageType {
   STREAM_HANDLER = 52;
   CHECK_WORKERS_AVAILABLE = 53;
   CHECK_WORKERS_AVAILABLE_RESPONSE = 54;
+  REMOVE_WORKERS_UNAVAILABLE_INFO = 55;
 }
 
 message PbStorageInfo {
@@ -394,6 +395,11 @@ message PbDestroyWorkerSlotsResponse {
 message PbCheckForWorkerTimeout {
 }
 
+message PbRemoveWorkersUnavailableInfo {
+  repeated PbWorkerInfo workerInfo = 1;
+  string requestId = 2;
+}
+
 message PbWorkerLost {
   string host = 1;
   int32 rpcPort = 2;
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index ecf35f67a..16a113b9b 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -640,6 +640,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def workerCheckFileCleanMaxRetries: Int = 
get(WORKER_CHECK_FILE_CLEAN_MAX_RETRIES)
   def workerCheckFileCleanTimeout: Long = get(WORKER_CHECK_FILE_CLEAN_TIMEOUT)
   def workerHeartbeatTimeout: Long = get(WORKER_HEARTBEAT_TIMEOUT)
+  def workerUnavailableInfoExpireTimeout: Long = 
get(WORKER_UNAVAILABLE_INFO_EXPIRE_TIMEOUT)
+
   def workerReplicateThreads: Int = get(WORKER_REPLICATE_THREADS)
   def workerCommitThreads: Int =
     if (hasHDFSStorage) Math.max(128, get(WORKER_COMMIT_THREADS)) else 
get(WORKER_COMMIT_THREADS)
@@ -1578,6 +1580,14 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("120s")
 
+  val WORKER_UNAVAILABLE_INFO_EXPIRE_TIMEOUT: ConfigEntry[Long] =
+    buildConf("celeborn.master.workerUnavailableInfo.expireTimeout")
+      .categories("master")
+      .version("0.3.2")
+      .doc("Worker unavailable info would be cleared when the retention period 
is expired")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("1800s")
+
   val MASTER_HOST: ConfigEntry[String] =
     buildConf("celeborn.master.host")
       .categories("master")
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index 323f36399..749170a26 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -66,6 +66,8 @@ object ControlMessages extends Logging {
 
   case object CheckForApplicationTimeOut extends Message
 
+  case object CheckForWorkerUnavailableInfoTimeout extends Message
+
   case object CheckForHDFSExpiredDirsTimeout extends Message
 
   case object RemoveExpiredShuffle extends Message
@@ -360,6 +362,18 @@ object ControlMessages extends Logging {
         .build()
   }
 
+  object RemoveWorkersUnavailableInfo {
+    def apply(
+        unavailable: util.List[WorkerInfo],
+        requestId: String): PbRemoveWorkersUnavailableInfo =
+      PbRemoveWorkersUnavailableInfo.newBuilder()
+        .setRequestId(requestId)
+        .addAllWorkerInfo(unavailable.asScala.map { workerInfo =>
+          PbSerDeUtils.toPbWorkerInfo(workerInfo, true)
+        }.toList.asJava)
+        .build()
+  }
+
   /**
    * ==========================================
    *         handled by worker
@@ -682,6 +696,9 @@ object ControlMessages extends Logging {
         .setRequestId(requestId).build().toByteArray
       new TransportMessage(MessageType.REPORT_WORKER_FAILURE, payload)
 
+    case pb: PbRemoveWorkersUnavailableInfo =>
+      new TransportMessage(MessageType.REMOVE_WORKERS_UNAVAILABLE_INFO, 
pb.toByteArray)
+
     case pb: PbRegisterWorkerResponse =>
       new TransportMessage(MessageType.REGISTER_WORKER_RESPONSE, 
pb.toByteArray)
 
@@ -987,6 +1004,9 @@ object ControlMessages extends Logging {
             .asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava),
           pbReportWorkerUnavailable.getRequestId)
 
+      case REMOVE_WORKERS_UNAVAILABLE_INFO_VALUE =>
+        PbRemoveWorkersUnavailableInfo.parseFrom(message.getPayload)
+
       case REGISTER_WORKER_RESPONSE_VALUE =>
         PbRegisterWorkerResponse.parseFrom(message.getPayload)
 
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index 59baad7c6..3b5d7697a 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -35,6 +35,7 @@ license: |
 | celeborn.master.slot.assign.maxWorkers | 10000 | Max workers that slots of 
one shuffle can be allocated on. Will choose the smaller positive one from 
Master side and Client side, see `celeborn.client.slot.assign.maxWorkers`. | 
0.3.1 | 
 | celeborn.master.slot.assign.policy | ROUNDROBIN | Policy for master to 
assign slots, Celeborn supports two types of policy: roundrobin and loadaware. 
Loadaware policy will be ignored when `HDFS` is enabled in 
`celeborn.storage.activeTypes` | 0.3.0 | 
 | celeborn.master.userResourceConsumption.update.interval | 30s | Time length 
for a window about compute user resource consumption. | 0.3.0 | 
+| celeborn.master.workerUnavailableInfo.expireTimeout | 1800s | Worker 
unavailable info would be cleared when the retention period is expired | 0.3.2 
| 
 | celeborn.storage.activeTypes | HDD,SSD | Enabled storage levels. Available 
options: HDD,SSD,HDFS.  | 0.3.0 | 
 | celeborn.storage.hdfs.dir | &lt;undefined&gt; | HDFS base directory for 
Celeborn to store shuffle data. | 0.2.0 | 
 <!--end-include-->
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 36e60d89a..5e8f52b6f 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -135,6 +135,17 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     excludedWorkers.remove(worker);
   }
 
+  public void removeWorkersUnavailableInfoMeta(List<WorkerInfo> 
unavailableWorkers) {
+    synchronized (workers) {
+      for (WorkerInfo workerInfo : unavailableWorkers) {
+        if (lostWorkers.containsKey(workerInfo)) {
+          lostWorkers.remove(workerInfo);
+          shutdownWorkers.remove(workerInfo);
+        }
+      }
+    }
+  }
+
   public void updateWorkerHeartbeatMeta(
       String host,
       int rpcPort,
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
index a34cb445d..008b35707 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
@@ -45,6 +45,8 @@ public interface IMetadataHandler {
   void handleWorkerRemove(
       String host, int rpcPort, int pushPort, int fetchPort, int 
replicatePort, String requestId);
 
+  void handleRemoveWorkersUnavailableInfo(List<WorkerInfo> unavailableWorkers, 
String requestId);
+
   void handleWorkerHeartbeat(
       String host,
       int rpcPort,
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
index 15c0c6d6d..7d01b4961 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
@@ -81,6 +81,12 @@ public class SingleMasterMetaManager extends 
AbstractMetaManager {
     updateWorkerRemoveMeta(host, rpcPort, pushPort, fetchPort, replicatePort);
   }
 
+  @Override
+  public void handleRemoveWorkersUnavailableInfo(
+      List<WorkerInfo> unavailableWorkers, String requestId) {
+    removeWorkersUnavailableInfoMeta(unavailableWorkers);
+  }
+
   @Override
   public void handleWorkerHeartbeat(
       String host,
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
index 181c6e487..8fc641795 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
@@ -186,6 +186,27 @@ public class HAMasterMetaManager extends 
AbstractMetaManager {
     }
   }
 
+  @Override
+  public void handleRemoveWorkersUnavailableInfo(
+      List<WorkerInfo> unavailableWorkers, String requestId) {
+    try {
+      List<ResourceProtos.WorkerAddress> addrs =
+          
unavailableWorkers.stream().map(MetaUtil::infoToAddr).collect(Collectors.toList());
+      ratisServer.submitRequest(
+          ResourceRequest.newBuilder()
+              .setCmdType(Type.RemoveWorkersUnavailableInfo)
+              .setRequestId(requestId)
+              .setRemoveWorkersUnavailableInfoRequest(
+                  
ResourceProtos.RemoveWorkersUnavailableInfoRequest.newBuilder()
+                      .addAllUnavailable(addrs)
+                      .build())
+              .build());
+    } catch (CelebornRuntimeException e) {
+      LOG.error("Handle remove workers unavailable info failed!", e);
+      throw e;
+    }
+  }
+
   @Override
   public void handleWorkerHeartbeat(
       String host,
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
index 27ba6d882..ab9d39dbc 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
@@ -228,6 +228,14 @@ public class MetaHandler {
           metaSystem.updatePartitionSize();
           break;
 
+        case RemoveWorkersUnavailableInfo:
+          List<ResourceProtos.WorkerAddress> unavailableList =
+              
request.getRemoveWorkersUnavailableInfoRequest().getUnavailableList();
+          List<WorkerInfo> unavailableWorkers =
+              
unavailableList.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
+          metaSystem.removeWorkersUnavailableInfoMeta(unavailableWorkers);
+          break;
+
         default:
           throw new IOException("Can not parse this command!" + request);
       }
diff --git a/master/src/main/proto/Resource.proto 
b/master/src/main/proto/Resource.proto
index a6fb5d173..9120105a5 100644
--- a/master/src/main/proto/Resource.proto
+++ b/master/src/main/proto/Resource.proto
@@ -35,6 +35,7 @@ enum Type {
   ReportWorkerUnavailable = 20;
   UpdatePartitionSize = 21;
   WorkerRemove = 22;
+  RemoveWorkersUnavailableInfo = 23;
 }
 
 message ResourceRequest {
@@ -53,6 +54,7 @@ message ResourceRequest {
   optional RegisterWorkerRequest registerWorkerRequest = 17;
   optional ReportWorkerUnavailableRequest reportWorkerUnavailableRequest = 18;
   optional WorkerRemoveRequest workerRemoveRequest = 19;
+  optional RemoveWorkersUnavailableInfoRequest 
removeWorkersUnavailableInfoRequest = 20;
 }
 
 message DiskInfo {
@@ -138,6 +140,10 @@ message ReportWorkerUnavailableRequest {
   repeated WorkerAddress unavailable = 1;
 }
 
+message RemoveWorkersUnavailableInfoRequest {
+  repeated WorkerAddress unavailable = 1;
+}
+
 message WorkerAddress {
   required string host = 1;
   required int32 rpcPort = 2;
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index b06d4684c..7cb4a12d7 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -94,12 +94,15 @@ private[celeborn] class Master(
     
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")
   private var checkForWorkerTimeOutTask: ScheduledFuture[_] = _
   private var checkForApplicationTimeOutTask: ScheduledFuture[_] = _
+  private var checkForUnavailableWorkerTimeOutTask: ScheduledFuture[_] = _
   private var checkForHDFSRemnantDirsTimeOutTask: ScheduledFuture[_] = _
   private val nonEagerHandler = 
ThreadUtils.newDaemonCachedThreadPool("master-noneager-handler", 64)
 
   // Config constants
   private val workerHeartbeatTimeoutMs = conf.workerHeartbeatTimeout
   private val appHeartbeatTimeoutMs = conf.appHeartbeatTimeoutMs
+  private val workerUnavailableInfoExpireTimeoutMs = 
conf.workerUnavailableInfoExpireTimeout
+
   private val hdfsExpireDirsTimeoutMS = conf.hdfsExpireDirsTimeoutMS
   private val hasHDFSStorage = conf.hasHDFSStorage
 
@@ -191,6 +194,16 @@ private[celeborn] class Master(
       appHeartbeatTimeoutMs / 2,
       TimeUnit.MILLISECONDS)
 
+    checkForUnavailableWorkerTimeOutTask = 
forwardMessageThread.scheduleAtFixedRate(
+      new Runnable {
+        override def run(): Unit = Utils.tryLogNonFatalError {
+          self.send(CheckForWorkerUnavailableInfoTimeout)
+        }
+      },
+      0,
+      workerUnavailableInfoExpireTimeoutMs / 2,
+      TimeUnit.MILLISECONDS)
+
     if (hasHDFSStorage) {
       checkForHDFSRemnantDirsTimeOutTask = 
forwardMessageThread.scheduleAtFixedRate(
         new Runnable {
@@ -210,6 +223,9 @@ private[celeborn] class Master(
     if (checkForWorkerTimeOutTask != null) {
       checkForWorkerTimeOutTask.cancel(true)
     }
+    if (checkForUnavailableWorkerTimeOutTask != null) {
+      checkForUnavailableWorkerTimeOutTask.cancel(true)
+    }
     if (checkForApplicationTimeOutTask != null) {
       checkForApplicationTimeOutTask.cancel(true)
     }
@@ -238,6 +254,8 @@ private[celeborn] class Master(
   override def receive: PartialFunction[Any, Unit] = {
     case _: PbCheckForWorkerTimeout =>
       executeWithLeaderChecker(null, timeoutDeadWorkers())
+    case CheckForWorkerUnavailableInfoTimeout =>
+      executeWithLeaderChecker(null, timeoutWorkerUnavailableInfos())
     case CheckForApplicationTimeOut =>
       executeWithLeaderChecker(null, timeoutDeadApplications())
     case CheckForHDFSExpiredDirsTimeout =>
@@ -253,6 +271,12 @@ private[celeborn] class Master(
       executeWithLeaderChecker(
         null,
         handleWorkerLost(null, host, rpcPort, pushPort, fetchPort, 
replicatePort, requestId))
+    case pb: PbRemoveWorkersUnavailableInfo =>
+      val unavailableWorkers = new 
util.ArrayList[WorkerInfo](pb.getWorkerInfoList
+        .asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava)
+      executeWithLeaderChecker(
+        null,
+        handleRemoveWorkersUnavailableInfos(unavailableWorkers, 
pb.getRequestId))
   }
 
   override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, 
Unit] = {
@@ -403,6 +427,25 @@ private[celeborn] class Master(
     }
   }
 
+  private def timeoutWorkerUnavailableInfos(): Unit = {
+    val currentTime = System.currentTimeMillis()
+    // Need increase timeout deadline to avoid long time leader election period
+    if (HAHelper.getWorkerTimeoutDeadline(statusSystem) > currentTime) {
+      return
+    }
+
+    val unavailableInfoTimeoutWorkers = lostWorkersSnapshot.asScala.filter {
+      case (_, lostTime) => currentTime - lostTime > 
workerUnavailableInfoExpireTimeoutMs
+    }.keySet.toList.asJava
+
+    if (!unavailableInfoTimeoutWorkers.isEmpty) {
+      logDebug(s"Remove unavailable info for workers: 
$unavailableInfoTimeoutWorkers")
+      self.send(RemoveWorkersUnavailableInfo(
+        unavailableInfoTimeoutWorkers,
+        MasterClient.genRequestId()));
+    }
+  }
+
   private def timeoutDeadApplications(): Unit = {
     val currentTime = System.currentTimeMillis()
     // Need increase timeout deadline to avoid long time leader election period
@@ -726,6 +769,12 @@ private[celeborn] class Master(
     }
   }
 
+  private def handleRemoveWorkersUnavailableInfos(
+      unavailableWorkers: util.List[WorkerInfo],
+      requestId: String): Unit = {
+    statusSystem.handleRemoveWorkersUnavailableInfo(unavailableWorkers, 
requestId);
+  }
+
   private def computeUserResourceConsumption(userIdentifier: UserIdentifier)
       : ResourceConsumption = {
     val current = System.currentTimeMillis()
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index 6ae55d162..b8cdae002 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -907,6 +907,79 @@ public class RatisMasterStatusSystemSuiteJ {
     Assert.assertEquals(0, STATUSSYSTEM3.excludedWorkers.size());
   }
 
+  @Test
+  public void testHandleRemoveWorkersUnavailableInfo() throws 
InterruptedException {
+    AbstractMetaManager statusSystem = pickLeaderStatusSystem();
+    Assert.assertNotNull(statusSystem);
+
+    statusSystem.handleRegisterWorker(
+        HOSTNAME1,
+        RPCPORT1,
+        PUSHPORT1,
+        FETCHPORT1,
+        REPLICATEPORT1,
+        disks1,
+        userResourceConsumption1,
+        getNewReqeustId());
+    statusSystem.handleRegisterWorker(
+        HOSTNAME2,
+        RPCPORT2,
+        PUSHPORT2,
+        FETCHPORT2,
+        REPLICATEPORT2,
+        disks2,
+        userResourceConsumption2,
+        getNewReqeustId());
+    statusSystem.handleRegisterWorker(
+        HOSTNAME3,
+        RPCPORT3,
+        PUSHPORT3,
+        FETCHPORT3,
+        REPLICATEPORT3,
+        disks3,
+        userResourceConsumption3,
+        getNewReqeustId());
+
+    WorkerInfo workerInfo1 =
+        new WorkerInfo(
+            HOSTNAME1,
+            RPCPORT1,
+            PUSHPORT1,
+            FETCHPORT1,
+            REPLICATEPORT1,
+            disks1,
+            userResourceConsumption1);
+
+    List<WorkerInfo> unavailableWorkers = new ArrayList<>();
+    unavailableWorkers.add(workerInfo1);
+
+    statusSystem.handleWorkerLost(
+        HOSTNAME1, RPCPORT1, PUSHPORT1, FETCHPORT1, REPLICATEPORT1, 
getNewReqeustId());
+    statusSystem.handleReportWorkerUnavailable(unavailableWorkers, 
getNewReqeustId());
+
+    Thread.sleep(3000L);
+    Assert.assertEquals(2, STATUSSYSTEM1.workers.size());
+
+    Assert.assertEquals(1, STATUSSYSTEM1.shutdownWorkers.size());
+    Assert.assertEquals(1, STATUSSYSTEM2.shutdownWorkers.size());
+    Assert.assertEquals(1, STATUSSYSTEM3.shutdownWorkers.size());
+
+    Assert.assertEquals(1, STATUSSYSTEM1.lostWorkers.size());
+    Assert.assertEquals(1, STATUSSYSTEM2.lostWorkers.size());
+    Assert.assertEquals(1, STATUSSYSTEM3.lostWorkers.size());
+
+    statusSystem.handleRemoveWorkersUnavailableInfo(unavailableWorkers, 
getNewReqeustId());
+    Thread.sleep(3000L);
+
+    Assert.assertEquals(0, STATUSSYSTEM1.shutdownWorkers.size());
+    Assert.assertEquals(0, STATUSSYSTEM2.shutdownWorkers.size());
+    Assert.assertEquals(0, STATUSSYSTEM3.shutdownWorkers.size());
+
+    Assert.assertEquals(0, STATUSSYSTEM1.lostWorkers.size());
+    Assert.assertEquals(0, STATUSSYSTEM2.lostWorkers.size());
+    Assert.assertEquals(0, STATUSSYSTEM3.lostWorkers.size());
+  }
+
   @Test
   public void testHandleUpdatePartitionSize() throws InterruptedException {
     AbstractMetaManager statusSystem = pickLeaderStatusSystem();

Reply via email to