This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 7a6d27699 [MINOR] feat(server,coordinator): Heartbeat server
startTimeMs to coordinator (#1975)
7a6d27699 is described below
commit 7a6d2769980a29bc0309b6bdb52d167eacd71b90
Author: maobaolong <[email protected]>
AuthorDate: Tue Jul 30 10:14:15 2024 +0800
[MINOR] feat(server,coordinator): Heartbeat server startTimeMs to
coordinator (#1975)
### What changes were proposed in this pull request?
Server send start time to coordinator through heartbeat.
### Why are the changes needed?
Get each server start time to know well which server ever restarted.
### Does this PR introduce _any_ user-facing change?
User can the the server start time from rest api.
### How was this patch tested?
curl http://<COORDINATOR_HOST>: <COORDINATOR_JETTY_PORT>/api/server/nodes
---
.../apache/uniffle/coordinator/CoordinatorGrpcService.java | 3 ++-
.../java/org/apache/uniffle/coordinator/ServerNode.java | 13 +++++++++++--
.../uniffle/client/impl/grpc/CoordinatorGrpcClient.java | 7 +++++--
.../uniffle/client/request/RssSendHeartBeatRequest.java | 9 ++++++++-
proto/src/main/proto/Rss.proto | 1 +
.../java/org/apache/uniffle/server/RegisterHeartBeat.java | 9 ++++++---
.../main/java/org/apache/uniffle/server/ShuffleServer.java | 10 +++++++++-
7 files changed, 42 insertions(+), 10 deletions(-)
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
index b3df63db5..2b6c1c428 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
@@ -434,6 +434,7 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
serverStatus,
StorageInfoUtils.fromProto(request.getStorageInfoMap()),
request.getServerId().getNettyPort(),
- request.getServerId().getJettyPort());
+ request.getServerId().getJettyPort(),
+ request.getStartTimeMs());
}
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
index a9723e00b..8b90fdafe 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
@@ -43,6 +43,7 @@ public class ServerNode implements Comparable<ServerNode> {
private Map<String, StorageInfo> storageInfo;
private int nettyPort = -1;
private int jettyPort = -1;
+ private long startTimeMs = -1;
public ServerNode(String id) {
this(id, "", 0, 0, 0, 0, 0, Sets.newHashSet(), ServerStatus.EXCLUDED);
@@ -117,6 +118,7 @@ public class ServerNode implements Comparable<ServerNode> {
status,
storageInfoMap,
-1,
+ -1,
-1);
}
@@ -144,7 +146,8 @@ public class ServerNode implements Comparable<ServerNode> {
status,
storageInfoMap,
nettyPort,
- -1);
+ -1,
+ -1L);
}
public ServerNode(
@@ -159,7 +162,8 @@ public class ServerNode implements Comparable<ServerNode> {
ServerStatus status,
Map<String, StorageInfo> storageInfoMap,
int nettyPort,
- int jettyPort) {
+ int jettyPort,
+ long startTimeMs) {
this.id = id;
this.ip = ip;
this.grpcPort = grpcPort;
@@ -178,6 +182,7 @@ public class ServerNode implements Comparable<ServerNode> {
if (jettyPort > 0) {
this.jettyPort = jettyPort;
}
+ this.startTimeMs = startTimeMs;
}
public ShuffleServerId convertToGrpcProto() {
@@ -317,4 +322,8 @@ public class ServerNode implements Comparable<ServerNode> {
public int getJettyPort() {
return jettyPort;
}
+
+ public long getStartTimeMs() {
+ return startTimeMs;
+ }
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
index 5900de6eb..3f7ff066c 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcClient.java
@@ -125,7 +125,8 @@ public class CoordinatorGrpcClient extends GrpcClient
implements CoordinatorClie
ServerStatus serverStatus,
Map<String, StorageInfo> storageInfo,
int nettyPort,
- int jettyPort) {
+ int jettyPort,
+ long startTimeMs) {
ShuffleServerId serverId =
ShuffleServerId.newBuilder()
.setId(id)
@@ -144,6 +145,7 @@ public class CoordinatorGrpcClient extends GrpcClient
implements CoordinatorClie
.addAllTags(tags)
.setStatusValue(serverStatus.ordinal())
.putAllStorageInfo(StorageInfoUtils.toProto(storageInfo))
+ .setStartTimeMs(startTimeMs)
.build();
RssProtos.StatusCode status;
@@ -219,7 +221,8 @@ public class CoordinatorGrpcClient extends GrpcClient
implements CoordinatorClie
request.getServerStatus(),
request.getStorageInfo(),
request.getNettyPort(),
- request.getJettyPort());
+ request.getJettyPort(),
+ request.getStartTimeMs());
RssSendHeartBeatResponse response;
RssProtos.StatusCode statusCode = rpcResponse.getStatus();
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
index 34d29d750..a31164195 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/request/RssSendHeartBeatRequest.java
@@ -38,6 +38,7 @@ public class RssSendHeartBeatRequest {
private final Map<String, StorageInfo> storageInfo;
private final int nettyPort;
private final int jettyPort;
+ private final long startTimeMs;
public RssSendHeartBeatRequest(
String shuffleServerId,
@@ -52,7 +53,8 @@ public class RssSendHeartBeatRequest {
ServerStatus serverStatus,
Map<String, StorageInfo> storageInfo,
int nettyPort,
- int jettyPort) {
+ int jettyPort,
+ long startTimeMs) {
this.shuffleServerId = shuffleServerId;
this.shuffleServerIp = shuffleServerIp;
this.shuffleServerPort = shuffleServerPort;
@@ -66,6 +68,7 @@ public class RssSendHeartBeatRequest {
this.storageInfo = storageInfo;
this.nettyPort = nettyPort;
this.jettyPort = jettyPort;
+ this.startTimeMs = startTimeMs;
}
public String getShuffleServerId() {
@@ -119,4 +122,8 @@ public class RssSendHeartBeatRequest {
public int getJettyPort() {
return jettyPort;
}
+
+ public long getStartTimeMs() {
+ return startTimeMs;
+ }
}
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 861d73a3f..eb7a82220 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -275,6 +275,7 @@ message ShuffleServerHeartBeatRequest {
google.protobuf.BoolValue isHealthy = 7;
optional ServerStatus status = 8;
map<string, StorageInfo> storageInfo = 21; // mount point to storage info
mapping.
+ optional int64 startTimeMs = 24;
}
message ShuffleServerHeartBeatResponse {
diff --git
a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
index a8c8b5d76..736b05581 100644
--- a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
+++ b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java
@@ -86,7 +86,8 @@ public class RegisterHeartBeat {
shuffleServer.getServerStatus(),
shuffleServer.getStorageManager().getStorageInfo(),
shuffleServer.getNettyPort(),
- shuffleServer.getJettyPort());
+ shuffleServer.getJettyPort(),
+ shuffleServer.getStartTimeMs());
} catch (Exception e) {
LOG.warn("Error happened when send heart beat to coordinator");
}
@@ -108,7 +109,8 @@ public class RegisterHeartBeat {
ServerStatus serverStatus,
Map<String, StorageInfo> localStorageInfo,
int nettyPort,
- int jettyPort) {
+ int jettyPort,
+ long startTimeMs) {
AtomicBoolean sendSuccessfully = new AtomicBoolean(false);
// use `rss.server.heartbeat.interval` as the timeout option
RssSendHeartBeatRequest request =
@@ -125,7 +127,8 @@ public class RegisterHeartBeat {
serverStatus,
localStorageInfo,
nettyPort,
- jettyPort);
+ jettyPort,
+ startTimeMs);
ThreadUtils.executeTasks(
heartBeatExecutorService,
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index 461fe2aab..88ddb04fe 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -104,8 +104,11 @@ public class ShuffleServer {
private StreamServer streamServer;
private JvmPauseMonitor jvmPauseMonitor;
+ private final long startTimeMs;
+
public ShuffleServer(ShuffleServerConf shuffleServerConf) throws Exception {
this.shuffleServerConf = shuffleServerConf;
+ this.startTimeMs = System.currentTimeMillis();
try {
initialization();
} catch (Exception e) {
@@ -542,6 +545,10 @@ public class ShuffleServer {
return StringUtils.join(tags, ",");
}
+ public long getStartTimeMs() {
+ return startTimeMs;
+ }
+
@VisibleForTesting
public void sendHeartbeat() {
ShuffleServer shuffleServer = this;
@@ -557,6 +564,7 @@ public class ShuffleServer {
shuffleServer.getServerStatus(),
shuffleServer.getStorageManager().getStorageInfo(),
shuffleServer.getNettyPort(),
- shuffleServer.getJettyPort());
+ shuffleServer.getJettyPort(),
+ shuffleServer.getStartTimeMs());
}
}