This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 0c3d60fd [#937] feat: Add rest api for servernode list of losing
connection and unhealthy (#938)
0c3d60fd is described below
commit 0c3d60fd65636eca8fa9b6250745efc7234f9ceb
Author: yl09099 <[email protected]>
AuthorDate: Sat Jun 17 10:26:02 2023 +0800
[#937] feat: Add rest api for servernode list of losing connection and
unhealthy (#938)
### What changes were proposed in this pull request?
Obtain the list of ServerNodes whose heartbeat is lost from a Coordinator
Server
Obtain the list of ServerNodes whose unhealthy from a Coordinator Server
### Why are the changes needed?
There is no interface for viewing lost and unhealthy lists
Fix: #937
### Does this PR introduce _any_ user-facing change?
1. Change in user-facing APIs.
### How was this patch tested?
Updated UT: SimpleClusterManagerTest
---
.../org/apache/uniffle/common/ServerStatus.java | 2 +
.../apache/uniffle/coordinator/ClusterManager.java | 14 ++++
.../org/apache/uniffle/coordinator/ServerNode.java | 18 ++---
.../uniffle/coordinator/SimpleClusterManager.java | 49 ++++++++++---
.../coordinator/web/servlet/NodesServlet.java | 12 +++-
.../coordinator/SimpleClusterManagerTest.java | 83 +++++++++++++++++++++-
.../java/org/apache/uniffle/test/ServletTest.java | 77 +++++++++++++++++---
proto/src/main/proto/Rss.proto | 2 +
.../org/apache/uniffle/server/ShuffleServer.java | 5 ++
9 files changed, 232 insertions(+), 30 deletions(-)
diff --git a/common/src/main/java/org/apache/uniffle/common/ServerStatus.java
b/common/src/main/java/org/apache/uniffle/common/ServerStatus.java
index bd23b560..d5f257a4 100644
--- a/common/src/main/java/org/apache/uniffle/common/ServerStatus.java
+++ b/common/src/main/java/org/apache/uniffle/common/ServerStatus.java
@@ -27,6 +27,8 @@ public enum ServerStatus {
ACTIVE(0),
DECOMMISSIONING(1),
DECOMMISSIONED(2),
+ LOST(3),
+ UNHEALTHY(4),
UNKNOWN(-1);
static final Map<Integer, ServerStatus> VALUE_MAP =
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
index fbd7e694..6d6b5092 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
@@ -40,6 +40,20 @@ public interface ClusterManager extends Closeable,
Reconfigurable {
*/
List<ServerNode> getServerList(Set<String> requiredTags);
+ /**
+ * Get lost nodes from the cluster
+ *
+ * @return list of lost nodes
+ */
+ List<ServerNode> getLostServerList();
+
+ /**
+ * Get unhealthy nodes from the cluster
+ *
+ * @return list of unhealthy nodes
+ */
+ List<ServerNode> getUnhealthyServerList();
+
/**
* @return number of server nodes in the cluster
*/
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
index bc51fded..6cc720ed 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
@@ -37,8 +37,7 @@ public class ServerNode implements Comparable<ServerNode> {
private int eventNumInFlush;
private long timestamp;
private Set<String> tags;
- private boolean isHealthy;
- private final ServerStatus status;
+ private ServerStatus status;
private Map<String, StorageInfo> storageInfo;
private int nettyPort = -1;
@@ -110,8 +109,7 @@ public class ServerNode implements Comparable<ServerNode> {
this.eventNumInFlush = eventNumInFlush;
this.timestamp = System.currentTimeMillis();
this.tags = tags;
- this.isHealthy = isHealthy;
- this.status = status;
+ this.status = isHealthy ? status : ServerStatus.UNHEALTHY;
this.storageInfo = storageInfoMap;
if (nettyPort > 0) {
this.nettyPort = nettyPort;
@@ -160,12 +158,17 @@ public class ServerNode implements Comparable<ServerNode>
{
}
public boolean isHealthy() {
- return isHealthy;
+ return this.status != ServerStatus.UNHEALTHY;
}
-
+
public ServerStatus getStatus() {
return status;
}
+
+ public void setStatus(ServerStatus serverStatus) {
+ this.status = serverStatus;
+ }
+
public Map<String, StorageInfo> getStorageInfo() {
return storageInfo;
@@ -183,7 +186,6 @@ public class ServerNode implements Comparable<ServerNode> {
+ "], eventNumInFlush[" + eventNumInFlush
+ "], timestamp[" + timestamp
+ "], tags" + tags.toString() + ""
- + ", healthy[" + isHealthy
+ ", status[" + status
+ "], storages[num=" + storageInfo.size() + "]";
@@ -192,7 +194,7 @@ public class ServerNode implements Comparable<ServerNode> {
/**
* Only for test case
*/
- void setTimestamp(long timestamp) {
+ public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
index 7a4f4d7d..a11823ea 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
@@ -64,6 +64,14 @@ public class SimpleClusterManager implements ClusterManager {
private final Map<String, ServerNode> servers = JavaUtils.newConcurrentMap();
private final Cache<ServerNode, ShuffleServerInternalGrpcClient> clientCache;
private Set<String> excludeNodes = Sets.newConcurrentHashSet();
+ /**
+ * ServerNode whose heartbeat is lost
+ */
+ Set<ServerNode> lostNodes = Sets.newHashSet();
+ /**
+ * Unhealthy ServerNode
+ */
+ Set<ServerNode> unhealthyNodes = Sets.newHashSet();
// tag -> nodes
private Map<String, Set<ServerNode>> tagToNodes =
JavaUtils.newConcurrentMap();
private AtomicLong excludeLastModify = new AtomicLong(0L);
@@ -121,19 +129,24 @@ public class SimpleClusterManager implements
ClusterManager {
void nodesCheck() {
try {
long timestamp = System.currentTimeMillis();
- Set<String> deleteIds = Sets.newHashSet();
- Set<String> unhealthyNode = Sets.newHashSet();
for (ServerNode sn : servers.values()) {
if (timestamp - sn.getTimestamp() > heartbeatTimeout) {
LOG.warn("Heartbeat timeout detect, " + sn + " will be removed from
node list.");
- deleteIds.add(sn.getId());
+ sn.setStatus(ServerStatus.LOST);
+ lostNodes.add(sn);
+ unhealthyNodes.remove(sn);
} else if (!sn.isHealthy()) {
LOG.warn("Found server {} was unhealthy, will not assign it.", sn);
- unhealthyNode.add(sn.getId());
+ unhealthyNodes.add(sn);
+ lostNodes.remove(sn);
+ } else {
+ sn.setStatus(ServerStatus.ACTIVE);
+ lostNodes.remove(sn);
+ unhealthyNodes.remove(sn);
}
}
- for (String serverId : deleteIds) {
- ServerNode sn = servers.remove(serverId);
+ for (ServerNode server : lostNodes) {
+ ServerNode sn = servers.remove(server.getId());
if (sn != null) {
clientCache.invalidate(sn);
for (Set<ServerNode> nodesWithTag : tagToNodes.values()) {
@@ -141,7 +154,7 @@ public class SimpleClusterManager implements ClusterManager
{
}
}
}
- if (!deleteIds.isEmpty() || outputAliveServerCount %
periodicOutputIntervalTimes == 0) {
+ if (!lostNodes.isEmpty() || outputAliveServerCount %
periodicOutputIntervalTimes == 0) {
LOG.info("Alive servers number: {}, ids: {}",
servers.size(),
servers.keySet().stream().collect(Collectors.toList())
@@ -149,12 +162,17 @@ public class SimpleClusterManager implements
ClusterManager {
}
outputAliveServerCount++;
- CoordinatorMetrics.gaugeUnhealthyServerNum.set(unhealthyNode.size());
+ CoordinatorMetrics.gaugeUnhealthyServerNum.set(unhealthyNodes.size());
CoordinatorMetrics.gaugeTotalServerNum.set(servers.size());
} catch (Exception e) {
LOG.warn("Error happened in nodesCheck", e);
}
}
+
+ @VisibleForTesting
+ public void nodesCheckTest() {
+ nodesCheck();
+ }
private void updateExcludeNodes(String path) {
int originalExcludeNodesNumber = excludeNodes.size();
@@ -231,6 +249,16 @@ public class SimpleClusterManager implements
ClusterManager {
return availableNodes;
}
+ @Override
+ public List<ServerNode> getLostServerList() {
+ return Lists.newArrayList(lostNodes);
+ }
+
+ @Override
+ public List<ServerNode> getUnhealthyServerList() {
+ return Lists.newArrayList(unhealthyNodes);
+ }
+
public Set<String> getExcludeNodes() {
return excludeNodes;
}
@@ -331,6 +359,11 @@ public class SimpleClusterManager implements
ClusterManager {
public void setStartupSilentPeriodEnabled(boolean
startupSilentPeriodEnabled) {
this.startupSilentPeriodEnabled = startupSilentPeriodEnabled;
}
+
+ @VisibleForTesting
+ public Map<String, ServerNode> getServers() {
+ return servers;
+ }
@Override
public void reconfigure(RssConf conf) {
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/NodesServlet.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/NodesServlet.java
index 788f7f0a..ea073cad 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/NodesServlet.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/NodesServlet.java
@@ -24,6 +24,7 @@ import java.util.stream.Collectors;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.coordinator.CoordinatorServer;
import org.apache.uniffle.coordinator.ServerNode;
import org.apache.uniffle.coordinator.web.Response;
@@ -37,10 +38,17 @@ public class NodesServlet extends
BaseServlet<List<ServerNode>> {
}
@Override
- protected Response<List<ServerNode>> handleGet(HttpServletRequest req,
HttpServletResponse resp) {
- List<ServerNode> serverList =
coordinator.getClusterManager().getServerList(Collections.EMPTY_SET);
+ protected Response handleGet(HttpServletRequest req, HttpServletResponse
resp) {
+ List<ServerNode> serverList;
String id = req.getParameter("id");
String status = req.getParameter("status");
+ if (ServerStatus.UNHEALTHY.name().equalsIgnoreCase(status)) {
+ serverList = coordinator.getClusterManager().getUnhealthyServerList();
+ } else if (ServerStatus.LOST.name().equalsIgnoreCase(status)) {
+ serverList = coordinator.getClusterManager().getLostServerList();
+ } else {
+ serverList =
coordinator.getClusterManager().getServerList(Collections.EMPTY_SET);
+ }
serverList = serverList.stream().filter((server) -> {
if (id != null && !id.equals(server.getId())) {
return false;
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
index d2e5644f..e08193f6 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.google.common.collect.Sets;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
@@ -129,6 +130,84 @@ public class SimpleClusterManagerTest {
}
}
+ @Test
+ public void getLostServerListTest() throws Exception {
+ CoordinatorConf coordinatorConf = new CoordinatorConf();
+ // Shorten the heartbeat time
+ coordinatorConf.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT,
300L);
+ SimpleClusterManager clusterManager = new
SimpleClusterManager(coordinatorConf, new Configuration());
+ ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
+ 10, grpcTags, true);
+ ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
+ 10, grpcTags, true);
+ ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
+ 11, grpcTags, true);
+ clusterManager.add(sn1);
+ clusterManager.add(sn2);
+ clusterManager.add(sn3);
+ Set<String> expectedIds = Sets.newHashSet("sn1", "sn2", "sn3");
+ await().atMost(1, TimeUnit.SECONDS).until(() -> {
+ Set<String> lostServerList =
clusterManager.getLostServerList().stream()
+ .map(ServerNode::getId).collect(Collectors.toSet());
+ return CollectionUtils.isEqualCollection(lostServerList,
expectedIds);
+ }
+ );
+ // re-register sn3
+ sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
+ 11, grpcTags, true);
+ clusterManager.add(sn3);
+ Set<String> expectedIdsre = Sets.newHashSet("sn1", "sn2");
+ await().atMost(1, TimeUnit.SECONDS).until(() -> {
+ // Retrieve listed ServerNode List
+ Set<String> lostServerListre =
clusterManager.getLostServerList().stream()
+ .map(ServerNode::getId).collect(Collectors.toSet());
+ return CollectionUtils.isEqualCollection(lostServerListre,
expectedIdsre);
+ }
+ );
+ }
+
+ @Test
+ public void getUnhealthyServerList() throws Exception {
+ CoordinatorConf coordinatorConf = new CoordinatorConf();
+ // Shorten the heartbeat time
+ coordinatorConf.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT,
300L);
+ SimpleClusterManager clusterManager = new
SimpleClusterManager(coordinatorConf, new Configuration());
+ ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
+ 10, grpcTags, true);
+ ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
+ 10, grpcTags, true);
+ ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
+ 11, grpcTags, false);
+ ServerNode sn4 = new ServerNode("sn4", "ip", 0, 100L, 50L, 20,
+ 11, grpcTags, false);
+ clusterManager.add(sn1);
+ clusterManager.add(sn2);
+ clusterManager.add(sn3);
+ clusterManager.add(sn4);
+ // Analog timeout registration
+ Set<String> expectedIds = Sets.newHashSet("sn3", "sn4");
+ await().atMost(1, TimeUnit.SECONDS).until(() -> {
+ Set<String> unhealthyServerList =
clusterManager.getUnhealthyServerList().stream()
+ .map(ServerNode::getId).collect(Collectors.toSet());
+ return CollectionUtils.isEqualCollection(unhealthyServerList,
expectedIds);
+ });
+ // Register unhealthy node sn3 again
+ sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
+ 11, grpcTags, false);
+ clusterManager.add(sn3);
+ Set<String> expectedIdsre = Sets.newHashSet("sn3");
+ await().atMost(1, TimeUnit.SECONDS).until(() -> {
+ Set<String> unhealthyServerListre =
clusterManager.getUnhealthyServerList().stream()
+ .map(ServerNode::getId).collect(Collectors.toSet());
+ return CollectionUtils.isEqualCollection(unhealthyServerListre,
expectedIdsre);
+ });
+ // At this point verify that sn4 is in the lost list
+ List<ServerNode> lostremoveunhealthy = clusterManager.getLostServerList();
+ Set<String> expectedIdlostremoveunhealthy = Sets.newHashSet("sn1", "sn2",
"sn4");
+ assertEquals(expectedIdlostremoveunhealthy, lostremoveunhealthy.stream()
+ .map(ServerNode::getId).collect(Collectors.toSet()));
+ }
+
@Test
public void getServerListForNettyTest() throws Exception {
CoordinatorConf ssc = new CoordinatorConf();
@@ -312,13 +391,13 @@ public class SimpleClusterManagerTest {
await().atMost(3, TimeUnit.SECONDS).until(() ->
scm.getExcludeNodes().equals(nodes2));
assertEquals(nodes2, scm.getExcludeNodes());
- final Set<String> comments = Sets.newHashSet("# The contents of the
first comment","node3-1999",
+ final Set<String> comments = Sets.newHashSet("# The contents of the
first comment", "node3-1999",
"# The contents of the second comment", "node4-1999", "# The content
of the third comment");
final Set<String> noComments = Sets.newHashSet("node3-1999",
"node4-1999");
writeExcludeHosts(excludeNodesPath, comments);
await().atMost(3, TimeUnit.SECONDS).until(() ->
scm.getExcludeNodes().equals(noComments));
assertEquals(noComments, scm.getExcludeNodes());
-
+
Set<String> excludeNodes = scm.getExcludeNodes();
Thread.sleep(3000);
// excludeNodes shouldn't be updated if file has no change
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
index a514660f..78e86bde 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
@@ -18,14 +18,19 @@
package org.apache.uniffle.test;
import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import org.apache.commons.collections.CollectionUtils;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -39,6 +44,8 @@ import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.metrics.TestUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.CoordinatorServer;
+import org.apache.uniffle.coordinator.ServerNode;
+import org.apache.uniffle.coordinator.SimpleClusterManager;
import org.apache.uniffle.coordinator.web.Response;
import org.apache.uniffle.coordinator.web.request.CancelDecommissionRequest;
import org.apache.uniffle.coordinator.web.request.DecommissionRequest;
@@ -48,10 +55,13 @@ import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class ServletTest extends IntegrationTestBase {
private static final String URL_PREFIX = "http://127.0.0.1:12345/api/";
private static final String NODES_URL = URL_PREFIX + "server/nodes";
+ private static final String LOSTNODES_URL = URL_PREFIX +
"server/nodes?status=LOST";
+ private static final String UNHEALTHYNODES_URL = URL_PREFIX +
"server/nodes?status=UNHEALTHY";
private static final String DECOMMISSION_URL = URL_PREFIX +
"server/decommission";
private static final String CANCEL_DECOMMISSION_URL = URL_PREFIX +
"server/cancelDecommission";
private static CoordinatorServer coordinatorServer;
@@ -80,10 +90,24 @@ public class ServletTest extends IntegrationTestBase {
shuffleServerConf.set(RssBaseConf.RPC_SERVER_PORT, SHUFFLE_SERVER_PORT +
1);
shuffleServerConf.set(RssBaseConf.JETTY_HTTP_PORT, 18081);
createShuffleServer(shuffleServerConf);
+ File dataDir5 = new File(tmpDir, "data5");
+ File dataDir6 = new File(tmpDir, "data6");
+ basePath = Lists.newArrayList(dataDir5.getAbsolutePath(),
dataDir6.getAbsolutePath());
+ shuffleServerConf.set(RssBaseConf.RSS_STORAGE_BASE_PATH, basePath);
+ shuffleServerConf.set(RssBaseConf.RPC_SERVER_PORT, SHUFFLE_SERVER_PORT +
2);
+ shuffleServerConf.set(RssBaseConf.JETTY_HTTP_PORT, 18082);
+ createShuffleServer(shuffleServerConf);
+ File dataDir7 = new File(tmpDir, "data7");
+ File dataDir8 = new File(tmpDir, "data8");
+ basePath = Lists.newArrayList(dataDir7.getAbsolutePath(),
dataDir8.getAbsolutePath());
+ shuffleServerConf.set(RssBaseConf.RSS_STORAGE_BASE_PATH, basePath);
+ shuffleServerConf.set(RssBaseConf.RPC_SERVER_PORT, SHUFFLE_SERVER_PORT +
3);
+ shuffleServerConf.set(RssBaseConf.JETTY_HTTP_PORT, 18083);
+ createShuffleServer(shuffleServerConf);
startServers();
coordinatorServer = coordinators.get(0);
Awaitility.await().timeout(30, TimeUnit.SECONDS).until(() ->
- coordinatorServer.getClusterManager().list().size() == 2);
+ coordinatorServer.getClusterManager().list().size() == 4);
}
@Test
@@ -93,7 +117,7 @@ public class ServletTest extends IntegrationTestBase {
new TypeReference<Response<List<HashMap<String, Object>>>>() {});
List<HashMap<String, Object>> serverList = response.getData();
assertEquals(0, response.getCode());
- assertEquals(2, serverList.size());
+ assertEquals(4, serverList.size());
assertEquals(SHUFFLE_SERVER_PORT,
Integer.parseInt(serverList.get(0).get("grpcPort").toString()));
assertEquals(ServerStatus.ACTIVE.toString(),
serverList.get(0).get("status"));
assertEquals(SHUFFLE_SERVER_PORT + 1,
Integer.parseInt(serverList.get(1).get("grpcPort").toString()));
@@ -107,15 +131,48 @@ public class ServletTest extends IntegrationTestBase {
serverList = response.getData();
assertEquals(1, serverList.size());
assertEquals(shuffleServer.getId(), serverList.get(0).get("id"));
+ }
- content = TestUtils.httpGet(NODES_URL + "?status=DECOMMISSIONED");
- response = objectMapper.readValue(content, new
TypeReference<Response<List<HashMap<String, Object>>>>() {});
- serverList = response.getData();
- assertEquals(0, serverList.size());
- content = TestUtils.httpGet(NODES_URL + "?status=ACTIVE");
- response = objectMapper.readValue(content, new
TypeReference<Response<List<HashMap<String, Object>>>>() {});
- serverList = response.getData();
- assertEquals(2, serverList.size());
+ @Test
+ public void testLostNodesServlet() throws IOException {
+ SimpleClusterManager clusterManager = (SimpleClusterManager)
coordinatorServer.getClusterManager();
+ ShuffleServer shuffleServer3 = shuffleServers.get(2);
+ ShuffleServer shuffleServer4 = shuffleServers.get(3);
+ Map<String, ServerNode> servers = clusterManager.getServers();
+
servers.get(shuffleServer3.getId()).setTimestamp(System.currentTimeMillis() -
40000);
+
servers.get(shuffleServer4.getId()).setTimestamp(System.currentTimeMillis() -
40000);
+ clusterManager.nodesCheckTest();
+ List<String> expectShuffleIds = Arrays.asList(shuffleServer3.getId(),
shuffleServer4.getId());
+ List<String> shuffleIds = new ArrayList<>();
+ Response<List<HashMap<String, Object>>> response =
objectMapper.readValue(TestUtils.httpGet(LOSTNODES_URL),
+ new TypeReference<Response<List<HashMap<String, Object>>>>() {});
+ List<HashMap<String, Object>> serverList = response.getData();
+ for (HashMap<String, Object> stringObjectHashMap : serverList) {
+ String shuffleId = (String) stringObjectHashMap.get("id");
+ shuffleIds.add(shuffleId);
+ }
+ assertTrue(CollectionUtils.isEqualCollection(expectShuffleIds,
shuffleIds));
+ }
+
+ @Test
+ public void testUnhealthyNodesServlet() {
+ ShuffleServer shuffleServer3 = shuffleServers.get(2);
+ ShuffleServer shuffleServer4 = shuffleServers.get(3);
+ shuffleServer3.markUnhealthy();
+ shuffleServer4.markUnhealthy();
+ List<String> expectShuffleIds =
Arrays.asList(shuffleServer3.getId(),shuffleServer4.getId());
+ List<String> shuffleIds = new ArrayList<>();
+ Awaitility.await().atMost(30,TimeUnit.SECONDS).until(() -> {
+ Response<List<HashMap<String, Object>>> response =
objectMapper.readValue(TestUtils.httpGet(UNHEALTHYNODES_URL),
+ new TypeReference<Response<List<HashMap<String, Object>>>>() {});
+ List<HashMap<String, Object>> serverList = response.getData();
+ for (HashMap<String, Object> stringObjectHashMap : serverList) {
+ String shuffleId = (String) stringObjectHashMap.get("id");
+ shuffleIds.add(shuffleId);
+ }
+ return serverList.size() == 2;
+ });
+ assertTrue(CollectionUtils.isEqualCollection(expectShuffleIds,
shuffleIds));
}
@Test
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 2099a035..d1e66464 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -239,6 +239,8 @@ enum ServerStatus {
ACTIVE = 0;
DECOMMISSIONING = 1;
DECOMMISSIONED = 2;
+ LOST = 3;
+ UNHEALTHY = 4;
// todo: more status, such as UPGRADING
}
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index fe100b62..212dd2d3 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -457,6 +457,11 @@ public class ShuffleServer {
return isHealthy.get();
}
+ @VisibleForTesting
+ public void markUnhealthy() {
+ isHealthy.set(false);
+ }
+
public GRPCMetrics getGrpcMetrics() {
return grpcMetrics;
}