This is an automated email from the ASF dual-hosted git repository. jshao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
commit d92208ddb1edca13fcb6cb31a8980b2052f29d7b Author: Junfan Zhang <junfan.zh...@outlook.com> AuthorDate: Thu Jun 23 15:30:19 2022 +0800 [Minor] Remove serverNode from tags structure when heartbeart timeout (#193) ### What changes were proposed in this pull request? Remove serverNode from tags structure when heartbeart timeout ### Why are the changes needed? Remove serverNode from tags structure when heartbeart timeout ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT --- .../com/tencent/rss/coordinator/ServerNode.java | 7 ++++++ .../rss/coordinator/SimpleClusterManager.java | 9 ++++++-- .../rss/coordinator/SimpleClusterManagerTest.java | 27 ++++++++++++++++++++++ 3 files changed, 41 insertions(+), 2 deletions(-) diff --git a/coordinator/src/main/java/com/tencent/rss/coordinator/ServerNode.java b/coordinator/src/main/java/com/tencent/rss/coordinator/ServerNode.java index ef09298..816f080 100644 --- a/coordinator/src/main/java/com/tencent/rss/coordinator/ServerNode.java +++ b/coordinator/src/main/java/com/tencent/rss/coordinator/ServerNode.java @@ -115,6 +115,13 @@ public class ServerNode implements Comparable<ServerNode> { + ", healthy[" + isHealthy + "]"; } + /** + * Only for test case + */ + void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + @Override public int compareTo(ServerNode other) { if (availableMemory > other.getAvailableMemory()) { diff --git a/coordinator/src/main/java/com/tencent/rss/coordinator/SimpleClusterManager.java b/coordinator/src/main/java/com/tencent/rss/coordinator/SimpleClusterManager.java index d3fe789..10af74d 100644 --- a/coordinator/src/main/java/com/tencent/rss/coordinator/SimpleClusterManager.java +++ b/coordinator/src/main/java/com/tencent/rss/coordinator/SimpleClusterManager.java @@ -72,7 +72,7 @@ public class SimpleClusterManager implements ClusterManager { } } - private void nodesCheck() { + void nodesCheck() { try { long timestamp = System.currentTimeMillis(); Set<String> deleteIds = Sets.newHashSet(); @@ -83,7 +83,12 @@ public class SimpleClusterManager implements ClusterManager { } } for (String serverId : deleteIds) { - servers.remove(serverId); + ServerNode sn = servers.remove(serverId); + if (sn != null) { + for (Set<ServerNode> nodesWithTag : tagToNodes.values()) { + nodesWithTag.remove(sn); + } + } } CoordinatorMetrics.gaugeTotalServerNum.set(servers.size()); diff --git a/coordinator/src/test/java/com/tencent/rss/coordinator/SimpleClusterManagerTest.java b/coordinator/src/test/java/com/tencent/rss/coordinator/SimpleClusterManagerTest.java index a5040bf..bed9081 100644 --- a/coordinator/src/test/java/com/tencent/rss/coordinator/SimpleClusterManagerTest.java +++ b/coordinator/src/test/java/com/tencent/rss/coordinator/SimpleClusterManagerTest.java @@ -27,6 +27,7 @@ import java.util.Set; import com.google.common.collect.Sets; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -142,6 +143,32 @@ public class SimpleClusterManagerTest { assertEquals(0, serverNodes.size()); } + @Test + public void testGetCorrectServerNodesWhenOneNodeRemoved() { + CoordinatorConf ssc = new CoordinatorConf(); + ssc.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 30 * 1000L); + SimpleClusterManager clusterManager = new SimpleClusterManager(ssc); + ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20, + 10, testTags, true); + ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21, + 10, testTags, true); + ServerNode sn3 = new ServerNode("sn3", "ip", 0, 100L, 50L, 20, + 11, testTags, true); + clusterManager.add(sn1); + clusterManager.add(sn2); + clusterManager.add(sn3); + List<ServerNode> serverNodes = clusterManager.getServerList(testTags); + assertEquals(3, serverNodes.size()); + + sn3.setTimestamp(System.currentTimeMillis() - 60 * 1000L); + clusterManager.nodesCheck(); + + Map<String, Set<ServerNode>> tagToNodes = clusterManager.getTagToNodes(); + List<ServerNode> serverList = clusterManager.getServerList(testTags); + Assertions.assertEquals(2, tagToNodes.get(testTags.iterator().next()).size()); + Assertions.assertEquals(2, serverList.size()); + } + @Test public void updateExcludeNodesTest() throws Exception { String excludeNodesFolder = (new File(ClassLoader.getSystemResource("empty").getFile())).getParent();