This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch cluster_scalability in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit eb45f3063414f129eee8bb5e4f99fe530dd1f541 Author: lta <[email protected]> AuthorDate: Wed Apr 7 22:25:31 2021 +0800 This commit fix following issues: 1. fix a dead-lock of headerGroupMap and asyncServiceMap in method addDataGroup. 2. add synchronized monitor in heartbeat wait object. --- .../iotdb/cluster/server/DataClusterServer.java | 22 +++++++---- .../cluster/server/member/DataGroupMember.java | 43 +++------------------- .../cluster/server/member/MetaGroupMember.java | 32 ++++++++++------ 3 files changed, 41 insertions(+), 56 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java index 0e48aac..91d49cf 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java @@ -122,15 +122,18 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async public DataGroupMember addDataGroupMember(DataGroupMember dataGroupMember, RaftNode header) { synchronized (headerGroupMap) { if (headerGroupMap.containsKey(header)) { - logger.debug("group {} already exist.", dataGroupMember.getAllNodes()); + logger.debug("Group {} already exist.", dataGroupMember.getAllNodes()); return headerGroupMap.get(header); } stoppedMemberManager.remove(header); headerGroupMap.put(header, dataGroupMember); - resetServiceCache(header); + dataGroupMember.start(); - return dataGroupMember; } + logger.info("Add group {} successfully.", dataGroupMember.getName()); + resetServiceCache(header); // avoid dead-lock + + return dataGroupMember; } private void resetServiceCache(RaftNode header) { @@ -597,12 +600,17 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async * @param header * @param dataGroupMember */ - private void removeMember(RaftNode header, DataGroupMember dataGroupMember, boolean waitFollowersToSync) { + private void removeMember(RaftNode header, DataGroupMember dataGroupMember, boolean removedGroup) { dataGroupMember.setReadOnly(); - if (waitFollowersToSync && dataGroupMember.getCharacter() == NodeCharacter.LEADER) { - dataGroupMember.getAppendLogThreadPool().submit(() -> dataGroupMember.waitFollowersToSync()); - } else { + if (!removedGroup) { dataGroupMember.stop(); + } else { + if (dataGroupMember.getCharacter() != NodeCharacter.LEADER) { + new Thread(()-> { + dataGroupMember.syncLeader(); + dataGroupMember.stop(); + }).start(); + } } stoppedMemberManager.put(header, dataGroupMember); logger.info("Data group member has removed, header {}, group is {}.", header, diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java index 8cb12cc..f142f67 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java @@ -350,6 +350,8 @@ public class DataGroupMember extends RaftMember { synchronized (term) { setCharacter(NodeCharacter.ELECTOR); setLeader(null); + } + synchronized (getHeartBeatWaitObject()) { getHeartBeatWaitObject().notifyAll(); } } @@ -759,6 +761,8 @@ public class DataGroupMember extends RaftMember { synchronized (term) { setCharacter(NodeCharacter.ELECTOR); setLeader(null); + } + synchronized (getHeartBeatWaitObject()) { getHeartBeatWaitObject().notifyAll(); } } @@ -796,39 +800,6 @@ public class DataGroupMember extends RaftMember { } /** - * When the header of a partition group is removed, it needs to wait all followers to sync data - * because there has no new leader. - */ - public void waitFollowersToSync() { - try { - for (Map.Entry<Node, Peer> entry : peerMap.entrySet()) { - Node node = entry.getKey(); - if (node.equals(thisNode)) { - continue; - } - Peer peer = entry.getValue(); - while (peer.getMatchIndex() < logManager.getCommitLogIndex()) { - try { - Thread.sleep(10); - if (character != NodeCharacter.LEADER) { - return; - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger - .warn("{}: Unexpected interruption when waiting follower {} to sync, raft id is {}", - name, node, getRaftGroupId()); - } - } - logger.info("{}: Follower {} has synced with leader, raft id is {}", name, node, - getRaftGroupId()); - } - } finally { - stop(); - } - } - - /** * Generate a report containing the character, leader, term, last log term, last log index, header * and readOnly or not of this member. */ @@ -864,7 +835,7 @@ public class DataGroupMember extends RaftMember { } public boolean onSnapshotInstalled(List<Integer> slots) { - if (!isHasSyncedLeaderBeforeRemoved()) { + if (getMetaGroupMember().getPartitionTable().getAllNodes().contains(thisNode)) { getMetaGroupMember().waitUtil(getMetaGroupMember().getPartitionTable().getLastMetaLogIndex()); } if (logger.isDebugEnabled()) { @@ -926,10 +897,6 @@ public class DataGroupMember extends RaftMember { lastAppliedPartitionTableVersion.save(); } - public long getLastAppliedPartitionTableVersion() { - return lastAppliedPartitionTableVersion.getVersion(); - } - private class LastAppliedPatitionTableVersion { private static final String VERSION_FILE_NAME = "LAST_PARTITION_TABLE_VERSION"; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java index 4a33693..f064c86 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java @@ -605,6 +605,7 @@ public class MetaGroupMember extends RaftMember { logger.info("Node {} admitted this node into the cluster", node); ByteBuffer partitionTableBuffer = resp.partitionTableBytes; acceptPartitionTable(partitionTableBuffer, true); + getDataClusterServer().pullSnapshots(); return true; } else if (resp.getRespNum() == Response.RESPONSE_IDENTIFIER_CONFLICT) { logger.info("The identifier {} conflicts the existing ones, regenerate a new one", @@ -719,8 +720,6 @@ public class MetaGroupMember extends RaftMember { updateNodeList(newTable.getAllNodes()); startSubServers(); - - getDataClusterServer().pullSnapshots(); } private void updateNodeList(Collection<Node> nodes) { @@ -2219,20 +2218,31 @@ public class MetaGroupMember extends RaftMember { getDataClusterServer().removeNode(oldNode, result); // the leader is removed, start the next election ASAP - if (oldNode.equals(leader.get())) { - setCharacter(NodeCharacter.ELECTOR); - setLeader(ClusterConstant.EMPTY_NODE); - lastHeartbeatReceivedTime = Long.MIN_VALUE; + if (oldNode.equals(leader.get()) && !oldNode.equals(thisNode)) { + synchronized (term) { + setCharacter(NodeCharacter.ELECTOR); + setLeader(null); + } + synchronized (getHeartBeatWaitObject()) { + getHeartBeatWaitObject().notifyAll(); + } } if (oldNode.equals(thisNode)) { // use super.stop() so that the data server will not be closed because other nodes may // want to pull data from this node - super.stop(); - if (clientServer != null) { - clientServer.stop(); - } - logger.info("{} has been removed from the cluster", name); + new Thread(() -> { + try { + Thread.sleep(RaftServer.getHeartBeatIntervalMs()); + } catch (InterruptedException e) { + //ignore + } + super.stop(); + if (clientServer != null) { + clientServer.stop(); + } + logger.info("{} has been removed from the cluster", name); + }).start(); } else if (thisNode.equals(leader.get())) { // as the old node is removed, it cannot know this by heartbeat or log, so it should be // directly kicked out of the cluster
