xianjingfeng commented on code in PR #938:
URL: https://github.com/apache/incubator-uniffle/pull/938#discussion_r1225710757


##########
coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java:
##########
@@ -129,6 +129,77 @@ public void getServerListTest() throws Exception {
     }
   }
 
+  @Test
+  public void getLostedServerListTest() throws Exception {
+    CoordinatorConf coordinatorConf = new CoordinatorConf();
+    // Shorten the heartbeat time
+    coordinatorConf.setLong("rss.coordinator.server.heartbeat.timeout",3 * 
1000L);
+    SimpleClusterManager clusterManager = new 
SimpleClusterManager(coordinatorConf, new Configuration());
+    ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
+        10, grpcTags, true);
+    ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
+        10, grpcTags, true);
+    ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
+        11, grpcTags, true);
+    clusterManager.add(sn1);
+    clusterManager.add(sn2);
+    clusterManager.add(sn3);
+    // Analog timeout registration
+    Thread.sleep(4000);
+    List<ServerNode> lostServerList = clusterManager.getLostServerList();
+    Set<String> expectedIds = Sets.newHashSet("sn1", "sn2", "sn3");
+    assertEquals(expectedIds, 
lostServerList.stream().map(ServerNode::getId).collect(Collectors.toSet()));
+    // re-register sn3
+    sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
+        11, grpcTags, true);
+    clusterManager.add(sn3);
+    Thread.sleep(2000);
+    // Retrieve listed ServerNode List
+    List<ServerNode> lostServerListre = clusterManager.getLostServerList();
+    Set<String> expectedIdsre = Sets.newHashSet("sn1", "sn2");
+    assertEquals(expectedIdsre, 
lostServerListre.stream().map(ServerNode::getId).collect(Collectors.toSet()));
+  }
+
+  @Test
+  public void getUnhealthyServerList() throws Exception {
+    CoordinatorConf coordinatorConf = new CoordinatorConf();
+    // Shorten the heartbeat time
+    coordinatorConf.setLong("rss.coordinator.server.heartbeat.timeout",3 * 
1000L);
+    SimpleClusterManager clusterManager = new 
SimpleClusterManager(coordinatorConf, new Configuration());
+    ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
+        10, grpcTags, true);
+    ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
+        10, grpcTags, true);
+    ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
+        11, grpcTags, false);
+    ServerNode sn4 = new ServerNode("sn4", "ip", 0, 100L, 50L, 20,
+        11, grpcTags, false);
+    clusterManager.add(sn1);
+    clusterManager.add(sn2);
+    clusterManager.add(sn3);
+    clusterManager.add(sn4);
+    // Analog timeout registration
+    Thread.sleep(2000);
+    List<ServerNode> unhealthyServerList = 
clusterManager.getUnhealthyServerList();
+    Set<String> expectedIds = Sets.newHashSet("sn3","sn4");
+    assertEquals(expectedIds, 
unhealthyServerList.stream().map(ServerNode::getId).collect(Collectors.toSet()));
+    // Register unhealthy node sn3 again
+    sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
+        11, grpcTags, false);
+    clusterManager.add(sn3);
+    // Analog timeout registration
+    Thread.sleep(2000);
+    // In this case, the only unhealthy node is sn3, because sn4 moves to the 
lost node after the heartbeat time
+    List<ServerNode> unhealthyServerListre = 
clusterManager.getUnhealthyServerList();
+    Set<String> expectedIdsre = Sets.newHashSet("sn3");
+    assertEquals(expectedIdsre, 
unhealthyServerListre.stream().map(ServerNode::getId).collect(Collectors.toSet()));
+    // At this point verify that sn4 is in the lost list
+    List<ServerNode> lostremoveunhealthy = clusterManager.getLostServerList();
+    Set<String> expectedIdlostremoveunhealthy = Sets.newHashSet("sn1", 
"sn2","sn4");

Review Comment:
   ```suggestion
       Set<String> expectedIdlostremoveunhealthy = Sets.newHashSet("sn1", 
"sn2", "sn4");
   ```



