This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new c1c46398d [CELEBORN-682] Master and client should handle blacklist
worker and shutting down worker separately
c1c46398d is described below
commit c1c46398d586ab8eec6256ea0878207a5afb2c1a
Author: Angerszhuuuu <[email protected]>
AuthorDate: Fri Jun 16 18:29:03 2023 +0800
[CELEBORN-682] Master and client should handle blacklist worker and
shutting down worker separately
### What changes were proposed in this pull request?
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #1594 from AngersZhuuuu/CELEBORN-682.
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../apache/celeborn/client/ChangePartitionManager.scala | 2 +-
.../org/apache/celeborn/client/LifecycleManager.scala | 1 +
.../org/apache/celeborn/client/WorkerStatusTracker.scala | 15 +++++++--------
.../apache/celeborn/client/WorkerStatusTrackerSuite.scala | 10 ++++++----
.../deploy/master/clustermeta/AbstractMetaManager.java | 2 --
.../apache/celeborn/service/deploy/master/Master.scala | 12 +++++++-----
.../master/clustermeta/DefaultMetaSystemSuiteJ.java | 3 ++-
.../clustermeta/ha/RatisMasterStatusSystemSuiteJ.java | 9 ++++++---
8 files changed, 30 insertions(+), 24 deletions(-)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala
b/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala
index bd55e0b24..fd42f51d6 100644
---
a/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala
@@ -252,7 +252,7 @@ class ChangePartitionManager(
.workerSnapshots(shuffleId)
.keySet()
.asScala
- .filter(w =>
!lifecycleManager.workerStatusTracker.blacklist.keySet().contains(w))
+ .filter(lifecycleManager.workerStatusTracker.isWorkerAvailable)
.toList
if (candidates.size < 1 || (pushReplicateEnabled && candidates.size < 2)) {
logError("[Update partition] failed for not enough candidates for
revive.")
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index aebf84fa0..6100c75b0 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -864,6 +864,7 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
retryCandidates.addAll(candidates)
// remove blacklist from retryCandidates
retryCandidates.removeAll(workerStatusTracker.blacklist.keys().asScala.toList.asJava)
+
retryCandidates.removeAll(workerStatusTracker.shuttingWorkers.asScala.toList.asJava)
if (retryCandidates.size < 1 || (pushReplicateEnabled &&
retryCandidates.size < 2)) {
logError(s"Retry reserve slots for $shuffleId failed caused by not
enough slots.")
noAvailableSlots = true
diff --git
a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
index 05443b322..602a2b5f0 100644
--- a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
@@ -39,7 +39,7 @@ class WorkerStatusTracker(
// blacklist
val blacklist = new ShuffleFailedWorkers()
- private val shuttingWorkers: JSet[WorkerInfo] = new JHashSet[WorkerInfo]()
+ val shuttingWorkers: JSet[WorkerInfo] = new JHashSet[WorkerInfo]()
def registerWorkerStatusListener(workerStatusListener:
WorkerStatusListener): Unit = {
workerStatusListeners.add(workerStatusListener)
@@ -49,10 +49,14 @@ class WorkerStatusTracker(
if (conf.clientCheckedUseAllocatedWorkers) {
lifecycleManager.getAllocatedWorkers()
} else {
- blacklist.asScala.keys.toSet
+ blacklist.asScala.keys.toSet ++ shuttingWorkers.asScala.toSet
}
}
+ def isWorkerAvailable(worker: WorkerInfo) = {
+ !blacklist.containsKey(worker) && !shuttingWorkers.contains(worker)
+ }
+
def blacklistWorkerFromPartition(
shuffleId: Int,
oldPartition: PartitionLocation,
@@ -172,11 +176,6 @@ class WorkerStatusTracker(
.map(_ -> (StatusCode.WORKER_IN_BLACKLIST -> current)).toMap.asJava)
}
- if (!res.shuttingWorkers.isEmpty) {
-
blacklist.putAll(res.shuttingWorkers.asScala.filterNot(blacklist.containsKey)
- .map(_ -> (StatusCode.WORKER_SHUTDOWN -> current)).toMap.asJava)
- }
-
val newShutdownWorkers = resolveShutdownWorkers(res.shuttingWorkers)
if (!res.unknownWorkers.isEmpty || !newShutdownWorkers.isEmpty) {
blacklist.putAll(res.unknownWorkers.asScala.filterNot(blacklist.containsKey)
@@ -196,7 +195,7 @@ class WorkerStatusTracker(
}
}
- private def resolveShutdownWorkers(newShutdownWorkers: JList[WorkerInfo]):
JList[WorkerInfo] = {
+ def resolveShutdownWorkers(newShutdownWorkers: JList[WorkerInfo]):
JList[WorkerInfo] = {
// shutdownWorkers only retain workers appeared in response.
shuttingWorkers.retainAll(newShutdownWorkers)
val shutdownList =
newShutdownWorkers.asScala.filterNot(shuttingWorkers.asScala.contains).asJava
diff --git
a/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
b/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
index c9553d512..2be8bbacb 100644
---
a/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
+++
b/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
@@ -61,18 +61,20 @@ class WorkerStatusTrackerSuite extends CelebornFunSuite {
// test new added workers
Assert.assertTrue(statusTracker.blacklist.containsKey(mock("host0")))
Assert.assertTrue(statusTracker.blacklist.containsKey(mock("host3")))
- Assert.assertTrue(statusTracker.blacklist.containsKey(mock("host4")))
+ Assert.assertTrue(!statusTracker.blacklist.contains(mock("host4")))
+ Assert.assertTrue(statusTracker.shuttingWorkers.contains(mock("host4")))
// test re heartbeat with shutdown workers
val response3 = buildResponse(Array.empty, Array.empty, Array("host4"))
statusTracker.handleHeartbeatResponse(response3)
- Assert.assertTrue(statusTracker.blacklist.containsKey(mock("host4")))
+ Assert.assertTrue(!statusTracker.blacklist.contains(mock("host4")))
+ Assert.assertTrue(statusTracker.shuttingWorkers.contains(mock("host4")))
// test remove
val workers = new util.HashSet[WorkerInfo]
- workers.add(mock("host4"))
+ workers.add(mock("host3"))
statusTracker.removeFromBlacklist(workers)
- Assert.assertFalse(statusTracker.blacklist.containsKey(mock("host4")))
+ Assert.assertFalse(statusTracker.blacklist.containsKey(mock("host3")))
// test register time elapsed
Thread.sleep(3000)
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index d3c797d62..20ccd764b 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -359,8 +359,6 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
public void updateMetaByReportWorkerUnavailable(List<WorkerInfo>
failedWorkers) {
synchronized (this.workers) {
shutdownWorkers.addAll(failedWorkers);
- failedWorkers.retainAll(this.workers);
- this.blacklist.addAll(failedWorkers);
}
}
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 351ff56f4..386038019 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -539,19 +539,20 @@ private[celeborn] class Master(
val numReducers = requestSlots.partitionIdList.size()
val shuffleKey = Utils.makeShuffleKey(requestSlots.applicationId,
requestSlots.shuffleId)
+ val availableWorkers = workersAvailable()
// offer slots
val slots =
masterSource.sample(MasterSource.OfferSlotsTime,
s"offerSlots-${Random.nextInt()}") {
statusSystem.workers.synchronized {
if (slotsAssignPolicy == SlotsAssignPolicy.ROUNDROBIN) {
SlotsAllocator.offerSlotsRoundRobin(
- workersNotBlacklisted(),
+ availableWorkers,
requestSlots.partitionIdList,
requestSlots.shouldReplicate,
requestSlots.shouldRackAware)
} else {
SlotsAllocator.offerSlotsLoadAware(
- workersNotBlacklisted(),
+ availableWorkers,
requestSlots.partitionIdList,
requestSlots.shouldReplicate,
requestSlots.shouldRackAware,
@@ -588,7 +589,7 @@ private[celeborn] class Master(
logInfo(s"Offer slots successfully for $numReducers reducers of
$shuffleKey" +
s" on ${slots.size()} workers.")
- val workersNotSelected =
workersNotBlacklisted().asScala.filter(!slots.containsKey(_))
+ val workersNotSelected =
availableWorkers.asScala.filter(!slots.containsKey(_))
val offerSlotsExtraSize = Math.min(conf.masterSlotAssignExtraSlots,
workersNotSelected.size)
if (offerSlotsExtraSize > 0) {
var index = Random.nextInt(workersNotSelected.size)
@@ -734,10 +735,11 @@ private[celeborn] class Master(
context.reply(CheckQuotaResponse(isAvailable, reason))
}
- private def workersNotBlacklisted(
+ private def workersAvailable(
tmpBlacklist: Set[WorkerInfo] = Set.empty): util.List[WorkerInfo] = {
workersSnapShot.asScala.filter { w =>
- !statusSystem.blacklist.contains(w) && !tmpBlacklist.contains(w)
+ !statusSystem.blacklist.contains(w) &&
!statusSystem.shutdownWorkers.contains(
+ w) && !tmpBlacklist.contains(w)
}.asJava
}
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
index 2913d50ad..f58ebba2a 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
@@ -604,7 +604,8 @@ public class DefaultMetaSystemSuiteJ {
failedWorkers.add(workerInfo1);
statusSystem.handleReportWorkerUnavailable(failedWorkers,
getNewReqeustId());
- assert 1 == statusSystem.blacklist.size();
+ assert 1 == statusSystem.shutdownWorkers.size();
+ assert 0 == statusSystem.blacklist.size();
}
@Test
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index c16462b4e..6fad572af 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -887,9 +887,12 @@ public class RatisMasterStatusSystemSuiteJ {
statusSystem.handleReportWorkerUnavailable(failedWorkers,
getNewReqeustId());
Thread.sleep(3000L);
- Assert.assertEquals(1, STATUSSYSTEM1.blacklist.size());
- Assert.assertEquals(1, STATUSSYSTEM2.blacklist.size());
- Assert.assertEquals(1, STATUSSYSTEM3.blacklist.size());
+ Assert.assertEquals(1, STATUSSYSTEM1.shutdownWorkers.size());
+ Assert.assertEquals(1, STATUSSYSTEM2.shutdownWorkers.size());
+ Assert.assertEquals(1, STATUSSYSTEM3.shutdownWorkers.size());
+ Assert.assertEquals(0, STATUSSYSTEM1.blacklist.size());
+ Assert.assertEquals(0, STATUSSYSTEM2.blacklist.size());
+ Assert.assertEquals(0, STATUSSYSTEM3.blacklist.size());
}
@Test