This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c3d60fd [#937] feat: Add rest api for servernode list of losing 
connection and unhealthy (#938)
0c3d60fd is described below

commit 0c3d60fd65636eca8fa9b6250745efc7234f9ceb
Author: yl09099 <[email protected]>
AuthorDate: Sat Jun 17 10:26:02 2023 +0800

    [#937] feat: Add rest api for servernode list of losing connection and 
unhealthy (#938)
    
    ### What changes were proposed in this pull request?
      Obtain the list of ServerNodes whose heartbeat is lost from a Coordinator 
Server
      Obtain the list of ServerNodes whose unhealthy from a Coordinator Server
    
    ### Why are the changes needed?
    There is no interface for viewing lost and unhealthy lists
    
    Fix: #937
    
    ### Does this PR introduce _any_ user-facing change?
      1. Change in user-facing APIs.
    
    ### How was this patch tested?
      Updated UT: SimpleClusterManagerTest
---
 .../org/apache/uniffle/common/ServerStatus.java    |  2 +
 .../apache/uniffle/coordinator/ClusterManager.java | 14 ++++
 .../org/apache/uniffle/coordinator/ServerNode.java | 18 ++---
 .../uniffle/coordinator/SimpleClusterManager.java  | 49 ++++++++++---
 .../coordinator/web/servlet/NodesServlet.java      | 12 +++-
 .../coordinator/SimpleClusterManagerTest.java      | 83 +++++++++++++++++++++-
 .../java/org/apache/uniffle/test/ServletTest.java  | 77 +++++++++++++++++---
 proto/src/main/proto/Rss.proto                     |  2 +
 .../org/apache/uniffle/server/ShuffleServer.java   |  5 ++
 9 files changed, 232 insertions(+), 30 deletions(-)

diff --git a/common/src/main/java/org/apache/uniffle/common/ServerStatus.java 
b/common/src/main/java/org/apache/uniffle/common/ServerStatus.java
index bd23b560..d5f257a4 100644
--- a/common/src/main/java/org/apache/uniffle/common/ServerStatus.java
+++ b/common/src/main/java/org/apache/uniffle/common/ServerStatus.java
@@ -27,6 +27,8 @@ public enum ServerStatus {
   ACTIVE(0),
   DECOMMISSIONING(1),
   DECOMMISSIONED(2),
+  LOST(3),
+  UNHEALTHY(4),
   UNKNOWN(-1);
 
   static final Map<Integer, ServerStatus> VALUE_MAP =
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
index fbd7e694..6d6b5092 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
@@ -40,6 +40,20 @@ public interface ClusterManager extends Closeable, 
Reconfigurable {
    */
   List<ServerNode> getServerList(Set<String> requiredTags);
 
+  /**
+   * Get lost nodes from the cluster
+   *
+   * @return list of lost nodes
+   */
+  List<ServerNode> getLostServerList();
+  
+  /**
+   * Get unhealthy nodes from the cluster
+   *
+   * @return list of unhealthy nodes
+   */
+  List<ServerNode> getUnhealthyServerList();
+
   /**
    * @return number of server nodes in the cluster
    */
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
index bc51fded..6cc720ed 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ServerNode.java
@@ -37,8 +37,7 @@ public class ServerNode implements Comparable<ServerNode> {
   private int eventNumInFlush;
   private long timestamp;
   private Set<String> tags;
-  private boolean isHealthy;
-  private final ServerStatus status;
+  private ServerStatus status;
   private Map<String, StorageInfo> storageInfo;
   private int nettyPort = -1;
 
@@ -110,8 +109,7 @@ public class ServerNode implements Comparable<ServerNode> {
     this.eventNumInFlush = eventNumInFlush;
     this.timestamp = System.currentTimeMillis();
     this.tags = tags;
-    this.isHealthy = isHealthy;
-    this.status = status;
+    this.status = isHealthy ? status : ServerStatus.UNHEALTHY;
     this.storageInfo = storageInfoMap;
     if (nettyPort > 0) {
       this.nettyPort = nettyPort;
@@ -160,12 +158,17 @@ public class ServerNode implements Comparable<ServerNode> 
{
   }
 
   public boolean isHealthy() {
-    return isHealthy;
+    return this.status != ServerStatus.UNHEALTHY;
   }
-
+  
   public ServerStatus getStatus() {
     return status;
   }
+  
+  public void setStatus(ServerStatus serverStatus) {
+    this.status = serverStatus;
+  }
+  
 
   public Map<String, StorageInfo> getStorageInfo() {
     return storageInfo;
@@ -183,7 +186,6 @@ public class ServerNode implements Comparable<ServerNode> {
         + "], eventNumInFlush[" + eventNumInFlush
         + "], timestamp[" + timestamp
         + "], tags" + tags.toString() + ""
-        + ", healthy[" + isHealthy
         + ", status[" + status
         + "], storages[num=" + storageInfo.size() + "]";
 
@@ -192,7 +194,7 @@ public class ServerNode implements Comparable<ServerNode> {
   /**
    * Only for test case
    */
-  void setTimestamp(long timestamp) {
+  public void setTimestamp(long timestamp) {
     this.timestamp = timestamp;
   }
 
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
index 7a4f4d7d..a11823ea 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
@@ -64,6 +64,14 @@ public class SimpleClusterManager implements ClusterManager {
   private final Map<String, ServerNode> servers = JavaUtils.newConcurrentMap();
   private final Cache<ServerNode, ShuffleServerInternalGrpcClient> clientCache;
   private Set<String> excludeNodes = Sets.newConcurrentHashSet();
+  /**
+   * ServerNode whose heartbeat is lost
+   */
+  Set<ServerNode> lostNodes = Sets.newHashSet();
+  /**
+   * Unhealthy ServerNode
+   */
+  Set<ServerNode> unhealthyNodes = Sets.newHashSet();
   // tag -> nodes
   private Map<String, Set<ServerNode>> tagToNodes = 
JavaUtils.newConcurrentMap();
   private AtomicLong excludeLastModify = new AtomicLong(0L);
@@ -121,19 +129,24 @@ public class SimpleClusterManager implements 
ClusterManager {
   void nodesCheck() {
     try {
       long timestamp = System.currentTimeMillis();
-      Set<String> deleteIds = Sets.newHashSet();
-      Set<String> unhealthyNode = Sets.newHashSet();
       for (ServerNode sn : servers.values()) {
         if (timestamp - sn.getTimestamp() > heartbeatTimeout) {
           LOG.warn("Heartbeat timeout detect, " + sn + " will be removed from 
node list.");
-          deleteIds.add(sn.getId());
+          sn.setStatus(ServerStatus.LOST);
+          lostNodes.add(sn);
+          unhealthyNodes.remove(sn);
         } else if (!sn.isHealthy()) {
           LOG.warn("Found server {} was unhealthy, will not assign it.", sn);
-          unhealthyNode.add(sn.getId());
+          unhealthyNodes.add(sn);
+          lostNodes.remove(sn);
+        } else {
+          sn.setStatus(ServerStatus.ACTIVE);
+          lostNodes.remove(sn);
+          unhealthyNodes.remove(sn);
         }
       }
-      for (String serverId : deleteIds) {
-        ServerNode sn = servers.remove(serverId);
+      for (ServerNode server : lostNodes) {
+        ServerNode sn = servers.remove(server.getId());
         if (sn != null) {
           clientCache.invalidate(sn);
           for (Set<ServerNode> nodesWithTag : tagToNodes.values()) {
@@ -141,7 +154,7 @@ public class SimpleClusterManager implements ClusterManager 
{
           }
         }
       }
-      if (!deleteIds.isEmpty() || outputAliveServerCount % 
periodicOutputIntervalTimes == 0) {
+      if (!lostNodes.isEmpty() || outputAliveServerCount % 
periodicOutputIntervalTimes == 0) {
         LOG.info("Alive servers number: {}, ids: {}",
             servers.size(),
             servers.keySet().stream().collect(Collectors.toList())
@@ -149,12 +162,17 @@ public class SimpleClusterManager implements 
ClusterManager {
       }
       outputAliveServerCount++;
 
-      CoordinatorMetrics.gaugeUnhealthyServerNum.set(unhealthyNode.size());
+      CoordinatorMetrics.gaugeUnhealthyServerNum.set(unhealthyNodes.size());
       CoordinatorMetrics.gaugeTotalServerNum.set(servers.size());
     } catch (Exception e) {
       LOG.warn("Error happened in nodesCheck", e);
     }
   }
+  
+  @VisibleForTesting
+  public void nodesCheckTest() {
+    nodesCheck();
+  }
 
   private void updateExcludeNodes(String path) {
     int originalExcludeNodesNumber = excludeNodes.size();
@@ -231,6 +249,16 @@ public class SimpleClusterManager implements 
ClusterManager {
     return availableNodes;
   }
 
+  @Override
+  public List<ServerNode> getLostServerList() {
+    return Lists.newArrayList(lostNodes);
+  }
+
+  @Override
+  public List<ServerNode> getUnhealthyServerList() {
+    return Lists.newArrayList(unhealthyNodes);
+  }
+
   public Set<String> getExcludeNodes() {
     return excludeNodes;
   }
@@ -331,6 +359,11 @@ public class SimpleClusterManager implements 
ClusterManager {
   public void setStartupSilentPeriodEnabled(boolean 
startupSilentPeriodEnabled) {
     this.startupSilentPeriodEnabled = startupSilentPeriodEnabled;
   }
+  
+  @VisibleForTesting
+  public Map<String, ServerNode> getServers() {
+    return servers;
+  }
 
   @Override
   public void reconfigure(RssConf conf) {
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/NodesServlet.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/NodesServlet.java
index 788f7f0a..ea073cad 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/NodesServlet.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/NodesServlet.java
@@ -24,6 +24,7 @@ import java.util.stream.Collectors;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.uniffle.common.ServerStatus;
 import org.apache.uniffle.coordinator.CoordinatorServer;
 import org.apache.uniffle.coordinator.ServerNode;
 import org.apache.uniffle.coordinator.web.Response;
@@ -37,10 +38,17 @@ public class NodesServlet extends 
BaseServlet<List<ServerNode>> {
   }
 
   @Override
-  protected Response<List<ServerNode>> handleGet(HttpServletRequest req, 
HttpServletResponse resp) {
-    List<ServerNode> serverList = 
coordinator.getClusterManager().getServerList(Collections.EMPTY_SET);
+  protected Response handleGet(HttpServletRequest req, HttpServletResponse 
resp) {
+    List<ServerNode> serverList;
     String id = req.getParameter("id");
     String status = req.getParameter("status");
+    if (ServerStatus.UNHEALTHY.name().equalsIgnoreCase(status)) {
+      serverList = coordinator.getClusterManager().getUnhealthyServerList();
+    } else if (ServerStatus.LOST.name().equalsIgnoreCase(status)) {
+      serverList = coordinator.getClusterManager().getLostServerList();
+    } else {
+      serverList = 
coordinator.getClusterManager().getServerList(Collections.EMPTY_SET);
+    }
     serverList = serverList.stream().filter((server) -> {
       if (id != null && !id.equals(server.getId())) {
         return false;
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
index d2e5644f..e08193f6 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import com.google.common.collect.Sets;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
@@ -129,6 +130,84 @@ public class SimpleClusterManagerTest {
     }
   }
 
+  @Test
+  public void getLostServerListTest() throws Exception {
+    CoordinatorConf coordinatorConf = new CoordinatorConf();
+    // Shorten the heartbeat time
+    coordinatorConf.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 
300L);
+    SimpleClusterManager clusterManager = new 
SimpleClusterManager(coordinatorConf, new Configuration());
+    ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
+        10, grpcTags, true);
+    ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
+        10, grpcTags, true);
+    ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
+        11, grpcTags, true);
+    clusterManager.add(sn1);
+    clusterManager.add(sn2);
+    clusterManager.add(sn3);
+    Set<String> expectedIds = Sets.newHashSet("sn1", "sn2", "sn3");
+    await().atMost(1, TimeUnit.SECONDS).until(() -> { 
+          Set<String> lostServerList = 
clusterManager.getLostServerList().stream()
+              .map(ServerNode::getId).collect(Collectors.toSet());
+          return CollectionUtils.isEqualCollection(lostServerList, 
expectedIds);
+        }
+    );
+    // re-register sn3
+    sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
+        11, grpcTags, true);
+    clusterManager.add(sn3);
+    Set<String> expectedIdsre = Sets.newHashSet("sn1", "sn2");
+    await().atMost(1, TimeUnit.SECONDS).until(() -> { 
+      // Retrieve listed ServerNode List
+      Set<String> lostServerListre = 
clusterManager.getLostServerList().stream()
+          .map(ServerNode::getId).collect(Collectors.toSet());
+          return CollectionUtils.isEqualCollection(lostServerListre, 
expectedIdsre);
+        }
+    );
+  }
+
+  @Test
+  public void getUnhealthyServerList() throws Exception {
+    CoordinatorConf coordinatorConf = new CoordinatorConf();
+    // Shorten the heartbeat time
+    coordinatorConf.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 
300L);
+    SimpleClusterManager clusterManager = new 
SimpleClusterManager(coordinatorConf, new Configuration());
+    ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
+        10, grpcTags, true);
+    ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
+        10, grpcTags, true);
+    ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
+        11, grpcTags, false);
+    ServerNode sn4 = new ServerNode("sn4", "ip", 0, 100L, 50L, 20,
+        11, grpcTags, false);
+    clusterManager.add(sn1);
+    clusterManager.add(sn2);
+    clusterManager.add(sn3);
+    clusterManager.add(sn4);
+    // Analog timeout registration
+    Set<String> expectedIds = Sets.newHashSet("sn3", "sn4");
+    await().atMost(1, TimeUnit.SECONDS).until(() -> { 
+      Set<String> unhealthyServerList = 
clusterManager.getUnhealthyServerList().stream()
+          .map(ServerNode::getId).collect(Collectors.toSet());
+      return CollectionUtils.isEqualCollection(unhealthyServerList, 
expectedIds);
+    });
+    // Register unhealthy node sn3 again
+    sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
+        11, grpcTags, false);
+    clusterManager.add(sn3);
+    Set<String> expectedIdsre = Sets.newHashSet("sn3");
+    await().atMost(1, TimeUnit.SECONDS).until(() -> { 
+      Set<String> unhealthyServerListre = 
clusterManager.getUnhealthyServerList().stream()
+          .map(ServerNode::getId).collect(Collectors.toSet());
+      return CollectionUtils.isEqualCollection(unhealthyServerListre, 
expectedIdsre);
+    });
+    // At this point verify that sn4 is in the lost list
+    List<ServerNode> lostremoveunhealthy = clusterManager.getLostServerList();
+    Set<String> expectedIdlostremoveunhealthy = Sets.newHashSet("sn1", "sn2", 
"sn4");
+    assertEquals(expectedIdlostremoveunhealthy, lostremoveunhealthy.stream()
+        .map(ServerNode::getId).collect(Collectors.toSet()));
+  }
+
   @Test
   public void getServerListForNettyTest() throws Exception {
     CoordinatorConf ssc = new CoordinatorConf();
@@ -312,13 +391,13 @@ public class SimpleClusterManagerTest {
       await().atMost(3, TimeUnit.SECONDS).until(() -> 
scm.getExcludeNodes().equals(nodes2));
       assertEquals(nodes2, scm.getExcludeNodes());
 
-      final Set<String> comments = Sets.newHashSet("# The contents of the 
first comment","node3-1999",
+      final Set<String> comments = Sets.newHashSet("# The contents of the 
first comment", "node3-1999",
           "# The contents of the second comment", "node4-1999", "# The content 
of the third comment");
       final Set<String> noComments = Sets.newHashSet("node3-1999", 
"node4-1999");
       writeExcludeHosts(excludeNodesPath, comments);
       await().atMost(3, TimeUnit.SECONDS).until(() -> 
scm.getExcludeNodes().equals(noComments));
       assertEquals(noComments, scm.getExcludeNodes());
-      
+
       Set<String> excludeNodes = scm.getExcludeNodes();
       Thread.sleep(3000);
       // excludeNodes shouldn't be updated if file has no change
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
index a514660f..78e86bde 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
@@ -18,14 +18,19 @@
 package org.apache.uniffle.test;
 
 import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import org.apache.commons.collections.CollectionUtils;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -39,6 +44,8 @@ import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.metrics.TestUtils;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.coordinator.CoordinatorServer;
+import org.apache.uniffle.coordinator.ServerNode;
+import org.apache.uniffle.coordinator.SimpleClusterManager;
 import org.apache.uniffle.coordinator.web.Response;
 import org.apache.uniffle.coordinator.web.request.CancelDecommissionRequest;
 import org.apache.uniffle.coordinator.web.request.DecommissionRequest;
@@ -48,10 +55,13 @@ import org.apache.uniffle.storage.util.StorageType;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class ServletTest extends IntegrationTestBase {
   private static final String URL_PREFIX = "http://127.0.0.1:12345/api/";;
   private static final String NODES_URL = URL_PREFIX + "server/nodes";
+  private static final String LOSTNODES_URL = URL_PREFIX + 
"server/nodes?status=LOST";
+  private static final String UNHEALTHYNODES_URL = URL_PREFIX + 
"server/nodes?status=UNHEALTHY";
   private static final String DECOMMISSION_URL = URL_PREFIX + 
"server/decommission";
   private static final String CANCEL_DECOMMISSION_URL = URL_PREFIX + 
"server/cancelDecommission";
   private static CoordinatorServer coordinatorServer;
@@ -80,10 +90,24 @@ public class ServletTest extends IntegrationTestBase {
     shuffleServerConf.set(RssBaseConf.RPC_SERVER_PORT, SHUFFLE_SERVER_PORT + 
1);
     shuffleServerConf.set(RssBaseConf.JETTY_HTTP_PORT, 18081);
     createShuffleServer(shuffleServerConf);
+    File dataDir5 = new File(tmpDir, "data5");
+    File dataDir6 = new File(tmpDir, "data6");
+    basePath = Lists.newArrayList(dataDir5.getAbsolutePath(), 
dataDir6.getAbsolutePath());
+    shuffleServerConf.set(RssBaseConf.RSS_STORAGE_BASE_PATH, basePath);
+    shuffleServerConf.set(RssBaseConf.RPC_SERVER_PORT, SHUFFLE_SERVER_PORT + 
2);
+    shuffleServerConf.set(RssBaseConf.JETTY_HTTP_PORT, 18082);
+    createShuffleServer(shuffleServerConf);
+    File dataDir7 = new File(tmpDir, "data7");
+    File dataDir8 = new File(tmpDir, "data8");
+    basePath = Lists.newArrayList(dataDir7.getAbsolutePath(), 
dataDir8.getAbsolutePath());
+    shuffleServerConf.set(RssBaseConf.RSS_STORAGE_BASE_PATH, basePath);
+    shuffleServerConf.set(RssBaseConf.RPC_SERVER_PORT, SHUFFLE_SERVER_PORT + 
3);
+    shuffleServerConf.set(RssBaseConf.JETTY_HTTP_PORT, 18083);
+    createShuffleServer(shuffleServerConf);
     startServers();
     coordinatorServer = coordinators.get(0);
     Awaitility.await().timeout(30, TimeUnit.SECONDS).until(() ->
-        coordinatorServer.getClusterManager().list().size() == 2);
+        coordinatorServer.getClusterManager().list().size() == 4);
   }
 
   @Test
@@ -93,7 +117,7 @@ public class ServletTest extends IntegrationTestBase {
             new TypeReference<Response<List<HashMap<String, Object>>>>() {});
     List<HashMap<String, Object>> serverList = response.getData();
     assertEquals(0, response.getCode());
-    assertEquals(2, serverList.size());
+    assertEquals(4, serverList.size());
     assertEquals(SHUFFLE_SERVER_PORT, 
Integer.parseInt(serverList.get(0).get("grpcPort").toString()));
     assertEquals(ServerStatus.ACTIVE.toString(), 
serverList.get(0).get("status"));
     assertEquals(SHUFFLE_SERVER_PORT + 1, 
Integer.parseInt(serverList.get(1).get("grpcPort").toString()));
@@ -107,15 +131,48 @@ public class ServletTest extends IntegrationTestBase {
     serverList = response.getData();
     assertEquals(1, serverList.size());
     assertEquals(shuffleServer.getId(), serverList.get(0).get("id"));
+  }
 
-    content = TestUtils.httpGet(NODES_URL + "?status=DECOMMISSIONED");
-    response = objectMapper.readValue(content, new 
TypeReference<Response<List<HashMap<String, Object>>>>() {});
-    serverList = response.getData();
-    assertEquals(0, serverList.size());
-    content = TestUtils.httpGet(NODES_URL + "?status=ACTIVE");
-    response = objectMapper.readValue(content, new 
TypeReference<Response<List<HashMap<String, Object>>>>() {});
-    serverList = response.getData();
-    assertEquals(2, serverList.size());
+  @Test
+  public void testLostNodesServlet() throws IOException {
+    SimpleClusterManager clusterManager = (SimpleClusterManager) 
coordinatorServer.getClusterManager();
+    ShuffleServer shuffleServer3 = shuffleServers.get(2);
+    ShuffleServer shuffleServer4 = shuffleServers.get(3);
+    Map<String, ServerNode> servers = clusterManager.getServers();
+    
servers.get(shuffleServer3.getId()).setTimestamp(System.currentTimeMillis() - 
40000);
+    
servers.get(shuffleServer4.getId()).setTimestamp(System.currentTimeMillis() - 
40000);
+    clusterManager.nodesCheckTest();
+    List<String> expectShuffleIds = Arrays.asList(shuffleServer3.getId(), 
shuffleServer4.getId());
+    List<String> shuffleIds = new ArrayList<>();
+    Response<List<HashMap<String, Object>>> response = 
objectMapper.readValue(TestUtils.httpGet(LOSTNODES_URL),
+        new TypeReference<Response<List<HashMap<String, Object>>>>() {});
+    List<HashMap<String, Object>> serverList = response.getData();
+    for (HashMap<String, Object> stringObjectHashMap : serverList) {
+      String shuffleId = (String) stringObjectHashMap.get("id");
+      shuffleIds.add(shuffleId);
+    }
+    assertTrue(CollectionUtils.isEqualCollection(expectShuffleIds, 
shuffleIds));
+  }
+
+  @Test
+  public void testUnhealthyNodesServlet() {
+    ShuffleServer shuffleServer3 = shuffleServers.get(2);
+    ShuffleServer shuffleServer4 = shuffleServers.get(3);
+    shuffleServer3.markUnhealthy();
+    shuffleServer4.markUnhealthy();
+    List<String> expectShuffleIds = 
Arrays.asList(shuffleServer3.getId(),shuffleServer4.getId());
+    List<String> shuffleIds = new ArrayList<>();
+    Awaitility.await().atMost(30,TimeUnit.SECONDS).until(() -> {
+      Response<List<HashMap<String, Object>>> response = 
objectMapper.readValue(TestUtils.httpGet(UNHEALTHYNODES_URL), 
+          new TypeReference<Response<List<HashMap<String, Object>>>>() {});
+      List<HashMap<String, Object>> serverList = response.getData();
+      for (HashMap<String, Object> stringObjectHashMap : serverList) {
+        String shuffleId = (String) stringObjectHashMap.get("id");
+        shuffleIds.add(shuffleId);
+      }
+      return serverList.size() == 2;
+    });
+    assertTrue(CollectionUtils.isEqualCollection(expectShuffleIds, 
shuffleIds));
   }
 
   @Test
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 2099a035..d1e66464 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -239,6 +239,8 @@ enum ServerStatus {
   ACTIVE = 0;
   DECOMMISSIONING = 1;
   DECOMMISSIONED = 2;
+  LOST = 3;
+  UNHEALTHY = 4;
   // todo: more status, such as UPGRADING
 }
 
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index fe100b62..212dd2d3 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -457,6 +457,11 @@ public class ShuffleServer {
     return isHealthy.get();
   }
 
+  @VisibleForTesting
+  public void markUnhealthy() {
+    isHealthy.set(false);
+  }
+
   public GRPCMetrics getGrpcMetrics() {
     return grpcMetrics;
   }

Reply via email to