This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 52eddc59f [CELEBORN-448] Support exclude worker manually
52eddc59f is described below
commit 52eddc59f3c52d360dba2457a2b4caefb33b4454
Author: SteNicholas <[email protected]>
AuthorDate: Tue Nov 7 16:25:24 2023 +0800
[CELEBORN-448] Support exclude worker manually
### What changes were proposed in this pull request?
Support exclude worker manually given worker id. This worker is added into
excluded workers manually.
### Why are the changes needed?
Celeborn supports to shuffle client-side fetch and push exclude workers on
failure at present. It's necessary to exclude worker manually for maintaining
the Celeborn cluster.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- `HttpUtilsSuite`
- `DefaultMetaSystemSuiteJ#testHandleWorkerExclude`
- `RatisMasterStatusSystemSuiteJ#testHandleWorkerExclude`
- `MasterStateMachineSuiteJ#testObjSerde`
Closes #1997 from SteNicholas/CELEBORN-448.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
---
common/src/main/proto/TransportMessages.proto | 13 ++
.../common/protocol/message/ControlMessages.scala | 28 ++++
.../apache/celeborn/common/util/PbSerDeUtils.scala | 3 +
docs/monitoring.md | 33 ++--
.../master/clustermeta/AbstractMetaManager.java | 21 ++-
.../master/clustermeta/IMetadataHandler.java | 3 +
.../clustermeta/SingleMasterMetaManager.java | 6 +
.../master/clustermeta/ha/HAMasterMetaManager.java | 30 ++++
.../deploy/master/clustermeta/ha/MetaHandler.java | 12 ++
master/src/main/proto/Resource.proto | 7 +
.../celeborn/service/deploy/master/Master.scala | 76 +++++++--
.../clustermeta/DefaultMetaSystemSuiteJ.java | 139 ++++++++++-------
.../clustermeta/ha/MasterStateMachineSuiteJ.java | 7 +-
.../ha/RatisMasterStatusSystemSuiteJ.java | 169 +++++++++++++--------
.../celeborn/server/common/HttpService.scala | 25 +--
.../celeborn/server/common/http/HttpEndpoint.scala | 12 +-
.../celeborn/server/common/http/HttpUtils.scala | 5 +-
.../server/common/http/HttpUtilsSuite.scala | 25 ++-
.../celeborn/service/deploy/worker/Worker.scala | 4 +-
19 files changed, 442 insertions(+), 176 deletions(-)
diff --git a/common/src/main/proto/TransportMessages.proto
b/common/src/main/proto/TransportMessages.proto
index e05615e5b..56f0e7561 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -85,6 +85,8 @@ enum MessageType {
STREAM_CHUNK_SLICE = 62;
CHUNK_FETCH_REQUEST = 63;
TRANSPORTABLE_ERROR = 64;
+ WORKER_EXCLUDE = 65;
+ WORKER_EXCLUDE_RESPONSE = 66;
}
enum StreamType {
@@ -417,6 +419,16 @@ message PbRemoveWorkersUnavailableInfo {
string requestId = 2;
}
+message PbWorkerExclude {
+ repeated PbWorkerInfo workersToAdd = 1;
+ repeated PbWorkerInfo workersToRemove = 2;
+ string requestId = 3;
+}
+
+message PbWorkerExcludeResponse {
+ bool success = 1;
+}
+
message PbWorkerLost {
string host = 1;
int32 rpcPort = 2;
@@ -500,6 +512,7 @@ message PbSnapshotMetaInfo {
PbAppDiskUsageSnapshot currentAppDiskUsageMetricsSnapshot = 11;
map<string, int64> lostWorkers = 12;
repeated PbWorkerInfo shutdownWorkers = 13;
+ repeated PbWorkerInfo manuallyExcludedWorkers = 14;
}
message PbOpenStream {
diff --git
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index 578824cc1..63b4f7b1f 100644
---
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -274,6 +274,28 @@ object ControlMessages extends Logging {
partitionIds: util.Set[Integer] = new util.HashSet[Integer]())
extends MasterMessage
+ object WorkerExclude {
+ def apply(
+ workersToAdd: util.List[WorkerInfo],
+ workersToRemove: util.List[WorkerInfo],
+ requestId: String): PbWorkerExclude = PbWorkerExclude.newBuilder()
+ .addAllWorkersToAdd(workersToAdd.asScala.map { workerInfo =>
+ PbSerDeUtils.toPbWorkerInfo(workerInfo, true)
+ }.toList.asJava)
+ .addAllWorkersToRemove(workersToRemove.asScala.map { workerInfo =>
+ PbSerDeUtils.toPbWorkerInfo(workerInfo, true)
+ }.toList.asJava)
+ .setRequestId(requestId)
+ .build()
+ }
+
+ object WorkerExcludeResponse {
+ def apply(success: Boolean): PbWorkerExcludeResponse =
+ PbWorkerExcludeResponse.newBuilder()
+ .setSuccess(success)
+ .build()
+ }
+
object WorkerLost {
def apply(
host: String,
@@ -607,6 +629,12 @@ object ControlMessages extends Logging {
val payload = builder.build().toByteArray
new TransportMessage(MessageType.GET_REDUCER_FILE_GROUP_RESPONSE,
payload)
+ case pb: PbWorkerExclude =>
+ new TransportMessage(MessageType.WORKER_EXCLUDE, pb.toByteArray)
+
+ case pb: PbWorkerExcludeResponse =>
+ new TransportMessage(MessageType.WORKER_EXCLUDE_RESPONSE, pb.toByteArray)
+
case pb: PbWorkerLost =>
new TransportMessage(MessageType.WORKER_LOST, pb.toByteArray)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index c198cca11..2748d6cf3 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -369,6 +369,7 @@ object PbSerDeUtils {
registeredShuffle: java.util.Set[String],
hostnameSet: java.util.Set[String],
excludedWorkers: java.util.Set[WorkerInfo],
+ manuallyExcludedWorkers: java.util.Set[WorkerInfo],
workerLostEvent: java.util.Set[WorkerInfo],
appHeartbeatTime: java.util.Map[String, java.lang.Long],
workers: java.util.List[WorkerInfo],
@@ -383,6 +384,8 @@ object PbSerDeUtils {
.addAllRegisteredShuffle(registeredShuffle)
.addAllHostnameSet(hostnameSet)
.addAllExcludedWorkers(excludedWorkers.asScala.map(toPbWorkerInfo(_,
true)).asJava)
+ .addAllManuallyExcludedWorkers(manuallyExcludedWorkers.asScala
+ .map(toPbWorkerInfo(_, true)).asJava)
.addAllWorkerLostEvents(workerLostEvent.asScala.map(toPbWorkerInfo(_,
true)).asJava)
.putAllAppHeartbeatTime(appHeartbeatTime)
.addAllWorkers(workers.asScala.map(toPbWorkerInfo(_, true)).asJava)
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 1c82c1c1e..ae858f9ba 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -304,21 +304,22 @@ API path listed as below:
#### Master
-| Path | Meaning
|
-|-----------------------|-------------------------------------------------------------------------------------------------------------------------------------|
-| /metrics/prometheus | List the metrics data in prometheus format of the
master.(The url path is defined by configure
`celeborn.metrics.prometheus.path`.) |
-| /conf | List the conf setting of the master.
|
-| /masterGroupInfo | List master group information of the service. It
will list all master's LEADER, FOLLOWER information.
|
-| /workerInfo | List worker information of the service. It will list
all registered workers 's information. |
-| /lostWorkers | List all lost workers of the master.
|
-| /excludedWorkers | List all excluded workers of the master.
|
-| /shutdownWorkers | List all shutdown workers of the master.
|
-| /threadDump | List the current thread dump of the master.
|
-| /hostnames | List all running application's LifecycleManager's
hostnames of the cluster.
|
-| /applications | List all running application's ids of the cluster.
|
-| /shuffles | List all running shuffle keys of the service. It
will return all running shuffle's key of the cluster.
|
-| /listTopDiskUsedApps | List the top disk usage application ids. It will
return the top disk usage application ids for the cluster.
|
-| /help | List the available API providers of the master.
|
+| Path | Meaning
|
+|------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| /metrics/prometheus | List the metrics data
in prometheus format of the master.(The url path is defined by configure
`celeborn.metrics.prometheus.path`.)
|
+| /conf | List the conf setting
of the master.
|
+| /masterGroupInfo | List master group
information of the service. It will list all master's LEADER, FOLLOWER
information.
|
+| /workerInfo | List worker
information of the service. It will list all registered workers 's information.
|
+| /lostWorkers | List all lost workers
of the master.
|
+| /excludedWorkers | List all excluded
workers of the master.
|
+| /shutdownWorkers | List all shutdown
workers of the master.
|
+| /threadDump | List the current
thread dump of the master.
|
+| /hostnames | List all running
application's LifecycleManager's hostnames of the cluster.
|
+| /applications | List all running
application's ids of the cluster.
|
+| /shuffles | List all running
shuffle keys of the service. It will return all running shuffle's key of the
cluster.
|
+| /listTopDiskUsedApps | List the top disk
usage application ids. It will return the top disk usage application ids for
the cluster.
|
+| /exclude?add=${ADD_WORKERS}&remove=${REMOVE_WORKERS} | Excluded workers of
the master add or remove the worker manually given worker id. The parameter add
or remove specifies the excluded workers to add or remove, which value is
separated by commas. |
+| /help | List the available
API providers of the master.
|
#### Worker
@@ -334,5 +335,5 @@ API path listed as below:
| /unavailablePeers | List the unavailable peers of the worker, this
always means the worker connect to the peer failed.
|
| /isShutdown | Show if the worker is during the process of
shutdown.
|
| /isRegistered | Show if the worker is registered to the master
success.
|
-| /exit?type=${TYPE} | Trigger this worker to exit. Legal `type`s are
'DECOMMISSION‘, 'GRACEFUL' and 'IMMEDIATELY'
|
+| /exit?type=${TYPE} | Trigger this worker to exit. Legal `type`s are
'DECOMMISSION', 'GRACEFUL' and 'IMMEDIATELY'.
|
| /help | List the available API providers of the worker.
|
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 5e8f52b6f..a6526d218 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -63,6 +63,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
public final ConcurrentHashMap<WorkerInfo, Long> lostWorkers =
JavaUtils.newConcurrentHashMap();
public final ConcurrentHashMap<String, Long> appHeartbeatTime =
JavaUtils.newConcurrentHashMap();
public final Set<WorkerInfo> excludedWorkers = ConcurrentHashMap.newKeySet();
+ public final Set<WorkerInfo> manuallyExcludedWorkers =
ConcurrentHashMap.newKeySet();
public final Set<WorkerInfo> shutdownWorkers = ConcurrentHashMap.newKeySet();
public final Set<WorkerInfo> workerLostEvents =
ConcurrentHashMap.newKeySet();
@@ -111,6 +112,12 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
appHeartbeatTime.remove(appId);
}
+ public void updateWorkerExcludeMeta(
+ List<WorkerInfo> workersToAdd, List<WorkerInfo> workersToRemove) {
+ manuallyExcludedWorkers.addAll(workersToAdd);
+ workersToRemove.forEach(manuallyExcludedWorkers::remove);
+ }
+
public void updateWorkerLostMeta(
String host, int rpcPort, int pushPort, int fetchPort, int
replicatePort) {
WorkerInfo worker = new WorkerInfo(host, rpcPort, pushPort, fetchPort,
replicatePort);
@@ -223,6 +230,7 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
registeredShuffle,
hostnameSet,
excludedWorkers,
+ manuallyExcludedWorkers,
workerLostEvents,
appHeartbeatTime,
workers,
@@ -254,6 +262,10 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
snapshotMetaInfo.getExcludedWorkersList().stream()
.map(PbSerDeUtils::fromPbWorkerInfo)
.collect(Collectors.toSet()));
+ manuallyExcludedWorkers.addAll(
+ snapshotMetaInfo.getManuallyExcludedWorkersList().stream()
+ .map(PbSerDeUtils::fromPbWorkerInfo)
+ .collect(Collectors.toSet()));
workerLostEvents.addAll(
snapshotMetaInfo.getWorkerLostEventsList().stream()
.map(PbSerDeUtils::fromPbWorkerInfo)
@@ -310,10 +322,11 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
}
LOG.info("Successfully restore meta info from snapshot {}",
file.getAbsolutePath());
LOG.info(
- "Worker size: {}, Registered shuffle size: {}. Worker excluded list
size: {}.",
+ "Worker size: {}, Registered shuffle size: {}. Worker excluded list
size: {}. Manually Excluded list size: {}",
workers.size(),
registeredShuffle.size(),
- excludedWorkers.size());
+ excludedWorkers.size(),
+ manuallyExcludedWorkers.size());
workers.forEach(workerInfo -> LOG.info(workerInfo.toString()));
registeredShuffle.forEach(shuffle -> LOG.info("RegisteredShuffle {}",
shuffle));
}
@@ -342,7 +355,9 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
Utils.bytesToString(oldEstimatedPartitionSize),
Utils.bytesToString(estimatedPartitionSize));
workers.stream()
- .filter(worker -> !excludedWorkers.contains(worker))
+ .filter(
+ worker ->
+ !excludedWorkers.contains(worker) &&
!manuallyExcludedWorkers.contains(worker))
.forEach(workerInfo ->
workerInfo.updateDiskMaxSlots(estimatedPartitionSize));
}
}
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
index 008b35707..a51135738 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
@@ -39,6 +39,9 @@ public interface IMetadataHandler {
void handleAppLost(String appId, String requestId);
+ void handleWorkerExclude(
+ List<WorkerInfo> workersToAdd, List<WorkerInfo> workersToRemove, String
requestId);
+
void handleWorkerLost(
String host, int rpcPort, int pushPort, int fetchPort, int
replicatePort, String requestId);
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
index 7d01b4961..c901341c5 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
@@ -69,6 +69,12 @@ public class SingleMasterMetaManager extends
AbstractMetaManager {
updateAppLostMeta(appId);
}
+ @Override
+ public void handleWorkerExclude(
+ List<WorkerInfo> workersToAdd, List<WorkerInfo> workersToRemove, String
requestId) {
+ updateWorkerExcludeMeta(workersToAdd, workersToRemove);
+ }
+
@Override
public void handleWorkerLost(
String host, int rpcPort, int pushPort, int fetchPort, int
replicatePort, String requestId) {
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
index 8fc641795..087ce0374 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
@@ -140,6 +140,36 @@ public class HAMasterMetaManager extends
AbstractMetaManager {
}
}
+ @Override
+ public void handleWorkerExclude(
+ List<WorkerInfo> workersToAdd, List<WorkerInfo> workersToRemove, String
requestId) {
+ try {
+ ratisServer.submitRequest(
+ ResourceRequest.newBuilder()
+ .setCmdType(Type.WorkerExclude)
+ .setRequestId(requestId)
+ .setWorkerExcludeRequest(
+ ResourceProtos.WorkerExcludeRequest.newBuilder()
+ .addAllWorkersToAdd(
+ workersToAdd.stream()
+ .map(MetaUtil::infoToAddr)
+ .collect(Collectors.toList()))
+ .addAllWorkersToRemove(
+ workersToRemove.stream()
+ .map(MetaUtil::infoToAddr)
+ .collect(Collectors.toList()))
+ .build())
+ .build());
+ } catch (CelebornRuntimeException e) {
+ LOG.error(
+ "Handle worker exclude for workersToAdd {} and workersToRemove {}
failed!",
+
workersToAdd.stream().map(WorkerInfo::toString).collect(Collectors.joining(",")),
+
workersToRemove.stream().map(WorkerInfo::toString).collect(Collectors.joining(",")),
+ e);
+ throw e;
+ }
+ }
+
@Override
public void handleWorkerLost(
String host, int rpcPort, int pushPort, int fetchPort, int
replicatePort, String requestId) {
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
index 533d10016..13990bccd 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
@@ -131,6 +131,18 @@ public class MetaHandler {
metaSystem.updateAppLostMeta(appId);
break;
+ case WorkerExclude:
+ List<ResourceProtos.WorkerAddress> addAddresses =
+ request.getWorkerExcludeRequest().getWorkersToAddList();
+ List<ResourceProtos.WorkerAddress> removeAddresses =
+ request.getWorkerExcludeRequest().getWorkersToRemoveList();
+ List<WorkerInfo> workersToAdd =
+
addAddresses.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
+ List<WorkerInfo> workersToRemove =
+
removeAddresses.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
+ metaSystem.updateWorkerExcludeMeta(workersToAdd, workersToRemove);
+ break;
+
case WorkerLost:
host = request.getWorkerLostRequest().getHost();
rpcPort = request.getWorkerLostRequest().getRpcPort();
diff --git a/master/src/main/proto/Resource.proto
b/master/src/main/proto/Resource.proto
index 9120105a5..c7cde94dc 100644
--- a/master/src/main/proto/Resource.proto
+++ b/master/src/main/proto/Resource.proto
@@ -36,6 +36,7 @@ enum Type {
UpdatePartitionSize = 21;
WorkerRemove = 22;
RemoveWorkersUnavailableInfo = 23;
+ WorkerExclude = 24;
}
message ResourceRequest {
@@ -55,6 +56,7 @@ message ResourceRequest {
optional ReportWorkerUnavailableRequest reportWorkerUnavailableRequest = 18;
optional WorkerRemoveRequest workerRemoveRequest = 19;
optional RemoveWorkersUnavailableInfoRequest
removeWorkersUnavailableInfoRequest = 20;
+ optional WorkerExcludeRequest workerExcludeRequest = 21;
}
message DiskInfo {
@@ -97,6 +99,11 @@ message AppLostRequest {
required string appId = 1;
}
+message WorkerExcludeRequest {
+ repeated WorkerAddress workersToAdd = 1;
+ repeated WorkerAddress workersToRemove = 2;
+}
+
message WorkerLostRequest {
required string host = 1;
required int32 rpcPort = 2;
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index c1102418a..9d4361679 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -160,7 +160,7 @@ private[celeborn] class Master(
statusSystem.registeredShuffle.size
}
masterSource.addGauge(MasterSource.EXCLUDED_WORKER_COUNT) { () =>
- statusSystem.excludedWorkers.size
+ statusSystem.excludedWorkers.size +
statusSystem.manuallyExcludedWorkers.size
}
masterSource.addGauge(MasterSource.WORKER_COUNT) { () =>
statusSystem.workers.size }
masterSource.addGauge(MasterSource.LOST_WORKER_COUNT) { () =>
statusSystem.lostWorkers.size }
@@ -409,6 +409,19 @@ private[celeborn] class Master(
context,
handleReportNodeUnavailable(context, failedWorkers, requestId))
+ case pb: PbWorkerExclude =>
+ val workersToAdd = new util.ArrayList[WorkerInfo](pb.getWorkersToAddList
+ .asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava)
+ val workersToRemove = new
util.ArrayList[WorkerInfo](pb.getWorkersToRemoveList
+ .asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava)
+ executeWithLeaderChecker(
+ context,
+ handleWorkerExclude(
+ context,
+ workersToAdd,
+ workersToRemove,
+ pb.getRequestId))
+
case pb: PbWorkerLost =>
val host = pb.getHost
val rpcPort = pb.getRpcPort
@@ -537,6 +550,20 @@ private[celeborn] class Master(
context.reply(HeartbeatFromWorkerResponse(expiredShuffleKeys, registered))
}
+ private def handleWorkerExclude(
+ context: RpcCallContext,
+ workersToAdd: util.List[WorkerInfo],
+ workersToRemove: util.List[WorkerInfo],
+ requestId: String): Unit = {
+ statusSystem.handleWorkerExclude(
+ workersToAdd.asScala.filter(workersSnapShot.contains(_)).asJava,
+ workersToRemove.asScala.filter(workersSnapShot.contains(_)).asJava,
+ requestId)
+ if (context != null) {
+ context.reply(WorkerExcludeResponse(true))
+ }
+ }
+
private def handleWorkerLost(
context: RpcCallContext,
host: String,
@@ -878,8 +905,8 @@ private[celeborn] class Master(
private def workersAvailable(
tmpExcludedWorkerList: Set[WorkerInfo] = Set.empty):
util.List[WorkerInfo] = {
workersSnapShot.asScala.filter { w =>
- !statusSystem.excludedWorkers.contains(w) &&
!statusSystem.shutdownWorkers.contains(
- w) && !tmpExcludedWorkerList.contains(w)
+ !statusSystem.excludedWorkers.contains(w) &&
!statusSystem.manuallyExcludedWorkers.contains(
+ w) && !statusSystem.shutdownWorkers.contains(w) &&
!tmpExcludedWorkerList.contains(w)
}.asJava
}
@@ -890,12 +917,14 @@ private[celeborn] class Master(
sb.toString()
}
+ private def getWorkers: String = {
+ workersSnapShot.asScala.mkString("\n")
+ }
+
override def getWorkerInfo: String = {
val sb = new StringBuilder
sb.append("====================== Workers Info in Master
=========================\n")
- workersSnapShot.asScala.foreach { w =>
- sb.append(w).append("\n")
- }
+ sb.append(getWorkers)
sb.toString()
}
@@ -920,8 +949,8 @@ private[celeborn] class Master(
override def getExcludedWorkers: String = {
val sb = new StringBuilder
sb.append("===================== Excluded Workers in Master
======================\n")
- statusSystem.excludedWorkers.asScala.foreach { worker =>
- sb.append(s"${worker.toUniqueId()}\n")
+ (statusSystem.excludedWorkers.asScala ++
statusSystem.manuallyExcludedWorkers.asScala).foreach {
+ worker => sb.append(s"${worker.toUniqueId()}\n")
}
sb.toString()
}
@@ -967,13 +996,30 @@ private[celeborn] class Master(
sb.toString()
}
- override def listPartitionLocationInfo: String = throw new
UnsupportedOperationException()
-
- override def getUnavailablePeers: String = throw new
UnsupportedOperationException()
-
- override def isShutdown: String = throw new UnsupportedOperationException()
-
- override def isRegistered: String = throw new UnsupportedOperationException()
+ override def exclude(addWorkers: String, removeWorkers: String): String = {
+ val sb = new StringBuilder
+ sb.append("============================ Add/Remove Excluded Workers
Manually =============================\n")
+ val workersToAdd = addWorkers.split(",").filter(_.nonEmpty)
+ val workersToRemove = removeWorkers.split(",").filter(_.nonEmpty)
+ val workerExcludeResponse =
self.askSync[PbWorkerExcludeResponse](WorkerExclude(
+ workersToAdd.map(WorkerInfo.fromUniqueId).toList.asJava,
+ workersToRemove.map(WorkerInfo.fromUniqueId).toList.asJava,
+ MasterClient.genRequestId()))
+ if (workerExcludeResponse.getSuccess) {
+ sb.append(
+ s"Excluded workers add ${workersToAdd.mkString(",")} and remove
${workersToRemove.mkString(",")} successfully.\n")
+ } else {
+ sb.append(
+ s"Failed to Exclude workers add ${workersToAdd.mkString(",")} and
remove ${workersToRemove.mkString(",")}.\n")
+ }
+ val unknownExcludedWorkers =
+ (workersToAdd ++ workersToRemove).filter(!workersSnapShot.contains(_))
+ if (unknownExcludedWorkers.nonEmpty) {
+ sb.append(
+ s"Unknown worker ${unknownExcludedWorkers.mkString(",")}. Workers in
Master:\n$getWorkers.")
+ }
+ sb.toString()
+ }
private def isMasterActive: Int = {
// use int rather than bool for better monitoring on dashboard
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
index bb845b192..c8276f4d8 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
@@ -24,7 +24,6 @@ import static org.mockito.Mockito.when;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
-import com.google.common.collect.ImmutableMap;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -43,43 +42,43 @@ import
org.apache.celeborn.common.rpc.netty.NettyRpcEndpointRef;
public class DefaultMetaSystemSuiteJ {
- private RpcEnv mockRpcEnv = mock(RpcEnv.class);
- private CelebornConf conf = new CelebornConf();
+ private final RpcEnv mockRpcEnv = mock(RpcEnv.class);
+ private final CelebornConf conf = new CelebornConf();
private AbstractMetaManager statusSystem;
- private RpcEndpointRef dummyRef =
+ private final RpcEndpointRef dummyRef =
new NettyRpcEndpointRef(
new CelebornConf(), RpcEndpointAddress.apply("localhost", 111,
"dummy"), null);
- private AtomicLong callerId = new AtomicLong();
-
- private static String HOSTNAME1 = "host1";
- private static int RPCPORT1 = 1111;
- private static int PUSHPORT1 = 1112;
- private static int FETCHPORT1 = 1113;
- private static int REPLICATEPORT1 = 1114;
- private static Map<String, DiskInfo> disks1 = new HashMap<>();
- private static Map<UserIdentifier, ResourceConsumption>
userResourceConsumption1 =
+ private final AtomicLong callerId = new AtomicLong();
+
+ private static final String HOSTNAME1 = "host1";
+ private static final int RPCPORT1 = 1111;
+ private static final int PUSHPORT1 = 1112;
+ private static final int FETCHPORT1 = 1113;
+ private static final int REPLICATEPORT1 = 1114;
+ private static final Map<String, DiskInfo> disks1 = new HashMap<>();
+ private static final Map<UserIdentifier, ResourceConsumption>
userResourceConsumption1 =
new HashMap<>();
- private static String HOSTNAME2 = "host2";
- private static int RPCPORT2 = 2111;
- private static int PUSHPORT2 = 2112;
- private static int FETCHPORT2 = 2113;
- private static int REPLICATEPORT2 = 2114;
- private static Map<String, DiskInfo> disks2 = new HashMap<>();
- private static Map<UserIdentifier, ResourceConsumption>
userResourceConsumption2 =
+ private static final String HOSTNAME2 = "host2";
+ private static final int RPCPORT2 = 2111;
+ private static final int PUSHPORT2 = 2112;
+ private static final int FETCHPORT2 = 2113;
+ private static final int REPLICATEPORT2 = 2114;
+ private static final Map<String, DiskInfo> disks2 = new HashMap<>();
+ private static final Map<UserIdentifier, ResourceConsumption>
userResourceConsumption2 =
new HashMap<>();
- private static String HOSTNAME3 = "host3";
- private static int RPCPORT3 = 3111;
- private static int PUSHPORT3 = 3112;
- private static int FETCHPORT3 = 3113;
- private static int REPLICATEPORT3 = 3114;
- private static Map<String, DiskInfo> disks3 = new HashMap<>();
- private static Map<UserIdentifier, ResourceConsumption>
userResourceConsumption3 =
+ private static final String HOSTNAME3 = "host3";
+ private static final int RPCPORT3 = 3111;
+ private static final int PUSHPORT3 = 3112;
+ private static final int FETCHPORT3 = 3113;
+ private static final int REPLICATEPORT3 = 3114;
+ private static final Map<String, DiskInfo> disks3 = new HashMap<>();
+ private static final Map<UserIdentifier, ResourceConsumption>
userResourceConsumption3 =
new HashMap<>();
@Before
- public void setUp() throws Exception {
+ public void setUp() {
when(mockRpcEnv.setupEndpointRef(any(), any())).thenReturn(dummyRef);
statusSystem = new SingleMasterMetaManager(mockRpcEnv, conf);
@@ -103,7 +102,7 @@ public class DefaultMetaSystemSuiteJ {
}
@After
- public void tearDown() throws Exception {}
+ public void tearDown() {}
private String getNewReqeustId() {
return MasterClient.encodeRequestId(UUID.randomUUID().toString(),
callerId.incrementAndGet());
@@ -143,6 +142,55 @@ public class DefaultMetaSystemSuiteJ {
assert (statusSystem.workers.size() == 3);
}
+ @Test
+ public void testHandleWorkerExclude() {
+ WorkerInfo workerInfo1 =
+ new WorkerInfo(
+ HOSTNAME1,
+ RPCPORT1,
+ PUSHPORT1,
+ FETCHPORT1,
+ REPLICATEPORT1,
+ disks1,
+ userResourceConsumption1);
+ WorkerInfo workerInfo2 =
+ new WorkerInfo(
+ HOSTNAME2,
+ RPCPORT2,
+ PUSHPORT2,
+ FETCHPORT2,
+ REPLICATEPORT2,
+ disks2,
+ userResourceConsumption2);
+
+ statusSystem.handleRegisterWorker(
+ workerInfo1.host(),
+ workerInfo1.rpcPort(),
+ workerInfo1.pushPort(),
+ workerInfo1.fetchPort(),
+ workerInfo1.replicatePort(),
+ workerInfo1.diskInfos(),
+ workerInfo1.userResourceConsumption(),
+ getNewReqeustId());
+ statusSystem.handleRegisterWorker(
+ workerInfo2.host(),
+ workerInfo2.rpcPort(),
+ workerInfo2.pushPort(),
+ workerInfo2.fetchPort(),
+ workerInfo2.replicatePort(),
+ workerInfo2.diskInfos(),
+ workerInfo2.userResourceConsumption(),
+ getNewReqeustId());
+
+ statusSystem.handleWorkerExclude(
+ Arrays.asList(workerInfo1, workerInfo2), Collections.emptyList(),
getNewReqeustId());
+ assert (statusSystem.manuallyExcludedWorkers.size() == 2);
+
+ statusSystem.handleWorkerExclude(
+ Collections.emptyList(), Collections.singletonList(workerInfo1),
getNewReqeustId());
+ assert (statusSystem.manuallyExcludedWorkers.size() == 1);
+ }
+
@Test
public void testHandleWorkerLost() {
statusSystem.handleRegisterWorker(
@@ -178,9 +226,9 @@ public class DefaultMetaSystemSuiteJ {
assert (statusSystem.workers.size() == 2);
}
- private static String APPID1 = "appId1";
- private static int SHUFFLEID1 = 1;
- private static String SHUFFLEKEY1 = APPID1 + "-" + SHUFFLEID1;
+ private static final String APPID1 = "appId1";
+ private static final int SHUFFLEID1 = 1;
+ private static final String SHUFFLEKEY1 = APPID1 + "-" + SHUFFLEID1;
@Test
public void testHandleRequestSlots() {
@@ -304,14 +352,6 @@ public class DefaultMetaSystemSuiteJ {
allocation);
statusSystem.handleRequestSlots(SHUFFLEKEY1, HOSTNAME1, workersToAllocate,
getNewReqeustId());
-
- List<String> workerIds = new ArrayList<>();
- workerIds.add(
- HOSTNAME1 + ":" + RPCPORT1 + ":" + PUSHPORT1 + ":" + FETCHPORT1 + ":"
+ REPLICATEPORT1);
-
- List<Map<String, Integer>> workerSlots = new ArrayList<>();
- workerSlots.add(ImmutableMap.of("disk1", 3));
-
Assert.assertEquals(
0,
statusSystem.workers.stream()
@@ -583,7 +623,8 @@ public class DefaultMetaSystemSuiteJ {
userResourceConsumption3,
getNewReqeustId());
- WorkerInfo workerInfo1 =
+ List<WorkerInfo> failedWorkers = new ArrayList<>();
+ failedWorkers.add(
new WorkerInfo(
HOSTNAME1,
RPCPORT1,
@@ -591,23 +632,11 @@ public class DefaultMetaSystemSuiteJ {
FETCHPORT1,
REPLICATEPORT1,
disks1,
- userResourceConsumption1);
- WorkerInfo workerInfo2 =
- new WorkerInfo(
- HOSTNAME2,
- RPCPORT2,
- PUSHPORT2,
- FETCHPORT2,
- REPLICATEPORT2,
- disks2,
- userResourceConsumption2);
-
- List<WorkerInfo> failedWorkers = new ArrayList<>();
- failedWorkers.add(workerInfo1);
+ userResourceConsumption1));
statusSystem.handleReportWorkerUnavailable(failedWorkers,
getNewReqeustId());
assert 1 == statusSystem.shutdownWorkers.size();
- assert 0 == statusSystem.excludedWorkers.size();
+ assert statusSystem.excludedWorkers.isEmpty();
}
@Test
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
index 51f669a53..a7fe7eb09 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
@@ -76,7 +76,7 @@ public class MasterStateMachineSuiteJ extends RatisBaseSuiteJ
{
.build();
ResourceResponse response = stateMachine.runCommand(request, -1);
- Assert.assertEquals(response.getSuccess(), true);
+ Assert.assertTrue(response.getSuccess());
}
@Test
@@ -160,6 +160,9 @@ public class MasterStateMachineSuiteJ extends
RatisBaseSuiteJ {
masterStatusSystem.excludedWorkers.add(info2);
masterStatusSystem.excludedWorkers.add(info3);
+ masterStatusSystem.manuallyExcludedWorkers.add(info1);
+ masterStatusSystem.manuallyExcludedWorkers.add(info2);
+
masterStatusSystem.hostnameSet.add(host1);
masterStatusSystem.hostnameSet.add(host2);
masterStatusSystem.hostnameSet.add(host3);
@@ -184,10 +187,12 @@ public class MasterStateMachineSuiteJ extends
RatisBaseSuiteJ {
masterStatusSystem.hostnameSet.clear();
masterStatusSystem.excludedWorkers.clear();
+ masterStatusSystem.manuallyExcludedWorkers.clear();
masterStatusSystem.restoreMetaFromFile(tmpFile);
Assert.assertEquals(3, masterStatusSystem.excludedWorkers.size());
+ Assert.assertEquals(2, masterStatusSystem.manuallyExcludedWorkers.size());
Assert.assertEquals(3, masterStatusSystem.hostnameSet.size());
Assert.assertEquals(
conf.metricsAppTopDiskUsageWindowSize(),
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index 84d383cf7..b66099e1d 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -25,7 +25,6 @@ import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
-import com.google.common.collect.ImmutableMap;
import org.junit.*;
import org.mockito.Mockito;
@@ -51,7 +50,7 @@ public class RatisMasterStatusSystemSuiteJ {
protected static HAMasterMetaManager STATUSSYSTEM2 = null;
protected static HAMasterMetaManager STATUSSYSTEM3 = null;
- private static RpcEndpointRef dummyRef =
+ private static final RpcEndpointRef dummyRef =
new NettyRpcEndpointRef(
new CelebornConf(), RpcEndpointAddress.apply("localhost", 111,
"dummy"), null);
@@ -155,45 +154,43 @@ public class RatisMasterStatusSystemSuiteJ {
}
@Test
- public void testLeaderAvaiable() throws InterruptedException {
- boolean hasLeader = false;
- if (RATISSERVER1.isLeader() || RATISSERVER2.isLeader() ||
RATISSERVER3.isLeader()) {
- hasLeader = true;
- }
- Assert.assertEquals(hasLeader, true);
+ public void testLeaderAvaiable() {
+ boolean hasLeader =
+ RATISSERVER1.isLeader() || RATISSERVER2.isLeader() ||
RATISSERVER3.isLeader();
+ Assert.assertTrue(hasLeader);
}
- private static String HOSTNAME1 = "host1";
- private static int RPCPORT1 = 1111;
- private static int PUSHPORT1 = 1112;
- private static int FETCHPORT1 = 1113;
- private static int REPLICATEPORT1 = 1114;
- private static Map<String, DiskInfo> disks1 = new HashMap();
- private static Map<UserIdentifier, ResourceConsumption>
userResourceConsumption1 =
+ private static final String HOSTNAME1 = "host1";
+ private static final int RPCPORT1 = 1111;
+ private static final int PUSHPORT1 = 1112;
+ private static final int FETCHPORT1 = 1113;
+ private static final int REPLICATEPORT1 = 1114;
+ private static final Map<String, DiskInfo> disks1 = new HashMap<>();
+ private static final Map<UserIdentifier, ResourceConsumption>
userResourceConsumption1 =
new HashMap<>();
- private static String HOSTNAME2 = "host2";
- private static int RPCPORT2 = 2111;
- private static int PUSHPORT2 = 2112;
- private static int FETCHPORT2 = 2113;
- private static int REPLICATEPORT2 = 2114;
- private static Map<String, DiskInfo> disks2 = new HashMap<>();
- private static Map<UserIdentifier, ResourceConsumption>
userResourceConsumption2 =
+ private static final String HOSTNAME2 = "host2";
+ private static final int RPCPORT2 = 2111;
+ private static final int PUSHPORT2 = 2112;
+ private static final int FETCHPORT2 = 2113;
+ private static final int REPLICATEPORT2 = 2114;
+ private static final Map<String, DiskInfo> disks2 = new HashMap<>();
+ private static final Map<UserIdentifier, ResourceConsumption>
userResourceConsumption2 =
new HashMap<>();
- private static String HOSTNAME3 = "host3";
- private static int RPCPORT3 = 3111;
- private static int PUSHPORT3 = 3112;
- private static int FETCHPORT3 = 3113;
- private static int REPLICATEPORT3 = 3114;
- private static Map<String, DiskInfo> disks3 = new HashMap<>();
- private static Map<UserIdentifier, ResourceConsumption>
userResourceConsumption3 =
+ private static final String HOSTNAME3 = "host3";
+ private static final int RPCPORT3 = 3111;
+ private static final int PUSHPORT3 = 3112;
+ private static final int FETCHPORT3 = 3113;
+ private static final int REPLICATEPORT3 = 3114;
+ private static final Map<String, DiskInfo> disks3 = new HashMap<>();
+ private static final Map<UserIdentifier, ResourceConsumption>
userResourceConsumption3 =
new HashMap<>();
- private AtomicLong callerId = new AtomicLong();
- private static String APPID1 = "appId1";
- private static int SHUFFLEID1 = 1;
- private static String SHUFFLEKEY1 = APPID1 + "-" + SHUFFLEID1;
+ private final AtomicLong callerId = new AtomicLong();
+ private static final String APPID1 = "appId1";
+ private static final int SHUFFLEID1 = 1;
+ private static final String SHUFFLEKEY1 = APPID1 + "-" + SHUFFLEID1;
private String getNewReqeustId() {
return MasterClient.encodeRequestId(UUID.randomUUID().toString(),
callerId.incrementAndGet());
@@ -235,7 +232,7 @@ public class RatisMasterStatusSystemSuiteJ {
disks1,
userResourceConsumption1,
getNewReqeustId());
- Assert.assertTrue(false);
+ Assert.fail();
} catch (CelebornRuntimeException e) {
Assert.assertTrue(true);
} finally {
@@ -283,6 +280,66 @@ public class RatisMasterStatusSystemSuiteJ {
Assert.assertEquals(3, STATUSSYSTEM3.workers.size());
}
+ @Test
+ public void testHandleWorkerExclude() throws InterruptedException {
+ AbstractMetaManager statusSystem = pickLeaderStatusSystem();
+ Assert.assertNotNull(statusSystem);
+
+ WorkerInfo workerInfo1 =
+ new WorkerInfo(
+ HOSTNAME1,
+ RPCPORT1,
+ PUSHPORT1,
+ FETCHPORT1,
+ REPLICATEPORT1,
+ disks1,
+ userResourceConsumption1);
+ WorkerInfo workerInfo2 =
+ new WorkerInfo(
+ HOSTNAME2,
+ RPCPORT2,
+ PUSHPORT2,
+ FETCHPORT2,
+ REPLICATEPORT2,
+ disks2,
+ userResourceConsumption2);
+
+ statusSystem.handleRegisterWorker(
+ workerInfo1.host(),
+ workerInfo1.rpcPort(),
+ workerInfo1.pushPort(),
+ workerInfo1.fetchPort(),
+ workerInfo1.replicatePort(),
+ workerInfo1.diskInfos(),
+ workerInfo1.userResourceConsumption(),
+ getNewReqeustId());
+ statusSystem.handleRegisterWorker(
+ workerInfo2.host(),
+ workerInfo2.rpcPort(),
+ workerInfo2.pushPort(),
+ workerInfo2.fetchPort(),
+ workerInfo2.replicatePort(),
+ workerInfo2.diskInfos(),
+ workerInfo2.userResourceConsumption(),
+ getNewReqeustId());
+
+ statusSystem.handleWorkerExclude(
+ Arrays.asList(workerInfo1, workerInfo2), Collections.emptyList(),
getNewReqeustId());
+ Thread.sleep(3000L);
+
+ Assert.assertEquals(2, STATUSSYSTEM1.manuallyExcludedWorkers.size());
+ Assert.assertEquals(2, STATUSSYSTEM2.manuallyExcludedWorkers.size());
+ Assert.assertEquals(2, STATUSSYSTEM3.manuallyExcludedWorkers.size());
+
+ statusSystem.handleWorkerExclude(
+ Collections.emptyList(), Collections.singletonList(workerInfo1),
getNewReqeustId());
+ Thread.sleep(3000L);
+
+ Assert.assertEquals(1, STATUSSYSTEM1.manuallyExcludedWorkers.size());
+ Assert.assertEquals(1, STATUSSYSTEM2.manuallyExcludedWorkers.size());
+ Assert.assertEquals(1, STATUSSYSTEM3.manuallyExcludedWorkers.size());
+ }
+
@Test
public void testHandleWorkerLost() throws InterruptedException {
AbstractMetaManager statusSystem = pickLeaderStatusSystem();
@@ -483,15 +540,6 @@ public class RatisMasterStatusSystemSuiteJ {
statusSystem.handleRequestSlots(SHUFFLEKEY1, HOSTNAME1, workersToAllocate,
getNewReqeustId());
Thread.sleep(3000L);
- List<String> workerIds = new ArrayList<>();
- workerIds.add(
- HOSTNAME1 + ":" + RPCPORT1 + ":" + PUSHPORT1 + ":" + FETCHPORT1 + ":"
+ REPLICATEPORT1);
-
- List<Map<String, Integer>> workerSlots = new ArrayList<>();
- workerSlots.add(ImmutableMap.of("disk1", 3));
-
- Thread.sleep(3000L);
-
// Do not update diskinfo's activeslots
Assert.assertEquals(
@@ -658,9 +706,9 @@ public class RatisMasterStatusSystemSuiteJ {
statusSystem.handleUnRegisterShuffle(SHUFFLEKEY1, getNewReqeustId());
Thread.sleep(3000L);
- Assert.assertEquals(true, STATUSSYSTEM1.registeredShuffle.isEmpty());
- Assert.assertEquals(true, STATUSSYSTEM2.registeredShuffle.isEmpty());
- Assert.assertEquals(true, STATUSSYSTEM3.registeredShuffle.isEmpty());
+ Assert.assertTrue(STATUSSYSTEM1.registeredShuffle.isEmpty());
+ Assert.assertTrue(STATUSSYSTEM2.registeredShuffle.isEmpty());
+ Assert.assertTrue(STATUSSYSTEM3.registeredShuffle.isEmpty());
}
@Test
@@ -671,17 +719,17 @@ public class RatisMasterStatusSystemSuiteJ {
long dummy = 1235L;
statusSystem.handleAppHeartbeat(APPID1, 1, 1, dummy, getNewReqeustId());
Thread.sleep(3000L);
- Assert.assertEquals(new Long(dummy),
STATUSSYSTEM1.appHeartbeatTime.get(APPID1));
- Assert.assertEquals(new Long(dummy),
STATUSSYSTEM2.appHeartbeatTime.get(APPID1));
- Assert.assertEquals(new Long(dummy),
STATUSSYSTEM3.appHeartbeatTime.get(APPID1));
+ Assert.assertEquals(Long.valueOf(dummy),
STATUSSYSTEM1.appHeartbeatTime.get(APPID1));
+ Assert.assertEquals(Long.valueOf(dummy),
STATUSSYSTEM2.appHeartbeatTime.get(APPID1));
+ Assert.assertEquals(Long.valueOf(dummy),
STATUSSYSTEM3.appHeartbeatTime.get(APPID1));
String appId2 = "app02";
statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId());
Thread.sleep(3000L);
- Assert.assertEquals(new Long(dummy),
STATUSSYSTEM1.appHeartbeatTime.get(appId2));
- Assert.assertEquals(new Long(dummy),
STATUSSYSTEM2.appHeartbeatTime.get(appId2));
- Assert.assertEquals(new Long(dummy),
STATUSSYSTEM3.appHeartbeatTime.get(appId2));
+ Assert.assertEquals(Long.valueOf(dummy),
STATUSSYSTEM1.appHeartbeatTime.get(appId2));
+ Assert.assertEquals(Long.valueOf(dummy),
STATUSSYSTEM2.appHeartbeatTime.get(appId2));
+ Assert.assertEquals(Long.valueOf(dummy),
STATUSSYSTEM3.appHeartbeatTime.get(appId2));
Assert.assertEquals(2, STATUSSYSTEM1.appHeartbeatTime.size());
Assert.assertEquals(2, STATUSSYSTEM2.appHeartbeatTime.size());
@@ -871,7 +919,8 @@ public class RatisMasterStatusSystemSuiteJ {
userResourceConsumption3,
getNewReqeustId());
- WorkerInfo workerInfo1 =
+ List<WorkerInfo> failedWorkers = new ArrayList<>();
+ failedWorkers.add(
new WorkerInfo(
HOSTNAME1,
RPCPORT1,
@@ -879,19 +928,7 @@ public class RatisMasterStatusSystemSuiteJ {
FETCHPORT1,
REPLICATEPORT1,
disks1,
- userResourceConsumption1);
- WorkerInfo workerInfo2 =
- new WorkerInfo(
- HOSTNAME2,
- RPCPORT2,
- PUSHPORT2,
- FETCHPORT2,
- REPLICATEPORT2,
- disks2,
- userResourceConsumption2);
-
- List<WorkerInfo> failedWorkers = new ArrayList<>();
- failedWorkers.add(workerInfo1);
+ userResourceConsumption1));
statusSystem.handleReportWorkerUnavailable(failedWorkers,
getNewReqeustId());
Thread.sleep(3000L);
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
index 4390c369d..5f9ff1f5a 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
@@ -43,33 +43,36 @@ abstract class HttpService extends Service with Logging {
sb.toString()
}
- def getMasterGroupInfo: String = throw new UnsupportedOperationException()
-
def getWorkerInfo: String
+ def getThreadDump: String
+
+ def getShuffleList: String
+
+ def listTopDiskUseApps: String
+
+ def getMasterGroupInfo: String = throw new UnsupportedOperationException()
+
def getLostWorkers: String = throw new UnsupportedOperationException()
def getShutdownWorkers: String = throw new UnsupportedOperationException()
def getExcludedWorkers: String = throw new UnsupportedOperationException()
- def getThreadDump: String
-
def getHostnameList: String = throw new UnsupportedOperationException()
def getApplicationList: String = throw new UnsupportedOperationException()
- def getShuffleList: String
-
- def listTopDiskUseApps: String
+ def exclude(addWorkers: String, removeWorkers: String): String =
+ throw new UnsupportedOperationException()
- def listPartitionLocationInfo: String
+ def listPartitionLocationInfo: String = throw new
UnsupportedOperationException()
- def getUnavailablePeers: String
+ def getUnavailablePeers: String = throw new UnsupportedOperationException()
- def isShutdown: String
+ def isShutdown: String = throw new UnsupportedOperationException()
- def isRegistered: String
+ def isRegistered: String = throw new UnsupportedOperationException()
def exit(exitType: String): String = throw new
UnsupportedOperationException()
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpEndpoint.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpEndpoint.scala
index 0cbc3b975..544505cd6 100644
---
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpEndpoint.scala
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpEndpoint.scala
@@ -168,6 +168,16 @@ case object Applications extends HttpEndpoint {
service.getApplicationList
}
+case object Exclude extends HttpEndpoint {
+ override def path: String = "/exclude"
+
+ override def description(service: String): String =
+ "Excluded workers of the master add or remove the worker manually given
worker id. The parameter add or remove specifies the excluded workers to add or
remove, which value is separated by commas."
+
+ override def handle(service: HttpService, parameters: Map[String, String]):
String =
+ service.exclude(parameters.getOrElse("ADD", "").trim,
parameters.getOrElse("REMOVE", "").trim)
+}
+
case object ListPartitionLocationInfo extends HttpEndpoint {
override def path: String = "/listPartitionLocationInfo"
@@ -212,7 +222,7 @@ case object Exit extends HttpEndpoint {
override def path: String = "/exit"
override def description(service: String): String =
- "Trigger this worker to exit. Legal types are 'DECOMMISSION‘, 'GRACEFUL'
and 'IMMEDIATELY'"
+ "Trigger this worker to exit. Legal types are 'DECOMMISSION', 'GRACEFUL'
and 'IMMEDIATELY'."
override def handle(service: HttpService, parameters: Map[String, String]):
String =
service.exit(parameters.getOrElse("TYPE", ""))
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala
index 3fbcca5e5..c208c0f06 100644
---
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala
@@ -32,7 +32,8 @@ object HttpUtils {
ExcludedWorkers,
ShutdownWorkers,
Hostnames,
- Applications) ++ baseEndpoints
+ Applications,
+ Exclude) ++ baseEndpoints
private val workerEndpoints: List[HttpEndpoint] =
List(
ListPartitionLocationInfo,
@@ -50,7 +51,7 @@ object HttpUtils {
url.getQuery
.split("&")
.map(_.split("="))
- .map(arr => arr(0).toUpperCase(Locale.ROOT) ->
arr(1).toUpperCase(Locale.ROOT)).toMap
+ .map(arr => arr(0).toUpperCase(Locale.ROOT) -> arr(1)).toMap
}
(url.getPath, parameter)
}
diff --git
a/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala
b/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala
index 4133e67ea..8b877a8c7 100644
---
a/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala
+++
b/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala
@@ -33,13 +33,29 @@ class HttpUtilsSuite extends AnyFunSuite with Logging {
assert(parameters == expectParameters)
}
+ test("CELEBORN-448: Support exclude worker manually") {
+ checkParseUri("/exclude", "/exclude", Map.empty)
+ checkParseUri(
+ "/exclude?add=localhost:1001:1002:1003:1004",
+ "/exclude",
+ Map("ADD" -> "localhost:1001:1002:1003:1004"))
+ checkParseUri(
+ "/exclude?remove=localhost:1001:1002:1003:1004",
+ "/exclude",
+ Map("REMOVE" -> "localhost:1001:1002:1003:1004"))
+ checkParseUri(
+
"/exclude?add=localhost:1001:1002:1003:1004&remove=localhost:2001:2002:2003:2004",
+ "/exclude",
+ Map("ADD" -> "localhost:1001:1002:1003:1004", "REMOVE" ->
"localhost:2001:2002:2003:2004"))
+ }
+
test("CELEBORN-847: Support parse HTTP Restful API parameters") {
checkParseUri("/exit", "/exit", Map.empty)
- checkParseUri("/exit?type=decommission", "/exit", Map("TYPE" ->
"DECOMMISSION"))
+ checkParseUri("/exit?type=decommission", "/exit", Map("TYPE" ->
"decommission"))
checkParseUri(
- "/exit?type=decommission&foo=a",
+ "/exit?type=decommission&foo=A",
"/exit",
- Map("TYPE" -> "DECOMMISSION", "FOO" -> "A"))
+ Map("TYPE" -> "decommission", "FOO" -> "A"))
}
test("CELEBORN-829: Improve response message of invalid HTTP request") {
@@ -47,6 +63,7 @@ class HttpUtilsSuite extends AnyFunSuite with Logging {
s"""Available API providers include:
|/applications List all running application's ids of the
cluster.
|/conf List the conf setting of the master.
+ |/exclude Excluded workers of the master add or remove
the worker manually given worker id. The parameter add or remove specifies the
excluded workers to add or remove, which value is separated by commas.
|/excludedWorkers List all excluded workers of the master.
|/help List the available API providers of the master.
|/hostnames List all running application's
LifecycleManager's hostnames of the cluster.
@@ -61,7 +78,7 @@ class HttpUtilsSuite extends AnyFunSuite with Logging {
assert(HttpUtils.help(Service.WORKER) ==
s"""Available API providers include:
|/conf List the conf setting of the worker.
- |/exit Trigger this worker to exit. Legal types
are 'DECOMMISSION‘, 'GRACEFUL' and 'IMMEDIATELY'
+ |/exit Trigger this worker to exit. Legal types
are 'DECOMMISSION', 'GRACEFUL' and 'IMMEDIATELY'.
|/help List the available API providers of the
worker.
|/isRegistered Show if the worker is registered to the
master success.
|/isShutdown Show if the worker is during the process
of shutdown.
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 2b03f8fb7..d3850f8f1 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -20,7 +20,7 @@ package org.apache.celeborn.service.deploy.worker
import java.io.File
import java.lang.{Long => JLong}
import java.util
-import java.util.{HashMap => JHashMap, HashSet => JHashSet, Map => JMap}
+import java.util.{HashMap => JHashMap, HashSet => JHashSet, Locale, Map =>
JMap}
import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicIntegerArray}
@@ -647,7 +647,7 @@ private[celeborn] class Worker(
}
override def exit(exitType: String): String = {
- exitType match {
+ exitType.toUpperCase(Locale.ROOT) match {
case "DECOMMISSION" =>
exitKind = CelebornExitKind.WORKER_DECOMMISSION
ShutdownHookManager.get().updateTimeout(