This is an automated email from the ASF dual-hosted git repository.

xianjingfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 1951376f [#80][Part-2] feat: Add RPC logic and heartbeat logic for 
decommisson (#663)
1951376f is described below

commit 1951376f28fe4f91cbe051c6fa8600f52dede8e3
Author: xianjingfeng <[email protected]>
AuthorDate: Thu Mar 2 21:29:00 2023 +0800

    [#80][Part-2] feat: Add RPC logic and heartbeat logic for decommisson (#663)
    
    ### What changes were proposed in this pull request?
    Add RPC logic and heartbeat logic for decommisson
    
    ### Why are the changes needed?
    Support shuffle server decommission. It is a part of #80
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    UT
---
 .../org/apache/uniffle/common/ServerStatus.java    |  26 +++++-
 .../org/apache/uniffle/common/rpc/GrpcServer.java  |  74 +++++++++++----
 .../org/apache/uniffle/common/rpc/StatusCode.java  |  21 ++++-
 .../StatusCodeTest.java => ServerStatusTest.java}  |  35 ++++---
 .../apache/uniffle/common/rpc/StatusCodeTest.java  |  23 +++--
 coordinator/pom.xml                                |   5 +-
 .../apache/uniffle/coordinator/ClusterManager.java |   6 ++
 .../uniffle/coordinator/CoordinatorConf.java       |  12 +++
 .../uniffle/coordinator/CoordinatorFactory.java    |   6 +-
 .../coordinator/CoordinatorGrpcService.java        |   3 +
 .../org/apache/uniffle/coordinator/ServerNode.java |  27 +++++-
 .../uniffle/coordinator/SimpleClusterManager.java  |  52 +++++++++++
 .../apache/uniffle/coordinator/ServerNodeTest.java |   4 +-
 .../uniffle/test/CoordinatorGrpcServerTest.java    |   6 +-
 .../test/ShuffleServerInternalGrpcTest.java        | 104 +++++++++++++++++++++
 .../client/api/ShuffleServerInternalClient.java    |  24 +++--
 .../client/impl/grpc/CoordinatorGrpcClient.java    |   4 +
 .../impl/grpc/ShuffleServerInternalGrpcClient.java |  76 +++++++++++++++
 .../request/RssCancelDecommissionRequest.java      |  18 +---
 .../client/request/RssDecommissionRequest.java     |  18 +---
 .../client/request/RssSendHeartBeatRequest.java    |   8 ++
 .../response/RssCancelDecommissionResponse.java    |  20 ++--
 .../client/response/RssDecommissionResponse.java   |  20 ++--
 proto/src/main/proto/Rss.proto                     |  30 ++++++
 .../apache/uniffle/server/RegisterHeartBeat.java   |   4 +
 .../uniffle/server/ShuffleServerFactory.java       |   9 +-
 .../server/ShuffleServerInternalGrpcService.java   |  85 +++++++++++++++++
 .../apache/uniffle/server/MockedGrpcServer.java    |   5 +-
 28 files changed, 608 insertions(+), 117 deletions(-)

diff --git a/common/src/main/java/org/apache/uniffle/common/ServerStatus.java 
b/common/src/main/java/org/apache/uniffle/common/ServerStatus.java
index 2cfdf6ba..bd23b560 100644
--- a/common/src/main/java/org/apache/uniffle/common/ServerStatus.java
+++ b/common/src/main/java/org/apache/uniffle/common/ServerStatus.java
@@ -17,12 +17,20 @@
 
 package org.apache.uniffle.common;
 
+import java.util.Arrays;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.uniffle.proto.RssProtos;
+
 public enum ServerStatus {
-  UNKNOWN(-1),
   ACTIVE(0),
   DECOMMISSIONING(1),
-  DECOMMISSIONED(2);
+  DECOMMISSIONED(2),
+  UNKNOWN(-1);
 
+  static final Map<Integer, ServerStatus> VALUE_MAP =
+      
Arrays.stream(ServerStatus.values()).collect(Collectors.toMap(ServerStatus::code,
 s -> s));
   private final int code;
 
   ServerStatus(int code) {
@@ -32,4 +40,18 @@ public enum ServerStatus {
   public int code() {
     return code;
   }
+
+  public static ServerStatus fromCode(Integer code) {
+    ServerStatus serverStatus = VALUE_MAP.get(code);
+    return serverStatus == null ? UNKNOWN : serverStatus;
+  }
+
+  public RssProtos.ServerStatus toProto() {
+    RssProtos.ServerStatus serverStatus = 
RssProtos.ServerStatus.forNumber(this.code());
+    return serverStatus == null ? RssProtos.ServerStatus.UNRECOGNIZED : 
serverStatus;
+  }
+
+  public static ServerStatus fromProto(RssProtos.ServerStatus status) {
+    return fromCode(status.getNumber());
+  }
 }
diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java 
b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
index be4e916f..a9935412 100644
--- a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
+++ b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
@@ -18,6 +18,8 @@
 package org.apache.uniffle.common.rpc;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadFactory;
@@ -25,11 +27,14 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
 import io.grpc.BindableService;
 import io.grpc.Server;
 import io.grpc.ServerBuilder;
+import io.grpc.ServerInterceptor;
 import io.grpc.ServerInterceptors;
+import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,7 +51,10 @@ public class GrpcServer implements ServerInterface {
   private final int port;
   private final ExecutorService pool;
 
-  public GrpcServer(RssBaseConf conf, BindableService service, GRPCMetrics 
grpcMetrics) {
+  protected GrpcServer(
+      RssBaseConf conf,
+      List<Pair<BindableService, List<ServerInterceptor>>> 
servicesWithInterceptors,
+      GRPCMetrics grpcMetrics) {
     this.port = conf.getInteger(RssBaseConf.RPC_SERVER_PORT);
     long maxInboundMessageSize = 
conf.getLong(RssBaseConf.RPC_MESSAGE_MAX_SIZE);
     int rpcExecutorSize = conf.getInteger(RssBaseConf.RPC_EXECUTOR_SIZE);
@@ -61,23 +69,55 @@ public class GrpcServer implements ServerInterface {
     );
 
     boolean isMetricsEnabled = 
conf.getBoolean(RssBaseConf.RPC_METRICS_ENABLED);
+    ServerBuilder<?> builder = ServerBuilder
+        .forPort(port)
+        .executor(pool)
+        .maxInboundMessageSize((int)maxInboundMessageSize);
     if (isMetricsEnabled) {
-      MonitoringServerInterceptor monitoringInterceptor =
-          new MonitoringServerInterceptor(grpcMetrics);
-      this.server = ServerBuilder
-          .forPort(port)
-          .addService(ServerInterceptors.intercept(service, 
monitoringInterceptor))
-          .executor(pool)
-          .addTransportFilter(new MonitoringServerTransportFilter(grpcMetrics))
-          .maxInboundMessageSize((int)maxInboundMessageSize)
-          .build();
-    } else {
-      this.server = ServerBuilder
-          .forPort(port)
-          .addService(service)
-          .executor(pool)
-          .maxInboundMessageSize((int)maxInboundMessageSize)
-          .build();
+      builder.addTransportFilter(new 
MonitoringServerTransportFilter(grpcMetrics));
+    }
+    servicesWithInterceptors.forEach((serviceWithInterceptors) -> {
+      List<ServerInterceptor> interceptors = 
serviceWithInterceptors.getRight();
+      if (isMetricsEnabled) {
+        MonitoringServerInterceptor monitoringInterceptor =
+            new MonitoringServerInterceptor(grpcMetrics);
+        List<ServerInterceptor> newInterceptors = 
Lists.newArrayList(interceptors);
+        newInterceptors.add(monitoringInterceptor);
+        interceptors = newInterceptors;
+      }
+      
builder.addService(ServerInterceptors.intercept(serviceWithInterceptors.getLeft(),
 interceptors));
+    });
+    this.server = builder.build();
+  }
+
+  public static class Builder {
+
+    private RssBaseConf rssBaseConf;
+    private GRPCMetrics grpcMetrics;
+
+    private List<Pair<BindableService, List<ServerInterceptor>>> 
servicesWithInterceptors = new ArrayList<>();
+
+    public static Builder newBuilder() {
+      return new Builder();
+    }
+
+    public Builder conf(RssBaseConf rssBaseConf) {
+      this.rssBaseConf = rssBaseConf;
+      return this;
+    }
+
+    public Builder addService(BindableService bindableService, 
ServerInterceptor... interceptors) {
+      this.servicesWithInterceptors.add(Pair.of(bindableService, 
Lists.newArrayList(interceptors)));
+      return this;
+    }
+
+    public Builder grpcMetrics(GRPCMetrics metrics) {
+      this.grpcMetrics = metrics;
+      return this;
+    }
+
+    public GrpcServer build() {
+      return new GrpcServer(rssBaseConf, servicesWithInterceptors, 
grpcMetrics);
     }
   }
 
diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/StatusCode.java 
b/common/src/main/java/org/apache/uniffle/common/rpc/StatusCode.java
index 7157c351..eb2c7811 100644
--- a/common/src/main/java/org/apache/uniffle/common/rpc/StatusCode.java
+++ b/common/src/main/java/org/apache/uniffle/common/rpc/StatusCode.java
@@ -17,6 +17,10 @@
 
 package org.apache.uniffle.common.rpc;
 
+import java.util.Arrays;
+import java.util.Map;
+import java.util.stream.Collectors;
+
 import org.apache.uniffle.proto.RssProtos;
 
 public enum StatusCode {
@@ -28,8 +32,12 @@ public enum StatusCode {
   NO_PARTITION(5),
   INTERNAL_ERROR(6),
   TIMEOUT(7),
-  ACCESS_DENIED(8);
+  ACCESS_DENIED(8),
+  INVALID_REQUEST(9),
+  UNKNOWN(-1);
 
+  static final Map<Integer, StatusCode> VALUE_MAP =
+      
Arrays.stream(StatusCode.values()).collect(Collectors.toMap(StatusCode::statusCode,
 s -> s));
   private final int statusCode;
 
   StatusCode(int code) {
@@ -40,8 +48,17 @@ public enum StatusCode {
     return statusCode;
   }
 
+  public static StatusCode fromCode(Integer code) {
+    StatusCode statusCode = VALUE_MAP.get(code);
+    return statusCode == null ? UNKNOWN : statusCode;
+  }
+
   public RssProtos.StatusCode toProto() {
     RssProtos.StatusCode code = 
RssProtos.StatusCode.forNumber(this.statusCode());
-    return code == null ? RssProtos.StatusCode.INTERNAL_ERROR : code;
+    return code == null ? RssProtos.StatusCode.UNRECOGNIZED : code;
+  }
+
+  public static StatusCode fromProto(RssProtos.StatusCode status) {
+    return fromCode(status.getNumber());
   }
 }
diff --git 
a/common/src/test/java/org/apache/uniffle/common/rpc/StatusCodeTest.java 
b/common/src/test/java/org/apache/uniffle/common/ServerStatusTest.java
similarity index 51%
copy from common/src/test/java/org/apache/uniffle/common/rpc/StatusCodeTest.java
copy to common/src/test/java/org/apache/uniffle/common/ServerStatusTest.java
index 3c93d14b..e449215a 100644
--- a/common/src/test/java/org/apache/uniffle/common/rpc/StatusCodeTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/ServerStatusTest.java
@@ -15,7 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.uniffle.common.rpc;
+package org.apache.uniffle.common;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
 
 import org.junit.jupiter.api.Test;
 
@@ -24,31 +28,36 @@ import org.apache.uniffle.proto.RssProtos;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
-public class StatusCodeTest {
+public class ServerStatusTest {
 
   @Test
   public void test() throws Exception {
-    RssProtos.StatusCode[] protoStatusCode = RssProtos.StatusCode.values();
-    for (RssProtos.StatusCode statusCode : protoStatusCode) {
+    assertEquals(-1, ServerStatus.UNKNOWN.code());
+    assertEquals(ServerStatus.fromCode(-2), ServerStatus.UNKNOWN);
+    assertEquals(ServerStatus.fromCode(Integer.MAX_VALUE), 
ServerStatus.UNKNOWN);
+    List<RssProtos.ServerStatus> protoServerStatuses = 
Arrays.stream(RssProtos.ServerStatus.values())
+        .filter(s -> 
!RssProtos.ServerStatus.UNRECOGNIZED.equals(s)).collect(Collectors.toList());
+
+    for (RssProtos.ServerStatus statusCode : protoServerStatuses) {
+
       try {
-        if (RssProtos.StatusCode.UNRECOGNIZED.equals(statusCode)) {
-          continue;
-        }
-        StatusCode.valueOf(statusCode.name());
+        ServerStatus.valueOf(statusCode.name());
       } catch (Exception e) {
         fail(e.getMessage());
       }
     }
-    StatusCode[] statusCodes = StatusCode.values();
-    for (StatusCode statusCode : statusCodes) {
+    List<ServerStatus> serverStatuses = Arrays.stream(ServerStatus.values())
+        .filter(s -> 
!ServerStatus.UNKNOWN.equals(s)).collect(Collectors.toList());
+    for (ServerStatus serverStatus : serverStatuses) {
       try {
-        RssProtos.StatusCode.valueOf(statusCode.name());
+        RssProtos.ServerStatus.valueOf(serverStatus.name());
       } catch (Exception e) {
         fail(e.getMessage());
       }
     }
-    for (int i = 0; i < statusCodes.length; i++) {
-      assertEquals(protoStatusCode[i], statusCodes[i].toProto());
+    for (int i = 0; i < serverStatuses.size() - 1; i++) {
+      assertEquals(protoServerStatuses.get(i), 
serverStatuses.get(i).toProto());
+      assertEquals(ServerStatus.fromProto(protoServerStatuses.get(i)), 
serverStatuses.get(i));
     }
   }
 }
diff --git 
a/common/src/test/java/org/apache/uniffle/common/rpc/StatusCodeTest.java 
b/common/src/test/java/org/apache/uniffle/common/rpc/StatusCodeTest.java
index 3c93d14b..5956c1d3 100644
--- a/common/src/test/java/org/apache/uniffle/common/rpc/StatusCodeTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/rpc/StatusCodeTest.java
@@ -17,6 +17,10 @@
 
 package org.apache.uniffle.common.rpc;
 
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
 import org.junit.jupiter.api.Test;
 
 import org.apache.uniffle.proto.RssProtos;
@@ -28,18 +32,22 @@ public class StatusCodeTest {
 
   @Test
   public void test() throws Exception {
-    RssProtos.StatusCode[] protoStatusCode = RssProtos.StatusCode.values();
+    assertEquals(-1, StatusCode.UNKNOWN.statusCode());
+    assertEquals(StatusCode.fromCode(-2), StatusCode.UNKNOWN);
+    assertEquals(StatusCode.fromCode(Integer.MAX_VALUE), StatusCode.UNKNOWN);
+    List<RssProtos.StatusCode> protoStatusCode = 
Arrays.stream(RssProtos.StatusCode.values())
+        .filter(s -> 
!RssProtos.StatusCode.UNRECOGNIZED.equals(s)).collect(Collectors.toList());
+
     for (RssProtos.StatusCode statusCode : protoStatusCode) {
       try {
-        if (RssProtos.StatusCode.UNRECOGNIZED.equals(statusCode)) {
-          continue;
-        }
         StatusCode.valueOf(statusCode.name());
       } catch (Exception e) {
         fail(e.getMessage());
       }
     }
-    StatusCode[] statusCodes = StatusCode.values();
+    List<StatusCode> statusCodes = Arrays.stream(StatusCode.values())
+        .filter(s -> 
!StatusCode.UNKNOWN.equals(s)).collect(Collectors.toList());
+
     for (StatusCode statusCode : statusCodes) {
       try {
         RssProtos.StatusCode.valueOf(statusCode.name());
@@ -47,8 +55,9 @@ public class StatusCodeTest {
         fail(e.getMessage());
       }
     }
-    for (int i = 0; i < statusCodes.length; i++) {
-      assertEquals(protoStatusCode[i], statusCodes[i].toProto());
+    for (int i = 0; i < statusCodes.size() - 1; i++) {
+      assertEquals(protoStatusCode.get(i), statusCodes.get(i).toProto());
+      assertEquals(StatusCode.fromProto(protoStatusCode.get(i)), 
statusCodes.get(i));
     }
   }
 }
diff --git a/coordinator/pom.xml b/coordinator/pom.xml
index 0f2589c7..ac6b2ec5 100644
--- a/coordinator/pom.xml
+++ b/coordinator/pom.xml
@@ -36,7 +36,10 @@
       <groupId>org.apache.uniffle</groupId>
       <artifactId>rss-common</artifactId>
     </dependency>
-
+    <dependency>
+      <groupId>org.apache.uniffle</groupId>
+      <artifactId>rss-internal-client</artifactId>
+    </dependency>
     <dependency>
       <groupId>com.google.protobuf</groupId>
       <artifactId>protobuf-java-util</artifactId>
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
index 5139b10b..fbd7e694 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
@@ -56,4 +56,10 @@ public interface ClusterManager extends Closeable, 
Reconfigurable {
    * @return whether to be ready for serving
    */
   boolean isReadyForServe();
+
+  ServerNode getServerNodeById(String serverId);
+
+  void decommission(String serverId);
+
+  void cancelDecommission(String serverId);
 }
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
index 6f2b9413..879dedff 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
@@ -197,6 +197,18 @@ public class CoordinatorConf extends RssBaseConf {
       .defaultValue(60 * 1000L)
       .withDescription("Update interval for the default number of submitted 
apps per user");
 
+  public static final ConfigOption<Long> 
COORDINATOR_NODES_CLIENT_CACHE_EXPIRED = ConfigOptions
+          .key("rss.coordinator.nodes.client.cache.expired")
+          .longType()
+          .defaultValue(120 * 1000L)
+          .withDescription("Expired time (ms) for the clients communicating 
with nodes.");
+
+  public static final ConfigOption<Integer> COORDINATOR_NODES_CLIENT_CACHE_MAX 
= ConfigOptions
+      .key("rss.coordinator.nodes.client.cache.max")
+      .intType()
+      .defaultValue(1000)
+      .withDescription("The max number of clients that communicating with 
nodes and storing in the cache.");
+
   public CoordinatorConf() {
   }
 
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorFactory.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorFactory.java
index b8f566ee..1a952d27 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorFactory.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorFactory.java
@@ -33,8 +33,10 @@ public class CoordinatorFactory {
   public ServerInterface getServer() {
     String type = conf.getString(CoordinatorConf.RPC_SERVER_TYPE);
     if (type.equals(ServerType.GRPC.name())) {
-      return new GrpcServer(conf, new 
CoordinatorGrpcService(coordinatorServer),
-          coordinatorServer.getGrpcMetrics());
+      return GrpcServer.Builder.newBuilder()
+          .conf(conf)
+          .grpcMetrics(coordinatorServer.getGrpcMetrics())
+          .addService(new CoordinatorGrpcService(coordinatorServer)).build();
     } else {
       throw new UnsupportedOperationException("Unsupported server type " + 
type);
     }
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
index 8e2b49d1..7d2b255b 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.ServerStatus;
 import org.apache.uniffle.common.storage.StorageInfoUtils;
 import org.apache.uniffle.coordinator.access.AccessCheckResult;
 import org.apache.uniffle.coordinator.access.AccessInfo;
@@ -367,6 +368,7 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
     if (request.hasIsHealthy()) {
       isHealthy = request.getIsHealthy().getValue();
     }
+    ServerStatus serverStatus = request.hasStatus() ? 
ServerStatus.fromProto(request.getStatus()) : ServerStatus.ACTIVE;
     return new ServerNode(request.getServerId().getId(),
         request.getServerId().getIp(),
         request.getServerId().getPort(),
@@ -376,6 +378,7 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
         request.getEventNumInFlush(),
         Sets.newHashSet(request.getTagsList()),
         isHealthy,
+        serverStatus,
         StorageInfoUtils.fromProto(request.getStorageInfoMap()));
   }
 }
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
index 069e5c79..5d2cfeba 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
@@ -22,6 +22,7 @@ import java.util.Set;
 
 import com.google.common.collect.Maps;
 
+import org.apache.uniffle.common.ServerStatus;
 import org.apache.uniffle.common.storage.StorageInfo;
 import org.apache.uniffle.proto.RssProtos.ShuffleServerId;
 
@@ -37,8 +38,10 @@ public class ServerNode implements Comparable<ServerNode> {
   private long timestamp;
   private Set<String> tags;
   private boolean isHealthy;
+  private final ServerStatus status;
   private Map<String, StorageInfo> storageInfo;
 
+  // Only for test
   public ServerNode(
       String id,
       String ip,
@@ -50,7 +53,7 @@ public class ServerNode implements Comparable<ServerNode> {
       Set<String> tags,
       boolean isHealthy) {
     this(id, ip, port, usedMemory, preAllocatedMemory, availableMemory, 
eventNumInFlush, tags, isHealthy,
-        Maps.newHashMap());
+        ServerStatus.ACTIVE, Maps.newHashMap());
   }
 
   public ServerNode(
@@ -63,6 +66,22 @@ public class ServerNode implements Comparable<ServerNode> {
       int eventNumInFlush,
       Set<String> tags,
       boolean isHealthy,
+      ServerStatus status) {
+    this(id, ip, port, usedMemory, preAllocatedMemory, availableMemory, 
eventNumInFlush, tags, isHealthy,
+        status, Maps.newHashMap());
+  }
+
+  public ServerNode(
+      String id,
+      String ip,
+      int port,
+      long usedMemory,
+      long preAllocatedMemory,
+      long availableMemory,
+      int eventNumInFlush,
+      Set<String> tags,
+      boolean isHealthy,
+      ServerStatus status,
       Map<String, StorageInfo> storageInfoMap) {
     this.id = id;
     this.ip = ip;
@@ -74,6 +93,7 @@ public class ServerNode implements Comparable<ServerNode> {
     this.timestamp = System.currentTimeMillis();
     this.tags = tags;
     this.isHealthy = isHealthy;
+    this.status = status;
     this.storageInfo = storageInfoMap;
   }
 
@@ -121,6 +141,10 @@ public class ServerNode implements Comparable<ServerNode> {
     return isHealthy;
   }
 
+  public ServerStatus getStatus() {
+    return status;
+  }
+
   public Map<String, StorageInfo> getStorageInfo() {
     return storageInfo;
   }
@@ -137,6 +161,7 @@ public class ServerNode implements Comparable<ServerNode> {
         + "], timestamp[" + timestamp
         + "], tags" + tags.toString() + ""
         + ", healthy[" + isHealthy
+        + ", status[" + status
         + "], storages[num=" + storageInfo.size() + "]";
 
   }
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
index dbe5df33..7715b543 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
@@ -26,6 +26,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -33,6 +34,8 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -44,7 +47,12 @@ import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.client.impl.grpc.ShuffleServerInternalGrpcClient;
+import org.apache.uniffle.client.request.RssCancelDecommissionRequest;
+import org.apache.uniffle.client.request.RssDecommissionRequest;
+import org.apache.uniffle.common.ServerStatus;
 import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.exception.InvalidRequestException;
 import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
 import org.apache.uniffle.common.util.ThreadUtils;
 import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
@@ -54,6 +62,7 @@ public class SimpleClusterManager implements ClusterManager {
   private static final Logger LOG = 
LoggerFactory.getLogger(SimpleClusterManager.class);
 
   private final Map<String, ServerNode> servers = Maps.newConcurrentMap();
+  private final Cache<ServerNode, ShuffleServerInternalGrpcClient> clientCache;
   private Set<String> excludeNodes = Sets.newConcurrentHashSet();
   // tag -> nodes
   private Map<String, Set<ServerNode>> tagToNodes = Maps.newConcurrentMap();
@@ -83,6 +92,7 @@ public class SimpleClusterManager implements ClusterManager {
     this.startupSilentPeriodDurationMs = 
conf.get(CoordinatorConf.COORDINATOR_START_SILENT_PERIOD_DURATION);
 
     periodicOutputIntervalTimes = 
conf.get(CoordinatorConf.COORDINATOR_NODES_PERIODIC_OUTPUT_INTERVAL_TIMES);
+
     scheduledExecutorService.scheduleAtFixedRate(
         this::nodesCheck, heartbeatTimeout / 3,
         heartbeatTimeout / 3, TimeUnit.MILLISECONDS);
@@ -97,7 +107,15 @@ public class SimpleClusterManager implements ClusterManager 
{
           () -> updateExcludeNodes(excludeNodesPath), updateNodesInterval, 
updateNodesInterval, TimeUnit.MILLISECONDS);
     }
 
+    long clientExpiredTime = 
conf.get(CoordinatorConf.COORDINATOR_NODES_CLIENT_CACHE_EXPIRED);
+    int maxClient = 
conf.get(CoordinatorConf.COORDINATOR_NODES_CLIENT_CACHE_MAX);
+    clientCache = CacheBuilder.newBuilder()
+        .expireAfterAccess(clientExpiredTime, TimeUnit.MILLISECONDS)
+        .maximumSize(maxClient)
+        .removalListener(notify -> ((ShuffleServerInternalGrpcClient) 
notify.getValue()).close())
+        .build();
     this.startTime = System.currentTimeMillis();
+
   }
 
   void nodesCheck() {
@@ -117,6 +135,7 @@ public class SimpleClusterManager implements ClusterManager 
{
       for (String serverId : deleteIds) {
         ServerNode sn = servers.remove(serverId);
         if (sn != null) {
+          clientCache.invalidate(sn);
           for (Set<ServerNode> nodesWithTag : tagToNodes.values()) {
             nodesWithTag.remove(sn);
           }
@@ -200,6 +219,9 @@ public class SimpleClusterManager implements ClusterManager 
{
   public List<ServerNode> getServerList(Set<String> requiredTags) {
     List<ServerNode> availableNodes = Lists.newArrayList();
     for (ServerNode node : servers.values()) {
+      if (!ServerStatus.ACTIVE.equals(node.getStatus())) {
+        continue;
+      }
       if (!excludeNodes.contains(node.getId())
           && node.getTags().containsAll(requiredTags)
           && node.isHealthy()) {
@@ -250,6 +272,36 @@ public class SimpleClusterManager implements 
ClusterManager {
     return readyForServe;
   }
 
+  @Override
+  public void decommission(String serverId) {
+    ServerNode serverNode = getServerNodeById(serverId);
+    getShuffleServerClient(serverNode).decommission(new 
RssDecommissionRequest());
+  }
+
+  @Override
+  public void cancelDecommission(String serverId) {
+    ServerNode serverNode = getServerNodeById(serverId);
+    getShuffleServerClient(serverNode).cancelDecommission(new 
RssCancelDecommissionRequest());
+  }
+
+  private ShuffleServerInternalGrpcClient getShuffleServerClient(ServerNode 
serverNode) {
+    try {
+      return clientCache.get(serverNode,
+              () -> new ShuffleServerInternalGrpcClient(serverNode.getIp(), 
serverNode.getPort()));
+    } catch (ExecutionException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public ServerNode getServerNodeById(String serverId) {
+    ServerNode serverNode = servers.get(serverId);
+    if (serverNode == null) {
+      throw new InvalidRequestException("Server Id [" + serverId + "] not 
found!");
+    }
+    return serverNode;
+  }
+
   @Override
   public void close() throws IOException {
     if (hadoopFileSystem != null) {
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/ServerNodeTest.java 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/ServerNodeTest.java
index 9c6e7a38..4f9c014d 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/ServerNodeTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/ServerNodeTest.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.junit.jupiter.api.Test;
 
+import org.apache.uniffle.common.ServerStatus;
 import org.apache.uniffle.common.storage.StorageInfo;
 import org.apache.uniffle.common.storage.StorageMedia;
 import org.apache.uniffle.common.storage.StorageStatus;
@@ -65,7 +66,8 @@ public class ServerNodeTest {
         60L,
         StorageStatus.NORMAL);
     localStorageInfo.put("/mnt", info);
-    ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 20, 10, tags, 
true, localStorageInfo);
+    ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 20, 10, tags,
+        true, ServerStatus.ACTIVE, localStorageInfo);
     assertEquals(1, sn2.getStorageInfo().size());
   }
 }
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
index 99abcee4..f5ca9b51 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
@@ -61,7 +61,11 @@ public class CoordinatorGrpcServerTest {
 
     GRPCMetrics grpcMetrics = new CoordinatorGrpcMetrics();
     grpcMetrics.register(new CollectorRegistry(true));
-    GrpcServer grpcServer = new GrpcServer(baseConf, new 
MockedCoordinatorGrpcService(), grpcMetrics);
+    GrpcServer grpcServer = GrpcServer.Builder.newBuilder()
+        .conf(baseConf)
+        .grpcMetrics(grpcMetrics)
+        .addService(new MockedCoordinatorGrpcService())
+        .build();
     grpcServer.start();
 
     // case1: test the single one connection metric
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerInternalGrpcTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerInternalGrpcTest.java
new file mode 100644
index 00000000..41cd6194
--- /dev/null
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerInternalGrpcTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Lists;
+import io.grpc.StatusRuntimeException;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
+import org.apache.uniffle.client.impl.grpc.ShuffleServerInternalGrpcClient;
+import org.apache.uniffle.client.request.RssCancelDecommissionRequest;
+import org.apache.uniffle.client.request.RssDecommissionRequest;
+import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
+import org.apache.uniffle.client.request.RssUnregisterShuffleRequest;
+import org.apache.uniffle.client.response.RssCancelDecommissionResponse;
+import org.apache.uniffle.client.response.RssDecommissionResponse;
+import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.common.ServerStatus;
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.server.ShuffleServer;
+import org.apache.uniffle.server.ShuffleServerConf;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class ShuffleServerInternalGrpcTest extends IntegrationTestBase {
+
+  private ShuffleServerGrpcClient shuffleServerClient;
+  private ShuffleServerInternalGrpcClient shuffleServerInternalClient;
+
+  @BeforeAll
+  public static void setupServers(@TempDir File tmpDir) throws Exception {
+    CoordinatorConf coordinatorConf = getCoordinatorConf();
+    coordinatorConf.setLong(CoordinatorConf.COORDINATOR_APP_EXPIRED, 2000);
+    createCoordinatorServer(coordinatorConf);
+    ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+    File dataDir1 = new File(tmpDir, "data1");
+    String basePath = dataDir1.getAbsolutePath();
+    shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, 
Arrays.asList(basePath));
+    
shuffleServerConf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 
5000L);
+    
shuffleServerConf.set(ShuffleServerConf.SERVER_DECOMMISSION_CHECK_INTERVAL, 
500L);
+    createShuffleServer(shuffleServerConf);
+    startServers();
+  }
+
+  @BeforeEach
+  public void createClient() {
+    shuffleServerClient = new ShuffleServerGrpcClient(LOCALHOST, 
SHUFFLE_SERVER_PORT);
+    shuffleServerInternalClient = new 
ShuffleServerInternalGrpcClient(LOCALHOST, SHUFFLE_SERVER_PORT);
+  }
+
+  @Test
+  public void decommissionTest() {
+    String appId = "decommissionTest";
+    int shuffleId = 0;
+    shuffleServerClient.registerShuffle(new RssRegisterShuffleRequest(appId, 
shuffleId,
+        Lists.newArrayList(new PartitionRange(0, 1)), ""));
+
+    ShuffleServer shuffleServer = shuffleServers.get(0);
+    RssDecommissionResponse response = 
shuffleServerInternalClient.decommission(new RssDecommissionRequest());
+    assertEquals(StatusCode.SUCCESS, response.getStatusCode());
+    assertEquals(ServerStatus.DECOMMISSIONING, 
shuffleServer.getServerStatus());
+    RssCancelDecommissionResponse cancelResponse =
+            shuffleServerInternalClient.cancelDecommission(new 
RssCancelDecommissionRequest());
+    assertEquals(StatusCode.SUCCESS, cancelResponse.getStatusCode());
+    assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus());
+
+    // Clean all apps, shuffle server will be shutdown right now.
+    shuffleServerClient.unregisterShuffle(new 
RssUnregisterShuffleRequest(appId, shuffleId));
+    response = shuffleServerInternalClient.decommission(new 
RssDecommissionRequest());
+    assertEquals(StatusCode.SUCCESS, response.getStatusCode());
+    assertEquals(ServerStatus.DECOMMISSIONING, 
shuffleServer.getServerStatus());
+    Awaitility.await().timeout(10, TimeUnit.SECONDS).until(() ->
+        !shuffleServer.isRunning());
+    // Server is already shutdown, so io exception should be thrown here.
+    assertThrows(StatusRuntimeException.class,
+        () -> shuffleServerInternalClient.cancelDecommission(new 
RssCancelDecommissionRequest()));
+  }
+
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/ServerStatus.java 
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerInternalClient.java
similarity index 58%
copy from common/src/main/java/org/apache/uniffle/common/ServerStatus.java
copy to 
internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerInternalClient.java
index 2cfdf6ba..b05448a6 100644
--- a/common/src/main/java/org/apache/uniffle/common/ServerStatus.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerInternalClient.java
@@ -15,21 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.uniffle.common;
+package org.apache.uniffle.client.api;
 
-public enum ServerStatus {
-  UNKNOWN(-1),
-  ACTIVE(0),
-  DECOMMISSIONING(1),
-  DECOMMISSIONED(2);
+import org.apache.uniffle.client.request.RssCancelDecommissionRequest;
+import org.apache.uniffle.client.request.RssDecommissionRequest;
+import org.apache.uniffle.client.response.RssCancelDecommissionResponse;
+import org.apache.uniffle.client.response.RssDecommissionResponse;
 
-  private final int code;
+public interface ShuffleServerInternalClient {
 
-  ServerStatus(int code) {
-    this.code = code;
-  }
+  RssDecommissionResponse decommission(RssDecommissionRequest request);
+
+  RssCancelDecommissionResponse 
cancelDecommission(RssCancelDecommissionRequest rssCancelDecommissionRequest);
+
+  void close();
 
-  public int code() {
-    return code;
-  }
 }
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
index 28412912..0f66a831 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
@@ -50,6 +50,7 @@ import 
org.apache.uniffle.client.response.RssGetShuffleAssignmentsResponse;
 import org.apache.uniffle.client.response.RssSendHeartBeatResponse;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.ServerStatus;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.rpc.StatusCode;
@@ -117,6 +118,7 @@ public class CoordinatorGrpcClient extends GrpcClient 
implements CoordinatorClie
       long timeout,
       Set<String> tags,
       boolean isHealthy,
+      ServerStatus serverStatus,
       Map<String, StorageInfo> storageInfo) {
     ShuffleServerId serverId =
         ShuffleServerId.newBuilder().setId(id).setIp(ip).setPort(port).build();
@@ -129,6 +131,7 @@ public class CoordinatorGrpcClient extends GrpcClient 
implements CoordinatorClie
             .setEventNumInFlush(eventNumInFlush)
             .addAllTags(tags)
             .setIsHealthy(BoolValue.newBuilder().setValue(isHealthy).build())
+            .setStatusValue(serverStatus.ordinal())
             .putAllStorageInfo(StorageInfoUtils.toProto(storageInfo))
             .build();
 
@@ -194,6 +197,7 @@ public class CoordinatorGrpcClient extends GrpcClient 
implements CoordinatorClie
         request.getTimeout(),
         request.getTags(),
         request.isHealthy(),
+        request.getServerStatus(),
         request.getStorageInfo());
 
     RssSendHeartBeatResponse response;
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerInternalGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerInternalGrpcClient.java
new file mode 100644
index 00000000..f00c5f7d
--- /dev/null
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerInternalGrpcClient.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.impl.grpc;
+
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.client.api.ShuffleServerInternalClient;
+import org.apache.uniffle.client.request.RssCancelDecommissionRequest;
+import org.apache.uniffle.client.request.RssDecommissionRequest;
+import org.apache.uniffle.client.response.RssCancelDecommissionResponse;
+import org.apache.uniffle.client.response.RssDecommissionResponse;
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.proto.RssProtos;
+import org.apache.uniffle.proto.ShuffleServerInternalGrpc;
+
+public class ShuffleServerInternalGrpcClient extends GrpcClient implements 
ShuffleServerInternalClient {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ShuffleServerInternalGrpcClient.class);
+  private static final long RPC_TIMEOUT_DEFAULT_MS = 60000;
+  private long rpcTimeout = RPC_TIMEOUT_DEFAULT_MS;
+  private ShuffleServerInternalGrpc.ShuffleServerInternalBlockingStub 
blockingStub;
+
+  public ShuffleServerInternalGrpcClient(String host, int port) {
+    this(host, port, 3);
+  }
+
+  public ShuffleServerInternalGrpcClient(String host, int port, int 
maxRetryAttempts) {
+    this(host, port, maxRetryAttempts, true);
+  }
+
+  public ShuffleServerInternalGrpcClient(String host, int port, int 
maxRetryAttempts, boolean usePlaintext) {
+    super(host, port, maxRetryAttempts, usePlaintext);
+    // todo Add ClientInterceptor for authentication
+    blockingStub = ShuffleServerInternalGrpc.newBlockingStub(channel);
+  }
+
+  public ShuffleServerInternalGrpc.ShuffleServerInternalBlockingStub 
getBlockingStub() {
+    return blockingStub.withDeadlineAfter(rpcTimeout, TimeUnit.MILLISECONDS);
+  }
+
+  @Override
+  public RssDecommissionResponse decommission(RssDecommissionRequest request) {
+    RssProtos.DecommissionRequest protoRequest =
+        RssProtos.DecommissionRequest.newBuilder().build();
+    RssProtos.DecommissionResponse rpcResponse = 
getBlockingStub().decommission(protoRequest);
+    return new RssDecommissionResponse(
+        StatusCode.fromProto(rpcResponse.getStatus()), 
rpcResponse.getRetMsg());
+  }
+
+  @Override
+  public RssCancelDecommissionResponse 
cancelDecommission(RssCancelDecommissionRequest rssCancelDecommissionRequest) {
+    RssProtos.CancelDecommissionRequest protoRequest =
+            RssProtos.CancelDecommissionRequest.newBuilder().build();
+    RssProtos.CancelDecommissionResponse rpcResponse = 
getBlockingStub().cancelDecommission(protoRequest);
+    return new RssCancelDecommissionResponse(
+            StatusCode.fromProto(rpcResponse.getStatus()), 
rpcResponse.getRetMsg());
+  }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/ServerStatus.java 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssCancelDecommissionRequest.java
similarity index 75%
copy from common/src/main/java/org/apache/uniffle/common/ServerStatus.java
copy to 
internal-client/src/main/java/org/apache/uniffle/client/request/RssCancelDecommissionRequest.java
index 2cfdf6ba..cc02b128 100644
--- a/common/src/main/java/org/apache/uniffle/common/ServerStatus.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssCancelDecommissionRequest.java
@@ -15,21 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.uniffle.common;
+package org.apache.uniffle.client.request;
 
-public enum ServerStatus {
-  UNKNOWN(-1),
-  ACTIVE(0),
-  DECOMMISSIONING(1),
-  DECOMMISSIONED(2);
-
-  private final int code;
-
-  ServerStatus(int code) {
-    this.code = code;
-  }
-
-  public int code() {
-    return code;
-  }
+public class RssCancelDecommissionRequest {
 }
diff --git a/common/src/main/java/org/apache/uniffle/common/ServerStatus.java 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssDecommissionRequest.java
similarity index 75%
copy from common/src/main/java/org/apache/uniffle/common/ServerStatus.java
copy to 
internal-client/src/main/java/org/apache/uniffle/client/request/RssDecommissionRequest.java
index 2cfdf6ba..710f38c3 100644
--- a/common/src/main/java/org/apache/uniffle/common/ServerStatus.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssDecommissionRequest.java
@@ -15,21 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.uniffle.common;
+package org.apache.uniffle.client.request;
 
-public enum ServerStatus {
-  UNKNOWN(-1),
-  ACTIVE(0),
-  DECOMMISSIONING(1),
-  DECOMMISSIONED(2);
-
-  private final int code;
-
-  ServerStatus(int code) {
-    this.code = code;
-  }
-
-  public int code() {
-    return code;
-  }
+public class RssDecommissionRequest {
 }
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
index 12a20f5e..9576075d 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
@@ -21,6 +21,7 @@ package org.apache.uniffle.client.request;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.uniffle.common.ServerStatus;
 import org.apache.uniffle.common.storage.StorageInfo;
 
 public class RssSendHeartBeatRequest {
@@ -35,6 +36,7 @@ public class RssSendHeartBeatRequest {
   private final Set<String> tags;
   private final long timeout;
   private final boolean isHealthy;
+  private final ServerStatus serverStatus;
   private final Map<String, StorageInfo> storageInfo;
 
   public RssSendHeartBeatRequest(
@@ -48,6 +50,7 @@ public class RssSendHeartBeatRequest {
       long timeout,
       Set<String> tags,
       boolean isHealthy,
+      ServerStatus serverStatus,
       Map<String, StorageInfo> storageInfo) {
     this.shuffleServerId = shuffleServerId;
     this.shuffleServerIp = shuffleServerIp;
@@ -59,6 +62,7 @@ public class RssSendHeartBeatRequest {
     this.tags = tags;
     this.timeout = timeout;
     this.isHealthy = isHealthy;
+    this.serverStatus = serverStatus;
     this.storageInfo = storageInfo;
   }
 
@@ -102,6 +106,10 @@ public class RssSendHeartBeatRequest {
     return isHealthy;
   }
 
+  public ServerStatus getServerStatus() {
+    return serverStatus;
+  }
+
   public Map<String, StorageInfo> getStorageInfo() {
     return storageInfo;
   }
diff --git a/common/src/main/java/org/apache/uniffle/common/ServerStatus.java 
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssCancelDecommissionResponse.java
similarity index 69%
copy from common/src/main/java/org/apache/uniffle/common/ServerStatus.java
copy to 
internal-client/src/main/java/org/apache/uniffle/client/response/RssCancelDecommissionResponse.java
index 2cfdf6ba..fb59dc9e 100644
--- a/common/src/main/java/org/apache/uniffle/common/ServerStatus.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssCancelDecommissionResponse.java
@@ -15,21 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.uniffle.common;
+package org.apache.uniffle.client.response;
 
-public enum ServerStatus {
-  UNKNOWN(-1),
-  ACTIVE(0),
-  DECOMMISSIONING(1),
-  DECOMMISSIONED(2);
+import org.apache.uniffle.common.rpc.StatusCode;
 
-  private final int code;
+public class RssCancelDecommissionResponse extends ClientResponse {
+  private String retMsg;
 
-  ServerStatus(int code) {
-    this.code = code;
+  public RssCancelDecommissionResponse(StatusCode statusCode, String retMsg) {
+    super(statusCode);
+    this.retMsg = retMsg;
   }
 
-  public int code() {
-    return code;
+  public String getRetMsg() {
+    return retMsg;
   }
 }
diff --git a/common/src/main/java/org/apache/uniffle/common/ServerStatus.java 
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssDecommissionResponse.java
similarity index 69%
copy from common/src/main/java/org/apache/uniffle/common/ServerStatus.java
copy to 
internal-client/src/main/java/org/apache/uniffle/client/response/RssDecommissionResponse.java
index 2cfdf6ba..c0d0ce9f 100644
--- a/common/src/main/java/org/apache/uniffle/common/ServerStatus.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssDecommissionResponse.java
@@ -15,21 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.uniffle.common;
+package org.apache.uniffle.client.response;
 
-public enum ServerStatus {
-  UNKNOWN(-1),
-  ACTIVE(0),
-  DECOMMISSIONING(1),
-  DECOMMISSIONED(2);
+import org.apache.uniffle.common.rpc.StatusCode;
 
-  private final int code;
+public class RssDecommissionResponse extends ClientResponse {
+  private String retMsg;
 
-  ServerStatus(int code) {
-    this.code = code;
+  public RssDecommissionResponse(StatusCode statusCode, String retMsg) {
+    super(statusCode);
+    this.retMsg = retMsg;
   }
 
-  public int code() {
-    return code;
+  public String getRetMsg() {
+    return retMsg;
   }
 }
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 0bff956e..0a9d03a8 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -234,6 +234,13 @@ message ShuffleCommitResponse {
   string retMsg = 3;
 }
 
+enum ServerStatus {
+  ACTIVE = 0;
+  DECOMMISSIONING = 1;
+  DECOMMISSIONED = 2;
+  // todo: more status, such as UPGRADING
+}
+
 message ShuffleServerHeartBeatRequest {
   ShuffleServerId serverId = 1;
   int64 usedMemory = 2;
@@ -242,6 +249,7 @@ message ShuffleServerHeartBeatRequest {
   int32 eventNumInFlush = 5;
   repeated string tags = 6;
   google.protobuf.BoolValue isHealthy = 7;
+  optional ServerStatus status = 8;
   map<string, StorageInfo> storageInfo = 21; // mount point to storage info 
mapping.
 }
 
@@ -272,6 +280,7 @@ enum StatusCode {
   INTERNAL_ERROR = 6;
   TIMEOUT = 7;
   ACCESS_DENIED = 8;
+  INVALID_REQUEST = 9;
   // add more status
 }
 
@@ -458,3 +467,24 @@ message FetchRemoteStorageResponse {
   StatusCode status = 1;
   RemoteStorage remoteStorage = 2;
 }
+
+service ShuffleServerInternal {
+  rpc decommission(DecommissionRequest) returns (DecommissionResponse);
+  rpc cancelDecommission(CancelDecommissionRequest) returns 
(CancelDecommissionResponse);
+}
+
+message DecommissionRequest {
+}
+
+message DecommissionResponse {
+  StatusCode status = 1;
+  string retMsg = 2;
+}
+
+message CancelDecommissionRequest {
+}
+
+message CancelDecommissionResponse {
+  StatusCode status = 1;
+  string retMsg = 2;
+}
\ No newline at end of file
diff --git 
a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java 
b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
index 4e8b93ec..b7883fd5 100644
--- a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
+++ b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
@@ -35,6 +35,7 @@ import org.apache.uniffle.client.api.CoordinatorClient;
 import org.apache.uniffle.client.factory.CoordinatorClientFactory;
 import org.apache.uniffle.client.request.RssSendHeartBeatRequest;
 import org.apache.uniffle.client.response.RssSendHeartBeatResponse;
+import org.apache.uniffle.common.ServerStatus;
 import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.common.storage.StorageInfo;
 import org.apache.uniffle.common.util.ThreadUtils;
@@ -83,6 +84,7 @@ public class RegisterHeartBeat {
             shuffleServer.getEventNumInFlush(),
             shuffleServer.getTags(),
             shuffleServer.isHealthy(),
+            shuffleServer.getServerStatus(),
             shuffleServer.getStorageManager().getStorageInfo());
       } catch (Exception e) {
         LOG.warn("Error happened when send heart beat to coordinator");
@@ -102,6 +104,7 @@ public class RegisterHeartBeat {
       int eventNumInFlush,
       Set<String> tags,
       boolean isHealthy,
+      ServerStatus serverStatus,
       Map<String, StorageInfo> localStorageInfo) {
     boolean sendSuccessfully = false;
     RssSendHeartBeatRequest request = new RssSendHeartBeatRequest(
@@ -115,6 +118,7 @@ public class RegisterHeartBeat {
         heartBeatTimeout,
         tags,
         isHealthy,
+        serverStatus,
         localStorageInfo);
     List<Future<RssSendHeartBeatResponse>> respFutures = coordinatorClients
         .stream()
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerFactory.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerFactory.java
index 24e70e39..8ba4adca 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerFactory.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerFactory.java
@@ -33,8 +33,13 @@ public class ShuffleServerFactory {
   public ServerInterface getServer() {
     String type = conf.getString(ShuffleServerConf.RPC_SERVER_TYPE);
     if (type.equals(ServerType.GRPC.name())) {
-      return new GrpcServer(conf, new ShuffleServerGrpcService(shuffleServer),
-          shuffleServer.getGrpcMetrics());
+      return GrpcServer.Builder.newBuilder()
+          .conf(conf)
+          .grpcMetrics(shuffleServer.getGrpcMetrics())
+          .addService(new ShuffleServerGrpcService(shuffleServer))
+          // todo: Add ServerInterceptor for authentication
+          .addService(new ShuffleServerInternalGrpcService(shuffleServer))
+          .build();
     } else {
       throw new UnsupportedOperationException("Unsupported server type " + 
type);
     }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerInternalGrpcService.java
 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerInternalGrpcService.java
new file mode 100644
index 00000000..3a50d085
--- /dev/null
+++ 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerInternalGrpcService.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import io.grpc.stub.StreamObserver;
+
+import org.apache.uniffle.common.exception.InvalidRequestException;
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.proto.RssProtos;
+import 
org.apache.uniffle.proto.ShuffleServerInternalGrpc.ShuffleServerInternalImplBase;
+
+public class ShuffleServerInternalGrpcService extends 
ShuffleServerInternalImplBase {
+  private final ShuffleServer shuffleServer;
+
+  public ShuffleServerInternalGrpcService(ShuffleServer shuffleServer) {
+    this.shuffleServer = shuffleServer;
+  }
+
+  @Override
+  public void decommission(
+      RssProtos.DecommissionRequest request,
+      StreamObserver<RssProtos.DecommissionResponse> responseObserver) {
+    RssProtos.DecommissionResponse response;
+    try {
+      shuffleServer.decommission();
+      response = RssProtos.DecommissionResponse
+          .newBuilder()
+          .setStatus(StatusCode.SUCCESS.toProto())
+          .build();
+    } catch (Exception e) {
+      StatusCode statusCode = StatusCode.INTERNAL_ERROR;
+      if (e instanceof InvalidRequestException) {
+        statusCode = StatusCode.INVALID_REQUEST;
+      }
+      response = RssProtos.DecommissionResponse
+          .newBuilder()
+          .setStatus(statusCode.toProto())
+          .setRetMsg(e.getMessage())
+          .build();
+    }
+    responseObserver.onNext(response);
+    responseObserver.onCompleted();
+  }
+
+  @Override
+  public void cancelDecommission(
+          RssProtos.CancelDecommissionRequest request,
+          StreamObserver<RssProtos.CancelDecommissionResponse> 
responseObserver) {
+    RssProtos.CancelDecommissionResponse response;
+    try {
+      shuffleServer.cancelDecommission();
+      response = RssProtos.CancelDecommissionResponse
+              .newBuilder()
+              .setStatus(StatusCode.SUCCESS.toProto())
+              .build();
+    } catch (Exception e) {
+      StatusCode statusCode = StatusCode.INTERNAL_ERROR;
+      if (e instanceof InvalidRequestException) {
+        statusCode = StatusCode.INVALID_REQUEST;
+      }
+      response = RssProtos.CancelDecommissionResponse
+              .newBuilder()
+              .setStatus(statusCode.toProto())
+              .setRetMsg(e.getMessage())
+              .build();
+    }
+    responseObserver.onNext(response);
+    responseObserver.onCompleted();
+  }
+}
diff --git 
a/server/src/test/java/org/apache/uniffle/server/MockedGrpcServer.java 
b/server/src/test/java/org/apache/uniffle/server/MockedGrpcServer.java
index fa450345..58de5382 100644
--- a/server/src/test/java/org/apache/uniffle/server/MockedGrpcServer.java
+++ b/server/src/test/java/org/apache/uniffle/server/MockedGrpcServer.java
@@ -17,6 +17,9 @@
 
 package org.apache.uniffle.server;
 
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.tuple.Pair;
+
 import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.metrics.GRPCMetrics;
 import org.apache.uniffle.common.rpc.GrpcServer;
@@ -26,7 +29,7 @@ public class MockedGrpcServer extends GrpcServer {
 
   public MockedGrpcServer(RssBaseConf conf, MockedShuffleServerGrpcService 
service,
                           GRPCMetrics grpcMetrics) {
-    super(conf, service, grpcMetrics);
+    super(conf, Lists.newArrayList(Pair.of(service, Lists.newArrayList())), 
grpcMetrics);
     this.service = service;
   }
 

Reply via email to