This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 8a10a2d46 [CELEBORN-1421] Refine code in master to reduce unnecessary
sync to get workers/lostworkers/shutdownWorkers
8a10a2d46 is described below
commit 8a10a2d465c5ec9d947128d9828bcd5bb1f6bfa8
Author: Shuang <[email protected]>
AuthorDate: Fri May 17 14:06:37 2024 +0800
[CELEBORN-1421] Refine code in master to reduce unnecessary sync to get
workers/lostworkers/shutdownWorkers
### What changes were proposed in this pull request?
1. Use ConcurrentSet to replace ArrayList for workers.
2. Remove unnecessary sync and snapshot when get
workers/lostworkers/shutdownWorkers
### Why are the changes needed?
1. Reduce unnecessary sync to get workers/lostworkers/shutdownWorkers.
2. Somewhere in the Master, directly using statusSystem.workers(ArrayList)
is not safe, potentially leading to concurrent modification issues.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA
Closes #2507 from RexXiong/CELEBORN-1421.
Authored-by: Shuang <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../apache/celeborn/common/util/PbSerDeUtils.scala | 2 +-
.../master/clustermeta/AbstractMetaManager.java | 4 +--
.../celeborn/service/deploy/master/Master.scala | 34 +++++++++-------------
3 files changed, 16 insertions(+), 24 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index e82988433..f2ef85c48 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -411,7 +411,7 @@ object PbSerDeUtils {
manuallyExcludedWorkers: java.util.Set[WorkerInfo],
workerLostEvent: java.util.Set[WorkerInfo],
appHeartbeatTime: java.util.Map[String, java.lang.Long],
- workers: java.util.List[WorkerInfo],
+ workers: java.util.Set[WorkerInfo],
partitionTotalWritten: java.lang.Long,
partitionTotalFileCount: java.lang.Long,
appDiskUsageMetricSnapshots: Array[AppDiskUsageSnapShot],
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index c1cd37b28..b9c1c2c67 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -22,7 +22,6 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -66,7 +65,8 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
// Metadata for master service
public final Set<String> registeredShuffle = ConcurrentHashMap.newKeySet();
public final Set<String> hostnameSet = ConcurrentHashMap.newKeySet();
- public final ArrayList<WorkerInfo> workers = new ArrayList<>();
+ public final Set<WorkerInfo> workers = ConcurrentHashMap.newKeySet();
+
public final ConcurrentHashMap<WorkerInfo, Long> lostWorkers =
JavaUtils.newConcurrentHashMap();
public final ConcurrentHashMap<WorkerInfo, WorkerEventInfo> workerEventInfos
=
JavaUtils.newConcurrentHashMap();
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 6bc675ecb..b7f51b00b 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -180,14 +180,6 @@ private[celeborn] class Master(
private val userResourceConsumptions =
JavaUtils.newConcurrentHashMap[UserIdentifier, (ResourceConsumption,
Long)]()
- // States
- private def workersSnapShot: util.List[WorkerInfo] =
- statusSystem.workers.synchronized(new
util.ArrayList[WorkerInfo](statusSystem.workers))
- private def lostWorkersSnapshot: ConcurrentHashMap[WorkerInfo,
java.lang.Long] =
-
statusSystem.workers.synchronized(JavaUtils.newConcurrentHashMap(statusSystem.lostWorkers))
- private def shutdownWorkerSnapshot: util.List[WorkerInfo] =
- statusSystem.workers.synchronized(new
util.ArrayList[WorkerInfo](statusSystem.shutdownWorkers))
-
private def diskReserveSize = conf.workerDiskReserveSize
private def diskReserveRatio = conf.workerDiskReserveRatio
@@ -565,7 +557,7 @@ private[celeborn] class Master(
return
}
- workersSnapShot.asScala.foreach { worker =>
+ statusSystem.workers.asScala.foreach { worker =>
if (worker.lastHeartbeat < currentTime - workerHeartbeatTimeoutMs
&& !statusSystem.workerLostEvents.contains(worker)) {
logWarning(s"Worker ${worker.readableAddress()} timeout! Trigger
WorkerLost event.")
@@ -588,7 +580,7 @@ private[celeborn] class Master(
return
}
- val unavailableInfoTimeoutWorkers = lostWorkersSnapshot.asScala.filter {
+ val unavailableInfoTimeoutWorkers =
statusSystem.lostWorkers.asScala.filter {
case (_, lostTime) => currentTime - lostTime >
workerUnavailableInfoExpireTimeoutMs
}.keySet.toList.asJava
@@ -638,7 +630,7 @@ private[celeborn] class Master(
workerStatus: WorkerStatus,
requestId: String): Unit = {
val targetWorker = new WorkerInfo(host, rpcPort, pushPort, fetchPort,
replicatePort)
- val registered = workersSnapShot.asScala.contains(targetWorker)
+ val registered = statusSystem.workers.asScala.contains(targetWorker)
if (!registered) {
logWarning(s"Received heartbeat from unknown worker " +
s"$host:$rpcPort:$pushPort:$fetchPort:$replicatePort.")
@@ -708,7 +700,7 @@ private[celeborn] class Master(
-1,
new util.HashMap[String, DiskInfo](),
JavaUtils.newConcurrentHashMap[UserIdentifier, ResourceConsumption]())
- val worker: WorkerInfo = workersSnapShot
+ val worker: WorkerInfo = statusSystem.workers
.asScala
.find(_ == targetWorker)
.orNull
@@ -745,7 +737,7 @@ private[celeborn] class Master(
internalPort,
disks,
userResourceConsumption)
- if (workersSnapShot.contains(workerToRegister)) {
+ if (statusSystem.workers.contains(workerToRegister)) {
logWarning(s"Receive RegisterWorker while worker" +
s" ${workerToRegister.toString()} already exists, re-register.")
// TODO: remove `WorkerRemove` because we have improve register logic to
cover `WorkerRemove`
@@ -1015,14 +1007,14 @@ private[celeborn] class Master(
System.currentTimeMillis(),
requestId)
// unknown workers will retain in needCheckedWorkerList
- needCheckedWorkerList.removeAll(workersSnapShot)
+ needCheckedWorkerList.removeAll(statusSystem.workers)
if (shouldResponse) {
context.reply(HeartbeatFromApplicationResponse(
StatusCode.SUCCESS,
new util.ArrayList(
(statusSystem.excludedWorkers.asScala ++
statusSystem.manuallyExcludedWorkers.asScala).asJava),
needCheckedWorkerList,
- shutdownWorkerSnapshot))
+ new util.ArrayList[WorkerInfo](statusSystem.shutdownWorkers)))
} else {
context.reply(OneWayMessageResponse)
}
@@ -1124,9 +1116,9 @@ private[celeborn] class Master(
private def workersAvailable(
tmpExcludedWorkerList: Set[WorkerInfo] = Set.empty):
util.List[WorkerInfo] = {
- workersSnapShot.asScala.filter { w =>
+ statusSystem.workers.asScala.filter { w =>
statusSystem.isWorkerAvailable(w) && !tmpExcludedWorkerList.contains(w)
- }.asJava
+ }.toList.asJava
}
private def handleRequestForApplicationMeta(
@@ -1157,7 +1149,7 @@ private[celeborn] class Master(
}
private def getWorkers: String = {
- workersSnapShot.asScala.mkString("\n")
+ statusSystem.workers.asScala.mkString("\n")
}
override def handleWorkerEvent(workerEventType: String, workers: String):
String = {
@@ -1199,7 +1191,7 @@ private[celeborn] class Master(
override def getLostWorkers: String = {
val sb = new StringBuilder
sb.append("======================= Lost Workers in Master
========================\n")
- lostWorkersSnapshot.asScala.toSeq.sortBy(_._2).foreach { case (worker,
time) =>
+ statusSystem.lostWorkers.asScala.toSeq.sortBy(_._2).foreach { case
(worker, time) =>
sb.append(s"${worker.toUniqueId().padTo(50, "
").mkString}${Utils.formatTimestamp(time)}\n")
}
sb.toString()
@@ -1208,7 +1200,7 @@ private[celeborn] class Master(
override def getShutdownWorkers: String = {
val sb = new StringBuilder
sb.append("===================== Shutdown Workers in Master
======================\n")
- shutdownWorkerSnapshot.asScala.foreach { worker =>
+ statusSystem.shutdownWorkers.asScala.foreach { worker =>
sb.append(s"${worker.toUniqueId()}\n")
}
sb.toString()
@@ -1281,7 +1273,7 @@ private[celeborn] class Master(
s"Failed to Exclude workers add ${workersToAdd.mkString(",")} and
remove ${workersToRemove.mkString(",")}.\n")
}
val unknownExcludedWorkers =
- (workersToAdd ++ workersToRemove).filter(!workersSnapShot.contains(_))
+ (workersToAdd ++
workersToRemove).filter(!statusSystem.workers.contains(_))
if (unknownExcludedWorkers.nonEmpty) {
sb.append(
s"Unknown worker ${unknownExcludedWorkers.mkString(",")}. Workers in
Master:\n$getWorkers.")