This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch cluster_scalability in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 82abad07ae4509752809f1be84702176e7b1af44 Author: lta <[email protected]> AuthorDate: Thu Apr 1 11:01:16 2021 +0800 This commit fix following issues: 1. add lock to put DataGroupMember to headerMap in order to handle concurrent issue 2. fix a bug of using not started DataGroupMember to pull snapshot 3. put exile to backgroud --- .../java/org/apache/iotdb/cluster/ClusterMain.java | 2 +- .../iotdb/cluster/config/ClusterConstant.java | 2 ++ .../cluster/log/snapshot/PullSnapshotTask.java | 2 ++ .../iotdb/cluster/query/reader/DataSourceInfo.java | 6 ++-- .../iotdb/cluster/server/DataClusterServer.java | 33 +++++++++---------- .../cluster/server/member/DataGroupMember.java | 37 ++++++++++++++++------ .../cluster/server/member/MetaGroupMember.java | 33 ++++++++++++++++--- .../iotdb/cluster/server/member/RaftMember.java | 18 ++++------- .../cluster/server/member/MetaGroupMemberTest.java | 3 +- 9 files changed, 91 insertions(+), 45 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java index 8164e8e..ecef889 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java @@ -86,7 +86,7 @@ public class ClusterMain { } IoTDBDescriptor.getInstance().getConfig().setSyncEnable(false); - IoTDBDescriptor.getInstance().getConfig().setEnablePartition(true); + IoTDBDescriptor.getInstance().getConfig().setEnablePartition(false); logger.info("Running mode {}", mode); if (MODE_START.equals(mode)) { try { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java index 8e3f304..b846849 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConstant.java @@ -37,6 +37,8 @@ public class ClusterConstant { public static final int RETRY_WAIT_TIME_MS = 10; + public static final int THREAD_POLL_WAIT_TERMINATION_TIME = 10 * 1000; + public static final Node EMPTY_NODE = new Node(); private ClusterConstant() { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java index e1ae342..8f46616 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java @@ -181,6 +181,7 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Void> { try { // sequentially pick up a node that may have this slot nodeIndex = (nodeIndex + 1) % descriptor.getPreviousHolders().size(); + long startTime = System.currentTimeMillis(); finished = pullSnapshot(nodeIndex); if (!finished) { if (logger.isDebugEnabled()) { @@ -191,6 +192,7 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Void> { .sleep( ClusterDescriptor.getInstance().getConfig().getPullSnapshotRetryIntervalMs()); } + logger.debug("{}: Data migration ends, cost {}ms", newMember, (System.currentTimeMillis() - startTime)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); finished = true; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java index 8dcd45f..24b6bc5 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java @@ -90,19 +90,21 @@ public class DataSourceInfo { Long newReaderId = getReaderId(node, byTimestamp, timestamp); if (newReaderId != null) { logger.debug("get a readerId {} for {} from {}", newReaderId, request.path, node); - if (newReaderId != -1) { + if (newReaderId >= 0) { // register the node so the remote resources can be released context.registerRemoteNode(node, partitionGroup.getHeader(), partitionGroup.getId()); this.readerId = newReaderId; this.curSource = node; this.curPos = nextNodePos; return true; - } else { + } else if (newReaderId == -1){ // the id being -1 means there is no satisfying data on the remote node, create an // empty reader to reduce further communication this.isNoClient = true; this.isNoData = true; return false; + } else { + logger.debug("change other client for better query performance."); } } } catch (TException | IOException e) { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java index 20e1471..0e48aac 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java @@ -119,17 +119,18 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async * * @param dataGroupMember */ - public void addDataGroupMember(DataGroupMember dataGroupMember) { - RaftNode header = new RaftNode(dataGroupMember.getHeader(), - dataGroupMember.getRaftGroupId()); - if (headerGroupMap.containsKey(header)) { - logger.debug("group {} already exist.", dataGroupMember.getAllNodes()); - return; + public DataGroupMember addDataGroupMember(DataGroupMember dataGroupMember, RaftNode header) { + synchronized (headerGroupMap) { + if (headerGroupMap.containsKey(header)) { + logger.debug("group {} already exist.", dataGroupMember.getAllNodes()); + return headerGroupMap.get(header); + } + stoppedMemberManager.remove(header); + headerGroupMap.put(header, dataGroupMember); + resetServiceCache(header); + dataGroupMember.start(); + return dataGroupMember; } - stoppedMemberManager.remove(header); - headerGroupMap.put(header, dataGroupMember); - resetServiceCache(header); - dataGroupMember.start(); } private void resetServiceCache(RaftNode header) { @@ -541,9 +542,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async */ public void addNode(Node node, NodeAdditionResult result) { // If the node executed adding itself to the cluster, it's unnecessary to add new groups because they already exist. - // Just pull snapshot. if (node.equals(thisNode)) { - pullSnapshots(); return; } Iterator<Entry<RaftNode, DataGroupMember>> entryIterator = headerGroupMap.entrySet().iterator(); @@ -566,9 +565,10 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async } for (PartitionGroup newGroup : result.getNewGroupList()) { if (newGroup.contains(thisNode)) { + RaftNode header = new RaftNode(newGroup.getHeader(), newGroup.getId()); logger.info("Adding this node into a new group {}", newGroup); DataGroupMember dataGroupMember = dataMemberFactory.create(newGroup, thisNode); - addDataGroupMember(dataGroupMember); + dataGroupMember = addDataGroupMember(dataGroupMember, header); dataGroupMember .pullNodeAdditionSnapshots(((SlotPartitionTable) partitionTable).getNodeSlots(node, newGroup.getId()), node); @@ -631,13 +631,14 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async List<PartitionGroup> partitionGroups = partitionTable.getLocalGroups(); for (PartitionGroup partitionGroup : partitionGroups) { - DataGroupMember prevMember = headerGroupMap.get(new RaftNode(partitionGroup.getHeader(), partitionGroup.getId())); + RaftNode header = new RaftNode(partitionGroup.getHeader(), partitionGroup.getId()); + DataGroupMember prevMember = headerGroupMap.get(header); if (prevMember == null || !prevMember.getAllNodes().equals(partitionGroup)) { logger.info("Building member of data group: {}", partitionGroup); // no previous member or member changed DataGroupMember dataGroupMember = dataMemberFactory.create(partitionGroup, thisNode); // the previous member will be replaced here - addDataGroupMember(dataGroupMember); + addDataGroupMember(dataGroupMember, header); dataGroupMember.setUnchanged(true); } else { prevMember.setUnchanged(true); @@ -698,7 +699,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async if (!headerGroupMap.containsKey(header)) { logger.info("{} should join a new group {}", thisNode, group); DataGroupMember dataGroupMember = dataMemberFactory.create(group, thisNode); - addDataGroupMember(dataGroupMember); + addDataGroupMember(dataGroupMember, header); } // pull new slots from the removed node headerGroupMap.get(header).pullSlots(removalResult); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java index 962c466..8cb12cc 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java @@ -19,8 +19,9 @@ package org.apache.iotdb.cluster.server.member; +import static org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME; + import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; @@ -202,6 +203,10 @@ public class DataGroupMember extends RaftMember { heartBeatService.submit(new DataHeartbeatThread(this)); pullSnapshotService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); pullSnapshotHintService = new PullSnapshotHintService(this); + logger.info("{}: has inited pullSnapshotService and pullSnapshotHintService", name); + if (pullSnapshotHintService == null) { + logger.error("{}: pullSnapshotHintService is null", name); + } pullSnapshotHintService.start(); resumePullSnapshotTasks(); } @@ -217,7 +222,7 @@ public class DataGroupMember extends RaftMember { if (pullSnapshotService != null) { pullSnapshotService.shutdownNow(); try { - pullSnapshotService.awaitTermination(10, TimeUnit.SECONDS); + pullSnapshotService.awaitTermination(THREAD_POLL_WAIT_TERMINATION_TIME, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.error("Unexpected interruption when waiting for pullSnapshotService to end", e); @@ -234,12 +239,24 @@ public class DataGroupMember extends RaftMember { logger.info("{}: stopped", name); } - /** - * The first node (on the hash ring) in this data group is the header. It determines the duty - * (what range on the ring do the group take responsibility for) of the group and although other - * nodes in this may change, this node is unchangeable unless the data group is dismissed. It is - * also the identifier of this data group. - */ + @Override + long checkElectorLogProgress(ElectionRequest electionRequest) { + Node elector = electionRequest.getElector(); + // check if the node is in the group + if (!allNodes.contains(elector)) { + logger.info("{}: the elector {} is not in the data group {}, so reject this election.", name, + getPartitionGroup(), elector); + return Response.RESPONSE_NODE_IS_NOT_IN_GROUP; + } + return super.checkElectorLogProgress(electionRequest); + } + + /** + * The first node (on the hash ring) in this data group is the header. It determines the duty + * (what range on the ring do the group take responsibility for) of the group and although other + * nodes in this may change, this node is unchangeable unless the data group is dismissed. It is + * also the identifier of this data group. + */ @Override public Node getHeader() { return allNodes.get(0); @@ -370,7 +387,9 @@ public class DataGroupMember extends RaftMember { } // Make sure local data is complete. - if (lastAppliedPartitionTableVersion.getVersion() != metaGroupMember.getPartitionTable().getLastMetaLogIndex()) { + if (character != NodeCharacter.LEADER + && lastAppliedPartitionTableVersion.getVersion() != metaGroupMember.getPartitionTable() + .getLastMetaLogIndex()) { return null; } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java index 95dbd87..4a33693 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java @@ -19,6 +19,7 @@ package org.apache.iotdb.cluster.server.member; +import static org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME; import static org.apache.iotdb.cluster.utils.ClusterUtils.WAIT_START_UP_CHECK_TIME_SEC; import static org.apache.iotdb.cluster.utils.ClusterUtils.analyseStartUpCheckResult; @@ -94,6 +95,7 @@ import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable; import org.apache.iotdb.cluster.query.ClusterPlanRouter; import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse; import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse; +import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest; import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest; import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse; import org.apache.iotdb.cluster.rpc.thrift.Node; @@ -379,7 +381,7 @@ public class MetaGroupMember extends RaftMember { if (reportThread != null) { reportThread.shutdownNow(); try { - reportThread.awaitTermination(10, TimeUnit.SECONDS); + reportThread.awaitTermination(THREAD_POLL_WAIT_TERMINATION_TIME, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.error("Unexpected interruption when waiting for reportThread to end", e); @@ -388,7 +390,7 @@ public class MetaGroupMember extends RaftMember { if (hardLinkCleanerThread != null) { hardLinkCleanerThread.shutdownNow(); try { - hardLinkCleanerThread.awaitTermination(10, TimeUnit.SECONDS); + hardLinkCleanerThread.awaitTermination(THREAD_POLL_WAIT_TERMINATION_TIME, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.error("Unexpected interruption when waiting for hardlinkCleaner to end", e); @@ -638,6 +640,19 @@ public class MetaGroupMember extends RaftMember { throw new ConfigInconsistentException(); } + + @Override + long checkElectorLogProgress(ElectionRequest electionRequest) { + Node elector = electionRequest.getElector(); + // check if the node is in the group + if (partitionTable != null && !allNodes.contains(elector)) { + logger.info("{}: the elector {} is not in the data group {}, so reject this election.", name, + getPartitionGroup(), elector); + return Response.RESPONSE_NODE_IS_NOT_IN_GROUP; + } + return super.checkElectorLogProgress(electionRequest); + } + /** * Process the heartbeat request from a valid leader. Generate and tell the leader the identifier * of the node if necessary. If the partition table is missing, use the one from the request or @@ -704,6 +719,8 @@ public class MetaGroupMember extends RaftMember { updateNodeList(newTable.getAllNodes()); startSubServers(); + + getDataClusterServer().pullSnapshots(); } private void updateNodeList(Collection<Node> nodes) { @@ -1955,7 +1972,7 @@ public class MetaGroupMember extends RaftMember { return collectMigrationStatusSync(node); } } catch (TException | InterruptedException e) { - logger.warn("Cannot get the status of all nodes", e); + logger.error("{}: Cannot get the status of node {}", name, node, e); } return null; } @@ -2219,7 +2236,7 @@ public class MetaGroupMember extends RaftMember { } else if (thisNode.equals(leader.get())) { // as the old node is removed, it cannot know this by heartbeat or log, so it should be // directly kicked out of the cluster - exileNode(removeNodeLog); + getAppendLogThreadPool().submit(() -> exileNode(removeNodeLog)); } if (logger.isDebugEnabled()) { @@ -2291,6 +2308,9 @@ public class MetaGroupMember extends RaftMember { syncLeader(); Map<PartitionGroup, Integer> res = new HashMap<>(); for (Node node: allNodes) { + if (logger.isDebugEnabled()) { + logger.debug("{}: start to get migration status of {}", name, node); + } Map<PartitionGroup, Integer> oneNodeRes; if (node.equals(thisNode)) { oneNodeRes = collectMigrationStatus(); @@ -2312,8 +2332,11 @@ public class MetaGroupMember extends RaftMember { * @return key: data group; value: slot num in data migration */ public Map<PartitionGroup, Integer> collectMigrationStatus() { - logger.info("{}: start to collect migration status.", name); + logger.info("{}: start to collect migration status locally.", name); Map<PartitionGroup, Integer> groupSlotMap = new HashMap<>(); + if (getPartitionTable() == null) { + return groupSlotMap; + } Map<RaftNode, DataGroupMember> headerMap = getDataClusterServer().getHeaderGroupMap(); waitUtil(getPartitionTable().getLastMetaLogIndex()); synchronized (headerMap) { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java index 5f5579f..813dbf3 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java @@ -19,6 +19,8 @@ package org.apache.iotdb.cluster.server.member; +import static org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME; + import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; import java.net.SocketTimeoutException; @@ -339,9 +341,9 @@ public abstract class RaftMember { catchUpService.shutdownNow(); appendLogThreadPool.shutdownNow(); try { - heartBeatService.awaitTermination(10, TimeUnit.SECONDS); - catchUpService.awaitTermination(10, TimeUnit.SECONDS); - appendLogThreadPool.awaitTermination(10, TimeUnit.SECONDS); + heartBeatService.awaitTermination(THREAD_POLL_WAIT_TERMINATION_TIME, TimeUnit.SECONDS); + catchUpService.awaitTermination(THREAD_POLL_WAIT_TERMINATION_TIME, TimeUnit.SECONDS); + appendLogThreadPool.awaitTermination(THREAD_POLL_WAIT_TERMINATION_TIME, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.error("Unexpected interruption when waiting for heartBeatService and catchUpService " @@ -350,7 +352,7 @@ public abstract class RaftMember { if (serialToParallelPool != null) { serialToParallelPool.shutdownNow(); try { - serialToParallelPool.awaitTermination(10, TimeUnit.SECONDS); + serialToParallelPool.awaitTermination(THREAD_POLL_WAIT_TERMINATION_TIME, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.error("Unexpected interruption when waiting for asyncThreadPool to end", e); @@ -360,7 +362,7 @@ public abstract class RaftMember { if (commitLogPool != null) { commitLogPool.shutdownNow(); try { - commitLogPool.awaitTermination(10, TimeUnit.SECONDS); + commitLogPool.awaitTermination(THREAD_POLL_WAIT_TERMINATION_TIME, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.error("Unexpected interruption when waiting for commitLogPool to end", e); @@ -1139,12 +1141,6 @@ public abstract class RaftMember { long thatLastLogTerm = electionRequest.getLastLogTerm(); Node elector = electionRequest.getElector(); - // check if the node is in the group - if (!allNodes.contains(elector)) { - logger.info("{}: the elector {} is not in the group, so reject this election.", name, elector); - return Response.RESPONSE_NODE_IS_NOT_IN_GROUP; - } - // check the log progress of the elector long resp = checkLogProgress(thatLastLogIndex, thatLastLogTerm); if (resp == Response.RESPONSE_AGREE) { diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java index 2eb61bd..4347539 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java @@ -515,9 +515,10 @@ public class MetaGroupMemberTest extends MemberTest { dataClusterServer.setPartitionTable(partitionTable); for (PartitionGroup partitionGroup : partitionGroups) { + RaftNode header = new RaftNode(partitionGroup.getHeader(), partitionGroup.getId()); DataGroupMember dataGroupMember = getDataGroupMember(partitionGroup, TestUtils.getNode(0)); dataGroupMember.start(); - dataClusterServer.addDataGroupMember(dataGroupMember); + dataClusterServer.addDataGroupMember(dataGroupMember, header); } }
