This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch cluster_multi_raft in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2487976aa71f428f73644b3a081169d50a87ddca Author: lta <[email protected]> AuthorDate: Tue Dec 22 15:46:59 2020 +0800 merge master --- .../cluster/partition/NodeAdditionResult.java | 13 +- .../iotdb/cluster/partition/NodeRemovalResult.java | 37 +- .../iotdb/cluster/partition/PartitionTable.java | 1 - .../partition/slot/SlotNodeAdditionResult.java | 8 +- .../partition/slot/SlotNodeRemovalResult.java | 8 +- .../cluster/partition/slot/SlotPartitionTable.java | 384 +++++++++++---------- .../iotdb/cluster/server/DataClusterServer.java | 143 ++++---- .../apache/iotdb/cluster/server/NodeReport.java | 5 +- .../iotdb/cluster/server/StoppedMemberManager.java | 3 +- .../cluster/server/member/DataGroupMember.java | 8 +- .../iotdb/cluster/server/member/RaftMember.java | 3 +- .../cluster/server/service/BaseAsyncService.java | 1 - .../cluster/server/service/DataSyncService.java | 1 - .../apache/iotdb/cluster/utils/PartitionUtils.java | 1 - .../cluster/partition/SlotPartitionTableTest.java | 6 +- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +- .../org/apache/iotdb/rpc/RpcTransportFactory.java | 2 +- 17 files changed, 338 insertions(+), 288 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeAdditionResult.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeAdditionResult.java index 0c4572a..1c28c7c 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeAdditionResult.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeAdditionResult.java @@ -19,18 +19,21 @@ package org.apache.iotdb.cluster.partition; +import java.util.ArrayList; +import java.util.List; + public class NodeAdditionResult { /** * A new data group headed by the new node. */ - private PartitionGroup newGroup; + private List<PartitionGroup> newGroupList = new ArrayList<>(); - public PartitionGroup getNewGroup() { - return newGroup; + public List<PartitionGroup> getNewGroupList() { + return newGroupList; } - public void setNewGroup(PartitionGroup newGroup) { - this.newGroup = newGroup; + public void addNewGroup(PartitionGroup newGroup) { + this.newGroupList.add(newGroup); } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java index 1d655ba..457af85 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java @@ -19,28 +19,45 @@ package org.apache.iotdb.cluster.partition; +import java.util.List; + /** * NodeRemovalResult stores the removed partition group. */ public class NodeRemovalResult { - private PartitionGroup removedGroup; + + private List<PartitionGroup> removedGroupList; // if the removed group contains the local node, the local node should join a new group to // preserve the replication number - private PartitionGroup newGroup; + private List<PartitionGroup> newGroupList; + + public PartitionGroup getRemovedGroup(int raftId) { + for (PartitionGroup group : removedGroupList) { + if (group.getId() == raftId) { + return group; + } + } + return null; + } - public PartitionGroup getRemovedGroup() { - return removedGroup; + public void addRemovedGroup(PartitionGroup group) { + this.removedGroupList.add(group); } - public void setRemovedGroup(PartitionGroup group) { - this.removedGroup = group; + public List<PartitionGroup> getNewGroupList() { + return newGroupList; } - public PartitionGroup getNewGroup() { - return newGroup; + public void addNewGroup(PartitionGroup newGroup) { + this.newGroupList.add(newGroup); } - public void setNewGroup(PartitionGroup newGroup) { - this.newGroup = newGroup; + public PartitionGroup getNewGroup(int raftId) { + for (PartitionGroup group : newGroupList) { + if (group.getId() == raftId) { + return group; + } + } + return null; } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java index f1c9a91..bd8e518 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java @@ -29,7 +29,6 @@ import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.service.IoTDB; -import org.apache.iotdb.tsfile.utils.Pair; /** * PartitionTable manages the map whose key is the StorageGroupName with a time interval and the diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeAdditionResult.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeAdditionResult.java index 4cd2412..4599752 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeAdditionResult.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeAdditionResult.java @@ -22,21 +22,21 @@ package org.apache.iotdb.cluster.partition.slot; import java.util.Map; import java.util.Set; import org.apache.iotdb.cluster.partition.NodeAdditionResult; -import org.apache.iotdb.cluster.rpc.thrift.Node; +import org.apache.iotdb.cluster.rpc.thrift.RaftNode; public class SlotNodeAdditionResult extends NodeAdditionResult { /** * What slots will the old data groups transfer to the new one. */ - private Map<Node, Set<Integer>> lostSlots; + private Map<RaftNode, Set<Integer>> lostSlots; - public Map<Node, Set<Integer>> getLostSlots() { + public Map<RaftNode, Set<Integer>> getLostSlots() { return lostSlots; } public void setLostSlots( - Map<Node, Set<Integer>> lostSlots) { + Map<RaftNode, Set<Integer>> lostSlots) { this.lostSlots = lostSlots; } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java index 9c24eba..17a0c93 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java @@ -22,20 +22,20 @@ package org.apache.iotdb.cluster.partition.slot; import java.util.List; import java.util.Map; import org.apache.iotdb.cluster.partition.NodeRemovalResult; -import org.apache.iotdb.cluster.rpc.thrift.Node; +import org.apache.iotdb.cluster.rpc.thrift.RaftNode; /** * SlotNodeRemovalResult stores the removed partition group and who will take over its slots. */ public class SlotNodeRemovalResult extends NodeRemovalResult { - private Map<Node, List<Integer>> newSlotOwners; + private Map<RaftNode, List<Integer>> newSlotOwners; - public Map<Node, List<Integer>> getNewSlotOwners() { + public Map<RaftNode, List<Integer>> getNewSlotOwners() { return newSlotOwners; } - public void setNewSlotOwners(Map<Node, List<Integer>> newSlotOwners) { + public void addNewSlotOwners(Map<RaftNode, List<Integer>> newSlotOwners) { this.newSlotOwners = newSlotOwners; } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java index d06de9d..5fcfbcb 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java @@ -4,7 +4,6 @@ package org.apache.iotdb.cluster.partition.slot; - import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -14,6 +13,8 @@ import java.util.Arrays; import java.util.Collection; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -227,56 +228,58 @@ public class SlotPartitionTable implements PartitionTable { @Override public NodeAdditionResult addNode(Node node) { -// synchronized (nodeRing) { -// if (nodeRing.contains(node)) { -// return null; -// } -// -// nodeRing.add(node); -// nodeRing.sort(Comparator.comparingInt(Node::getNodeIdentifier)); -// -// List<PartitionGroup> retiredGroups = new ArrayList<>(); -// for (int i = 0; i < localGroups.size(); i++) { -// PartitionGroup oldGroup = localGroups.get(i); -// Node header = oldGroup.getHeader(); -// PartitionGroup newGrp = getHeaderGroup(header); -// if (newGrp.contains(node) && newGrp.contains(thisNode)) { -// // this group changes but still contains the local node -// localGroups.set(i, newGrp); -// } else if (newGrp.contains(node) && !newGrp.contains(thisNode)) { -// // the local node retires from the group -// retiredGroups.add(newGrp); -// } -// } -// -// // remove retired groups -// Iterator<PartitionGroup> groupIterator = localGroups.iterator(); -// while (groupIterator.hasNext()) { -// PartitionGroup partitionGroup = groupIterator.next(); -// for (PartitionGroup retiredGroup : retiredGroups) { -// if (retiredGroup.getHeader().equals(partitionGroup.getHeader())) { -// groupIterator.remove(); -// break; -// } -// } -// } -// } -// -// SlotNodeAdditionResult result = new SlotNodeAdditionResult(); -// PartitionGroup newGroup = getHeaderGroup(node); -// if (newGroup.contains(thisNode)) { -// localGroups.add(newGroup); -// } -// result.setNewGroup(newGroup); -// -// calculateGlobalGroups(); -// -// // the slots movement is only done logically, the new node itself will pull data from the -// // old node -// result.setLostSlots(moveSlotsToNew(node)); -// -// return result; - return null; + synchronized (nodeRing) { + if (nodeRing.contains(node)) { + return null; + } + + nodeRing.add(node); + nodeRing.sort(Comparator.comparingInt(Node::getNodeIdentifier)); + + List<PartitionGroup> retiredGroups = new ArrayList<>(); + for (int i = 0; i < localGroups.size(); i++) { + PartitionGroup oldGroup = localGroups.get(i); + Node header = oldGroup.getHeader(); + PartitionGroup newGrp = getHeaderGroup(new RaftNode(header, oldGroup.getId())); + if (newGrp.contains(node) && newGrp.contains(thisNode)) { + // this group changes but still contains the local node + localGroups.set(i, newGrp); + } else if (newGrp.contains(node) && !newGrp.contains(thisNode)) { + // the local node retires from the group + retiredGroups.add(newGrp); + } + } + + // remove retired groups + Iterator<PartitionGroup> groupIterator = localGroups.iterator(); + while (groupIterator.hasNext()) { + PartitionGroup partitionGroup = groupIterator.next(); + for (PartitionGroup retiredGroup : retiredGroups) { + if (retiredGroup.getHeader().equals(partitionGroup.getHeader()) + && retiredGroup.getId() == partitionGroup.getId()) { + groupIterator.remove(); + break; + } + } + } + } + + SlotNodeAdditionResult result = new SlotNodeAdditionResult(); + for (int raftId = 0 ;raftId < multiRaftFactor; raftId++) { + PartitionGroup newGroup = getHeaderGroup(new RaftNode(node, raftId)); + if (newGroup.contains(thisNode)) { + localGroups.add(newGroup); + } + result.addNewGroup(newGroup); + } + + calculateGlobalGroups(); + + // the slots movement is only done logically, the new node itself will pull data from the + // old node + result.setLostSlots(moveSlotsToNew(node)); + + return result; } @@ -287,32 +290,49 @@ public class SlotPartitionTable implements PartitionTable { * @param newNode * @return a map recording what slots each group lost. */ - private Map<Node, Set<Integer>> moveSlotsToNew(Node newNode) { -// Map<Node, Set<Integer>> result = new HashMap<>(); -// // as a node is added, the average slots for each node decrease -// // move the slots to the new node if any previous node have more slots than the new average -// List<Integer> newSlots = new ArrayList<>(); -// Map<Integer, Node> previousHolders = new HashMap<>(); -// int newAvg = totalSlotNumbers / nodeRing.size(); -// for (Entry<Node, List<Integer>> entry : nodeSlotMap.entrySet()) { -// List<Integer> slots = entry.getValue(); -// int transferNum = slots.size() - newAvg; -// if (transferNum > 0) { -// List<Integer> slotsToMove = slots.subList(slots.size() - transferNum, slots.size()); -// newSlots.addAll(slotsToMove); -// for (Integer slot : slotsToMove) { -// // record what node previously hold the integer -// previousHolders.put(slot, entry.getKey()); -// slotNodes[slot] = newNode; -// } -// result.computeIfAbsent(entry.getKey(), n -> new HashSet<>()).addAll(slotsToMove); -// slotsToMove.clear(); -// } -// } -// nodeSlotMap.put(newNode, newSlots); -// previousNodeMap.put(newNode, previousHolders); -// return result; - return null; + private Map<RaftNode, Set<Integer>> moveSlotsToNew(Node newNode) { + Map<RaftNode, Set<Integer>> result = new HashMap<>(); + // as a node is added, the average slots for each node decrease + // move the slots to the new node if any previous node have more slots than the new average + int newAvg = totalSlotNumbers / nodeRing.size() / multiRaftFactor; + int raftId = 0; + for(int i = 0 ; i < multiRaftFactor; i++) { + RaftNode raftNode = new RaftNode(newNode, i); + nodeSlotMap.putIfAbsent(raftNode, new ArrayList<>()); + previousNodeMap.putIfAbsent(raftNode, new HashMap<>()); + } + for (Entry<RaftNode, List<Integer>> entry : nodeSlotMap.entrySet()) { + List<Integer> slots = entry.getValue(); + int transferNum = slots.size() - newAvg; + if (transferNum > 0) { + RaftNode curNode = new RaftNode(newNode, raftId); + int numToMove = transferNum; + if(raftId != multiRaftFactor - 1) { + numToMove = Math.min(numToMove, newAvg - nodeSlotMap.get(curNode).size()); + } + List<Integer> slotsToMove = slots.subList(slots.size() - transferNum, slots.size() - transferNum + numToMove); + nodeSlotMap.get(curNode).addAll(slotsToMove); + for (Integer slot : slotsToMove) { + // record what node previously hold the integer + previousNodeMap.get(curNode).put(slot, entry.getKey()); + slotNodes[slot] = curNode; + } + result.computeIfAbsent(entry.getKey(), n -> new HashSet<>()).addAll(slotsToMove); + transferNum -= numToMove; + if (transferNum > 0) { + curNode = new RaftNode(newNode, ++raftId); + slotsToMove = slots.subList(slots.size() - transferNum, slots.size()); + nodeSlotMap.get(curNode).addAll(slotsToMove); + for (Integer slot : slotsToMove) { + // record what node previously hold the integer + previousNodeMap.get(curNode).put(slot, entry.getKey()); + slotNodes[slot] = curNode; + } + result.get(entry.getKey()).addAll(slotsToMove); + } + } + } + return result; } @Override @@ -332,20 +352,21 @@ public class SlotPartitionTable implements PartitionTable { for (Entry<RaftNode, List<Integer>> entry : nodeSlotMap.entrySet()) { SerializeUtils.serialize(entry.getKey().getNode(), dataOutputStream); dataOutputStream.writeInt(entry.getKey().getRaftId()); -// SerializeUtils.serialize(entry.getValue(), dataOutputStream); + SerializeUtils.serializeIntList(entry.getValue(), dataOutputStream); } -// dataOutputStream.writeInt(previousNodeMap.size()); -// for (Entry<Node, Map<Integer, Node>> nodeMapEntry : previousNodeMap.entrySet()) { -// dataOutputStream.writeInt(nodeMapEntry.getKey().getNodeIdentifier()); -// -// Map<Integer, Node> prevHolders = nodeMapEntry.getValue(); -// dataOutputStream.writeInt(prevHolders.size()); -// for (Entry<Integer, Node> integerNodeEntry : prevHolders.entrySet()) { -// dataOutputStream.writeInt(integerNodeEntry.getKey()); -// dataOutputStream.writeInt(integerNodeEntry.getValue().getNodeIdentifier()); -// } -// } + dataOutputStream.writeInt(previousNodeMap.size()); + for (Entry<RaftNode, Map<Integer, RaftNode>> nodeMapEntry : previousNodeMap.entrySet()) { + dataOutputStream.writeInt(nodeMapEntry.getKey().getNode().getNodeIdentifier()); + dataOutputStream.writeInt(nodeMapEntry.getKey().getRaftId()); + Map<Integer, RaftNode> prevHolders = nodeMapEntry.getValue(); + dataOutputStream.writeInt(prevHolders.size()); + for (Entry<Integer, RaftNode> integerNodeEntry : prevHolders.entrySet()) { + dataOutputStream.writeInt(integerNodeEntry.getKey()); + dataOutputStream.writeInt(integerNodeEntry.getValue().getNode().getNodeIdentifier()); + dataOutputStream.writeInt(integerNodeEntry.getValue().getRaftId()); + } + } dataOutputStream.writeLong(lastLogIndex); } catch (IOException ignored) { @@ -357,39 +378,38 @@ public class SlotPartitionTable implements PartitionTable { @Override public void deserialize(ByteBuffer buffer) { -// logger.info("Initializing the partition table from buffer"); -// totalSlotNumbers = buffer.getInt(); -// int size = buffer.getInt(); -// Map<Integer, Node> idNodeMap = new HashMap<>(); -// for (int i = 0; i < size; i++) { -// Node node = new Node(); -// List<Integer> slots = new ArrayList<>(); -// SerializeUtils.deserialize(node, buffer); -// int id = buffer.getInt(); -// SerializeUtils.deserialize(slots, buffer); -// RaftNode raftNode = new RaftNode(node, id); -// nodeSlotMap.put(raftNode, slots); -// idNodeMap.put(node.getNodeIdentifier(), node); -// for (Integer slot : slots) { -// slotNodes[slot] = raftNode; -// } -// } - -// int prevNodeMapSize = buffer.getInt(); -// previousNodeMap = new HashMap<>(); -// for (int i = 0; i < prevNodeMapSize; i++) { -// int nodeId = buffer.getInt(); -// Node node = idNodeMap.get(nodeId); -// -// Map<Integer, Node> prevHolders = new HashMap<>(); -// int holderNum = buffer.getInt(); -// for (int i1 = 0; i1 < holderNum; i1++) { -// int slot = buffer.getInt(); -// Node holder = idNodeMap.get(buffer.getInt()); -// prevHolders.put(slot, holder); -// } -// previousNodeMap.put(node, prevHolders); -// } + logger.info("Initializing the partition table from buffer"); + totalSlotNumbers = buffer.getInt(); + int size = buffer.getInt(); + Map<Integer, Node> idNodeMap = new HashMap<>(); + for (int i = 0; i < size; i++) { + Node node = new Node(); + SerializeUtils.deserialize(node, buffer); + RaftNode raftNode = new RaftNode(node, buffer.getInt()); + List<Integer> slots = new ArrayList<>(); + SerializeUtils.deserializeIntList(slots, buffer); + nodeSlotMap.put(raftNode, slots); + idNodeMap.put(node.getNodeIdentifier(), node); + for (Integer slot : slots) { + slotNodes[slot] = raftNode; + } + } + + int prevNodeMapSize = buffer.getInt(); + previousNodeMap = new HashMap<>(); + for (int i = 0; i < prevNodeMapSize; i++) { + int nodeId = buffer.getInt(); + RaftNode node = new RaftNode(idNodeMap.get(nodeId), buffer.getInt()); + + Map<Integer, RaftNode> prevHolders = new HashMap<>(); + int holderNum = buffer.getInt(); + for (int i1 = 0; i1 < holderNum; i1++) { + int slot = buffer.getInt(); + RaftNode holder = new RaftNode(idNodeMap.get(buffer.getInt()), buffer.getInt()); + prevHolders.put(slot, holder); + } + previousNodeMap.put(node, prevHolders); + } lastLogIndex = buffer.getLong(); for (RaftNode raftNode : nodeSlotMap.keySet()) { @@ -451,63 +471,69 @@ public class SlotPartitionTable implements PartitionTable { @Override public NodeRemovalResult removeNode(Node target) { -// synchronized (nodeRing) { -// if (!nodeRing.contains(target)) { -// return null; -// } -// -// SlotNodeRemovalResult result = new SlotNodeRemovalResult(); -// result.setRemovedGroup(getHeaderGroup(target)); -// nodeRing.remove(target); -// -// // if the node belongs to a group that headed by target, this group should be removed -// // and other groups containing target should be updated -// int removedGroupIdx = -1; -// for (int i = 0; i < localGroups.size(); i++) { -// PartitionGroup oldGroup = localGroups.get(i); -// Node header = oldGroup.getHeader(); -// if (header.equals(target)) { -// removedGroupIdx = i; -// } else { -// PartitionGroup newGrp = getHeaderGroup(header); -// localGroups.set(i, newGrp); -// } -// } -// if (removedGroupIdx != -1) { -// localGroups.remove(removedGroupIdx); -// // each node exactly joins replicationNum groups, so when a group is removed, the node -// // should join a new one -// int thisNodeIdx = nodeRing.indexOf(thisNode); -// // this node must be the last node of the new group -// int headerNodeIdx = thisNodeIdx - (replicationNum - 1); -// headerNodeIdx = headerNodeIdx < 0 ? headerNodeIdx + nodeRing.size() : headerNodeIdx; -// Node header = nodeRing.get(headerNodeIdx); -// PartitionGroup newGrp = getHeaderGroup(header); -// localGroups.add(newGrp); -// result.setNewGroup(newGrp); -// } -// -// calculateGlobalGroups(); -// -// // the slots movement is only done logically, the new node itself will pull data from the -// // old node -// Map<Node, List<Integer>> nodeListMap = retrieveSlots(target); -// result.setNewSlotOwners(nodeListMap); -// return result; -// } - return null; - } - - private Map<Node, List<Integer>> retrieveSlots(Node target) { - Map<Node, List<Integer>> newHolderSlotMap = new HashMap<>(); -// List<Integer> slots = nodeSlotMap.remove(target); -// for (int i = 0; i < slots.size(); i++) { -// int slot = slots.get(i); -// Node newHolder = nodeRing.get(i % nodeRing.size()); -// slotNodes[slot] = newHolder; -// nodeSlotMap.get(newHolder).add(slot); -// newHolderSlotMap.computeIfAbsent(newHolder, n -> new ArrayList<>()).add(slot); -// } + synchronized (nodeRing) { + if (!nodeRing.contains(target)) { + return null; + } + + SlotNodeRemovalResult result = new SlotNodeRemovalResult(); + for(int raftId = 0; raftId < multiRaftFactor; raftId++) { + result.addRemovedGroup(getHeaderGroup(new RaftNode(target, raftId))); + } + nodeRing.remove(target); + + // if the node belongs to a group that headed by target, this group should be removed + // and other groups containing target should be updated + List<Integer> removedGroupIdxs = new ArrayList<>(); + for (int i = 0; i < localGroups.size(); i++) { + PartitionGroup oldGroup = localGroups.get(i); + Node header = oldGroup.getHeader(); + if (header.equals(target)) { + removedGroupIdxs.add(i); + } else { + PartitionGroup newGrp = getHeaderGroup(new RaftNode(header, oldGroup.getId())); + localGroups.set(i, newGrp); + } + } + for(int i = removedGroupIdxs.size() - 1; i >= 0 ; i--) { + int removedGroupIdx = removedGroupIdxs.get(i); + int raftId = localGroups.get(removedGroupIdx).getId(); + localGroups.remove(removedGroupIdx); + // each node exactly joins replicationNum groups, so when a group is removed, the node + // should join a new one + int thisNodeIdx = nodeRing.indexOf(thisNode); + // this node must be the last node of the new group + int headerNodeIdx = thisNodeIdx - (replicationNum - 1); + headerNodeIdx = headerNodeIdx < 0 ? headerNodeIdx + nodeRing.size() : headerNodeIdx; + Node header = nodeRing.get(headerNodeIdx); + PartitionGroup newGrp = getHeaderGroup(new RaftNode(header, raftId)); + localGroups.add(newGrp); + result.addNewGroup(newGrp); + } + + calculateGlobalGroups(); + + // the slots movement is only done logically, the new node itself will pull data from the + // old node + Map<RaftNode, List<Integer>> raftNodeListMap = retrieveSlots(target); + result.addNewSlotOwners(raftNodeListMap); + return result; + } + } + + private Map<RaftNode, List<Integer>> retrieveSlots(Node target) { + Map<RaftNode, List<Integer>> newHolderSlotMap = new HashMap<>(); + for(int raftId = 0 ; raftId < multiRaftFactor; raftId++) { + RaftNode raftNode = new RaftNode(target, raftId); + List<Integer> slots = nodeSlotMap.remove(raftNode); + for (int i = 0; i < slots.size(); i++) { + int slot = slots.get(i); + RaftNode newHolder = new RaftNode(nodeRing.get(i % nodeRing.size()), raftId); + slotNodes[slot] = newHolder; + nodeSlotMap.get(newHolder).add(slot); + newHolderSlotMap.computeIfAbsent(newHolder, n -> new ArrayList<>()).add(slot); + } + } return newHolderSlotMap; } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java index e5843f5..e37aa7e 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java @@ -22,8 +22,10 @@ package org.apache.iotdb.cluster.server; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.iotdb.cluster.config.ClusterDescriptor; @@ -497,37 +499,38 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async * @param result */ public void addNode(Node node, NodeAdditionResult result) { -// Iterator<Entry<Node, DataGroupMember>> entryIterator = headerGroupMap.entrySet().iterator(); -// synchronized (headerGroupMap) { -// while (entryIterator.hasNext()) { -// Entry<Node, DataGroupMember> entry = entryIterator.next(); -// DataGroupMember dataGroupMember = entry.getValue(); -// // the member may be extruded from the group, remove and stop it if so -// boolean shouldLeave = dataGroupMember.addNode(node, result); -// if (shouldLeave) { -// logger.info("This node does not belong to {} any more", dataGroupMember.getAllNodes()); -// entryIterator.remove(); -// removeMember(entry.getKey(), entry.getValue()); -// } -// } -// -// if (result.getNewGroup().contains(thisNode)) { -// logger.info("Adding this node into a new group {}", result.getNewGroup()); -// DataGroupMember dataGroupMember = dataMemberFactory.create(result.getNewGroup(), thisNode); -// addDataGroupMember(dataGroupMember); -// dataGroupMember.start(); -// dataGroupMember -// .pullNodeAdditionSnapshots(((SlotPartitionTable) partitionTable).getNodeSlots(node), -// node); -// } -// } - } - - private void removeMember(Node header, DataGroupMember dataGroupMember) { -// dataGroupMember.syncLeader(); -// dataGroupMember.setReadOnly(); -// dataGroupMember.stop(); -// stoppedMemberManager.put(header, dataGroupMember); + Iterator<Entry<RaftNode, DataGroupMember>> entryIterator = headerGroupMap.entrySet().iterator(); + synchronized (headerGroupMap) { + while (entryIterator.hasNext()) { + Entry<RaftNode, DataGroupMember> entry = entryIterator.next(); + DataGroupMember dataGroupMember = entry.getValue(); + // the member may be extruded from the group, remove and stop it if so + boolean shouldLeave = dataGroupMember.addNode(node, result); + if (shouldLeave) { + logger.info("This node does not belong to {} any more", dataGroupMember.getAllNodes()); + entryIterator.remove(); + removeMember(entry.getKey(), entry.getValue()); + } + } + + for (PartitionGroup newGroup: result.getNewGroupList()) { + if (newGroup.contains(thisNode)) { + logger.info("Adding this node into a new group {}", newGroup); + DataGroupMember dataGroupMember = dataMemberFactory.create(newGroup, thisNode); + addDataGroupMember(dataGroupMember); + dataGroupMember.start(); + dataGroupMember.pullNodeAdditionSnapshots(((SlotPartitionTable) partitionTable).getNodeSlots(node, + newGroup.getId()), node); + } + } + } + } + + private void removeMember(RaftNode header, DataGroupMember dataGroupMember) { + dataGroupMember.syncLeader(); + dataGroupMember.setReadOnly(); + dataGroupMember.stop(); + stoppedMemberManager.put(header, dataGroupMember); } /** @@ -552,7 +555,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async List<PartitionGroup> partitionGroups = partitionTable.getLocalGroups(); for (PartitionGroup partitionGroup : partitionGroups) { - DataGroupMember prevMember = headerGroupMap.get(partitionGroup.getHeader()); + DataGroupMember prevMember = headerGroupMap.get(new RaftNode(partitionGroup.getHeader(), partitionGroup.getId())); if (prevMember == null || !prevMember.getAllNodes().equals(partitionGroup)) { logger.info("Building member of data group: {}", partitionGroup); // no previous member or member changed @@ -583,43 +586,45 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async * @param removalResult cluster changes due to the node removal */ public void removeNode(Node node, NodeRemovalResult removalResult) { -// Iterator<Entry<Node, DataGroupMember>> entryIterator = headerGroupMap.entrySet().iterator(); -// synchronized (headerGroupMap) { -// while (entryIterator.hasNext()) { -// Entry<Node, DataGroupMember> entry = entryIterator.next(); -// DataGroupMember dataGroupMember = entry.getValue(); -// if (dataGroupMember.getHeader().equals(node)) { -// // the group is removed as the node is removed, so new writes should be rejected as -// // they belong to the new holder, but the member is kept alive for other nodes to pull -// // snapshots -// entryIterator.remove(); -// removeMember(entry.getKey(), entry.getValue()); -// } else { -// if (node.equals(thisNode)) { -// // this node is removed, it is no more replica of other groups -// List<Integer> nodeSlots = -// ((SlotPartitionTable) partitionTable).getNodeSlots(dataGroupMember.getHeader()); -// dataGroupMember.removeLocalData(nodeSlots); -// entryIterator.remove(); -// dataGroupMember.stop(); -// } else { -// // the group should be updated and pull new slots from the removed node -// dataGroupMember.removeNode(node, removalResult); -// } -// } -// } -// PartitionGroup newGroup = removalResult.getNewGroup(); -// if (newGroup != null) { -// logger.info("{} should join a new group {}", thisNode, newGroup); -// try { -// createNewMember(newGroup.getHeader()); -// } catch (NotInSameGroupException e) { -// // ignored -// } catch (CheckConsistencyException ce) { -// logger.error("remove node failed, error={}", ce.getMessage()); -// } -// } -// } + Iterator<Entry<RaftNode, DataGroupMember>> entryIterator = headerGroupMap.entrySet().iterator(); + synchronized (headerGroupMap) { + while (entryIterator.hasNext()) { + Entry<RaftNode, DataGroupMember> entry = entryIterator.next(); + DataGroupMember dataGroupMember = entry.getValue(); + if (dataGroupMember.getHeader().equals(node)) { + // the group is removed as the node is removed, so new writes should be rejected as + // they belong to the new holder, but the member is kept alive for other nodes to pull + // snapshots + entryIterator.remove(); + removeMember(entry.getKey(), entry.getValue()); + } else { + if (node.equals(thisNode)) { + // this node is removed, it is no more replica of other groups + List<Integer> nodeSlots = + ((SlotPartitionTable) partitionTable) + .getNodeSlots(dataGroupMember.getHeader(), dataGroupMember.getRaftGroupId()); + dataGroupMember.removeLocalData(nodeSlots); + entryIterator.remove(); + dataGroupMember.stop(); + } else { + // the group should be updated and pull new slots from the removed node + dataGroupMember.removeNode(node, removalResult); + } + } + } + for (PartitionGroup newGroup : removalResult.getNewGroupList()) { + if (newGroup != null) { + logger.info("{} should join a new group {}", thisNode, newGroup); + try { + createNewMember(new RaftNode(newGroup.getHeader(), newGroup.getId())); + } catch (NotInSameGroupException e) { + // ignored + } catch (CheckConsistencyException ce) { + logger.error("remove node failed, error={}", ce.getMessage()); + } + } + } + } } public void setPartitionTable(PartitionTable partitionTable) { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/NodeReport.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/NodeReport.java index e175248..256a45a 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/NodeReport.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/NodeReport.java @@ -149,15 +149,17 @@ public class NodeReport { */ public static class DataMemberReport extends RaftMemberReport { Node header; + int raftId; long headerLatency; public DataMemberReport(NodeCharacter character, Node leader, long term, long lastLogTerm, - long lastLogIndex, long commitIndex, long commitTerm, Node header, boolean isReadOnly, + long lastLogIndex, long commitIndex, long commitTerm, Node header, int raftId, boolean isReadOnly, long headerLatency, long lastHeartbeatReceivedTime, long prevLastLogIndex, long maxAppliedLogIndex) { super(character, leader, term, lastLogTerm, lastLogIndex, commitIndex, commitTerm, isReadOnly, lastHeartbeatReceivedTime, prevLastLogIndex, maxAppliedLogIndex); this.header = header; + this.raftId = raftId; this.headerLatency = headerLatency; } @@ -165,6 +167,7 @@ public class NodeReport { public String toString() { return "DataMemberReport{" + "header=" + header + + ", raftId=" + raftId + ", character=" + character + ", Leader=" + leader + ", term=" + term + diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java index d941f84..293b444 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java @@ -34,7 +34,6 @@ import org.apache.iotdb.cluster.server.member.DataGroupMember; import org.apache.iotdb.cluster.server.member.DataGroupMember.Factory; import org.apache.iotdb.cluster.utils.ClusterUtils; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -153,6 +152,6 @@ public class StoppedMemberManager { private void parseResumed(String[] split) { Node header = ClusterUtils.stringToNode(split[1]); - removedMemberMap.remove(header); + removedMemberMap.remove(new RaftNode(header,0)); } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java index 8a43818..1fb8336 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java @@ -283,7 +283,7 @@ public class DataGroupMember extends RaftMember { // mark slots that do not belong to this group any more Set<Integer> lostSlots = ((SlotNodeAdditionResult) result).getLostSlots() - .getOrDefault(getHeader(), Collections.emptySet()); + .getOrDefault(new RaftNode(getHeader(), getRaftGroupId()), Collections.emptySet()); for (Integer lostSlot : lostSlots) { slotManager.setToSending(lostSlot); } @@ -748,11 +748,11 @@ public class DataGroupMember extends RaftMember { } } List<Integer> slotsToPull = ((SlotNodeRemovalResult) removalResult).getNewSlotOwners() - .get(getHeader()); + .get(new RaftNode(getHeader(), getRaftGroupId())); if (slotsToPull != null) { // pull the slots that should be taken over PullSnapshotTaskDescriptor taskDescriptor = new PullSnapshotTaskDescriptor( - removalResult.getRemovedGroup(), + removalResult.getRemovedGroup(getRaftGroupId()), slotsToPull, true); pullFileSnapshot(taskDescriptor, null); } @@ -770,7 +770,7 @@ public class DataGroupMember extends RaftMember { lastReportedLogIndex = logManager.getLastLogIndex(); return new DataMemberReport(character, leader.get(), term.get(), logManager.getLastLogTerm(), lastReportedLogIndex, logManager.getCommitLogIndex(), - logManager.getCommitLogTerm(), getHeader(), readOnly, + logManager.getCommitLogTerm(), getHeader(), getRaftGroupId(), readOnly, QueryCoordinator.getINSTANCE() .getLastResponseLatency(getHeader()), lastHeartbeatReceivedTime, prevLastLogIndex, logManager.getMaxHaveAppliedCommitIndex()); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java index aa9f8c8..35a9f2b 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java @@ -298,7 +298,8 @@ public abstract class RaftMember { new ThreadFactoryBuilder().setNameFormat(getName() + "-AppendLog%d").build()); if (!ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { - serialToParallelPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), 100, + serialToParallelPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), + Runtime.getRuntime().availableProcessors() * 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new ThreadFactoryBuilder().setNameFormat(getName() + "-SerialToParallel%d").build()); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java index a059846..521050e 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java @@ -32,7 +32,6 @@ import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq; import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest; import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse; import org.apache.iotdb.cluster.rpc.thrift.Node; -import org.apache.iotdb.cluster.rpc.thrift.RaftNode; import org.apache.iotdb.cluster.rpc.thrift.RaftService; import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient; import org.apache.iotdb.cluster.server.NodeCharacter; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java index 35af6e5..bbf3430 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java @@ -38,7 +38,6 @@ import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest; import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp; import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest; import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp; -import org.apache.iotdb.cluster.rpc.thrift.RaftNode; import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest; import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest; import org.apache.iotdb.cluster.rpc.thrift.TSDataService; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java index 0abf7d0..071a681 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java @@ -25,7 +25,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; import org.apache.iotdb.cluster.partition.PartitionTable; -import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.rpc.thrift.RaftNode; import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.qp.physical.PhysicalPlan; diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java index 9501962..ed37d48 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java @@ -515,16 +515,16 @@ public class SlotPartitionTableTest { List<Integer> nodeSlots = localTable.getNodeSlots(getNode(0), raftId); NodeRemovalResult nodeRemovalResult = localTable.removeNode(getNode(0)); assertFalse(localTable.getAllNodes().contains(getNode(0))); - PartitionGroup removedGroup = nodeRemovalResult.getRemovedGroup(); + PartitionGroup removedGroup = nodeRemovalResult.getRemovedGroup(0); for (int i = 0; i < 5; i++) { assertTrue(removedGroup.contains(getNode(i))); } - PartitionGroup newGroup = nodeRemovalResult.getNewGroup(); + PartitionGroup newGroup = nodeRemovalResult.getNewGroup(0); for (int i : new int[]{18, 19, 1, 2, 3}) { assertTrue(newGroup.contains(getNode(i))); } // the slots owned by the removed one should be redistributed to other nodes - Map<Node, List<Integer>> newSlotOwners = ((SlotNodeRemovalResult) nodeRemovalResult) + Map<RaftNode, List<Integer>> newSlotOwners = ((SlotNodeRemovalResult) nodeRemovalResult) .getNewSlotOwners(); for (List<Integer> slots : newSlotOwners.values()) { assertTrue(nodeSlots.containsAll(slots)); diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 27af6b1..9847754 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -113,7 +113,7 @@ public class IoTDBConfig { /** * whether to use Snappy compression before sending data through the network */ - private boolean rpcAdvancedCompressionEnable = false; + private boolean rpcAdvancedCompressionEnable = true; /** * Port which the JDBC server listens to. diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java index 57907de..84f00df 100644 --- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java +++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java @@ -27,7 +27,7 @@ import org.apache.thrift.transport.TTransportFactory; public class RpcTransportFactory extends TTransportFactory { // TODO: make it a config - public static boolean USE_SNAPPY = false; + public static boolean USE_SNAPPY = true; public static final RpcTransportFactory INSTANCE; static { INSTANCE = USE_SNAPPY ?