##########
coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java:
##########
@@ -129,6 +129,77 @@ public void getServerListTest() throws Exception {
     }
   }
 
+  @Test
+  public void getLostedServerListTest() throws Exception {
+    CoordinatorConf coordinatorConf = new CoordinatorConf();
+    // Shorten the heartbeat time
+    coordinatorConf.setLong("rss.coordinator.server.heartbeat.timeout",3 * 
1000L);
+    SimpleClusterManager clusterManager = new 
SimpleClusterManager(coordinatorConf, new Configuration());
+    ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
+        10, grpcTags, true);
+    ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
+        10, grpcTags, true);
+    ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
+        11, grpcTags, true);
+    clusterManager.add(sn1);
+    clusterManager.add(sn2);
+    clusterManager.add(sn3);
+    // Analog timeout registration
+    Thread.sleep(4000);
+    List<ServerNode> lostServerList = clusterManager.getLostServerList();
+    Set<String> expectedIds = Sets.newHashSet("sn1", "sn2", "sn3");
+    assertEquals(expectedIds, 
lostServerList.stream().map(ServerNode::getId).collect(Collectors.toSet()));
+    // re-register sn3
+    sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
+        11, grpcTags, true);
+    clusterManager.add(sn3);
+    Thread.sleep(2000);
+    // Retrieve listed ServerNode List
+    List<ServerNode> lostServerListre = clusterManager.getLostServerList();
+    Set<String> expectedIdsre = Sets.newHashSet("sn1", "sn2");
+    assertEquals(expectedIdsre, 
lostServerListre.stream().map(ServerNode::getId).collect(Collectors.toSet()));
+  }
+
+  @Test
+  public void getUnhealthyServerList() throws Exception {
+    CoordinatorConf coordinatorConf = new CoordinatorConf();
+    // Shorten the heartbeat time
+    coordinatorConf.setLong("rss.coordinator.server.heartbeat.timeout",3 * 
1000L);
+    SimpleClusterManager clusterManager = new 
SimpleClusterManager(coordinatorConf, new Configuration());
+    ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
+        10, grpcTags, true);
+    ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
+        10, grpcTags, true);
+    ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
+        11, grpcTags, false);
+    ServerNode sn4 = new ServerNode("sn4", "ip", 0, 100L, 50L, 20,
+        11, grpcTags, false);
+    clusterManager.add(sn1);
+    clusterManager.add(sn2);
+    clusterManager.add(sn3);
+    clusterManager.add(sn4);
+    // Analog timeout registration
+    Thread.sleep(2000);
+    List<ServerNode> unhealthyServerList = 
clusterManager.getUnhealthyServerList();
+    Set<String> expectedIds = Sets.newHashSet("sn3","sn4");
+    assertEquals(expectedIds, 
unhealthyServerList.stream().map(ServerNode::getId).collect(Collectors.toSet()));
+    // Register unhealthy node sn3 again
+    sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
+        11, grpcTags, false);
+    clusterManager.add(sn3);
+    // Analog timeout registration
+    Thread.sleep(2000);
+    // In this case, the only unhealthy node is sn3, because sn4 moves to the 
lost node after the heartbeat time
+    List<ServerNode> unhealthyServerListre = 
clusterManager.getUnhealthyServerList();
+    Set<String> expectedIdsre = Sets.newHashSet("sn3");
+    assertEquals(expectedIdsre, 
unhealthyServerListre.stream().map(ServerNode::getId).collect(Collectors.toSet()));
+    // At this point verify that sn4 is in the lost list
+    List<ServerNode> lostremoveunhealthy = clusterManager.getLostServerList();
+    Set<String> expectedIdlostremoveunhealthy = Sets.newHashSet("sn1", 
"sn2","sn4");
+    assertEquals(expectedIdlostremoveunhealthy,lostremoveunhealthy.stream()

Review Comment:
   ```suggestion
       assertEquals(expectedIdlostremoveunhealthy, lostremoveunhealthy.stream()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to