This is an automated email from the ASF dual-hosted git repository.
xianjingfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 1951376f [#80][Part-2] feat: Add RPC logic and heartbeat logic for
decommisson (#663)
1951376f is described below
commit 1951376f28fe4f91cbe051c6fa8600f52dede8e3
Author: xianjingfeng <[email protected]>
AuthorDate: Thu Mar 2 21:29:00 2023 +0800
[#80][Part-2] feat: Add RPC logic and heartbeat logic for decommisson (#663)
### What changes were proposed in this pull request?
Add RPC logic and heartbeat logic for decommisson
### Why are the changes needed?
Support shuffle server decommission. It is a part of #80
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
UT
---
.../org/apache/uniffle/common/ServerStatus.java | 26 +++++-
.../org/apache/uniffle/common/rpc/GrpcServer.java | 74 +++++++++++----
.../org/apache/uniffle/common/rpc/StatusCode.java | 21 ++++-
.../StatusCodeTest.java => ServerStatusTest.java} | 35 ++++---
.../apache/uniffle/common/rpc/StatusCodeTest.java | 23 +++--
coordinator/pom.xml | 5 +-
.../apache/uniffle/coordinator/ClusterManager.java | 6 ++
.../uniffle/coordinator/CoordinatorConf.java | 12 +++
.../uniffle/coordinator/CoordinatorFactory.java | 6 +-
.../coordinator/CoordinatorGrpcService.java | 3 +
.../org/apache/uniffle/coordinator/ServerNode.java | 27 +++++-
.../uniffle/coordinator/SimpleClusterManager.java | 52 +++++++++++
.../apache/uniffle/coordinator/ServerNodeTest.java | 4 +-
.../uniffle/test/CoordinatorGrpcServerTest.java | 6 +-
.../test/ShuffleServerInternalGrpcTest.java | 104 +++++++++++++++++++++
.../client/api/ShuffleServerInternalClient.java | 24 +++--
.../client/impl/grpc/CoordinatorGrpcClient.java | 4 +
.../impl/grpc/ShuffleServerInternalGrpcClient.java | 76 +++++++++++++++
.../request/RssCancelDecommissionRequest.java | 18 +---
.../client/request/RssDecommissionRequest.java | 18 +---
.../client/request/RssSendHeartBeatRequest.java | 8 ++
.../response/RssCancelDecommissionResponse.java | 20 ++--
.../client/response/RssDecommissionResponse.java | 20 ++--
proto/src/main/proto/Rss.proto | 30 ++++++
.../apache/uniffle/server/RegisterHeartBeat.java | 4 +
.../uniffle/server/ShuffleServerFactory.java | 9 +-
.../server/ShuffleServerInternalGrpcService.java | 85 +++++++++++++++++
.../apache/uniffle/server/MockedGrpcServer.java | 5 +-
28 files changed, 608 insertions(+), 117 deletions(-)
diff --git a/common/src/main/java/org/apache/uniffle/common/ServerStatus.java
b/common/src/main/java/org/apache/uniffle/common/ServerStatus.java
index 2cfdf6ba..bd23b560 100644
--- a/common/src/main/java/org/apache/uniffle/common/ServerStatus.java
+++ b/common/src/main/java/org/apache/uniffle/common/ServerStatus.java
@@ -17,12 +17,20 @@
package org.apache.uniffle.common;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.uniffle.proto.RssProtos;
+
public enum ServerStatus {
- UNKNOWN(-1),
ACTIVE(0),
DECOMMISSIONING(1),
- DECOMMISSIONED(2);
+ DECOMMISSIONED(2),
+ UNKNOWN(-1);
+ static final Map<Integer, ServerStatus> VALUE_MAP =
+
Arrays.stream(ServerStatus.values()).collect(Collectors.toMap(ServerStatus::code,
s -> s));
private final int code;
ServerStatus(int code) {
@@ -32,4 +40,18 @@ public enum ServerStatus {
public int code() {
return code;
}
+
+ public static ServerStatus fromCode(Integer code) {
+ ServerStatus serverStatus = VALUE_MAP.get(code);
+ return serverStatus == null ? UNKNOWN : serverStatus;
+ }
+
+ public RssProtos.ServerStatus toProto() {
+ RssProtos.ServerStatus serverStatus =
RssProtos.ServerStatus.forNumber(this.code());
+ return serverStatus == null ? RssProtos.ServerStatus.UNRECOGNIZED :
serverStatus;
+ }
+
+ public static ServerStatus fromProto(RssProtos.ServerStatus status) {
+ return fromCode(status.getNumber());
+ }
}
diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
index be4e916f..a9935412 100644
--- a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
+++ b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
@@ -18,6 +18,8 @@
package org.apache.uniffle.common.rpc;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
@@ -25,11 +27,14 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import io.grpc.BindableService;
import io.grpc.Server;
import io.grpc.ServerBuilder;
+import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
+import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +51,10 @@ public class GrpcServer implements ServerInterface {
private final int port;
private final ExecutorService pool;
- public GrpcServer(RssBaseConf conf, BindableService service, GRPCMetrics
grpcMetrics) {
+ protected GrpcServer(
+ RssBaseConf conf,
+ List<Pair<BindableService, List<ServerInterceptor>>>
servicesWithInterceptors,
+ GRPCMetrics grpcMetrics) {
this.port = conf.getInteger(RssBaseConf.RPC_SERVER_PORT);
long maxInboundMessageSize =
conf.getLong(RssBaseConf.RPC_MESSAGE_MAX_SIZE);
int rpcExecutorSize = conf.getInteger(RssBaseConf.RPC_EXECUTOR_SIZE);
@@ -61,23 +69,55 @@ public class GrpcServer implements ServerInterface {
);
boolean isMetricsEnabled =
conf.getBoolean(RssBaseConf.RPC_METRICS_ENABLED);
+ ServerBuilder<?> builder = ServerBuilder
+ .forPort(port)
+ .executor(pool)
+ .maxInboundMessageSize((int)maxInboundMessageSize);
if (isMetricsEnabled) {
- MonitoringServerInterceptor monitoringInterceptor =
- new MonitoringServerInterceptor(grpcMetrics);
- this.server = ServerBuilder
- .forPort(port)
- .addService(ServerInterceptors.intercept(service,
monitoringInterceptor))
- .executor(pool)
- .addTransportFilter(new MonitoringServerTransportFilter(grpcMetrics))
- .maxInboundMessageSize((int)maxInboundMessageSize)
- .build();
- } else {
- this.server = ServerBuilder
- .forPort(port)
- .addService(service)
- .executor(pool)
- .maxInboundMessageSize((int)maxInboundMessageSize)
- .build();
+ builder.addTransportFilter(new
MonitoringServerTransportFilter(grpcMetrics));
+ }
+ servicesWithInterceptors.forEach((serviceWithInterceptors) -> {
+ List<ServerInterceptor> interceptors =
serviceWithInterceptors.getRight();
+ if (isMetricsEnabled) {
+ MonitoringServerInterceptor monitoringInterceptor =
+ new MonitoringServerInterceptor(grpcMetrics);
+ List<ServerInterceptor> newInterceptors =
Lists.newArrayList(interceptors);
+ newInterceptors.add(monitoringInterceptor);
+ interceptors = newInterceptors;
+ }
+
builder.addService(ServerInterceptors.intercept(serviceWithInterceptors.getLeft(),
interceptors));
+ });
+ this.server = builder.build();
+ }
+
+ public static class Builder {
+
+ private RssBaseConf rssBaseConf;
+ private GRPCMetrics grpcMetrics;
+
+ private List<Pair<BindableService, List<ServerInterceptor>>>
servicesWithInterceptors = new ArrayList<>();
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public Builder conf(RssBaseConf rssBaseConf) {
+ this.rssBaseConf = rssBaseConf;
+ return this;
+ }
+
+ public Builder addService(BindableService bindableService,
ServerInterceptor... interceptors) {
+ this.servicesWithInterceptors.add(Pair.of(bindableService,
Lists.newArrayList(interceptors)));
+ return this;
+ }
+
+ public Builder grpcMetrics(GRPCMetrics metrics) {
+ this.grpcMetrics = metrics;
+ return this;
+ }
+
+ public GrpcServer build() {
+ return new GrpcServer(rssBaseConf, servicesWithInterceptors,
grpcMetrics);
}
}
diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/StatusCode.java
b/common/src/main/java/org/apache/uniffle/common/rpc/StatusCode.java
index 7157c351..eb2c7811 100644
--- a/common/src/main/java/org/apache/uniffle/common/rpc/StatusCode.java
+++ b/common/src/main/java/org/apache/uniffle/common/rpc/StatusCode.java
@@ -17,6 +17,10 @@
package org.apache.uniffle.common.rpc;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.stream.Collectors;
+
import org.apache.uniffle.proto.RssProtos;
public enum StatusCode {
@@ -28,8 +32,12 @@ public enum StatusCode {
NO_PARTITION(5),
INTERNAL_ERROR(6),
TIMEOUT(7),
- ACCESS_DENIED(8);
+ ACCESS_DENIED(8),
+ INVALID_REQUEST(9),
+ UNKNOWN(-1);
+ static final Map<Integer, StatusCode> VALUE_MAP =
+
Arrays.stream(StatusCode.values()).collect(Collectors.toMap(StatusCode::statusCode,
s -> s));
private final int statusCode;
StatusCode(int code) {
@@ -40,8 +48,17 @@ public enum StatusCode {
return statusCode;
}
+ public static StatusCode fromCode(Integer code) {
+ StatusCode statusCode = VALUE_MAP.get(code);
+ return statusCode == null ? UNKNOWN : statusCode;
+ }
+
public RssProtos.StatusCode toProto() {
RssProtos.StatusCode code =
RssProtos.StatusCode.forNumber(this.statusCode());
- return code == null ? RssProtos.StatusCode.INTERNAL_ERROR : code;
+ return code == null ? RssProtos.StatusCode.UNRECOGNIZED : code;
+ }
+
+ public static StatusCode fromProto(RssProtos.StatusCode status) {
+ return fromCode(status.getNumber());
}
}
diff --git
a/common/src/test/java/org/apache/uniffle/common/rpc/StatusCodeTest.java
b/common/src/test/java/org/apache/uniffle/common/ServerStatusTest.java
similarity index 51%
copy from common/src/test/java/org/apache/uniffle/common/rpc/StatusCodeTest.java
copy to common/src/test/java/org/apache/uniffle/common/ServerStatusTest.java
index 3c93d14b..e449215a 100644
--- a/common/src/test/java/org/apache/uniffle/common/rpc/StatusCodeTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/ServerStatusTest.java
@@ -15,7 +15,11 @@
* limitations under the License.
*/
-package org.apache.uniffle.common.rpc;
+package org.apache.uniffle.common;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;
@@ -24,31 +28,36 @@ import org.apache.uniffle.proto.RssProtos;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
-public class StatusCodeTest {
+public class ServerStatusTest {
@Test
public void test() throws Exception {
- RssProtos.StatusCode[] protoStatusCode = RssProtos.StatusCode.values();
- for (RssProtos.StatusCode statusCode : protoStatusCode) {
+ assertEquals(-1, ServerStatus.UNKNOWN.code());
+ assertEquals(ServerStatus.fromCode(-2), ServerStatus.UNKNOWN);
+ assertEquals(ServerStatus.fromCode(Integer.MAX_VALUE),
ServerStatus.UNKNOWN);
+ List<RssProtos.ServerStatus> protoServerStatuses =
Arrays.stream(RssProtos.ServerStatus.values())
+ .filter(s ->
!RssProtos.ServerStatus.UNRECOGNIZED.equals(s)).collect(Collectors.toList());
+
+ for (RssProtos.ServerStatus statusCode : protoServerStatuses) {
+
try {
- if (RssProtos.StatusCode.UNRECOGNIZED.equals(statusCode)) {
- continue;
- }
- StatusCode.valueOf(statusCode.name());
+ ServerStatus.valueOf(statusCode.name());
} catch (Exception e) {
fail(e.getMessage());
}
}
- StatusCode[] statusCodes = StatusCode.values();
- for (StatusCode statusCode : statusCodes) {
+ List<ServerStatus> serverStatuses = Arrays.stream(ServerStatus.values())
+ .filter(s ->
!ServerStatus.UNKNOWN.equals(s)).collect(Collectors.toList());
+ for (ServerStatus serverStatus : serverStatuses) {
try {
- RssProtos.StatusCode.valueOf(statusCode.name());
+ RssProtos.ServerStatus.valueOf(serverStatus.name());
} catch (Exception e) {
fail(e.getMessage());
}
}
- for (int i = 0; i < statusCodes.length; i++) {
- assertEquals(protoStatusCode[i], statusCodes[i].toProto());
+ for (int i = 0; i < serverStatuses.size() - 1; i++) {
+ assertEquals(protoServerStatuses.get(i),
serverStatuses.get(i).toProto());
+ assertEquals(ServerStatus.fromProto(protoServerStatuses.get(i)),
serverStatuses.get(i));
}
}
}
diff --git
a/common/src/test/java/org/apache/uniffle/common/rpc/StatusCodeTest.java
b/common/src/test/java/org/apache/uniffle/common/rpc/StatusCodeTest.java
index 3c93d14b..5956c1d3 100644
--- a/common/src/test/java/org/apache/uniffle/common/rpc/StatusCodeTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/rpc/StatusCodeTest.java
@@ -17,6 +17,10 @@
package org.apache.uniffle.common.rpc;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
import org.junit.jupiter.api.Test;
import org.apache.uniffle.proto.RssProtos;
@@ -28,18 +32,22 @@ public class StatusCodeTest {
@Test
public void test() throws Exception {
- RssProtos.StatusCode[] protoStatusCode = RssProtos.StatusCode.values();
+ assertEquals(-1, StatusCode.UNKNOWN.statusCode());
+ assertEquals(StatusCode.fromCode(-2), StatusCode.UNKNOWN);
+ assertEquals(StatusCode.fromCode(Integer.MAX_VALUE), StatusCode.UNKNOWN);
+ List<RssProtos.StatusCode> protoStatusCode =
Arrays.stream(RssProtos.StatusCode.values())
+ .filter(s ->
!RssProtos.StatusCode.UNRECOGNIZED.equals(s)).collect(Collectors.toList());
+
for (RssProtos.StatusCode statusCode : protoStatusCode) {
try {
- if (RssProtos.StatusCode.UNRECOGNIZED.equals(statusCode)) {
- continue;
- }
StatusCode.valueOf(statusCode.name());
} catch (Exception e) {
fail(e.getMessage());
}
}
- StatusCode[] statusCodes = StatusCode.values();
+ List<StatusCode> statusCodes = Arrays.stream(StatusCode.values())
+ .filter(s ->
!StatusCode.UNKNOWN.equals(s)).collect(Collectors.toList());
+
for (StatusCode statusCode : statusCodes) {
try {
RssProtos.StatusCode.valueOf(statusCode.name());
@@ -47,8 +55,9 @@ public class StatusCodeTest {
fail(e.getMessage());
}
}
- for (int i = 0; i < statusCodes.length; i++) {
- assertEquals(protoStatusCode[i], statusCodes[i].toProto());
+ for (int i = 0; i < statusCodes.size() - 1; i++) {
+ assertEquals(protoStatusCode.get(i), statusCodes.get(i).toProto());
+ assertEquals(StatusCode.fromProto(protoStatusCode.get(i)),
statusCodes.get(i));
}
}
}
diff --git a/coordinator/pom.xml b/coordinator/pom.xml
index 0f2589c7..ac6b2ec5 100644
--- a/coordinator/pom.xml
+++ b/coordinator/pom.xml
@@ -36,7 +36,10 @@
<groupId>org.apache.uniffle</groupId>
<artifactId>rss-common</artifactId>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.uniffle</groupId>
+ <artifactId>rss-internal-client</artifactId>
+ </dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
index 5139b10b..fbd7e694 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
@@ -56,4 +56,10 @@ public interface ClusterManager extends Closeable,
Reconfigurable {
* @return whether to be ready for serving
*/
boolean isReadyForServe();
+
+ ServerNode getServerNodeById(String serverId);
+
+ void decommission(String serverId);
+
+ void cancelDecommission(String serverId);
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
index 6f2b9413..879dedff 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
@@ -197,6 +197,18 @@ public class CoordinatorConf extends RssBaseConf {
.defaultValue(60 * 1000L)
.withDescription("Update interval for the default number of submitted
apps per user");
+ public static final ConfigOption<Long>
COORDINATOR_NODES_CLIENT_CACHE_EXPIRED = ConfigOptions
+ .key("rss.coordinator.nodes.client.cache.expired")
+ .longType()
+ .defaultValue(120 * 1000L)
+ .withDescription("Expired time (ms) for the clients communicating
with nodes.");
+
+ public static final ConfigOption<Integer> COORDINATOR_NODES_CLIENT_CACHE_MAX
= ConfigOptions
+ .key("rss.coordinator.nodes.client.cache.max")
+ .intType()
+ .defaultValue(1000)
+ .withDescription("The max number of clients that communicating with
nodes and storing in the cache.");
+
public CoordinatorConf() {
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorFactory.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorFactory.java
index b8f566ee..1a952d27 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorFactory.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorFactory.java
@@ -33,8 +33,10 @@ public class CoordinatorFactory {
public ServerInterface getServer() {
String type = conf.getString(CoordinatorConf.RPC_SERVER_TYPE);
if (type.equals(ServerType.GRPC.name())) {
- return new GrpcServer(conf, new
CoordinatorGrpcService(coordinatorServer),
- coordinatorServer.getGrpcMetrics());
+ return GrpcServer.Builder.newBuilder()
+ .conf(conf)
+ .grpcMetrics(coordinatorServer.getGrpcMetrics())
+ .addService(new CoordinatorGrpcService(coordinatorServer)).build();
} else {
throw new UnsupportedOperationException("Unsupported server type " +
type);
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
index 8e2b49d1..7d2b255b 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.storage.StorageInfoUtils;
import org.apache.uniffle.coordinator.access.AccessCheckResult;
import org.apache.uniffle.coordinator.access.AccessInfo;
@@ -367,6 +368,7 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
if (request.hasIsHealthy()) {
isHealthy = request.getIsHealthy().getValue();
}
+ ServerStatus serverStatus = request.hasStatus() ?
ServerStatus.fromProto(request.getStatus()) : ServerStatus.ACTIVE;
return new ServerNode(request.getServerId().getId(),
request.getServerId().getIp(),
request.getServerId().getPort(),
@@ -376,6 +378,7 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
request.getEventNumInFlush(),
Sets.newHashSet(request.getTagsList()),
isHealthy,
+ serverStatus,
StorageInfoUtils.fromProto(request.getStorageInfoMap()));
}
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
index 069e5c79..5d2cfeba 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
@@ -22,6 +22,7 @@ import java.util.Set;
import com.google.common.collect.Maps;
+import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.storage.StorageInfo;
import org.apache.uniffle.proto.RssProtos.ShuffleServerId;
@@ -37,8 +38,10 @@ public class ServerNode implements Comparable<ServerNode> {
private long timestamp;
private Set<String> tags;
private boolean isHealthy;
+ private final ServerStatus status;
private Map<String, StorageInfo> storageInfo;
+ // Only for test
public ServerNode(
String id,
String ip,
@@ -50,7 +53,7 @@ public class ServerNode implements Comparable<ServerNode> {
Set<String> tags,
boolean isHealthy) {
this(id, ip, port, usedMemory, preAllocatedMemory, availableMemory,
eventNumInFlush, tags, isHealthy,
- Maps.newHashMap());
+ ServerStatus.ACTIVE, Maps.newHashMap());
}
public ServerNode(
@@ -63,6 +66,22 @@ public class ServerNode implements Comparable<ServerNode> {
int eventNumInFlush,
Set<String> tags,
boolean isHealthy,
+ ServerStatus status) {
+ this(id, ip, port, usedMemory, preAllocatedMemory, availableMemory,
eventNumInFlush, tags, isHealthy,
+ status, Maps.newHashMap());
+ }
+
+ public ServerNode(
+ String id,
+ String ip,
+ int port,
+ long usedMemory,
+ long preAllocatedMemory,
+ long availableMemory,
+ int eventNumInFlush,
+ Set<String> tags,
+ boolean isHealthy,
+ ServerStatus status,
Map<String, StorageInfo> storageInfoMap) {
this.id = id;
this.ip = ip;
@@ -74,6 +93,7 @@ public class ServerNode implements Comparable<ServerNode> {
this.timestamp = System.currentTimeMillis();
this.tags = tags;
this.isHealthy = isHealthy;
+ this.status = status;
this.storageInfo = storageInfoMap;
}
@@ -121,6 +141,10 @@ public class ServerNode implements Comparable<ServerNode> {
return isHealthy;
}
+ public ServerStatus getStatus() {
+ return status;
+ }
+
public Map<String, StorageInfo> getStorageInfo() {
return storageInfo;
}
@@ -137,6 +161,7 @@ public class ServerNode implements Comparable<ServerNode> {
+ "], timestamp[" + timestamp
+ "], tags" + tags.toString() + ""
+ ", healthy[" + isHealthy
+ + ", status[" + status
+ "], storages[num=" + storageInfo.size() + "]";
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
index dbe5df33..7715b543 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
@@ -26,6 +26,7 @@ import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -33,6 +34,8 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -44,7 +47,12 @@ import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.client.impl.grpc.ShuffleServerInternalGrpcClient;
+import org.apache.uniffle.client.request.RssCancelDecommissionRequest;
+import org.apache.uniffle.client.request.RssDecommissionRequest;
+import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.exception.InvalidRequestException;
import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
@@ -54,6 +62,7 @@ public class SimpleClusterManager implements ClusterManager {
private static final Logger LOG =
LoggerFactory.getLogger(SimpleClusterManager.class);
private final Map<String, ServerNode> servers = Maps.newConcurrentMap();
+ private final Cache<ServerNode, ShuffleServerInternalGrpcClient> clientCache;
private Set<String> excludeNodes = Sets.newConcurrentHashSet();
// tag -> nodes
private Map<String, Set<ServerNode>> tagToNodes = Maps.newConcurrentMap();
@@ -83,6 +92,7 @@ public class SimpleClusterManager implements ClusterManager {
this.startupSilentPeriodDurationMs =
conf.get(CoordinatorConf.COORDINATOR_START_SILENT_PERIOD_DURATION);
periodicOutputIntervalTimes =
conf.get(CoordinatorConf.COORDINATOR_NODES_PERIODIC_OUTPUT_INTERVAL_TIMES);
+
scheduledExecutorService.scheduleAtFixedRate(
this::nodesCheck, heartbeatTimeout / 3,
heartbeatTimeout / 3, TimeUnit.MILLISECONDS);
@@ -97,7 +107,15 @@ public class SimpleClusterManager implements ClusterManager
{
() -> updateExcludeNodes(excludeNodesPath), updateNodesInterval,
updateNodesInterval, TimeUnit.MILLISECONDS);
}
+ long clientExpiredTime =
conf.get(CoordinatorConf.COORDINATOR_NODES_CLIENT_CACHE_EXPIRED);
+ int maxClient =
conf.get(CoordinatorConf.COORDINATOR_NODES_CLIENT_CACHE_MAX);
+ clientCache = CacheBuilder.newBuilder()
+ .expireAfterAccess(clientExpiredTime, TimeUnit.MILLISECONDS)
+ .maximumSize(maxClient)
+ .removalListener(notify -> ((ShuffleServerInternalGrpcClient)
notify.getValue()).close())
+ .build();
this.startTime = System.currentTimeMillis();
+
}
void nodesCheck() {
@@ -117,6 +135,7 @@ public class SimpleClusterManager implements ClusterManager
{
for (String serverId : deleteIds) {
ServerNode sn = servers.remove(serverId);
if (sn != null) {
+ clientCache.invalidate(sn);
for (Set<ServerNode> nodesWithTag : tagToNodes.values()) {
nodesWithTag.remove(sn);
}
@@ -200,6 +219,9 @@ public class SimpleClusterManager implements ClusterManager
{
public List<ServerNode> getServerList(Set<String> requiredTags) {
List<ServerNode> availableNodes = Lists.newArrayList();
for (ServerNode node : servers.values()) {
+ if (!ServerStatus.ACTIVE.equals(node.getStatus())) {
+ continue;
+ }
if (!excludeNodes.contains(node.getId())
&& node.getTags().containsAll(requiredTags)
&& node.isHealthy()) {
@@ -250,6 +272,36 @@ public class SimpleClusterManager implements
ClusterManager {
return readyForServe;
}
+ @Override
+ public void decommission(String serverId) {
+ ServerNode serverNode = getServerNodeById(serverId);
+ getShuffleServerClient(serverNode).decommission(new
RssDecommissionRequest());
+ }
+
+ @Override
+ public void cancelDecommission(String serverId) {
+ ServerNode serverNode = getServerNodeById(serverId);
+ getShuffleServerClient(serverNode).cancelDecommission(new
RssCancelDecommissionRequest());
+ }
+
+ private ShuffleServerInternalGrpcClient getShuffleServerClient(ServerNode
serverNode) {
+ try {
+ return clientCache.get(serverNode,
+ () -> new ShuffleServerInternalGrpcClient(serverNode.getIp(),
serverNode.getPort()));
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public ServerNode getServerNodeById(String serverId) {
+ ServerNode serverNode = servers.get(serverId);
+ if (serverNode == null) {
+ throw new InvalidRequestException("Server Id [" + serverId + "] not
found!");
+ }
+ return serverNode;
+ }
+
@Override
public void close() throws IOException {
if (hadoopFileSystem != null) {
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/ServerNodeTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/ServerNodeTest.java
index 9c6e7a38..4f9c014d 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/ServerNodeTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/ServerNodeTest.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.junit.jupiter.api.Test;
+import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.storage.StorageInfo;
import org.apache.uniffle.common.storage.StorageMedia;
import org.apache.uniffle.common.storage.StorageStatus;
@@ -65,7 +66,8 @@ public class ServerNodeTest {
60L,
StorageStatus.NORMAL);
localStorageInfo.put("/mnt", info);
- ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 20, 10, tags,
true, localStorageInfo);
+ ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 20, 10, tags,
+ true, ServerStatus.ACTIVE, localStorageInfo);
assertEquals(1, sn2.getStorageInfo().size());
}
}
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
index 99abcee4..f5ca9b51 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcServerTest.java
@@ -61,7 +61,11 @@ public class CoordinatorGrpcServerTest {
GRPCMetrics grpcMetrics = new CoordinatorGrpcMetrics();
grpcMetrics.register(new CollectorRegistry(true));
- GrpcServer grpcServer = new GrpcServer(baseConf, new
MockedCoordinatorGrpcService(), grpcMetrics);
+ GrpcServer grpcServer = GrpcServer.Builder.newBuilder()
+ .conf(baseConf)
+ .grpcMetrics(grpcMetrics)
+ .addService(new MockedCoordinatorGrpcService())
+ .build();
grpcServer.start();
// case1: test the single one connection metric
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerInternalGrpcTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerInternalGrpcTest.java
new file mode 100644
index 00000000..41cd6194
--- /dev/null
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerInternalGrpcTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.test;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Lists;
+import io.grpc.StatusRuntimeException;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
+import org.apache.uniffle.client.impl.grpc.ShuffleServerInternalGrpcClient;
+import org.apache.uniffle.client.request.RssCancelDecommissionRequest;
+import org.apache.uniffle.client.request.RssDecommissionRequest;
+import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
+import org.apache.uniffle.client.request.RssUnregisterShuffleRequest;
+import org.apache.uniffle.client.response.RssCancelDecommissionResponse;
+import org.apache.uniffle.client.response.RssDecommissionResponse;
+import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.common.ServerStatus;
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.server.ShuffleServer;
+import org.apache.uniffle.server.ShuffleServerConf;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class ShuffleServerInternalGrpcTest extends IntegrationTestBase {
+
+ private ShuffleServerGrpcClient shuffleServerClient;
+ private ShuffleServerInternalGrpcClient shuffleServerInternalClient;
+
+ @BeforeAll
+ public static void setupServers(@TempDir File tmpDir) throws Exception {
+ CoordinatorConf coordinatorConf = getCoordinatorConf();
+ coordinatorConf.setLong(CoordinatorConf.COORDINATOR_APP_EXPIRED, 2000);
+ createCoordinatorServer(coordinatorConf);
+ ShuffleServerConf shuffleServerConf = getShuffleServerConf();
+ File dataDir1 = new File(tmpDir, "data1");
+ String basePath = dataDir1.getAbsolutePath();
+ shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH,
Arrays.asList(basePath));
+
shuffleServerConf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT,
5000L);
+
shuffleServerConf.set(ShuffleServerConf.SERVER_DECOMMISSION_CHECK_INTERVAL,
500L);
+ createShuffleServer(shuffleServerConf);
+ startServers();
+ }
+
+ @BeforeEach
+ public void createClient() {
+ shuffleServerClient = new ShuffleServerGrpcClient(LOCALHOST,
SHUFFLE_SERVER_PORT);
+ shuffleServerInternalClient = new
ShuffleServerInternalGrpcClient(LOCALHOST, SHUFFLE_SERVER_PORT);
+ }
+
+ @Test
+ public void decommissionTest() {
+ String appId = "decommissionTest";
+ int shuffleId = 0;
+ shuffleServerClient.registerShuffle(new RssRegisterShuffleRequest(appId,
shuffleId,
+ Lists.newArrayList(new PartitionRange(0, 1)), ""));
+
+ ShuffleServer shuffleServer = shuffleServers.get(0);
+ RssDecommissionResponse response =
shuffleServerInternalClient.decommission(new RssDecommissionRequest());
+ assertEquals(StatusCode.SUCCESS, response.getStatusCode());
+ assertEquals(ServerStatus.DECOMMISSIONING,
shuffleServer.getServerStatus());
+ RssCancelDecommissionResponse cancelResponse =
+ shuffleServerInternalClient.cancelDecommission(new
RssCancelDecommissionRequest());
+ assertEquals(StatusCode.SUCCESS, cancelResponse.getStatusCode());
+ assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus());
+
+ // Clean all apps, shuffle server will be shutdown right now.
+ shuffleServerClient.unregisterShuffle(new
RssUnregisterShuffleRequest(appId, shuffleId));
+ response = shuffleServerInternalClient.decommission(new
RssDecommissionRequest());
+ assertEquals(StatusCode.SUCCESS, response.getStatusCode());
+ assertEquals(ServerStatus.DECOMMISSIONING,
shuffleServer.getServerStatus());
+ Awaitility.await().timeout(10, TimeUnit.SECONDS).until(() ->
+ !shuffleServer.isRunning());
+ // Server is already shutdown, so io exception should be thrown here.
+ assertThrows(StatusRuntimeException.class,
+ () -> shuffleServerInternalClient.cancelDecommission(new
RssCancelDecommissionRequest()));
+ }
+
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/ServerStatus.java
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerInternalClient.java
similarity index 58%
copy from common/src/main/java/org/apache/uniffle/common/ServerStatus.java
copy to
internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerInternalClient.java
index 2cfdf6ba..b05448a6 100644
--- a/common/src/main/java/org/apache/uniffle/common/ServerStatus.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerInternalClient.java
@@ -15,21 +15,19 @@
* limitations under the License.
*/
-package org.apache.uniffle.common;
+package org.apache.uniffle.client.api;
-public enum ServerStatus {
- UNKNOWN(-1),
- ACTIVE(0),
- DECOMMISSIONING(1),
- DECOMMISSIONED(2);
+import org.apache.uniffle.client.request.RssCancelDecommissionRequest;
+import org.apache.uniffle.client.request.RssDecommissionRequest;
+import org.apache.uniffle.client.response.RssCancelDecommissionResponse;
+import org.apache.uniffle.client.response.RssDecommissionResponse;
- private final int code;
+public interface ShuffleServerInternalClient {
- ServerStatus(int code) {
- this.code = code;
- }
+ RssDecommissionResponse decommission(RssDecommissionRequest request);
+
+ RssCancelDecommissionResponse
cancelDecommission(RssCancelDecommissionRequest rssCancelDecommissionRequest);
+
+ void close();
- public int code() {
- return code;
- }
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
index 28412912..0f66a831 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
@@ -50,6 +50,7 @@ import
org.apache.uniffle.client.response.RssGetShuffleAssignmentsResponse;
import org.apache.uniffle.client.response.RssSendHeartBeatResponse;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.rpc.StatusCode;
@@ -117,6 +118,7 @@ public class CoordinatorGrpcClient extends GrpcClient
implements CoordinatorClie
long timeout,
Set<String> tags,
boolean isHealthy,
+ ServerStatus serverStatus,
Map<String, StorageInfo> storageInfo) {
ShuffleServerId serverId =
ShuffleServerId.newBuilder().setId(id).setIp(ip).setPort(port).build();
@@ -129,6 +131,7 @@ public class CoordinatorGrpcClient extends GrpcClient
implements CoordinatorClie
.setEventNumInFlush(eventNumInFlush)
.addAllTags(tags)
.setIsHealthy(BoolValue.newBuilder().setValue(isHealthy).build())
+ .setStatusValue(serverStatus.ordinal())
.putAllStorageInfo(StorageInfoUtils.toProto(storageInfo))
.build();
@@ -194,6 +197,7 @@ public class CoordinatorGrpcClient extends GrpcClient
implements CoordinatorClie
request.getTimeout(),
request.getTags(),
request.isHealthy(),
+ request.getServerStatus(),
request.getStorageInfo());
RssSendHeartBeatResponse response;
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerInternalGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerInternalGrpcClient.java
new file mode 100644
index 00000000..f00c5f7d
--- /dev/null
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerInternalGrpcClient.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.impl.grpc;
+
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.client.api.ShuffleServerInternalClient;
+import org.apache.uniffle.client.request.RssCancelDecommissionRequest;
+import org.apache.uniffle.client.request.RssDecommissionRequest;
+import org.apache.uniffle.client.response.RssCancelDecommissionResponse;
+import org.apache.uniffle.client.response.RssDecommissionResponse;
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.proto.RssProtos;
+import org.apache.uniffle.proto.ShuffleServerInternalGrpc;
+
+public class ShuffleServerInternalGrpcClient extends GrpcClient implements
ShuffleServerInternalClient {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ShuffleServerInternalGrpcClient.class);
+ private static final long RPC_TIMEOUT_DEFAULT_MS = 60000;
+ private long rpcTimeout = RPC_TIMEOUT_DEFAULT_MS;
+ private ShuffleServerInternalGrpc.ShuffleServerInternalBlockingStub
blockingStub;
+
+ public ShuffleServerInternalGrpcClient(String host, int port) {
+ this(host, port, 3);
+ }
+
+ public ShuffleServerInternalGrpcClient(String host, int port, int
maxRetryAttempts) {
+ this(host, port, maxRetryAttempts, true);
+ }
+
+ public ShuffleServerInternalGrpcClient(String host, int port, int
maxRetryAttempts, boolean usePlaintext) {
+ super(host, port, maxRetryAttempts, usePlaintext);
+ // todo Add ClientInterceptor for authentication
+ blockingStub = ShuffleServerInternalGrpc.newBlockingStub(channel);
+ }
+
+ public ShuffleServerInternalGrpc.ShuffleServerInternalBlockingStub
getBlockingStub() {
+ return blockingStub.withDeadlineAfter(rpcTimeout, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public RssDecommissionResponse decommission(RssDecommissionRequest request) {
+ RssProtos.DecommissionRequest protoRequest =
+ RssProtos.DecommissionRequest.newBuilder().build();
+ RssProtos.DecommissionResponse rpcResponse =
getBlockingStub().decommission(protoRequest);
+ return new RssDecommissionResponse(
+ StatusCode.fromProto(rpcResponse.getStatus()),
rpcResponse.getRetMsg());
+ }
+
+ @Override
+ public RssCancelDecommissionResponse
cancelDecommission(RssCancelDecommissionRequest rssCancelDecommissionRequest) {
+ RssProtos.CancelDecommissionRequest protoRequest =
+ RssProtos.CancelDecommissionRequest.newBuilder().build();
+ RssProtos.CancelDecommissionResponse rpcResponse =
getBlockingStub().cancelDecommission(protoRequest);
+ return new RssCancelDecommissionResponse(
+ StatusCode.fromProto(rpcResponse.getStatus()),
rpcResponse.getRetMsg());
+ }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/ServerStatus.java
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssCancelDecommissionRequest.java
similarity index 75%
copy from common/src/main/java/org/apache/uniffle/common/ServerStatus.java
copy to
internal-client/src/main/java/org/apache/uniffle/client/request/RssCancelDecommissionRequest.java
index 2cfdf6ba..cc02b128 100644
--- a/common/src/main/java/org/apache/uniffle/common/ServerStatus.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssCancelDecommissionRequest.java
@@ -15,21 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.common;
+package org.apache.uniffle.client.request;
-public enum ServerStatus {
- UNKNOWN(-1),
- ACTIVE(0),
- DECOMMISSIONING(1),
- DECOMMISSIONED(2);
-
- private final int code;
-
- ServerStatus(int code) {
- this.code = code;
- }
-
- public int code() {
- return code;
- }
+public class RssCancelDecommissionRequest {
}
diff --git a/common/src/main/java/org/apache/uniffle/common/ServerStatus.java
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssDecommissionRequest.java
similarity index 75%
copy from common/src/main/java/org/apache/uniffle/common/ServerStatus.java
copy to
internal-client/src/main/java/org/apache/uniffle/client/request/RssDecommissionRequest.java
index 2cfdf6ba..710f38c3 100644
--- a/common/src/main/java/org/apache/uniffle/common/ServerStatus.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssDecommissionRequest.java
@@ -15,21 +15,7 @@
* limitations under the License.
*/
-package org.apache.uniffle.common;
+package org.apache.uniffle.client.request;
-public enum ServerStatus {
- UNKNOWN(-1),
- ACTIVE(0),
- DECOMMISSIONING(1),
- DECOMMISSIONED(2);
-
- private final int code;
-
- ServerStatus(int code) {
- this.code = code;
- }
-
- public int code() {
- return code;
- }
+public class RssDecommissionRequest {
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
index 12a20f5e..9576075d 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
@@ -21,6 +21,7 @@ package org.apache.uniffle.client.request;
import java.util.Map;
import java.util.Set;
+import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.storage.StorageInfo;
public class RssSendHeartBeatRequest {
@@ -35,6 +36,7 @@ public class RssSendHeartBeatRequest {
private final Set<String> tags;
private final long timeout;
private final boolean isHealthy;
+ private final ServerStatus serverStatus;
private final Map<String, StorageInfo> storageInfo;
public RssSendHeartBeatRequest(
@@ -48,6 +50,7 @@ public class RssSendHeartBeatRequest {
long timeout,
Set<String> tags,
boolean isHealthy,
+ ServerStatus serverStatus,
Map<String, StorageInfo> storageInfo) {
this.shuffleServerId = shuffleServerId;
this.shuffleServerIp = shuffleServerIp;
@@ -59,6 +62,7 @@ public class RssSendHeartBeatRequest {
this.tags = tags;
this.timeout = timeout;
this.isHealthy = isHealthy;
+ this.serverStatus = serverStatus;
this.storageInfo = storageInfo;
}
@@ -102,6 +106,10 @@ public class RssSendHeartBeatRequest {
return isHealthy;
}
+ public ServerStatus getServerStatus() {
+ return serverStatus;
+ }
+
public Map<String, StorageInfo> getStorageInfo() {
return storageInfo;
}
diff --git a/common/src/main/java/org/apache/uniffle/common/ServerStatus.java
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssCancelDecommissionResponse.java
similarity index 69%
copy from common/src/main/java/org/apache/uniffle/common/ServerStatus.java
copy to
internal-client/src/main/java/org/apache/uniffle/client/response/RssCancelDecommissionResponse.java
index 2cfdf6ba..fb59dc9e 100644
--- a/common/src/main/java/org/apache/uniffle/common/ServerStatus.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssCancelDecommissionResponse.java
@@ -15,21 +15,19 @@
* limitations under the License.
*/
-package org.apache.uniffle.common;
+package org.apache.uniffle.client.response;
-public enum ServerStatus {
- UNKNOWN(-1),
- ACTIVE(0),
- DECOMMISSIONING(1),
- DECOMMISSIONED(2);
+import org.apache.uniffle.common.rpc.StatusCode;
- private final int code;
+public class RssCancelDecommissionResponse extends ClientResponse {
+ private String retMsg;
- ServerStatus(int code) {
- this.code = code;
+ public RssCancelDecommissionResponse(StatusCode statusCode, String retMsg) {
+ super(statusCode);
+ this.retMsg = retMsg;
}
- public int code() {
- return code;
+ public String getRetMsg() {
+ return retMsg;
}
}
diff --git a/common/src/main/java/org/apache/uniffle/common/ServerStatus.java
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssDecommissionResponse.java
similarity index 69%
copy from common/src/main/java/org/apache/uniffle/common/ServerStatus.java
copy to
internal-client/src/main/java/org/apache/uniffle/client/response/RssDecommissionResponse.java
index 2cfdf6ba..c0d0ce9f 100644
--- a/common/src/main/java/org/apache/uniffle/common/ServerStatus.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/response/RssDecommissionResponse.java
@@ -15,21 +15,19 @@
* limitations under the License.
*/
-package org.apache.uniffle.common;
+package org.apache.uniffle.client.response;
-public enum ServerStatus {
- UNKNOWN(-1),
- ACTIVE(0),
- DECOMMISSIONING(1),
- DECOMMISSIONED(2);
+import org.apache.uniffle.common.rpc.StatusCode;
- private final int code;
+public class RssDecommissionResponse extends ClientResponse {
+ private String retMsg;
- ServerStatus(int code) {
- this.code = code;
+ public RssDecommissionResponse(StatusCode statusCode, String retMsg) {
+ super(statusCode);
+ this.retMsg = retMsg;
}
- public int code() {
- return code;
+ public String getRetMsg() {
+ return retMsg;
}
}
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 0bff956e..0a9d03a8 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -234,6 +234,13 @@ message ShuffleCommitResponse {
string retMsg = 3;
}
+enum ServerStatus {
+ ACTIVE = 0;
+ DECOMMISSIONING = 1;
+ DECOMMISSIONED = 2;
+ // todo: more status, such as UPGRADING
+}
+
message ShuffleServerHeartBeatRequest {
ShuffleServerId serverId = 1;
int64 usedMemory = 2;
@@ -242,6 +249,7 @@ message ShuffleServerHeartBeatRequest {
int32 eventNumInFlush = 5;
repeated string tags = 6;
google.protobuf.BoolValue isHealthy = 7;
+ optional ServerStatus status = 8;
map<string, StorageInfo> storageInfo = 21; // mount point to storage info
mapping.
}
@@ -272,6 +280,7 @@ enum StatusCode {
INTERNAL_ERROR = 6;
TIMEOUT = 7;
ACCESS_DENIED = 8;
+ INVALID_REQUEST = 9;
// add more status
}
@@ -458,3 +467,24 @@ message FetchRemoteStorageResponse {
StatusCode status = 1;
RemoteStorage remoteStorage = 2;
}
+
+service ShuffleServerInternal {
+ rpc decommission(DecommissionRequest) returns (DecommissionResponse);
+ rpc cancelDecommission(CancelDecommissionRequest) returns
(CancelDecommissionResponse);
+}
+
+message DecommissionRequest {
+}
+
+message DecommissionResponse {
+ StatusCode status = 1;
+ string retMsg = 2;
+}
+
+message CancelDecommissionRequest {
+}
+
+message CancelDecommissionResponse {
+ StatusCode status = 1;
+ string retMsg = 2;
+}
\ No newline at end of file
diff --git
a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
index 4e8b93ec..b7883fd5 100644
--- a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
+++ b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
@@ -35,6 +35,7 @@ import org.apache.uniffle.client.api.CoordinatorClient;
import org.apache.uniffle.client.factory.CoordinatorClientFactory;
import org.apache.uniffle.client.request.RssSendHeartBeatRequest;
import org.apache.uniffle.client.response.RssSendHeartBeatResponse;
+import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.storage.StorageInfo;
import org.apache.uniffle.common.util.ThreadUtils;
@@ -83,6 +84,7 @@ public class RegisterHeartBeat {
shuffleServer.getEventNumInFlush(),
shuffleServer.getTags(),
shuffleServer.isHealthy(),
+ shuffleServer.getServerStatus(),
shuffleServer.getStorageManager().getStorageInfo());
} catch (Exception e) {
LOG.warn("Error happened when send heart beat to coordinator");
@@ -102,6 +104,7 @@ public class RegisterHeartBeat {
int eventNumInFlush,
Set<String> tags,
boolean isHealthy,
+ ServerStatus serverStatus,
Map<String, StorageInfo> localStorageInfo) {
boolean sendSuccessfully = false;
RssSendHeartBeatRequest request = new RssSendHeartBeatRequest(
@@ -115,6 +118,7 @@ public class RegisterHeartBeat {
heartBeatTimeout,
tags,
isHealthy,
+ serverStatus,
localStorageInfo);
List<Future<RssSendHeartBeatResponse>> respFutures = coordinatorClients
.stream()
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerFactory.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerFactory.java
index 24e70e39..8ba4adca 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerFactory.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerFactory.java
@@ -33,8 +33,13 @@ public class ShuffleServerFactory {
public ServerInterface getServer() {
String type = conf.getString(ShuffleServerConf.RPC_SERVER_TYPE);
if (type.equals(ServerType.GRPC.name())) {
- return new GrpcServer(conf, new ShuffleServerGrpcService(shuffleServer),
- shuffleServer.getGrpcMetrics());
+ return GrpcServer.Builder.newBuilder()
+ .conf(conf)
+ .grpcMetrics(shuffleServer.getGrpcMetrics())
+ .addService(new ShuffleServerGrpcService(shuffleServer))
+ // todo: Add ServerInterceptor for authentication
+ .addService(new ShuffleServerInternalGrpcService(shuffleServer))
+ .build();
} else {
throw new UnsupportedOperationException("Unsupported server type " +
type);
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerInternalGrpcService.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerInternalGrpcService.java
new file mode 100644
index 00000000..3a50d085
--- /dev/null
+++
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerInternalGrpcService.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import io.grpc.stub.StreamObserver;
+
+import org.apache.uniffle.common.exception.InvalidRequestException;
+import org.apache.uniffle.common.rpc.StatusCode;
+import org.apache.uniffle.proto.RssProtos;
+import
org.apache.uniffle.proto.ShuffleServerInternalGrpc.ShuffleServerInternalImplBase;
+
+public class ShuffleServerInternalGrpcService extends
ShuffleServerInternalImplBase {
+ private final ShuffleServer shuffleServer;
+
+ public ShuffleServerInternalGrpcService(ShuffleServer shuffleServer) {
+ this.shuffleServer = shuffleServer;
+ }
+
+ @Override
+ public void decommission(
+ RssProtos.DecommissionRequest request,
+ StreamObserver<RssProtos.DecommissionResponse> responseObserver) {
+ RssProtos.DecommissionResponse response;
+ try {
+ shuffleServer.decommission();
+ response = RssProtos.DecommissionResponse
+ .newBuilder()
+ .setStatus(StatusCode.SUCCESS.toProto())
+ .build();
+ } catch (Exception e) {
+ StatusCode statusCode = StatusCode.INTERNAL_ERROR;
+ if (e instanceof InvalidRequestException) {
+ statusCode = StatusCode.INVALID_REQUEST;
+ }
+ response = RssProtos.DecommissionResponse
+ .newBuilder()
+ .setStatus(statusCode.toProto())
+ .setRetMsg(e.getMessage())
+ .build();
+ }
+ responseObserver.onNext(response);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void cancelDecommission(
+ RssProtos.CancelDecommissionRequest request,
+ StreamObserver<RssProtos.CancelDecommissionResponse>
responseObserver) {
+ RssProtos.CancelDecommissionResponse response;
+ try {
+ shuffleServer.cancelDecommission();
+ response = RssProtos.CancelDecommissionResponse
+ .newBuilder()
+ .setStatus(StatusCode.SUCCESS.toProto())
+ .build();
+ } catch (Exception e) {
+ StatusCode statusCode = StatusCode.INTERNAL_ERROR;
+ if (e instanceof InvalidRequestException) {
+ statusCode = StatusCode.INVALID_REQUEST;
+ }
+ response = RssProtos.CancelDecommissionResponse
+ .newBuilder()
+ .setStatus(statusCode.toProto())
+ .setRetMsg(e.getMessage())
+ .build();
+ }
+ responseObserver.onNext(response);
+ responseObserver.onCompleted();
+ }
+}
diff --git
a/server/src/test/java/org/apache/uniffle/server/MockedGrpcServer.java
b/server/src/test/java/org/apache/uniffle/server/MockedGrpcServer.java
index fa450345..58de5382 100644
--- a/server/src/test/java/org/apache/uniffle/server/MockedGrpcServer.java
+++ b/server/src/test/java/org/apache/uniffle/server/MockedGrpcServer.java
@@ -17,6 +17,9 @@
package org.apache.uniffle.server;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.tuple.Pair;
+
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.metrics.GRPCMetrics;
import org.apache.uniffle.common.rpc.GrpcServer;
@@ -26,7 +29,7 @@ public class MockedGrpcServer extends GrpcServer {
public MockedGrpcServer(RssBaseConf conf, MockedShuffleServerGrpcService
service,
GRPCMetrics grpcMetrics) {
- super(conf, service, grpcMetrics);
+ super(conf, Lists.newArrayList(Pair.of(service, Lists.newArrayList())),
grpcMetrics);
this.service = service;
}