This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 52eddc59f [CELEBORN-448] Support exclude worker manually
52eddc59f is described below

commit 52eddc59f3c52d360dba2457a2b4caefb33b4454
Author: SteNicholas <[email protected]>
AuthorDate: Tue Nov 7 16:25:24 2023 +0800

    [CELEBORN-448] Support exclude worker manually
    
    ### What changes were proposed in this pull request?
    
    Support exclude worker manually given worker id. This worker is added into 
excluded workers manually.
    
    ### Why are the changes needed?
    
    Celeborn supports to shuffle client-side fetch and push exclude workers on 
failure at present. It's necessary to exclude worker manually for maintaining 
the Celeborn cluster.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    - `HttpUtilsSuite`
    - `DefaultMetaSystemSuiteJ#testHandleWorkerExclude`
    - `RatisMasterStatusSystemSuiteJ#testHandleWorkerExclude`
    - `MasterStateMachineSuiteJ#testObjSerde`
    
    Closes #1997 from SteNicholas/CELEBORN-448.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 common/src/main/proto/TransportMessages.proto      |  13 ++
 .../common/protocol/message/ControlMessages.scala  |  28 ++++
 .../apache/celeborn/common/util/PbSerDeUtils.scala |   3 +
 docs/monitoring.md                                 |  33 ++--
 .../master/clustermeta/AbstractMetaManager.java    |  21 ++-
 .../master/clustermeta/IMetadataHandler.java       |   3 +
 .../clustermeta/SingleMasterMetaManager.java       |   6 +
 .../master/clustermeta/ha/HAMasterMetaManager.java |  30 ++++
 .../deploy/master/clustermeta/ha/MetaHandler.java  |  12 ++
 master/src/main/proto/Resource.proto               |   7 +
 .../celeborn/service/deploy/master/Master.scala    |  76 +++++++--
 .../clustermeta/DefaultMetaSystemSuiteJ.java       | 139 ++++++++++-------
 .../clustermeta/ha/MasterStateMachineSuiteJ.java   |   7 +-
 .../ha/RatisMasterStatusSystemSuiteJ.java          | 169 +++++++++++++--------
 .../celeborn/server/common/HttpService.scala       |  25 +--
 .../celeborn/server/common/http/HttpEndpoint.scala |  12 +-
 .../celeborn/server/common/http/HttpUtils.scala    |   5 +-
 .../server/common/http/HttpUtilsSuite.scala        |  25 ++-
 .../celeborn/service/deploy/worker/Worker.scala    |   4 +-
 19 files changed, 442 insertions(+), 176 deletions(-)

