This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch cluster_scalability in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 325e4f556cb3d520b6a365a89317e997291979c2 Author: lta <[email protected]> AuthorDate: Mon Jan 11 11:26:00 2021 +0800 fix bugs of wrong previous groups, pull snapshot from self and wrong remove local data --- .../iotdb/cluster/log/snapshot/FileSnapshot.java | 6 +-- .../cluster/log/snapshot/PullSnapshotTask.java | 56 ++++++++++--------- .../iotdb/cluster/partition/NodeRemovalResult.java | 5 +- .../iotdb/cluster/partition/PartitionTable.java | 6 +++ .../cluster/partition/slot/SlotPartitionTable.java | 59 ++++++++++++++------ .../iotdb/cluster/server/DataClusterServer.java | 30 ++++------- .../cluster/server/member/DataGroupMember.java | 62 +++++++++++++++++----- .../cluster/server/member/MetaGroupMember.java | 1 - .../iotdb/cluster/server/member/RaftMember.java | 35 ++++++++++++ .../server/heartbeat/MetaHeartbeatThreadTest.java | 5 ++ thrift/src/main/thrift/cluster.thrift | 1 - 11 files changed, 185 insertions(+), 81 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java index 9f1b562..b559879 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java @@ -227,12 +227,10 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot { throw new SnapshotInstallationException(e); } - for (FileSnapshot value : snapshotMap.values()) { - installFileSnapshotSchema(value); - } - for (Entry<Integer, FileSnapshot> integerSnapshotEntry : snapshotMap.entrySet()) { Integer slot = integerSnapshotEntry.getKey(); + FileSnapshot snapshot = integerSnapshotEntry.getValue(); + installFileSnapshotSchema(snapshot); SlotStatus status = slotManager.getStatus(slot); if (status == SlotStatus.PULLING) { // as schemas are set, writes can proceed diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java index 9dc6231..4a79485 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java @@ -39,6 +39,7 @@ import org.apache.iotdb.cluster.client.sync.SyncDataClient; import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.cluster.exception.SnapshotInstallationException; import org.apache.iotdb.cluster.log.Snapshot; +import org.apache.iotdb.cluster.partition.PartitionGroup; import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest; import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp; @@ -163,30 +164,37 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Void> { @Override public Void call() { - request = new PullSnapshotRequest(); - request.setHeader(descriptor.getPreviousHolders().getHeader()); - request.setRaftId(descriptor.getPreviousHolders().getId()); - request.setRequiredSlots(descriptor.getSlots()); - request.setRequireReadOnly(descriptor.isRequireReadOnly()); - - boolean finished = false; - int nodeIndex = -1; - while (!finished) { - try { - // sequentially pick up a node that may have this slot - nodeIndex = (nodeIndex + 1) % descriptor.getPreviousHolders().size(); - finished = pullSnapshot(nodeIndex); - if (!finished) { - Thread - .sleep(ClusterDescriptor.getInstance().getConfig().getPullSnapshotRetryIntervalMs()); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - finished = true; - } catch (TException e) { - if (logger.isDebugEnabled()) { - logger.debug("Cannot pull slot {} from {}, retry", descriptor.getSlots(), - descriptor.getPreviousHolders().get(nodeIndex), e); + // If this node is the member of previous holder, it's unnecessary to pull data again + if (descriptor.getPreviousHolders().contains(newMember.getThisNode())) { + // inform the previous holders that one member has successfully pulled snapshot directly + newMember.registerPullSnapshotHint(descriptor); + } else { + request = new PullSnapshotRequest(); + request.setHeader(descriptor.getPreviousHolders().getHeader()); + request.setRaftId(descriptor.getPreviousHolders().getId()); + request.setRequiredSlots(descriptor.getSlots()); + request.setRequireReadOnly(descriptor.isRequireReadOnly()); + + boolean finished = false; + int nodeIndex = ((PartitionGroup) newMember.getAllNodes()).indexOf(newMember.getThisNode()); + while (!finished) { + try { + // sequentially pick up a node that may have this slot + nodeIndex = (nodeIndex + 1) % descriptor.getPreviousHolders().size(); + finished = pullSnapshot(nodeIndex); + if (!finished) { + Thread + .sleep( + ClusterDescriptor.getInstance().getConfig().getPullSnapshotRetryIntervalMs()); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + finished = true; + } catch (TException e) { + if (logger.isDebugEnabled()) { + logger.debug("Cannot pull slot {} from {}, retry", descriptor.getSlots(), + descriptor.getPreviousHolders().get(nodeIndex), e); + } } } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java index 457af85..5493980 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java @@ -19,6 +19,7 @@ package org.apache.iotdb.cluster.partition; +import java.util.ArrayList; import java.util.List; /** @@ -26,10 +27,10 @@ import java.util.List; */ public class NodeRemovalResult { - private List<PartitionGroup> removedGroupList; + private List<PartitionGroup> removedGroupList = new ArrayList<>(); // if the removed group contains the local node, the local node should join a new group to // preserve the replication number - private List<PartitionGroup> newGroupList; + private List<PartitionGroup> newGroupList = new ArrayList<>(); public PartitionGroup getRemovedGroup(int raftId) { for (PartitionGroup group : removedGroupList) { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java index bd8e518..079aad1 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java @@ -95,6 +95,12 @@ public interface PartitionTable { List<PartitionGroup> getGlobalGroups(); /** + * Judge whether the data of slot is held by node + * @param node target node + */ + boolean judgeHoldSlot(Node node, int slot); + + /** * @param path can be an incomplete path (but should contain a storage group name) e.g., if * "root.sg" is a storage group, then path can not be "root". * @param timestamp diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java index f8f89b9..2a5ae3c 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java @@ -62,7 +62,7 @@ public class SlotPartitionTable implements PartitionTable { private RaftNode[] slotNodes = new RaftNode[ClusterConstant.SLOT_NUM]; // the nodes that each slot belongs to before a new node is added, used for the new node to // find the data source - private Map<RaftNode, Map<Integer, RaftNode>> previousNodeMap = new ConcurrentHashMap<>(); + private Map<RaftNode, Map<Integer, PartitionGroup>> previousNodeMap = new ConcurrentHashMap<>(); //the filed is used for determining which nodes need to be a group. // the data groups which this node belongs to. @@ -164,8 +164,7 @@ public class SlotPartitionTable implements PartitionTable { return ret; } - @Override - public PartitionGroup getHeaderGroup(RaftNode raftNode) { + private PartitionGroup getHeaderGroup(RaftNode raftNode, List<Node> nodeRing) { PartitionGroup ret = new PartitionGroup(raftNode.getRaftId()); // assuming the nodes are [1,2,3,4,5] @@ -187,6 +186,11 @@ public class SlotPartitionTable implements PartitionTable { } @Override + public PartitionGroup getHeaderGroup(RaftNode raftNode) { + return getHeaderGroup(raftNode, this.nodeRing); + } + + @Override public PartitionGroup getHeaderGroup(Node node) { return getHeaderGroup(new RaftNode(node, 0)); } @@ -228,11 +232,13 @@ public class SlotPartitionTable implements PartitionTable { @Override public NodeAdditionResult addNode(Node node) { + List<Node> oldRing; synchronized (nodeRing) { if (nodeRing.contains(node)) { return null; } + oldRing = new ArrayList<>(nodeRing); nodeRing.add(node); nodeRing.sort(Comparator.comparingInt(Node::getNodeIdentifier)); @@ -277,7 +283,7 @@ public class SlotPartitionTable implements PartitionTable { // the slots movement is only done logically, the new node itself will pull data from the // old node - result.setLostSlots(moveSlotsToNew(node)); + result.setLostSlots(moveSlotsToNew(node, oldRing)); return result; } @@ -290,7 +296,7 @@ public class SlotPartitionTable implements PartitionTable { * @param newNode * @return a map recording what slots each group lost. */ - private Map<RaftNode, Set<Integer>> moveSlotsToNew(Node newNode) { + private Map<RaftNode, Set<Integer>> moveSlotsToNew(Node newNode, List<Node> oldRing) { Map<RaftNode, Set<Integer>> result = new HashMap<>(); // as a node is added, the average slots for each node decrease // move the slots to the new node if any previous node have more slots than the new average @@ -315,7 +321,7 @@ public class SlotPartitionTable implements PartitionTable { nodeSlotMap.get(curNode).addAll(slotsToMove); for (Integer slot : slotsToMove) { // record what node previously hold the integer - previousNodeMap.get(curNode).put(slot, entry.getKey()); + previousNodeMap.get(curNode).put(slot, getHeaderGroup(entry.getKey(), oldRing)); slotNodes[slot] = curNode; } result.computeIfAbsent(entry.getKey(), n -> new HashSet<>()).addAll(slotsToMove); @@ -326,7 +332,7 @@ public class SlotPartitionTable implements PartitionTable { nodeSlotMap.get(curNode).addAll(slotsToMove); for (Integer slot : slotsToMove) { // record what node previously hold the integer - previousNodeMap.get(curNode).put(slot, entry.getKey()); + previousNodeMap.get(curNode).put(slot, getHeaderGroup(entry.getKey(), oldRing)); slotNodes[slot] = curNode; } result.get(entry.getKey()).addAll(slotsToMove); @@ -357,15 +363,19 @@ public class SlotPartitionTable implements PartitionTable { } dataOutputStream.writeInt(previousNodeMap.size()); - for (Entry<RaftNode, Map<Integer, RaftNode>> nodeMapEntry : previousNodeMap.entrySet()) { + for (Entry<RaftNode, Map<Integer, PartitionGroup>> nodeMapEntry : previousNodeMap.entrySet()) { dataOutputStream.writeInt(nodeMapEntry.getKey().getNode().getNodeIdentifier()); dataOutputStream.writeInt(nodeMapEntry.getKey().getRaftId()); - Map<Integer, RaftNode> prevHolders = nodeMapEntry.getValue(); + Map<Integer, PartitionGroup> prevHolders = nodeMapEntry.getValue(); dataOutputStream.writeInt(prevHolders.size()); - for (Entry<Integer, RaftNode> integerNodeEntry : prevHolders.entrySet()) { + for (Entry<Integer, PartitionGroup> integerNodeEntry : prevHolders.entrySet()) { dataOutputStream.writeInt(integerNodeEntry.getKey()); - dataOutputStream.writeInt(integerNodeEntry.getValue().getNode().getNodeIdentifier()); - dataOutputStream.writeInt(integerNodeEntry.getValue().getRaftId()); + PartitionGroup group = integerNodeEntry.getValue(); + dataOutputStream.writeInt(group.getId()); + dataOutputStream.writeInt(group.size()); + for (Node node : group) { + dataOutputStream.writeInt(node.getNodeIdentifier()); + } } } @@ -402,12 +412,16 @@ public class SlotPartitionTable implements PartitionTable { int nodeId = buffer.getInt(); RaftNode node = new RaftNode(idNodeMap.get(nodeId), buffer.getInt()); - Map<Integer, RaftNode> prevHolders = new HashMap<>(); + Map<Integer, PartitionGroup> prevHolders = new HashMap<>(); int holderNum = buffer.getInt(); for (int i1 = 0; i1 < holderNum; i1++) { int slot = buffer.getInt(); - RaftNode holder = new RaftNode(idNodeMap.get(buffer.getInt()), buffer.getInt()); - prevHolders.put(slot, holder); + PartitionGroup group = new PartitionGroup(buffer.getInt()); + int nodeNum = buffer.getInt(); + for (int i2 = 0 ; i2 < nodeNum; i2++) { + group.add(idNodeMap.get(buffer.getInt())); + } + prevHolders.put(slot, group); } previousNodeMap.put(node, prevHolders); } @@ -429,7 +443,7 @@ public class SlotPartitionTable implements PartitionTable { return nodeRing; } - public Map<Integer, RaftNode> getPreviousNodeMap(RaftNode raftNode) { + public Map<Integer, PartitionGroup> getPreviousNodeMap(RaftNode raftNode) { return previousNodeMap.get(raftNode); } @@ -503,6 +517,12 @@ public class SlotPartitionTable implements PartitionTable { // each node exactly joins replicationNum groups, so when a group is removed, the node // should join a new one int thisNodeIdx = nodeRing.indexOf(thisNode); + + // check if this node is to be removed + if (thisNodeIdx == -1) { + continue; + } + // this node must be the last node of the new group int headerNodeIdx = thisNodeIdx - (replicationNum - 1); headerNodeIdx = headerNodeIdx < 0 ? headerNodeIdx + nodeRing.size() : headerNodeIdx; @@ -531,7 +551,7 @@ public class SlotPartitionTable implements PartitionTable { int slot = slots.get(i); RaftNode newHolder = new RaftNode(nodeRing.get(i % nodeRing.size()), raftId); slotNodes[slot] = newHolder; - nodeSlotMap.get(newHolder).add(slot); + nodeSlotMap.computeIfAbsent(newHolder, n -> new ArrayList<>()).add(slot); newHolderSlotMap.computeIfAbsent(newHolder, n -> new ArrayList<>()).add(slot); } } @@ -549,6 +569,11 @@ public class SlotPartitionTable implements PartitionTable { } } + @Override + public boolean judgeHoldSlot(Node node, int slot) { + return getHeaderGroup(slotNodes[slot]).contains(node); + } + private void calculateGlobalGroups() { globalGroups = new ArrayList<>(); for (Node node : getAllNodes()) { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java index 81ae373..b023c36 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java @@ -526,9 +526,15 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async } } + /** + * Make sure the group will not receive new raft logs + * @param header + * @param dataGroupMember + */ private void removeMember(RaftNode header, DataGroupMember dataGroupMember) { - dataGroupMember.syncLeader(); + dataGroupMember.getStopStatus().setSyncSuccess(dataGroupMember.syncLeader()); dataGroupMember.setReadOnly(); + dataGroupMember.waitFollowersToSync(); dataGroupMember.stop(); stoppedMemberManager.put(header, dataGroupMember); } @@ -578,8 +584,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async /** * Try removing a node from the groups of each DataGroupMember. If the node is the header of some * group, set the member to read only so that it can still provide data for other nodes that has - * not yet pulled its data. If the node is the local node, remove all members whose group is not - * headed by this node. Otherwise, just change the node list of the member and pull new data. And + * not yet pulled its data. Otherwise, just change the node list of the member and pull new data. And * create a new DataGroupMember if this node should join a new group because of this removal. * * @param node @@ -591,25 +596,12 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async while (entryIterator.hasNext()) { Entry<RaftNode, DataGroupMember> entry = entryIterator.next(); DataGroupMember dataGroupMember = entry.getValue(); - if (dataGroupMember.getHeader().equals(node)) { - // the group is removed as the node is removed, so new writes should be rejected as - // they belong to the new holder, but the member is kept alive for other nodes to pull - // snapshots + if (dataGroupMember.getHeader().equals(node) || node.equals(thisNode)) { entryIterator.remove(); removeMember(entry.getKey(), entry.getValue()); } else { - if (node.equals(thisNode)) { - // this node is removed, it is no more replica of other groups - List<Integer> nodeSlots = - ((SlotPartitionTable) partitionTable) - .getNodeSlots(dataGroupMember.getHeader(), dataGroupMember.getRaftGroupId()); - dataGroupMember.removeLocalData(nodeSlots); - entryIterator.remove(); - dataGroupMember.stop(); - } else { - // the group should be updated and pull new slots from the removed node - dataGroupMember.removeNode(node, removalResult); - } + // the group should be updated and pull new slots from the removed node + dataGroupMember.removeNode(node, removalResult); } } for (PartitionGroup newGroup : removalResult.getNewGroupList()) { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java index 2cd675f..4737520 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java @@ -402,16 +402,27 @@ public class DataGroupMember extends RaftMember { * @param request */ public PullSnapshotResp getSnapshot(PullSnapshotRequest request) throws IOException { - waitLeader(); - if (character != NodeCharacter.LEADER && !readOnly) { - return null; - } // if the requester pulls the snapshots because the header of the group is removed, then the // member should no longer receive new data if (request.isRequireReadOnly()) { setReadOnly(); } + boolean canGetSnapshot; + /** + * There are two conditions that can get snapshot: + * 1. The raft member is stopped and sync status is successful which means it has synced leader successfully before stop. + * 2. The raft member is not stopped and syncing leader is successful. + */ + if (stopStatus.stop) { + canGetSnapshot = stopStatus.syncSuccess; + } else { + canGetSnapshot = syncLeader(); + } + if (!canGetSnapshot) { + return null; + } + List<Integer> requiredSlots = request.getRequiredSlots(); for (Integer requiredSlot : requiredSlots) { // wait if the data of the slot is in another node @@ -467,28 +478,26 @@ public class DataGroupMember extends RaftMember { synchronized (logManager) { logger.info("{} pulling {} slots from remote", name, slots.size()); PartitionedSnapshot<Snapshot> snapshot = (PartitionedSnapshot) logManager.getSnapshot(); - Map<Integer, RaftNode> prevHolders = ((SlotPartitionTable) metaGroupMember.getPartitionTable()) + Map<Integer, PartitionGroup> prevHolders = ((SlotPartitionTable) metaGroupMember.getPartitionTable()) .getPreviousNodeMap(new RaftNode(newNode, getRaftGroupId())); // group the slots by their owners - Map<RaftNode, List<Integer>> holderSlotsMap = new HashMap<>(); + Map<PartitionGroup, List<Integer>> holderSlotsMap = new HashMap<>(); for (int slot : slots) { // skip the slot if the corresponding data is already replicated locally if (snapshot.getSnapshot(slot) == null) { - RaftNode raftNode = prevHolders.get(slot); - if (raftNode != null) { - holderSlotsMap.computeIfAbsent(raftNode, n -> new ArrayList<>()).add(slot); + PartitionGroup group = prevHolders.get(slot); + if (group != null) { + holderSlotsMap.computeIfAbsent(group, n -> new ArrayList<>()).add(slot); } } } // pull snapshots from each owner's data group - for (Entry<RaftNode, List<Integer>> entry : holderSlotsMap.entrySet()) { - RaftNode raftNode = entry.getKey(); + for (Entry<PartitionGroup, List<Integer>> entry : holderSlotsMap.entrySet()) { List<Integer> nodeSlots = entry.getValue(); PullSnapshotTaskDescriptor taskDescriptor = - new PullSnapshotTaskDescriptor(metaGroupMember.getPartitionTable() - .getHeaderGroup(raftNode), nodeSlots, false); + new PullSnapshotTaskDescriptor(entry.getKey(), nodeSlots, false); pullFileSnapshot(taskDescriptor, null); } } @@ -760,6 +769,27 @@ public class DataGroupMember extends RaftMember { } } + public void waitFollowersToSync() { + if (character != NodeCharacter.LEADER) { + return; + } + for (Map.Entry<Node, Peer> entry: peerMap.entrySet()) { + Node node = entry.getKey(); + Peer peer = entry.getValue(); + while (peer.getMatchIndex() < logManager.getCommitLogIndex()) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.warn("{}: Unexpected interruption when waiting follower {} to sync, raft id is {}", + name, node, getRaftGroupId()); + } + } + logger.info("{}: Follower {} has synced with leader, raft id is {}", name, node, + getRaftGroupId()); + } + } + /** * Generate a report containing the character, leader, term, last log term, last log index, header * and readOnly or not of this member. @@ -800,6 +830,12 @@ public class DataGroupMember extends RaftMember { public boolean onSnapshotInstalled(List<Integer> slots) { List<Integer> removableSlots = new ArrayList<>(); for (Integer slot : slots) { + /** + * If this slot is just held by different raft groups in the same node, it should keep the data of slot. + */ + if (metaGroupMember.getPartitionTable().judgeHoldSlot(thisNode, slot)) { + continue; + } int sentReplicaNum = slotManager.sentOneReplication(slot); if (sentReplicaNum >= ClusterDescriptor.getInstance().getConfig().getReplicationNum()) { removableSlots.add(slot); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java index d93efc0..7e73f61 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java @@ -382,7 +382,6 @@ public class MetaGroupMember extends RaftMember { logger.error("Unexpected interruption when waiting for hardlinkCleaner to end", e); } } - logger.info("{}: stopped", name); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java index 3a5b51b..0526285 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java @@ -249,6 +249,8 @@ public abstract class RaftMember { */ private LogDispatcher logDispatcher; + protected StopStatus stopStatus; + protected RaftMember() { } @@ -260,6 +262,7 @@ public abstract class RaftMember { this.asyncHeartbeatClientPool = asyncHeartbeatPool; this.syncHeartbeatClientPool = syncHeartbeatPool; this.asyncSendLogClientPool = asyncClientPool; + this.stopStatus = new StopStatus(); } protected RaftMember(String name, AsyncClientPool asyncPool, SyncClientPool syncPool, @@ -365,9 +368,11 @@ public abstract class RaftMember { logger.error("Unexpected interruption when waiting for commitLogPool to end", e); } } + leader.set(ClusterConstant.EMPTY_NODE); catchUpService = null; heartBeatService = null; appendLogThreadPool = null; + stopStatus.setStop(true); logger.info("Member {} stopped", name); } @@ -801,6 +806,9 @@ public abstract class RaftMember { * Wait until the leader of this node becomes known or time out. */ public void waitLeader() { + if (stopStatus.isStop()) { + return; + } long startTime = System.currentTimeMillis(); while (leader.get() == null || ClusterConstant.EMPTY_NODE.equals(leader.get())) { synchronized (waitLeaderCondition) { @@ -1876,4 +1884,31 @@ public abstract class RaftMember { OK, TIME_OUT, LEADERSHIP_STALE } + public class StopStatus { + + boolean stop; + + boolean syncSuccess; + + public boolean isStop() { + return stop; + } + + public void setStop(boolean stop) { + this.stop = stop; + } + + public boolean isSyncSuccess() { + return syncSuccess; + } + + public void setSyncSuccess(boolean syncSuccess) { + this.syncSuccess = syncSuccess; + } + } + + public StopStatus getStopStatus() { + return stopStatus; + } + } diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java index e112d31..f6bb254 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java @@ -106,6 +106,11 @@ public class MetaHeartbeatThreadTest extends HeartbeatThreadTest { public List<PartitionGroup> getGlobalGroups() { return null; } + + @Override + public boolean judgeHoldSlot(Node node, int slot) { + return true; + } }; @Override diff --git a/thrift/src/main/thrift/cluster.thrift b/thrift/src/main/thrift/cluster.thrift index 9019680..2a24106 100644 --- a/thrift/src/main/thrift/cluster.thrift +++ b/thrift/src/main/thrift/cluster.thrift @@ -315,7 +315,6 @@ service RaftService { **/ long requestCommitIndex(1:Node header, 2:int raftId) - /** * Read a chunk of a file from the client. If the remaining of the file does not have enough * bytes, only the remaining will be returned.
