xianjingfeng commented on code in PR #938:
URL: https://github.com/apache/incubator-uniffle/pull/938#discussion_r1228908224
##########
coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java:
##########
@@ -129,6 +130,87 @@ public void getServerListTest() throws Exception {
}
}
+ @Test
+ public void getLostServerListTest() throws Exception {
+ CoordinatorConf coordinatorConf = new CoordinatorConf();
+ // Shorten the heartbeat time
+ coordinatorConf.setLong("rss.coordinator.server.heartbeat.timeout", 3 *
1000L);
+ SimpleClusterManager clusterManager = new
SimpleClusterManager(coordinatorConf, new Configuration());
+ ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
+ 10, grpcTags, true);
+ ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
+ 10, grpcTags, true);
+ ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
+ 11, grpcTags, true);
+ clusterManager.add(sn1);
+ clusterManager.add(sn2);
+ clusterManager.add(sn3);
+ // Analog timeout registration
+ await().atMost(4, TimeUnit.SECONDS).until(() -> {
+ Set<String> lostServerList =
clusterManager.getLostServerList().stream()
+ .map(ServerNode::getId).collect(Collectors.toSet());
+ Set<String> expectedIds = Sets.newHashSet("sn1", "sn2", "sn3");
+ return CollectionUtils.isEqualCollection(lostServerList,
expectedIds);
+ }
+ );
+ // re-register sn3
+ sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
+ 11, grpcTags, true);
+ clusterManager.add(sn3);
+ await().atMost(2, TimeUnit.SECONDS).until(() -> {
+ // Retrieve listed ServerNode List
+ Set<String> lostServerListre =
clusterManager.getLostServerList().stream()
+ .map(ServerNode::getId).collect(Collectors.toSet());
+ Set<String> expectedIdsre = Sets.newHashSet("sn1", "sn2");
+ return CollectionUtils.isEqualCollection(lostServerListre,
expectedIdsre);
+ }
+ );
+ }
+
+ @Test
+ public void getUnhealthyServerList() throws Exception {
+ CoordinatorConf coordinatorConf = new CoordinatorConf();
+ // Shorten the heartbeat time
+ coordinatorConf.setLong("rss.coordinator.server.heartbeat.timeout", 3 *
1000L);
+ SimpleClusterManager clusterManager = new
SimpleClusterManager(coordinatorConf, new Configuration());
+ ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
+ 10, grpcTags, true);
+ ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
+ 10, grpcTags, true);
+ ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20,
+ 11, grpcTags, false);
+ ServerNode sn4 = new ServerNode("sn4", "ip", 0, 100L, 50L, 20,
+ 11, grpcTags, false);
+ clusterManager.add(sn1);
+ clusterManager.add(sn2);
+ clusterManager.add(sn3);
+ clusterManager.add(sn4);
+ // Analog timeout registration
+ Thread.sleep(2000);
Review Comment:
Should we remove this line?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]