diff --git a/common/src/main/proto/TransportMessages.proto 
b/common/src/main/proto/TransportMessages.proto
index e05615e5b..56f0e7561 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -85,6 +85,8 @@ enum MessageType {
   STREAM_CHUNK_SLICE = 62;
   CHUNK_FETCH_REQUEST = 63;
   TRANSPORTABLE_ERROR = 64;
+  WORKER_EXCLUDE = 65;
+  WORKER_EXCLUDE_RESPONSE = 66;
 }
 
 enum StreamType {
@@ -417,6 +419,16 @@ message PbRemoveWorkersUnavailableInfo {
   string requestId = 2;
 }
 
+message PbWorkerExclude {
+  repeated PbWorkerInfo workersToAdd = 1;
+  repeated PbWorkerInfo workersToRemove = 2;
+  string requestId = 3;
+}
+
+message PbWorkerExcludeResponse {
+  bool success = 1;
+}
+
 message PbWorkerLost {
   string host = 1;
   int32 rpcPort = 2;
@@ -500,6 +512,7 @@ message PbSnapshotMetaInfo {
   PbAppDiskUsageSnapshot currentAppDiskUsageMetricsSnapshot = 11;
   map<string, int64> lostWorkers = 12;
   repeated PbWorkerInfo shutdownWorkers = 13;
+  repeated PbWorkerInfo manuallyExcludedWorkers = 14;
 }
 
 message PbOpenStream {
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index 578824cc1..63b4f7b1f 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -274,6 +274,28 @@ object ControlMessages extends Logging {
       partitionIds: util.Set[Integer] = new util.HashSet[Integer]())
     extends MasterMessage
 
+  object WorkerExclude {
+    def apply(
+        workersToAdd: util.List[WorkerInfo],
+        workersToRemove: util.List[WorkerInfo],
+        requestId: String): PbWorkerExclude = PbWorkerExclude.newBuilder()
+      .addAllWorkersToAdd(workersToAdd.asScala.map { workerInfo =>
+        PbSerDeUtils.toPbWorkerInfo(workerInfo, true)
+      }.toList.asJava)
+      .addAllWorkersToRemove(workersToRemove.asScala.map { workerInfo =>
+        PbSerDeUtils.toPbWorkerInfo(workerInfo, true)
+      }.toList.asJava)
+      .setRequestId(requestId)
+      .build()
+  }
+
+  object WorkerExcludeResponse {
+    def apply(success: Boolean): PbWorkerExcludeResponse =
+      PbWorkerExcludeResponse.newBuilder()
+        .setSuccess(success)
+        .build()
+  }
+
   object WorkerLost {
     def apply(
         host: String,
@@ -607,6 +629,12 @@ object ControlMessages extends Logging {
       val payload = builder.build().toByteArray
       new TransportMessage(MessageType.GET_REDUCER_FILE_GROUP_RESPONSE, 
payload)
 
+    case pb: PbWorkerExclude =>
+      new TransportMessage(MessageType.WORKER_EXCLUDE, pb.toByteArray)
+
+    case pb: PbWorkerExcludeResponse =>
+      new TransportMessage(MessageType.WORKER_EXCLUDE_RESPONSE, pb.toByteArray)
+
     case pb: PbWorkerLost =>
       new TransportMessage(MessageType.WORKER_LOST, pb.toByteArray)
 
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index c198cca11..2748d6cf3 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -369,6 +369,7 @@ object PbSerDeUtils {
       registeredShuffle: java.util.Set[String],
       hostnameSet: java.util.Set[String],
       excludedWorkers: java.util.Set[WorkerInfo],
+      manuallyExcludedWorkers: java.util.Set[WorkerInfo],
       workerLostEvent: java.util.Set[WorkerInfo],
       appHeartbeatTime: java.util.Map[String, java.lang.Long],
       workers: java.util.List[WorkerInfo],
@@ -383,6 +384,8 @@ object PbSerDeUtils {
       .addAllRegisteredShuffle(registeredShuffle)
       .addAllHostnameSet(hostnameSet)
       .addAllExcludedWorkers(excludedWorkers.asScala.map(toPbWorkerInfo(_, 
true)).asJava)
+      .addAllManuallyExcludedWorkers(manuallyExcludedWorkers.asScala
+        .map(toPbWorkerInfo(_, true)).asJava)
       .addAllWorkerLostEvents(workerLostEvent.asScala.map(toPbWorkerInfo(_, 
true)).asJava)
       .putAllAppHeartbeatTime(appHeartbeatTime)
       .addAllWorkers(workers.asScala.map(toPbWorkerInfo(_, true)).asJava)
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 1c82c1c1e..ae858f9ba 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -304,21 +304,22 @@ API path listed as below:
 
 #### Master
 
-| Path                  | Meaning                                              
                                                                               |
-|-----------------------|-------------------------------------------------------------------------------------------------------------------------------------|
-| /metrics/prometheus   | List the metrics data in prometheus format of the 
master.(The url path is defined by configure 
`celeborn.metrics.prometheus.path`.) |
-| /conf                 | List the conf setting of the master.                 
                                                                               |
-| /masterGroupInfo      | List master group information of the service. It 
will list all master's LEADER, FOLLOWER information.                            
   |
-| /workerInfo           | List worker information of the service. It will list 
all registered workers 's information.                                         |
-| /lostWorkers          | List all lost workers of the master.                 
                                                                               |
-| /excludedWorkers      | List all excluded workers of the master.             
                                                                               |
-| /shutdownWorkers      | List all shutdown workers of the master.             
                                                                               |
-| /threadDump           | List the current thread dump of the master.          
                                                                               |
-| /hostnames            | List all running application's LifecycleManager's 
hostnames of the cluster.                                                       
  |
-| /applications         | List all running application's ids of the cluster.   
                                                                               |
-| /shuffles             | List all running shuffle keys of the service. It 
will return all running shuffle's key of the cluster.                           
   |
-| /listTopDiskUsedApps  | List the top disk usage application ids. It will 
return the top disk usage application ids for the cluster.                      
   |
-| /help                 | List the available API providers of the master.      
                                                                               |
+| Path                                                 | Meaning               
                                                                                
                                                                                
             |
+|------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| /metrics/prometheus                                  | List the metrics data 
in prometheus format of the master.(The url path is defined by configure 
`celeborn.metrics.prometheus.path`.)                                            
                    |
+| /conf                                                | List the conf setting 
of the master.                                                                  
                                                                                
             |
+| /masterGroupInfo                                     | List master group 
information of the service. It will list all master's LEADER, FOLLOWER 
information.                                                                    
                          |
+| /workerInfo                                          | List worker 
information of the service. It will list all registered workers 's information. 
                                                                                
                       |
+| /lostWorkers                                         | List all lost workers 
of the master.                                                                  
                                                                                
             |
+| /excludedWorkers                                     | List all excluded 
workers of the master.                                                          
                                                                                
                 |
+| /shutdownWorkers                                     | List all shutdown 
workers of the master.                                                          
                                                                                
                 |
+| /threadDump                                          | List the current 
thread dump of the master.                                                      
                                                                                
                  |
+| /hostnames                                           | List all running 
application's LifecycleManager's hostnames of the cluster.                      
                                                                                
                  |
+| /applications                                        | List all running 
application's ids of the cluster.                                               
                                                                                
                  |
+| /shuffles                                            | List all running 
shuffle keys of the service. It will return all running shuffle's key of the 
cluster.                                                                        
                     |
+| /listTopDiskUsedApps                                 | List the top disk 
usage application ids. It will return the top disk usage application ids for 
the cluster.                                                                    
                    |
+| /exclude?add=${ADD_WORKERS}&remove=${REMOVE_WORKERS} | Excluded workers of 
the master add or remove the worker manually given worker id. The parameter add 
or remove specifies the excluded workers to add or remove, which value is 
separated by commas. |
+| /help                                                | List the available 
API providers of the master.                                                    
                                                                                
                |
 
 #### Worker
 
@@ -334,5 +335,5 @@ API path listed as below:
 | /unavailablePeers          | List the unavailable peers of the worker, this 
always means the worker connect to the peer failed.                             
     |
 | /isShutdown                | Show if the worker is during the process of 
shutdown.                                                                       
        |
 | /isRegistered              | Show if the worker is registered to the master 
success.                                                                        
     |
-| /exit?type=${TYPE}         | Trigger this worker to exit. Legal `type`s are 
'DECOMMISSION‘, 'GRACEFUL' and 'IMMEDIATELY'                                    
     |
+| /exit?type=${TYPE}         | Trigger this worker to exit. Legal `type`s are 
'DECOMMISSION', 'GRACEFUL' and 'IMMEDIATELY'.                                   
     |
 | /help                      | List the available API providers of the worker. 
                                                                                
    |
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 5e8f52b6f..a6526d218 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -63,6 +63,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
   public final ConcurrentHashMap<WorkerInfo, Long> lostWorkers = 
JavaUtils.newConcurrentHashMap();
   public final ConcurrentHashMap<String, Long> appHeartbeatTime = 
JavaUtils.newConcurrentHashMap();
   public final Set<WorkerInfo> excludedWorkers = ConcurrentHashMap.newKeySet();
+  public final Set<WorkerInfo> manuallyExcludedWorkers = 
ConcurrentHashMap.newKeySet();
   public final Set<WorkerInfo> shutdownWorkers = ConcurrentHashMap.newKeySet();
   public final Set<WorkerInfo> workerLostEvents = 
ConcurrentHashMap.newKeySet();
 
@@ -111,6 +112,12 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     appHeartbeatTime.remove(appId);
   }
 
+  public void updateWorkerExcludeMeta(
+      List<WorkerInfo> workersToAdd, List<WorkerInfo> workersToRemove) {
+    manuallyExcludedWorkers.addAll(workersToAdd);
+    workersToRemove.forEach(manuallyExcludedWorkers::remove);
+  }
+
   public void updateWorkerLostMeta(
       String host, int rpcPort, int pushPort, int fetchPort, int 
replicatePort) {
     WorkerInfo worker = new WorkerInfo(host, rpcPort, pushPort, fetchPort, 
replicatePort);
@@ -223,6 +230,7 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
                 registeredShuffle,
                 hostnameSet,
                 excludedWorkers,
+                manuallyExcludedWorkers,
                 workerLostEvents,
                 appHeartbeatTime,
                 workers,
@@ -254,6 +262,10 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
           snapshotMetaInfo.getExcludedWorkersList().stream()
               .map(PbSerDeUtils::fromPbWorkerInfo)
               .collect(Collectors.toSet()));
+      manuallyExcludedWorkers.addAll(
+          snapshotMetaInfo.getManuallyExcludedWorkersList().stream()
+              .map(PbSerDeUtils::fromPbWorkerInfo)
+              .collect(Collectors.toSet()));
       workerLostEvents.addAll(
           snapshotMetaInfo.getWorkerLostEventsList().stream()
               .map(PbSerDeUtils::fromPbWorkerInfo)
@@ -310,10 +322,11 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
     }
     LOG.info("Successfully restore meta info from snapshot {}", 
file.getAbsolutePath());
     LOG.info(
-        "Worker size: {}, Registered shuffle size: {}. Worker excluded list 
size: {}.",
+        "Worker size: {}, Registered shuffle size: {}. Worker excluded list 
size: {}. Manually Excluded list size: {}",
         workers.size(),
         registeredShuffle.size(),
-        excludedWorkers.size());
+        excludedWorkers.size(),
+        manuallyExcludedWorkers.size());
     workers.forEach(workerInfo -> LOG.info(workerInfo.toString()));
     registeredShuffle.forEach(shuffle -> LOG.info("RegisteredShuffle {}", 
shuffle));
   }
@@ -342,7 +355,9 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
         Utils.bytesToString(oldEstimatedPartitionSize),
         Utils.bytesToString(estimatedPartitionSize));
     workers.stream()
-        .filter(worker -> !excludedWorkers.contains(worker))
+        .filter(
+            worker ->
+                !excludedWorkers.contains(worker) && 
!manuallyExcludedWorkers.contains(worker))
         .forEach(workerInfo -> 
workerInfo.updateDiskMaxSlots(estimatedPartitionSize));
   }
 }
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
index 008b35707..a51135738 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java
@@ -39,6 +39,9 @@ public interface IMetadataHandler {
 
   void handleAppLost(String appId, String requestId);
 
+  void handleWorkerExclude(
+      List<WorkerInfo> workersToAdd, List<WorkerInfo> workersToRemove, String 
requestId);
+
   void handleWorkerLost(
       String host, int rpcPort, int pushPort, int fetchPort, int 
replicatePort, String requestId);
 
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
index 7d01b4961..c901341c5 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java
@@ -69,6 +69,12 @@ public class SingleMasterMetaManager extends 
AbstractMetaManager {
     updateAppLostMeta(appId);
   }
 
+  @Override
+  public void handleWorkerExclude(
+      List<WorkerInfo> workersToAdd, List<WorkerInfo> workersToRemove, String 
requestId) {
+    updateWorkerExcludeMeta(workersToAdd, workersToRemove);
+  }
+
   @Override
   public void handleWorkerLost(
       String host, int rpcPort, int pushPort, int fetchPort, int 
replicatePort, String requestId) {
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
index 8fc641795..087ce0374 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
@@ -140,6 +140,36 @@ public class HAMasterMetaManager extends 
AbstractMetaManager {
     }
   }
 
+  @Override
+  public void handleWorkerExclude(
+      List<WorkerInfo> workersToAdd, List<WorkerInfo> workersToRemove, String 
requestId) {
+    try {
+      ratisServer.submitRequest(
+          ResourceRequest.newBuilder()
+              .setCmdType(Type.WorkerExclude)
+              .setRequestId(requestId)
+              .setWorkerExcludeRequest(
+                  ResourceProtos.WorkerExcludeRequest.newBuilder()
+                      .addAllWorkersToAdd(
+                          workersToAdd.stream()
+                              .map(MetaUtil::infoToAddr)
+                              .collect(Collectors.toList()))
+                      .addAllWorkersToRemove(
+                          workersToRemove.stream()
+                              .map(MetaUtil::infoToAddr)
+                              .collect(Collectors.toList()))
+                      .build())
+              .build());
+    } catch (CelebornRuntimeException e) {
+      LOG.error(
+          "Handle worker exclude for workersToAdd {} and workersToRemove {} 
failed!",
+          
workersToAdd.stream().map(WorkerInfo::toString).collect(Collectors.joining(",")),
+          
workersToRemove.stream().map(WorkerInfo::toString).collect(Collectors.joining(",")),
+          e);
+      throw e;
+    }
+  }
+
   @Override
   public void handleWorkerLost(
       String host, int rpcPort, int pushPort, int fetchPort, int 
replicatePort, String requestId) {
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
index 533d10016..13990bccd 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java
@@ -131,6 +131,18 @@ public class MetaHandler {
           metaSystem.updateAppLostMeta(appId);
           break;
 
+        case WorkerExclude:
+          List<ResourceProtos.WorkerAddress> addAddresses =
+              request.getWorkerExcludeRequest().getWorkersToAddList();
+          List<ResourceProtos.WorkerAddress> removeAddresses =
+              request.getWorkerExcludeRequest().getWorkersToRemoveList();
+          List<WorkerInfo> workersToAdd =
+              
addAddresses.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
+          List<WorkerInfo> workersToRemove =
+              
removeAddresses.stream().map(MetaUtil::addrToInfo).collect(Collectors.toList());
+          metaSystem.updateWorkerExcludeMeta(workersToAdd, workersToRemove);
+          break;
+
         case WorkerLost:
           host = request.getWorkerLostRequest().getHost();
           rpcPort = request.getWorkerLostRequest().getRpcPort();
diff --git a/master/src/main/proto/Resource.proto 
b/master/src/main/proto/Resource.proto
index 9120105a5..c7cde94dc 100644
--- a/master/src/main/proto/Resource.proto
+++ b/master/src/main/proto/Resource.proto
@@ -36,6 +36,7 @@ enum Type {
   UpdatePartitionSize = 21;
   WorkerRemove = 22;
   RemoveWorkersUnavailableInfo = 23;
+  WorkerExclude = 24;
 }
 
 message ResourceRequest {
@@ -55,6 +56,7 @@ message ResourceRequest {
   optional ReportWorkerUnavailableRequest reportWorkerUnavailableRequest = 18;
   optional WorkerRemoveRequest workerRemoveRequest = 19;
   optional RemoveWorkersUnavailableInfoRequest 
removeWorkersUnavailableInfoRequest = 20;
+  optional WorkerExcludeRequest workerExcludeRequest = 21;
 }
 
 message DiskInfo {
@@ -97,6 +99,11 @@ message AppLostRequest {
   required string appId = 1;
 }
 
+message WorkerExcludeRequest {
+  repeated WorkerAddress workersToAdd = 1;
+  repeated WorkerAddress workersToRemove = 2;
+}
+
 message WorkerLostRequest {
   required string host = 1;
   required int32 rpcPort = 2;
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index c1102418a..9d4361679 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -160,7 +160,7 @@ private[celeborn] class Master(
     statusSystem.registeredShuffle.size
   }
   masterSource.addGauge(MasterSource.EXCLUDED_WORKER_COUNT) { () =>
-    statusSystem.excludedWorkers.size
+    statusSystem.excludedWorkers.size + 
statusSystem.manuallyExcludedWorkers.size
   }
   masterSource.addGauge(MasterSource.WORKER_COUNT) { () => 
statusSystem.workers.size }
   masterSource.addGauge(MasterSource.LOST_WORKER_COUNT) { () => 
statusSystem.lostWorkers.size }
@@ -409,6 +409,19 @@ private[celeborn] class Master(
         context,
         handleReportNodeUnavailable(context, failedWorkers, requestId))
 
+    case pb: PbWorkerExclude =>
+      val workersToAdd = new util.ArrayList[WorkerInfo](pb.getWorkersToAddList
+        .asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava)
+      val workersToRemove = new 
util.ArrayList[WorkerInfo](pb.getWorkersToRemoveList
+        .asScala.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava)
+      executeWithLeaderChecker(
+        context,
+        handleWorkerExclude(
+          context,
+          workersToAdd,
+          workersToRemove,
+          pb.getRequestId))
+
     case pb: PbWorkerLost =>
       val host = pb.getHost
       val rpcPort = pb.getRpcPort
@@ -537,6 +550,20 @@ private[celeborn] class Master(
     context.reply(HeartbeatFromWorkerResponse(expiredShuffleKeys, registered))
   }
 
+  private def handleWorkerExclude(
+      context: RpcCallContext,
+      workersToAdd: util.List[WorkerInfo],
+      workersToRemove: util.List[WorkerInfo],
+      requestId: String): Unit = {
+    statusSystem.handleWorkerExclude(
+      workersToAdd.asScala.filter(workersSnapShot.contains(_)).asJava,
+      workersToRemove.asScala.filter(workersSnapShot.contains(_)).asJava,
+      requestId)
+    if (context != null) {
+      context.reply(WorkerExcludeResponse(true))
+    }
+  }
+
   private def handleWorkerLost(
       context: RpcCallContext,
       host: String,
@@ -878,8 +905,8 @@ private[celeborn] class Master(
   private def workersAvailable(
       tmpExcludedWorkerList: Set[WorkerInfo] = Set.empty): 
util.List[WorkerInfo] = {
     workersSnapShot.asScala.filter { w =>
-      !statusSystem.excludedWorkers.contains(w) && 
!statusSystem.shutdownWorkers.contains(
-        w) && !tmpExcludedWorkerList.contains(w)
+      !statusSystem.excludedWorkers.contains(w) && 
!statusSystem.manuallyExcludedWorkers.contains(
+        w) && !statusSystem.shutdownWorkers.contains(w) && 
!tmpExcludedWorkerList.contains(w)
     }.asJava
   }
 
@@ -890,12 +917,14 @@ private[celeborn] class Master(
     sb.toString()
   }
 
+  private def getWorkers: String = {
+    workersSnapShot.asScala.mkString("\n")
+  }
+
   override def getWorkerInfo: String = {
     val sb = new StringBuilder
     sb.append("====================== Workers Info in Master 
=========================\n")
-    workersSnapShot.asScala.foreach { w =>
-      sb.append(w).append("\n")
-    }
+    sb.append(getWorkers)
     sb.toString()
   }
 
@@ -920,8 +949,8 @@ private[celeborn] class Master(
   override def getExcludedWorkers: String = {
     val sb = new StringBuilder
     sb.append("===================== Excluded Workers in Master 
======================\n")
-    statusSystem.excludedWorkers.asScala.foreach { worker =>
-      sb.append(s"${worker.toUniqueId()}\n")
+    (statusSystem.excludedWorkers.asScala ++ 
statusSystem.manuallyExcludedWorkers.asScala).foreach {
+      worker => sb.append(s"${worker.toUniqueId()}\n")
     }
     sb.toString()
   }
@@ -967,13 +996,30 @@ private[celeborn] class Master(
     sb.toString()
   }
 
-  override def listPartitionLocationInfo: String = throw new 
UnsupportedOperationException()
-
-  override def getUnavailablePeers: String = throw new 
UnsupportedOperationException()
-
-  override def isShutdown: String = throw new UnsupportedOperationException()
-
-  override def isRegistered: String = throw new UnsupportedOperationException()
+  override def exclude(addWorkers: String, removeWorkers: String): String = {
+    val sb = new StringBuilder
+    sb.append("============================ Add/Remove Excluded Workers  
Manually =============================\n")
+    val workersToAdd = addWorkers.split(",").filter(_.nonEmpty)
+    val workersToRemove = removeWorkers.split(",").filter(_.nonEmpty)
+    val workerExcludeResponse = 
self.askSync[PbWorkerExcludeResponse](WorkerExclude(
+      workersToAdd.map(WorkerInfo.fromUniqueId).toList.asJava,
+      workersToRemove.map(WorkerInfo.fromUniqueId).toList.asJava,
+      MasterClient.genRequestId()))
+    if (workerExcludeResponse.getSuccess) {
+      sb.append(
+        s"Excluded workers add ${workersToAdd.mkString(",")} and remove 
${workersToRemove.mkString(",")} successfully.\n")
+    } else {
+      sb.append(
+        s"Failed to Exclude workers add ${workersToAdd.mkString(",")} and 
remove ${workersToRemove.mkString(",")}.\n")
+    }
+    val unknownExcludedWorkers =
+      (workersToAdd ++ workersToRemove).filter(!workersSnapShot.contains(_))
+    if (unknownExcludedWorkers.nonEmpty) {
+      sb.append(
+        s"Unknown worker ${unknownExcludedWorkers.mkString(",")}. Workers in 
Master:\n$getWorkers.")
+    }
+    sb.toString()
+  }
 
   private def isMasterActive: Int = {
     // use int rather than bool for better monitoring on dashboard
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
index bb845b192..c8276f4d8 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
@@ -24,7 +24,6 @@ import static org.mockito.Mockito.when;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicLong;
 
-import com.google.common.collect.ImmutableMap;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -43,43 +42,43 @@ import 
org.apache.celeborn.common.rpc.netty.NettyRpcEndpointRef;
 
 public class DefaultMetaSystemSuiteJ {
 
-  private RpcEnv mockRpcEnv = mock(RpcEnv.class);
-  private CelebornConf conf = new CelebornConf();
+  private final RpcEnv mockRpcEnv = mock(RpcEnv.class);
+  private final CelebornConf conf = new CelebornConf();
   private AbstractMetaManager statusSystem;
-  private RpcEndpointRef dummyRef =
+  private final RpcEndpointRef dummyRef =
       new NettyRpcEndpointRef(
           new CelebornConf(), RpcEndpointAddress.apply("localhost", 111, 
"dummy"), null);
-  private AtomicLong callerId = new AtomicLong();
-
-  private static String HOSTNAME1 = "host1";
-  private static int RPCPORT1 = 1111;
-  private static int PUSHPORT1 = 1112;
-  private static int FETCHPORT1 = 1113;
-  private static int REPLICATEPORT1 = 1114;
-  private static Map<String, DiskInfo> disks1 = new HashMap<>();
-  private static Map<UserIdentifier, ResourceConsumption> 
userResourceConsumption1 =
+  private final AtomicLong callerId = new AtomicLong();
+
+  private static final String HOSTNAME1 = "host1";
+  private static final int RPCPORT1 = 1111;
+  private static final int PUSHPORT1 = 1112;
+  private static final int FETCHPORT1 = 1113;
+  private static final int REPLICATEPORT1 = 1114;
+  private static final Map<String, DiskInfo> disks1 = new HashMap<>();
+  private static final Map<UserIdentifier, ResourceConsumption> 
userResourceConsumption1 =
       new HashMap<>();
 
-  private static String HOSTNAME2 = "host2";
-  private static int RPCPORT2 = 2111;
-  private static int PUSHPORT2 = 2112;
-  private static int FETCHPORT2 = 2113;
-  private static int REPLICATEPORT2 = 2114;
-  private static Map<String, DiskInfo> disks2 = new HashMap<>();
-  private static Map<UserIdentifier, ResourceConsumption> 
userResourceConsumption2 =
+  private static final String HOSTNAME2 = "host2";
+  private static final int RPCPORT2 = 2111;
+  private static final int PUSHPORT2 = 2112;
+  private static final int FETCHPORT2 = 2113;
+  private static final int REPLICATEPORT2 = 2114;
+  private static final Map<String, DiskInfo> disks2 = new HashMap<>();
+  private static final Map<UserIdentifier, ResourceConsumption> 
userResourceConsumption2 =
       new HashMap<>();
 
-  private static String HOSTNAME3 = "host3";
-  private static int RPCPORT3 = 3111;
-  private static int PUSHPORT3 = 3112;
-  private static int FETCHPORT3 = 3113;
-  private static int REPLICATEPORT3 = 3114;
-  private static Map<String, DiskInfo> disks3 = new HashMap<>();
-  private static Map<UserIdentifier, ResourceConsumption> 
userResourceConsumption3 =
+  private static final String HOSTNAME3 = "host3";
+  private static final int RPCPORT3 = 3111;
+  private static final int PUSHPORT3 = 3112;
+  private static final int FETCHPORT3 = 3113;
+  private static final int REPLICATEPORT3 = 3114;
+  private static final Map<String, DiskInfo> disks3 = new HashMap<>();
+  private static final Map<UserIdentifier, ResourceConsumption> 
userResourceConsumption3 =
       new HashMap<>();
 
   @Before
-  public void setUp() throws Exception {
+  public void setUp() {
     when(mockRpcEnv.setupEndpointRef(any(), any())).thenReturn(dummyRef);
     statusSystem = new SingleMasterMetaManager(mockRpcEnv, conf);
 
@@ -103,7 +102,7 @@ public class DefaultMetaSystemSuiteJ {
   }
 
   @After
-  public void tearDown() throws Exception {}
+  public void tearDown() {}
 
   private String getNewReqeustId() {
     return MasterClient.encodeRequestId(UUID.randomUUID().toString(), 
callerId.incrementAndGet());
@@ -143,6 +142,55 @@ public class DefaultMetaSystemSuiteJ {
     assert (statusSystem.workers.size() == 3);
   }
 
+  @Test
+  public void testHandleWorkerExclude() {
+    WorkerInfo workerInfo1 =
+        new WorkerInfo(
+            HOSTNAME1,
+            RPCPORT1,
+            PUSHPORT1,
+            FETCHPORT1,
+            REPLICATEPORT1,
+            disks1,
+            userResourceConsumption1);
+    WorkerInfo workerInfo2 =
+        new WorkerInfo(
+            HOSTNAME2,
+            RPCPORT2,
+            PUSHPORT2,
+            FETCHPORT2,
+            REPLICATEPORT2,
+            disks2,
+            userResourceConsumption2);
+
+    statusSystem.handleRegisterWorker(
+        workerInfo1.host(),
+        workerInfo1.rpcPort(),
+        workerInfo1.pushPort(),
+        workerInfo1.fetchPort(),
+        workerInfo1.replicatePort(),
+        workerInfo1.diskInfos(),
+        workerInfo1.userResourceConsumption(),
+        getNewReqeustId());
+    statusSystem.handleRegisterWorker(
+        workerInfo2.host(),
+        workerInfo2.rpcPort(),
+        workerInfo2.pushPort(),
+        workerInfo2.fetchPort(),
+        workerInfo2.replicatePort(),
+        workerInfo2.diskInfos(),
+        workerInfo2.userResourceConsumption(),
+        getNewReqeustId());
+
+    statusSystem.handleWorkerExclude(
+        Arrays.asList(workerInfo1, workerInfo2), Collections.emptyList(), 
getNewReqeustId());
+    assert (statusSystem.manuallyExcludedWorkers.size() == 2);
+
+    statusSystem.handleWorkerExclude(
+        Collections.emptyList(), Collections.singletonList(workerInfo1), 
getNewReqeustId());
+    assert (statusSystem.manuallyExcludedWorkers.size() == 1);
+  }
+
   @Test
   public void testHandleWorkerLost() {
     statusSystem.handleRegisterWorker(
@@ -178,9 +226,9 @@ public class DefaultMetaSystemSuiteJ {
     assert (statusSystem.workers.size() == 2);
   }
 
-  private static String APPID1 = "appId1";
-  private static int SHUFFLEID1 = 1;
-  private static String SHUFFLEKEY1 = APPID1 + "-" + SHUFFLEID1;
+  private static final String APPID1 = "appId1";
+  private static final int SHUFFLEID1 = 1;
+  private static final String SHUFFLEKEY1 = APPID1 + "-" + SHUFFLEID1;
 
   @Test
   public void testHandleRequestSlots() {
@@ -304,14 +352,6 @@ public class DefaultMetaSystemSuiteJ {
         allocation);
 
     statusSystem.handleRequestSlots(SHUFFLEKEY1, HOSTNAME1, workersToAllocate, 
getNewReqeustId());
-
-    List<String> workerIds = new ArrayList<>();
-    workerIds.add(
-        HOSTNAME1 + ":" + RPCPORT1 + ":" + PUSHPORT1 + ":" + FETCHPORT1 + ":" 
+ REPLICATEPORT1);
-
-    List<Map<String, Integer>> workerSlots = new ArrayList<>();
-    workerSlots.add(ImmutableMap.of("disk1", 3));
-
     Assert.assertEquals(
         0,
         statusSystem.workers.stream()
@@ -583,7 +623,8 @@ public class DefaultMetaSystemSuiteJ {
         userResourceConsumption3,
         getNewReqeustId());
 
-    WorkerInfo workerInfo1 =
+    List<WorkerInfo> failedWorkers = new ArrayList<>();
+    failedWorkers.add(
         new WorkerInfo(
             HOSTNAME1,
             RPCPORT1,
@@ -591,23 +632,11 @@ public class DefaultMetaSystemSuiteJ {
             FETCHPORT1,
             REPLICATEPORT1,
             disks1,
-            userResourceConsumption1);
-    WorkerInfo workerInfo2 =
-        new WorkerInfo(
-            HOSTNAME2,
-            RPCPORT2,
-            PUSHPORT2,
-            FETCHPORT2,
-            REPLICATEPORT2,
-            disks2,
-            userResourceConsumption2);
-
-    List<WorkerInfo> failedWorkers = new ArrayList<>();
-    failedWorkers.add(workerInfo1);
+            userResourceConsumption1));
 
     statusSystem.handleReportWorkerUnavailable(failedWorkers, 
getNewReqeustId());
     assert 1 == statusSystem.shutdownWorkers.size();
-    assert 0 == statusSystem.excludedWorkers.size();
+    assert statusSystem.excludedWorkers.isEmpty();
   }
 
   @Test
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
index 51f669a53..a7fe7eb09 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
@@ -76,7 +76,7 @@ public class MasterStateMachineSuiteJ extends RatisBaseSuiteJ 
{
             .build();
 
     ResourceResponse response = stateMachine.runCommand(request, -1);
-    Assert.assertEquals(response.getSuccess(), true);
+    Assert.assertTrue(response.getSuccess());
   }
 
   @Test
@@ -160,6 +160,9 @@ public class MasterStateMachineSuiteJ extends 
RatisBaseSuiteJ {
     masterStatusSystem.excludedWorkers.add(info2);
     masterStatusSystem.excludedWorkers.add(info3);
 
+    masterStatusSystem.manuallyExcludedWorkers.add(info1);
+    masterStatusSystem.manuallyExcludedWorkers.add(info2);
+
     masterStatusSystem.hostnameSet.add(host1);
     masterStatusSystem.hostnameSet.add(host2);
     masterStatusSystem.hostnameSet.add(host3);
@@ -184,10 +187,12 @@ public class MasterStateMachineSuiteJ extends 
RatisBaseSuiteJ {
 
     masterStatusSystem.hostnameSet.clear();
     masterStatusSystem.excludedWorkers.clear();
+    masterStatusSystem.manuallyExcludedWorkers.clear();
 
     masterStatusSystem.restoreMetaFromFile(tmpFile);
 
     Assert.assertEquals(3, masterStatusSystem.excludedWorkers.size());
+    Assert.assertEquals(2, masterStatusSystem.manuallyExcludedWorkers.size());
     Assert.assertEquals(3, masterStatusSystem.hostnameSet.size());
     Assert.assertEquals(
         conf.metricsAppTopDiskUsageWindowSize(),
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index 84d383cf7..b66099e1d 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -25,7 +25,6 @@ import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicLong;
 
-import com.google.common.collect.ImmutableMap;
 import org.junit.*;
 import org.mockito.Mockito;
 
@@ -51,7 +50,7 @@ public class RatisMasterStatusSystemSuiteJ {
   protected static HAMasterMetaManager STATUSSYSTEM2 = null;
   protected static HAMasterMetaManager STATUSSYSTEM3 = null;
 
-  private static RpcEndpointRef dummyRef =
+  private static final RpcEndpointRef dummyRef =
       new NettyRpcEndpointRef(
           new CelebornConf(), RpcEndpointAddress.apply("localhost", 111, 
"dummy"), null);
 
@@ -155,45 +154,43 @@ public class RatisMasterStatusSystemSuiteJ {
   }
 
   @Test
-  public void testLeaderAvaiable() throws InterruptedException {
-    boolean hasLeader = false;
-    if (RATISSERVER1.isLeader() || RATISSERVER2.isLeader() || 
RATISSERVER3.isLeader()) {
-      hasLeader = true;
-    }
-    Assert.assertEquals(hasLeader, true);
+  public void testLeaderAvaiable() {
+    boolean hasLeader =
+        RATISSERVER1.isLeader() || RATISSERVER2.isLeader() || 
RATISSERVER3.isLeader();
+    Assert.assertTrue(hasLeader);
   }
 
-  private static String HOSTNAME1 = "host1";
-  private static int RPCPORT1 = 1111;
-  private static int PUSHPORT1 = 1112;
-  private static int FETCHPORT1 = 1113;
-  private static int REPLICATEPORT1 = 1114;
-  private static Map<String, DiskInfo> disks1 = new HashMap();
-  private static Map<UserIdentifier, ResourceConsumption> 
userResourceConsumption1 =
+  private static final String HOSTNAME1 = "host1";
+  private static final int RPCPORT1 = 1111;
+  private static final int PUSHPORT1 = 1112;
+  private static final int FETCHPORT1 = 1113;
+  private static final int REPLICATEPORT1 = 1114;
+  private static final Map<String, DiskInfo> disks1 = new HashMap<>();
+  private static final Map<UserIdentifier, ResourceConsumption> 
userResourceConsumption1 =
       new HashMap<>();
 
-  private static String HOSTNAME2 = "host2";
-  private static int RPCPORT2 = 2111;
-  private static int PUSHPORT2 = 2112;
-  private static int FETCHPORT2 = 2113;
-  private static int REPLICATEPORT2 = 2114;
-  private static Map<String, DiskInfo> disks2 = new HashMap<>();
-  private static Map<UserIdentifier, ResourceConsumption> 
userResourceConsumption2 =
+  private static final String HOSTNAME2 = "host2";
+  private static final int RPCPORT2 = 2111;
+  private static final int PUSHPORT2 = 2112;
+  private static final int FETCHPORT2 = 2113;
+  private static final int REPLICATEPORT2 = 2114;
+  private static final Map<String, DiskInfo> disks2 = new HashMap<>();
+  private static final Map<UserIdentifier, ResourceConsumption> 
userResourceConsumption2 =
       new HashMap<>();
 
-  private static String HOSTNAME3 = "host3";
-  private static int RPCPORT3 = 3111;
-  private static int PUSHPORT3 = 3112;
-  private static int FETCHPORT3 = 3113;
-  private static int REPLICATEPORT3 = 3114;
-  private static Map<String, DiskInfo> disks3 = new HashMap<>();
-  private static Map<UserIdentifier, ResourceConsumption> 
userResourceConsumption3 =
+  private static final String HOSTNAME3 = "host3";
+  private static final int RPCPORT3 = 3111;
+  private static final int PUSHPORT3 = 3112;
+  private static final int FETCHPORT3 = 3113;
+  private static final int REPLICATEPORT3 = 3114;
+  private static final Map<String, DiskInfo> disks3 = new HashMap<>();
+  private static final Map<UserIdentifier, ResourceConsumption> 
userResourceConsumption3 =
       new HashMap<>();
 
-  private AtomicLong callerId = new AtomicLong();
-  private static String APPID1 = "appId1";
-  private static int SHUFFLEID1 = 1;
-  private static String SHUFFLEKEY1 = APPID1 + "-" + SHUFFLEID1;
+  private final AtomicLong callerId = new AtomicLong();
+  private static final String APPID1 = "appId1";
+  private static final int SHUFFLEID1 = 1;
+  private static final String SHUFFLEKEY1 = APPID1 + "-" + SHUFFLEID1;
 
   private String getNewReqeustId() {
     return MasterClient.encodeRequestId(UUID.randomUUID().toString(), 
callerId.incrementAndGet());
@@ -235,7 +232,7 @@ public class RatisMasterStatusSystemSuiteJ {
           disks1,
           userResourceConsumption1,
           getNewReqeustId());
-      Assert.assertTrue(false);
+      Assert.fail();
     } catch (CelebornRuntimeException e) {
       Assert.assertTrue(true);
     } finally {
@@ -283,6 +280,66 @@ public class RatisMasterStatusSystemSuiteJ {
     Assert.assertEquals(3, STATUSSYSTEM3.workers.size());
   }
 
+  @Test
+  public void testHandleWorkerExclude() throws InterruptedException {
+    AbstractMetaManager statusSystem = pickLeaderStatusSystem();
+    Assert.assertNotNull(statusSystem);
+
+    WorkerInfo workerInfo1 =
+        new WorkerInfo(
+            HOSTNAME1,
+            RPCPORT1,
+            PUSHPORT1,
+            FETCHPORT1,
+            REPLICATEPORT1,
+            disks1,
+            userResourceConsumption1);
+    WorkerInfo workerInfo2 =
+        new WorkerInfo(
+            HOSTNAME2,
+            RPCPORT2,
+            PUSHPORT2,
+            FETCHPORT2,
+            REPLICATEPORT2,
+            disks2,
+            userResourceConsumption2);
+
+    statusSystem.handleRegisterWorker(
+        workerInfo1.host(),
+        workerInfo1.rpcPort(),
+        workerInfo1.pushPort(),
+        workerInfo1.fetchPort(),
+        workerInfo1.replicatePort(),
+        workerInfo1.diskInfos(),
+        workerInfo1.userResourceConsumption(),
+        getNewReqeustId());
+    statusSystem.handleRegisterWorker(
+        workerInfo2.host(),
+        workerInfo2.rpcPort(),
+        workerInfo2.pushPort(),
+        workerInfo2.fetchPort(),
+        workerInfo2.replicatePort(),
+        workerInfo2.diskInfos(),
+        workerInfo2.userResourceConsumption(),
+        getNewReqeustId());
+
+    statusSystem.handleWorkerExclude(
+        Arrays.asList(workerInfo1, workerInfo2), Collections.emptyList(), 
getNewReqeustId());
+    Thread.sleep(3000L);
+
+    Assert.assertEquals(2, STATUSSYSTEM1.manuallyExcludedWorkers.size());
+    Assert.assertEquals(2, STATUSSYSTEM2.manuallyExcludedWorkers.size());
+    Assert.assertEquals(2, STATUSSYSTEM3.manuallyExcludedWorkers.size());
+
+    statusSystem.handleWorkerExclude(
+        Collections.emptyList(), Collections.singletonList(workerInfo1), 
getNewReqeustId());
+    Thread.sleep(3000L);
+
+    Assert.assertEquals(1, STATUSSYSTEM1.manuallyExcludedWorkers.size());
+    Assert.assertEquals(1, STATUSSYSTEM2.manuallyExcludedWorkers.size());
+    Assert.assertEquals(1, STATUSSYSTEM3.manuallyExcludedWorkers.size());
+  }
+
   @Test
   public void testHandleWorkerLost() throws InterruptedException {
     AbstractMetaManager statusSystem = pickLeaderStatusSystem();
@@ -483,15 +540,6 @@ public class RatisMasterStatusSystemSuiteJ {
     statusSystem.handleRequestSlots(SHUFFLEKEY1, HOSTNAME1, workersToAllocate, 
getNewReqeustId());
     Thread.sleep(3000L);
 
-    List<String> workerIds = new ArrayList<>();
-    workerIds.add(
-        HOSTNAME1 + ":" + RPCPORT1 + ":" + PUSHPORT1 + ":" + FETCHPORT1 + ":" 
+ REPLICATEPORT1);
-
-    List<Map<String, Integer>> workerSlots = new ArrayList<>();
-    workerSlots.add(ImmutableMap.of("disk1", 3));
-
-    Thread.sleep(3000L);
-
     // Do not update diskinfo's activeslots
 
     Assert.assertEquals(
@@ -658,9 +706,9 @@ public class RatisMasterStatusSystemSuiteJ {
     statusSystem.handleUnRegisterShuffle(SHUFFLEKEY1, getNewReqeustId());
     Thread.sleep(3000L);
 
-    Assert.assertEquals(true, STATUSSYSTEM1.registeredShuffle.isEmpty());
-    Assert.assertEquals(true, STATUSSYSTEM2.registeredShuffle.isEmpty());
-    Assert.assertEquals(true, STATUSSYSTEM3.registeredShuffle.isEmpty());
+    Assert.assertTrue(STATUSSYSTEM1.registeredShuffle.isEmpty());
+    Assert.assertTrue(STATUSSYSTEM2.registeredShuffle.isEmpty());
+    Assert.assertTrue(STATUSSYSTEM3.registeredShuffle.isEmpty());
   }
 
   @Test
@@ -671,17 +719,17 @@ public class RatisMasterStatusSystemSuiteJ {
     long dummy = 1235L;
     statusSystem.handleAppHeartbeat(APPID1, 1, 1, dummy, getNewReqeustId());
     Thread.sleep(3000L);
-    Assert.assertEquals(new Long(dummy), 
STATUSSYSTEM1.appHeartbeatTime.get(APPID1));
-    Assert.assertEquals(new Long(dummy), 
STATUSSYSTEM2.appHeartbeatTime.get(APPID1));
-    Assert.assertEquals(new Long(dummy), 
STATUSSYSTEM3.appHeartbeatTime.get(APPID1));
+    Assert.assertEquals(Long.valueOf(dummy), 
STATUSSYSTEM1.appHeartbeatTime.get(APPID1));
+    Assert.assertEquals(Long.valueOf(dummy), 
STATUSSYSTEM2.appHeartbeatTime.get(APPID1));
+    Assert.assertEquals(Long.valueOf(dummy), 
STATUSSYSTEM3.appHeartbeatTime.get(APPID1));
 
     String appId2 = "app02";
     statusSystem.handleAppHeartbeat(appId2, 1, 1, dummy, getNewReqeustId());
     Thread.sleep(3000L);
 
-    Assert.assertEquals(new Long(dummy), 
STATUSSYSTEM1.appHeartbeatTime.get(appId2));
-    Assert.assertEquals(new Long(dummy), 
STATUSSYSTEM2.appHeartbeatTime.get(appId2));
-    Assert.assertEquals(new Long(dummy), 
STATUSSYSTEM3.appHeartbeatTime.get(appId2));
+    Assert.assertEquals(Long.valueOf(dummy), 
STATUSSYSTEM1.appHeartbeatTime.get(appId2));
+    Assert.assertEquals(Long.valueOf(dummy), 
STATUSSYSTEM2.appHeartbeatTime.get(appId2));
+    Assert.assertEquals(Long.valueOf(dummy), 
STATUSSYSTEM3.appHeartbeatTime.get(appId2));
 
     Assert.assertEquals(2, STATUSSYSTEM1.appHeartbeatTime.size());
     Assert.assertEquals(2, STATUSSYSTEM2.appHeartbeatTime.size());
@@ -871,7 +919,8 @@ public class RatisMasterStatusSystemSuiteJ {
         userResourceConsumption3,
         getNewReqeustId());
 
-    WorkerInfo workerInfo1 =
+    List<WorkerInfo> failedWorkers = new ArrayList<>();
+    failedWorkers.add(
         new WorkerInfo(
             HOSTNAME1,
             RPCPORT1,
@@ -879,19 +928,7 @@ public class RatisMasterStatusSystemSuiteJ {
             FETCHPORT1,
             REPLICATEPORT1,
             disks1,
-            userResourceConsumption1);
-    WorkerInfo workerInfo2 =
-        new WorkerInfo(
-            HOSTNAME2,
-            RPCPORT2,
-            PUSHPORT2,
-            FETCHPORT2,
-            REPLICATEPORT2,
-            disks2,
-            userResourceConsumption2);
-
-    List<WorkerInfo> failedWorkers = new ArrayList<>();
-    failedWorkers.add(workerInfo1);
+            userResourceConsumption1));
 
     statusSystem.handleReportWorkerUnavailable(failedWorkers, 
getNewReqeustId());
     Thread.sleep(3000L);
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala 
b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
index 4390c369d..5f9ff1f5a 100644
--- a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
+++ b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala
@@ -43,33 +43,36 @@ abstract class HttpService extends Service with Logging {
     sb.toString()
   }
 
-  def getMasterGroupInfo: String = throw new UnsupportedOperationException()
-
   def getWorkerInfo: String
 
+  def getThreadDump: String
+
+  def getShuffleList: String
+
+  def listTopDiskUseApps: String
+
+  def getMasterGroupInfo: String = throw new UnsupportedOperationException()
+
   def getLostWorkers: String = throw new UnsupportedOperationException()
 
   def getShutdownWorkers: String = throw new UnsupportedOperationException()
 
   def getExcludedWorkers: String = throw new UnsupportedOperationException()
 
-  def getThreadDump: String
-
   def getHostnameList: String = throw new UnsupportedOperationException()
 
   def getApplicationList: String = throw new UnsupportedOperationException()
 
-  def getShuffleList: String
-
-  def listTopDiskUseApps: String
+  def exclude(addWorkers: String, removeWorkers: String): String =
+    throw new UnsupportedOperationException()
 
-  def listPartitionLocationInfo: String
+  def listPartitionLocationInfo: String = throw new 
UnsupportedOperationException()
 
-  def getUnavailablePeers: String
+  def getUnavailablePeers: String = throw new UnsupportedOperationException()
 
-  def isShutdown: String
+  def isShutdown: String = throw new UnsupportedOperationException()
 
-  def isRegistered: String
+  def isRegistered: String = throw new UnsupportedOperationException()
 
   def exit(exitType: String): String = throw new 
UnsupportedOperationException()
 
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpEndpoint.scala
 
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpEndpoint.scala
index 0cbc3b975..544505cd6 100644
--- 
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpEndpoint.scala
+++ 
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpEndpoint.scala
@@ -168,6 +168,16 @@ case object Applications extends HttpEndpoint {
     service.getApplicationList
 }
 
+case object Exclude extends HttpEndpoint {
+  override def path: String = "/exclude"
+
+  override def description(service: String): String =
+    "Excluded workers of the master add or remove the worker manually given 
worker id. The parameter add or remove specifies the excluded workers to add or 
remove, which value is separated by commas."
+
+  override def handle(service: HttpService, parameters: Map[String, String]): 
String =
+    service.exclude(parameters.getOrElse("ADD", "").trim, 
parameters.getOrElse("REMOVE", "").trim)
+}
+
 case object ListPartitionLocationInfo extends HttpEndpoint {
   override def path: String = "/listPartitionLocationInfo"
 
@@ -212,7 +222,7 @@ case object Exit extends HttpEndpoint {
   override def path: String = "/exit"
 
   override def description(service: String): String =
-    "Trigger this worker to exit. Legal types are 'DECOMMISSION‘, 'GRACEFUL' 
and 'IMMEDIATELY'"
+    "Trigger this worker to exit. Legal types are 'DECOMMISSION', 'GRACEFUL' 
and 'IMMEDIATELY'."
 
   override def handle(service: HttpService, parameters: Map[String, String]): 
String =
     service.exit(parameters.getOrElse("TYPE", ""))
diff --git 
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala 
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala
index 3fbcca5e5..c208c0f06 100644
--- 
a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala
+++ 
b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala
@@ -32,7 +32,8 @@ object HttpUtils {
     ExcludedWorkers,
     ShutdownWorkers,
     Hostnames,
-    Applications) ++ baseEndpoints
+    Applications,
+    Exclude) ++ baseEndpoints
   private val workerEndpoints: List[HttpEndpoint] =
     List(
       ListPartitionLocationInfo,
@@ -50,7 +51,7 @@ object HttpUtils {
         url.getQuery
           .split("&")
           .map(_.split("="))
-          .map(arr => arr(0).toUpperCase(Locale.ROOT) -> 
arr(1).toUpperCase(Locale.ROOT)).toMap
+          .map(arr => arr(0).toUpperCase(Locale.ROOT) -> arr(1)).toMap
       }
     (url.getPath, parameter)
   }
diff --git 
a/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala
 
b/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala
index 4133e67ea..8b877a8c7 100644
--- 
a/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala
+++ 
b/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala
@@ -33,13 +33,29 @@ class HttpUtilsSuite extends AnyFunSuite with Logging {
     assert(parameters == expectParameters)
   }
 
+  test("CELEBORN-448: Support exclude worker manually") {
+    checkParseUri("/exclude", "/exclude", Map.empty)
+    checkParseUri(
+      "/exclude?add=localhost:1001:1002:1003:1004",
+      "/exclude",
+      Map("ADD" -> "localhost:1001:1002:1003:1004"))
+    checkParseUri(
+      "/exclude?remove=localhost:1001:1002:1003:1004",
+      "/exclude",
+      Map("REMOVE" -> "localhost:1001:1002:1003:1004"))
+    checkParseUri(
+      
"/exclude?add=localhost:1001:1002:1003:1004&remove=localhost:2001:2002:2003:2004",
+      "/exclude",
+      Map("ADD" -> "localhost:1001:1002:1003:1004", "REMOVE" -> 
"localhost:2001:2002:2003:2004"))
+  }
+
   test("CELEBORN-847: Support parse HTTP Restful API parameters") {
     checkParseUri("/exit", "/exit", Map.empty)
-    checkParseUri("/exit?type=decommission", "/exit", Map("TYPE" -> 
"DECOMMISSION"))
+    checkParseUri("/exit?type=decommission", "/exit", Map("TYPE" -> 
"decommission"))
     checkParseUri(
-      "/exit?type=decommission&foo=a",
+      "/exit?type=decommission&foo=A",
       "/exit",
-      Map("TYPE" -> "DECOMMISSION", "FOO" -> "A"))
+      Map("TYPE" -> "decommission", "FOO" -> "A"))
   }
 
   test("CELEBORN-829: Improve response message of invalid HTTP request") {
@@ -47,6 +63,7 @@ class HttpUtilsSuite extends AnyFunSuite with Logging {
       s"""Available API providers include:
          |/applications        List all running application's ids of the 
cluster.
          |/conf                List the conf setting of the master.
+         |/exclude             Excluded workers of the master add or remove 
the worker manually given worker id. The parameter add or remove specifies the 
excluded workers to add or remove, which value is separated by commas.
          |/excludedWorkers     List all excluded workers of the master.
          |/help                List the available API providers of the master.
          |/hostnames           List all running application's 
LifecycleManager's hostnames of the cluster.
@@ -61,7 +78,7 @@ class HttpUtilsSuite extends AnyFunSuite with Logging {
     assert(HttpUtils.help(Service.WORKER) ==
       s"""Available API providers include:
          |/conf                      List the conf setting of the worker.
-         |/exit                      Trigger this worker to exit. Legal types 
are 'DECOMMISSION‘, 'GRACEFUL' and 'IMMEDIATELY'
+         |/exit                      Trigger this worker to exit. Legal types 
are 'DECOMMISSION', 'GRACEFUL' and 'IMMEDIATELY'.
          |/help                      List the available API providers of the 
worker.
          |/isRegistered              Show if the worker is registered to the 
master success.
          |/isShutdown                Show if the worker is during the process 
of shutdown.
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 2b03f8fb7..d3850f8f1 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -20,7 +20,7 @@ package org.apache.celeborn.service.deploy.worker
 import java.io.File
 import java.lang.{Long => JLong}
 import java.util
-import java.util.{HashMap => JHashMap, HashSet => JHashSet, Map => JMap}
+import java.util.{HashMap => JHashMap, HashSet => JHashSet, Locale, Map => 
JMap}
 import java.util.concurrent._
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicIntegerArray}
 
@@ -647,7 +647,7 @@ private[celeborn] class Worker(
   }
 
   override def exit(exitType: String): String = {
-    exitType match {
+    exitType.toUpperCase(Locale.ROOT) match {
       case "DECOMMISSION" =>
         exitKind = CelebornExitKind.WORKER_DECOMMISSION
         ShutdownHookManager.get().updateTimeout(

Reply via email to