This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new c1c46398d [CELEBORN-682] Master and client should handle blacklist 
worker and shutting down worker separately
c1c46398d is described below

commit c1c46398d586ab8eec6256ea0878207a5afb2c1a
Author: Angerszhuuuu <[email protected]>
AuthorDate: Fri Jun 16 18:29:03 2023 +0800

    [CELEBORN-682] Master and client should handle blacklist worker and 
shutting down worker separately
    
    ### What changes were proposed in this pull request?
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #1594 from AngersZhuuuu/CELEBORN-682.
    
    Authored-by: Angerszhuuuu <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../apache/celeborn/client/ChangePartitionManager.scala   |  2 +-
 .../org/apache/celeborn/client/LifecycleManager.scala     |  1 +
 .../org/apache/celeborn/client/WorkerStatusTracker.scala  | 15 +++++++--------
 .../apache/celeborn/client/WorkerStatusTrackerSuite.scala | 10 ++++++----
 .../deploy/master/clustermeta/AbstractMetaManager.java    |  2 --
 .../apache/celeborn/service/deploy/master/Master.scala    | 12 +++++++-----
 .../master/clustermeta/DefaultMetaSystemSuiteJ.java       |  3 ++-
 .../clustermeta/ha/RatisMasterStatusSystemSuiteJ.java     |  9 ++++++---
 8 files changed, 30 insertions(+), 24 deletions(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala
index bd55e0b24..fd42f51d6 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala
@@ -252,7 +252,7 @@ class ChangePartitionManager(
         .workerSnapshots(shuffleId)
         .keySet()
         .asScala
-        .filter(w => 
!lifecycleManager.workerStatusTracker.blacklist.keySet().contains(w))
+        .filter(lifecycleManager.workerStatusTracker.isWorkerAvailable)
         .toList
     if (candidates.size < 1 || (pushReplicateEnabled && candidates.size < 2)) {
       logError("[Update partition] failed for not enough candidates for 
revive.")
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index aebf84fa0..6100c75b0 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -864,6 +864,7 @@ class LifecycleManager(appId: String, val conf: 
CelebornConf) extends RpcEndpoin
           retryCandidates.addAll(candidates)
           // remove blacklist from retryCandidates
           
retryCandidates.removeAll(workerStatusTracker.blacklist.keys().asScala.toList.asJava)
+          
retryCandidates.removeAll(workerStatusTracker.shuttingWorkers.asScala.toList.asJava)
           if (retryCandidates.size < 1 || (pushReplicateEnabled && 
retryCandidates.size < 2)) {
             logError(s"Retry reserve slots for $shuffleId failed caused by not 
enough slots.")
             noAvailableSlots = true
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala 
b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
index 05443b322..602a2b5f0 100644
--- a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
@@ -39,7 +39,7 @@ class WorkerStatusTracker(
 
   // blacklist
   val blacklist = new ShuffleFailedWorkers()
-  private val shuttingWorkers: JSet[WorkerInfo] = new JHashSet[WorkerInfo]()
+  val shuttingWorkers: JSet[WorkerInfo] = new JHashSet[WorkerInfo]()
 
   def registerWorkerStatusListener(workerStatusListener: 
WorkerStatusListener): Unit = {
     workerStatusListeners.add(workerStatusListener)
@@ -49,10 +49,14 @@ class WorkerStatusTracker(
     if (conf.clientCheckedUseAllocatedWorkers) {
       lifecycleManager.getAllocatedWorkers()
     } else {
-      blacklist.asScala.keys.toSet
+      blacklist.asScala.keys.toSet ++ shuttingWorkers.asScala.toSet
     }
   }
 
+  def isWorkerAvailable(worker: WorkerInfo) = {
+    !blacklist.containsKey(worker) && !shuttingWorkers.contains(worker)
+  }
+
   def blacklistWorkerFromPartition(
       shuffleId: Int,
       oldPartition: PartitionLocation,
@@ -172,11 +176,6 @@ class WorkerStatusTracker(
           .map(_ -> (StatusCode.WORKER_IN_BLACKLIST -> current)).toMap.asJava)
       }
 
-      if (!res.shuttingWorkers.isEmpty) {
-        
blacklist.putAll(res.shuttingWorkers.asScala.filterNot(blacklist.containsKey)
-          .map(_ -> (StatusCode.WORKER_SHUTDOWN -> current)).toMap.asJava)
-      }
-
       val newShutdownWorkers = resolveShutdownWorkers(res.shuttingWorkers)
       if (!res.unknownWorkers.isEmpty || !newShutdownWorkers.isEmpty) {
         
blacklist.putAll(res.unknownWorkers.asScala.filterNot(blacklist.containsKey)
@@ -196,7 +195,7 @@ class WorkerStatusTracker(
     }
   }
 
-  private def resolveShutdownWorkers(newShutdownWorkers: JList[WorkerInfo]): 
JList[WorkerInfo] = {
+  def resolveShutdownWorkers(newShutdownWorkers: JList[WorkerInfo]): 
JList[WorkerInfo] = {
     // shutdownWorkers only retain workers appeared in response.
     shuttingWorkers.retainAll(newShutdownWorkers)
     val shutdownList = 
newShutdownWorkers.asScala.filterNot(shuttingWorkers.asScala.contains).asJava
diff --git 
a/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
 
b/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
index c9553d512..2be8bbacb 100644
--- 
a/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
+++ 
b/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
@@ -61,18 +61,20 @@ class WorkerStatusTrackerSuite extends CelebornFunSuite {
     // test new added workers
     Assert.assertTrue(statusTracker.blacklist.containsKey(mock("host0")))
     Assert.assertTrue(statusTracker.blacklist.containsKey(mock("host3")))
-    Assert.assertTrue(statusTracker.blacklist.containsKey(mock("host4")))
+    Assert.assertTrue(!statusTracker.blacklist.contains(mock("host4")))
+    Assert.assertTrue(statusTracker.shuttingWorkers.contains(mock("host4")))
 
     // test re heartbeat with shutdown workers
     val response3 = buildResponse(Array.empty, Array.empty, Array("host4"))
     statusTracker.handleHeartbeatResponse(response3)
-    Assert.assertTrue(statusTracker.blacklist.containsKey(mock("host4")))
+    Assert.assertTrue(!statusTracker.blacklist.contains(mock("host4")))
+    Assert.assertTrue(statusTracker.shuttingWorkers.contains(mock("host4")))
 
     // test remove
     val workers = new util.HashSet[WorkerInfo]
-    workers.add(mock("host4"))
+    workers.add(mock("host3"))
     statusTracker.removeFromBlacklist(workers)
-    Assert.assertFalse(statusTracker.blacklist.containsKey(mock("host4")))
+    Assert.assertFalse(statusTracker.blacklist.containsKey(mock("host3")))
 
     // test register time elapsed
     Thread.sleep(3000)
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index d3c797d62..20ccd764b 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -359,8 +359,6 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
   public void updateMetaByReportWorkerUnavailable(List<WorkerInfo> 
failedWorkers) {
     synchronized (this.workers) {
       shutdownWorkers.addAll(failedWorkers);
-      failedWorkers.retainAll(this.workers);
-      this.blacklist.addAll(failedWorkers);
     }
   }
 
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 351ff56f4..386038019 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -539,19 +539,20 @@ private[celeborn] class Master(
     val numReducers = requestSlots.partitionIdList.size()
     val shuffleKey = Utils.makeShuffleKey(requestSlots.applicationId, 
requestSlots.shuffleId)
 
+    val availableWorkers = workersAvailable()
     // offer slots
     val slots =
       masterSource.sample(MasterSource.OfferSlotsTime, 
s"offerSlots-${Random.nextInt()}") {
         statusSystem.workers.synchronized {
           if (slotsAssignPolicy == SlotsAssignPolicy.ROUNDROBIN) {
             SlotsAllocator.offerSlotsRoundRobin(
-              workersNotBlacklisted(),
+              availableWorkers,
               requestSlots.partitionIdList,
               requestSlots.shouldReplicate,
               requestSlots.shouldRackAware)
           } else {
             SlotsAllocator.offerSlotsLoadAware(
-              workersNotBlacklisted(),
+              availableWorkers,
               requestSlots.partitionIdList,
               requestSlots.shouldReplicate,
               requestSlots.shouldRackAware,
@@ -588,7 +589,7 @@ private[celeborn] class Master(
     logInfo(s"Offer slots successfully for $numReducers reducers of 
$shuffleKey" +
       s" on ${slots.size()} workers.")
 
-    val workersNotSelected = 
workersNotBlacklisted().asScala.filter(!slots.containsKey(_))
+    val workersNotSelected = 
availableWorkers.asScala.filter(!slots.containsKey(_))
     val offerSlotsExtraSize = Math.min(conf.masterSlotAssignExtraSlots, 
workersNotSelected.size)
     if (offerSlotsExtraSize > 0) {
       var index = Random.nextInt(workersNotSelected.size)
@@ -734,10 +735,11 @@ private[celeborn] class Master(
     context.reply(CheckQuotaResponse(isAvailable, reason))
   }
 
-  private def workersNotBlacklisted(
+  private def workersAvailable(
       tmpBlacklist: Set[WorkerInfo] = Set.empty): util.List[WorkerInfo] = {
     workersSnapShot.asScala.filter { w =>
-      !statusSystem.blacklist.contains(w) && !tmpBlacklist.contains(w)
+      !statusSystem.blacklist.contains(w) && 
!statusSystem.shutdownWorkers.contains(
+        w) && !tmpBlacklist.contains(w)
     }.asJava
   }
 
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
index 2913d50ad..f58ebba2a 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
@@ -604,7 +604,8 @@ public class DefaultMetaSystemSuiteJ {
     failedWorkers.add(workerInfo1);
 
     statusSystem.handleReportWorkerUnavailable(failedWorkers, 
getNewReqeustId());
-    assert 1 == statusSystem.blacklist.size();
+    assert 1 == statusSystem.shutdownWorkers.size();
+    assert 0 == statusSystem.blacklist.size();
   }
 
   @Test
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index c16462b4e..6fad572af 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -887,9 +887,12 @@ public class RatisMasterStatusSystemSuiteJ {
 
     statusSystem.handleReportWorkerUnavailable(failedWorkers, 
getNewReqeustId());
     Thread.sleep(3000L);
-    Assert.assertEquals(1, STATUSSYSTEM1.blacklist.size());
-    Assert.assertEquals(1, STATUSSYSTEM2.blacklist.size());
-    Assert.assertEquals(1, STATUSSYSTEM3.blacklist.size());
+    Assert.assertEquals(1, STATUSSYSTEM1.shutdownWorkers.size());
+    Assert.assertEquals(1, STATUSSYSTEM2.shutdownWorkers.size());
+    Assert.assertEquals(1, STATUSSYSTEM3.shutdownWorkers.size());
+    Assert.assertEquals(0, STATUSSYSTEM1.blacklist.size());
+    Assert.assertEquals(0, STATUSSYSTEM2.blacklist.size());
+    Assert.assertEquals(0, STATUSSYSTEM3.blacklist.size());
   }
 
   @Test

Reply via email to