This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch cluster_multi_raft in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 55fc053da4fa93d8959a35404e3fda4bdf7788fa Author: LebronAl <[email protected]> AuthorDate: Fri Dec 11 19:49:04 2020 +0800 init --- .../iotdb/cluster/partition/PartitionGroup.java | 34 +- .../iotdb/cluster/partition/PartitionTable.java | 7 +- .../cluster/partition/slot/SlotPartitionTable.java | 387 ++++++++++--------- .../iotdb/cluster/server/DataClusterServer.java | 426 +++++++++++---------- .../iotdb/cluster/server/StoppedMemberManager.java | 48 +-- .../cluster/server/member/DataGroupMember.java | 23 +- .../cluster/server/member/MetaGroupMember.java | 21 +- .../iotdb/cluster/server/member/RaftMember.java | 7 +- .../cluster/server/service/BaseAsyncService.java | 6 +- .../cluster/server/service/BaseSyncService.java | 6 +- .../cluster/server/service/DataAsyncService.java | 61 +-- .../cluster/server/service/DataSyncService.java | 34 +- thrift/src/main/thrift/cluster.thrift | 80 ++-- 13 files changed, 630 insertions(+), 510 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java index 8bf9f52..dc5cbcc 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java @@ -21,45 +21,61 @@ package org.apache.iotdb.cluster.partition; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; import org.apache.iotdb.cluster.rpc.thrift.Node; /** * PartitionGroup contains all the nodes that will form a data group with a certain node, which are - * the REPLICATION_NUM - 1 different physical nodes after this node. - * The first element of the list is called header, which is also the identifier of the data group. + * the REPLICATION_NUM - 1 different physical nodes after this node. The first element of the list + * is called header, which is also the identifier of the data group. */ public class PartitionGroup extends ArrayList<Node> { - private Node thisNode; + private int id; public PartitionGroup() { } - public PartitionGroup(Node... nodes) { + public PartitionGroup(Collection<Node> nodes) { + this.addAll(nodes); + } + + public PartitionGroup(int id, Node... nodes) { this.addAll(Arrays.asList(nodes)); + this.id = id; } public PartitionGroup(PartitionGroup other) { super(other); - this.thisNode = other.thisNode; + this.id = other.getId(); } @Override public boolean equals(Object o) { - return super.equals(o); + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PartitionGroup group = (PartitionGroup) o; + return Objects.equals(id, group.getId()) && + super.equals(group); } @Override public int hashCode() { - return super.hashCode(); + return Objects.hash(id, getHeader()); } + public Node getHeader() { return get(0); } - public void setThisNode(Node thisNode) { - this.thisNode = thisNode; + public int getId() { + return id; } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java index 81f0199..a1a5ae6 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.service.IoTDB; +import org.apache.iotdb.tsfile.utils.Pair; /** * PartitionTable manages the map whose key is the StorageGroupName with a time interval and the @@ -54,7 +55,7 @@ public interface PartitionTable { * @param timestamp * @return */ - Node routeToHeaderByTime(String storageGroupName, long timestamp); + Pair<Node, Integer> routeToHeaderByTime(String storageGroupName, long timestamp); /** * Add a new node to update the partition table. @@ -78,10 +79,10 @@ public interface PartitionTable { List<PartitionGroup> getLocalGroups(); /** - * @param header + * @param pair * @return the partition group starting from the header. */ - PartitionGroup getHeaderGroup(Node header); + PartitionGroup getHeaderGroup(Pair<Node, Integer> pair); ByteBuffer serialize(); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java index ead856c..a1f98ce 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java @@ -14,8 +14,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Comparator; import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -31,6 +29,7 @@ import org.apache.iotdb.cluster.partition.PartitionTable; import org.apache.iotdb.cluster.partition.slot.SlotStrategy.DefaultStrategy; import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.db.utils.SerializeUtils; +import org.apache.iotdb.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +46,8 @@ public class SlotPartitionTable implements PartitionTable { private int replicationNum = ClusterDescriptor.getInstance().getConfig().getReplicationNum(); + private int raftGroupNum = 2; + //all nodes private List<Node> nodeRing = new ArrayList<>(); //normally, it is equal to ClusterConstant.SLOT_NUM. @@ -54,12 +55,12 @@ public class SlotPartitionTable implements PartitionTable { //The following fields are used for determining which node a data item belongs to. // the slots held by each node - private Map<Node, List<Integer>> nodeSlotMap = new ConcurrentHashMap<>(); + private Map<Pair<Node, Integer>, List<Integer>> nodeSlotMap = new ConcurrentHashMap<>(); // each slot is managed by whom - private Node[] slotNodes = new Node[ClusterConstant.SLOT_NUM]; + private Pair<Node, Integer>[] slotNodes = new Pair[ClusterConstant.SLOT_NUM]; // the nodes that each slot belongs to before a new node is added, used for the new node to // find the data source - private Map<Node, Map<Integer, Node>> previousNodeMap = new ConcurrentHashMap<>(); + private Map<Pair<Node, Integer>, Map<Integer, Node>> previousNodeMap = new ConcurrentHashMap<>(); //the filed is used for determining which nodes need to be a group. // the data groups which this node belongs to. @@ -111,8 +112,11 @@ public class SlotPartitionTable implements PartitionTable { // evenly assign the slots to each node int nodeNum = nodeRing.size(); int slotsPerNode = totalSlotNumbers / nodeNum; + int slotsPerRaftGroup = slotsPerNode / raftGroupNum; for (Node node : nodeRing) { - nodeSlotMap.put(node, new ArrayList<>()); + for (int i = 0; i < raftGroupNum; i++) { + nodeSlotMap.put(new Pair<>(node, i), new ArrayList<>()); + } } for (int i = 0; i < totalSlotNumbers; i++) { @@ -121,11 +125,17 @@ public class SlotPartitionTable implements PartitionTable { // the last node may receive a little more if total slots cannot de divided by node number nodeIdx--; } - nodeSlotMap.get(nodeRing.get(nodeIdx)).add(i); + for (int j = 0; j < nodeIdx; j++) { + int groupIdx = j / slotsPerRaftGroup; + if (groupIdx >= raftGroupNum) { + groupIdx--; + } + nodeSlotMap.get(new Pair<>(nodeRing.get(nodeIdx), groupIdx)).add(i); + } } // build the index to find a node by slot - for (Entry<Node, List<Integer>> entry : nodeSlotMap.entrySet()) { + for (Entry<Pair<Node, Integer>, List<Integer>> entry : nodeSlotMap.entrySet()) { for (Integer slot : entry.getValue()) { slotNodes[slot] = entry.getKey(); } @@ -145,7 +155,9 @@ public class SlotPartitionTable implements PartitionTable { if (startIndex < 0) { startIndex = startIndex + nodeRing.size(); } - ret.add(getHeaderGroup(nodeRing.get(startIndex))); + for (int j = 0; j < raftGroupNum; j++) { + ret.add(getHeaderGroup(new Pair<>(nodeRing.get(startIndex), j))); + } } logger.debug("The partition groups of {} are: {}", node, ret); @@ -153,13 +165,13 @@ public class SlotPartitionTable implements PartitionTable { } @Override - public PartitionGroup getHeaderGroup(Node node) { - PartitionGroup ret = new PartitionGroup(); + public PartitionGroup getHeaderGroup(Pair<Node, Integer> pair) { + PartitionGroup ret = new PartitionGroup(pair.right); // assuming the nodes are [1,2,3,4,5] - int nodeIndex = nodeRing.indexOf(node); + int nodeIndex = nodeRing.indexOf(pair.left); if (nodeIndex == -1) { - logger.error("Node {} is not in the cluster", node); + logger.error("Node {} is not in the cluster", pair.left); return null; } int endIndex = nodeIndex + replicationNum; @@ -177,8 +189,8 @@ public class SlotPartitionTable implements PartitionTable { @Override public PartitionGroup route(String storageGroupName, long timestamp) { synchronized (nodeRing) { - Node node = routeToHeaderByTime(storageGroupName, timestamp); - return getHeaderGroup(node); + Pair<Node, Integer> pair = routeToHeaderByTime(storageGroupName, timestamp); + return getHeaderGroup(pair); } } @@ -188,112 +200,115 @@ public class SlotPartitionTable implements PartitionTable { Thread.currentThread().getStackTrace()); return null; } - Node node = slotNodes[slot]; - logger.debug("The slot of {} is held by {}", slot, node); - if (node == null) { + Pair<Node, Integer> pair = slotNodes[slot]; + logger.debug("The slot of {} is held by {}", slot, pair); + if (pair.left == null) { logger.warn("The slot {} is incorrect", slot); return null; } - return getHeaderGroup(node); + return getHeaderGroup(pair); } @Override - public Node routeToHeaderByTime(String storageGroupName, long timestamp) { + public Pair<Node, Integer> routeToHeaderByTime(String storageGroupName, long timestamp) { synchronized (nodeRing) { int slot = getSlotStrategy() .calculateSlotByTime(storageGroupName, timestamp, getTotalSlotNumbers()); - Node node = slotNodes[slot]; + Pair<Node, Integer> pair = slotNodes[slot]; logger.trace("The slot of {}@{} is {}, held by {}", storageGroupName, timestamp, - slot, node); - return node; + slot, pair); + return pair; } } @Override public NodeAdditionResult addNode(Node node) { - synchronized (nodeRing) { - if (nodeRing.contains(node)) { - return null; - } - - nodeRing.add(node); - nodeRing.sort(Comparator.comparingInt(Node::getNodeIdentifier)); - - List<PartitionGroup> retiredGroups = new ArrayList<>(); - for (int i = 0; i < localGroups.size(); i++) { - PartitionGroup oldGroup = localGroups.get(i); - Node header = oldGroup.getHeader(); - PartitionGroup newGrp = getHeaderGroup(header); - if (newGrp.contains(node) && newGrp.contains(thisNode)) { - // this group changes but still contains the local node - localGroups.set(i, newGrp); - } else if (newGrp.contains(node) && !newGrp.contains(thisNode)) { - // the local node retires from the group - retiredGroups.add(newGrp); - } - } - - // remove retired groups - Iterator<PartitionGroup> groupIterator = localGroups.iterator(); - while (groupIterator.hasNext()) { - PartitionGroup partitionGroup = groupIterator.next(); - for (PartitionGroup retiredGroup : retiredGroups) { - if (retiredGroup.getHeader().equals(partitionGroup.getHeader())) { - groupIterator.remove(); - break; - } - } - } - } - - SlotNodeAdditionResult result = new SlotNodeAdditionResult(); - PartitionGroup newGroup = getHeaderGroup(node); - if (newGroup.contains(thisNode)) { - localGroups.add(newGroup); - } - result.setNewGroup(newGroup); - - calculateGlobalGroups(); - - // the slots movement is only done logically, the new node itself will pull data from the - // old node - result.setLostSlots(moveSlotsToNew(node)); - - return result; +// synchronized (nodeRing) { +// if (nodeRing.contains(node)) { +// return null; +// } +// +// nodeRing.add(node); +// nodeRing.sort(Comparator.comparingInt(Node::getNodeIdentifier)); +// +// List<PartitionGroup> retiredGroups = new ArrayList<>(); +// for (int i = 0; i < localGroups.size(); i++) { +// PartitionGroup oldGroup = localGroups.get(i); +// Node header = oldGroup.getHeader(); +// PartitionGroup newGrp = getHeaderGroup(header); +// if (newGrp.contains(node) && newGrp.contains(thisNode)) { +// // this group changes but still contains the local node +// localGroups.set(i, newGrp); +// } else if (newGrp.contains(node) && !newGrp.contains(thisNode)) { +// // the local node retires from the group +// retiredGroups.add(newGrp); +// } +// } +// +// // remove retired groups +// Iterator<PartitionGroup> groupIterator = localGroups.iterator(); +// while (groupIterator.hasNext()) { +// PartitionGroup partitionGroup = groupIterator.next(); +// for (PartitionGroup retiredGroup : retiredGroups) { +// if (retiredGroup.getHeader().equals(partitionGroup.getHeader())) { +// groupIterator.remove(); +// break; +// } +// } +// } +// } +// +// SlotNodeAdditionResult result = new SlotNodeAdditionResult(); +// PartitionGroup newGroup = getHeaderGroup(node); +// if (newGroup.contains(thisNode)) { +// localGroups.add(newGroup); +// } +// result.setNewGroup(newGroup); +// +// calculateGlobalGroups(); +// +// // the slots movement is only done logically, the new node itself will pull data from the +// // old node +// result.setLostSlots(moveSlotsToNew(node)); +// +// return result; + return null; } /** * Move last slots from each group whose slot number is bigger than the new average to the new * node. + * * @param newNode * @return a map recording what slots each group lost. */ private Map<Node, Set<Integer>> moveSlotsToNew(Node newNode) { - Map<Node, Set<Integer>> result = new HashMap<>(); - // as a node is added, the average slots for each node decrease - // move the slots to the new node if any previous node have more slots than the new average - List<Integer> newSlots = new ArrayList<>(); - Map<Integer, Node> previousHolders = new HashMap<>(); - int newAvg = totalSlotNumbers / nodeRing.size(); - for (Entry<Node, List<Integer>> entry : nodeSlotMap.entrySet()) { - List<Integer> slots = entry.getValue(); - int transferNum = slots.size() - newAvg; - if (transferNum > 0) { - List<Integer> slotsToMove = slots.subList(slots.size() - transferNum, slots.size()); - newSlots.addAll(slotsToMove); - for (Integer slot : slotsToMove) { - // record what node previously hold the integer - previousHolders.put(slot, entry.getKey()); - slotNodes[slot] = newNode; - } - result.computeIfAbsent(entry.getKey(), n -> new HashSet<>()).addAll(slotsToMove); - slotsToMove.clear(); - } - } - nodeSlotMap.put(newNode, newSlots); - previousNodeMap.put(newNode, previousHolders); - return result; +// Map<Node, Set<Integer>> result = new HashMap<>(); +// // as a node is added, the average slots for each node decrease +// // move the slots to the new node if any previous node have more slots than the new average +// List<Integer> newSlots = new ArrayList<>(); +// Map<Integer, Node> previousHolders = new HashMap<>(); +// int newAvg = totalSlotNumbers / nodeRing.size(); +// for (Entry<Node, List<Integer>> entry : nodeSlotMap.entrySet()) { +// List<Integer> slots = entry.getValue(); +// int transferNum = slots.size() - newAvg; +// if (transferNum > 0) { +// List<Integer> slotsToMove = slots.subList(slots.size() - transferNum, slots.size()); +// newSlots.addAll(slotsToMove); +// for (Integer slot : slotsToMove) { +// // record what node previously hold the integer +// previousHolders.put(slot, entry.getKey()); +// slotNodes[slot] = newNode; +// } +// result.computeIfAbsent(entry.getKey(), n -> new HashSet<>()).addAll(slotsToMove); +// slotsToMove.clear(); +// } +// } +// nodeSlotMap.put(newNode, newSlots); +// previousNodeMap.put(newNode, previousHolders); +// return result; + return null; } @Override @@ -310,22 +325,23 @@ public class SlotPartitionTable implements PartitionTable { try { dataOutputStream.writeInt(totalSlotNumbers); dataOutputStream.writeInt(nodeSlotMap.size()); - for (Entry<Node, List<Integer>> entry : nodeSlotMap.entrySet()) { - SerializeUtils.serialize(entry.getKey(), dataOutputStream); + for (Entry<Pair<Node, Integer>, List<Integer>> entry : nodeSlotMap.entrySet()) { + SerializeUtils.serialize(entry.getKey().left, dataOutputStream); + dataOutputStream.writeInt(entry.getKey().right); SerializeUtils.serialize(entry.getValue(), dataOutputStream); } - dataOutputStream.writeInt(previousNodeMap.size()); - for (Entry<Node, Map<Integer, Node>> nodeMapEntry : previousNodeMap.entrySet()) { - dataOutputStream.writeInt(nodeMapEntry.getKey().getNodeIdentifier()); - - Map<Integer, Node> prevHolders = nodeMapEntry.getValue(); - dataOutputStream.writeInt(prevHolders.size()); - for (Entry<Integer, Node> integerNodeEntry : prevHolders.entrySet()) { - dataOutputStream.writeInt(integerNodeEntry.getKey()); - dataOutputStream.writeInt(integerNodeEntry.getValue().getNodeIdentifier()); - } - } +// dataOutputStream.writeInt(previousNodeMap.size()); +// for (Entry<Node, Map<Integer, Node>> nodeMapEntry : previousNodeMap.entrySet()) { +// dataOutputStream.writeInt(nodeMapEntry.getKey().getNodeIdentifier()); +// +// Map<Integer, Node> prevHolders = nodeMapEntry.getValue(); +// dataOutputStream.writeInt(prevHolders.size()); +// for (Entry<Integer, Node> integerNodeEntry : prevHolders.entrySet()) { +// dataOutputStream.writeInt(integerNodeEntry.getKey()); +// dataOutputStream.writeInt(integerNodeEntry.getValue().getNodeIdentifier()); +// } +// } dataOutputStream.writeLong(lastLogIndex); } catch (IOException ignored) { @@ -345,32 +361,38 @@ public class SlotPartitionTable implements PartitionTable { Node node = new Node(); List<Integer> slots = new ArrayList<>(); SerializeUtils.deserialize(node, buffer); + int id = buffer.getInt(); SerializeUtils.deserialize(slots, buffer); - nodeSlotMap.put(node, slots); + Pair pair = new Pair<>(node, id); + nodeSlotMap.put(pair, slots); idNodeMap.put(node.getNodeIdentifier(), node); for (Integer slot : slots) { - slotNodes[slot] = node; + slotNodes[slot] = pair; } } - int prevNodeMapSize = buffer.getInt(); - previousNodeMap = new HashMap<>(); - for (int i = 0; i < prevNodeMapSize; i++) { - int nodeId = buffer.getInt(); - Node node = idNodeMap.get(nodeId); - - Map<Integer, Node> prevHolders = new HashMap<>(); - int holderNum = buffer.getInt(); - for (int i1 = 0; i1 < holderNum; i1++) { - int slot = buffer.getInt(); - Node holder = idNodeMap.get(buffer.getInt()); - prevHolders.put(slot, holder); - } - previousNodeMap.put(node, prevHolders); - } +// int prevNodeMapSize = buffer.getInt(); +// previousNodeMap = new HashMap<>(); +// for (int i = 0; i < prevNodeMapSize; i++) { +// int nodeId = buffer.getInt(); +// Node node = idNodeMap.get(nodeId); +// +// Map<Integer, Node> prevHolders = new HashMap<>(); +// int holderNum = buffer.getInt(); +// for (int i1 = 0; i1 < holderNum; i1++) { +// int slot = buffer.getInt(); +// Node holder = idNodeMap.get(buffer.getInt()); +// prevHolders.put(slot, holder); +// } +// previousNodeMap.put(node, prevHolders); +// } lastLogIndex = buffer.getLong(); - nodeRing.addAll(nodeSlotMap.keySet()); + for (Pair<Node, Integer> nodeIntegerPair : nodeSlotMap.keySet()) { + if (!nodeRing.contains(nodeIntegerPair.left)) { + nodeRing.add(nodeIntegerPair.left); + } + } nodeRing.sort(Comparator.comparingInt(Node::getNodeIdentifier)); logger.info("All known nodes: {}", nodeRing); @@ -390,7 +412,7 @@ public class SlotPartitionTable implements PartitionTable { return nodeSlotMap.get(header); } - public Map<Node, List<Integer>> getAllNodeSlots() { + public Map<Pair<Node, Integer>, List<Integer>> getAllNodeSlots() { return nodeSlotMap; } @@ -421,62 +443,63 @@ public class SlotPartitionTable implements PartitionTable { @Override public NodeRemovalResult removeNode(Node target) { - synchronized (nodeRing) { - if (!nodeRing.contains(target)) { - return null; - } - - SlotNodeRemovalResult result = new SlotNodeRemovalResult(); - result.setRemovedGroup(getHeaderGroup(target)); - nodeRing.remove(target); - - // if the node belongs to a group that headed by target, this group should be removed - // and other groups containing target should be updated - int removedGroupIdx = -1; - for (int i = 0; i < localGroups.size(); i++) { - PartitionGroup oldGroup = localGroups.get(i); - Node header = oldGroup.getHeader(); - if (header.equals(target)) { - removedGroupIdx = i; - } else { - PartitionGroup newGrp = getHeaderGroup(header); - localGroups.set(i, newGrp); - } - } - if (removedGroupIdx != -1) { - localGroups.remove(removedGroupIdx); - // each node exactly joins replicationNum groups, so when a group is removed, the node - // should join a new one - int thisNodeIdx = nodeRing.indexOf(thisNode); - // this node must be the last node of the new group - int headerNodeIdx = thisNodeIdx - (replicationNum - 1); - headerNodeIdx = headerNodeIdx < 0 ? headerNodeIdx + nodeRing.size() : headerNodeIdx; - Node header = nodeRing.get(headerNodeIdx); - PartitionGroup newGrp = getHeaderGroup(header); - localGroups.add(newGrp); - result.setNewGroup(newGrp); - } - - calculateGlobalGroups(); - - // the slots movement is only done logically, the new node itself will pull data from the - // old node - Map<Node, List<Integer>> nodeListMap = retrieveSlots(target); - result.setNewSlotOwners(nodeListMap); - return result; - } +// synchronized (nodeRing) { +// if (!nodeRing.contains(target)) { +// return null; +// } +// +// SlotNodeRemovalResult result = new SlotNodeRemovalResult(); +// result.setRemovedGroup(getHeaderGroup(target)); +// nodeRing.remove(target); +// +// // if the node belongs to a group that headed by target, this group should be removed +// // and other groups containing target should be updated +// int removedGroupIdx = -1; +// for (int i = 0; i < localGroups.size(); i++) { +// PartitionGroup oldGroup = localGroups.get(i); +// Node header = oldGroup.getHeader(); +// if (header.equals(target)) { +// removedGroupIdx = i; +// } else { +// PartitionGroup newGrp = getHeaderGroup(header); +// localGroups.set(i, newGrp); +// } +// } +// if (removedGroupIdx != -1) { +// localGroups.remove(removedGroupIdx); +// // each node exactly joins replicationNum groups, so when a group is removed, the node +// // should join a new one +// int thisNodeIdx = nodeRing.indexOf(thisNode); +// // this node must be the last node of the new group +// int headerNodeIdx = thisNodeIdx - (replicationNum - 1); +// headerNodeIdx = headerNodeIdx < 0 ? headerNodeIdx + nodeRing.size() : headerNodeIdx; +// Node header = nodeRing.get(headerNodeIdx); +// PartitionGroup newGrp = getHeaderGroup(header); +// localGroups.add(newGrp); +// result.setNewGroup(newGrp); +// } +// +// calculateGlobalGroups(); +// +// // the slots movement is only done logically, the new node itself will pull data from the +// // old node +// Map<Node, List<Integer>> nodeListMap = retrieveSlots(target); +// result.setNewSlotOwners(nodeListMap); +// return result; +// } + return null; } private Map<Node, List<Integer>> retrieveSlots(Node target) { Map<Node, List<Integer>> newHolderSlotMap = new HashMap<>(); - List<Integer> slots = nodeSlotMap.remove(target); - for (int i = 0; i < slots.size(); i++) { - int slot = slots.get(i); - Node newHolder = nodeRing.get(i % nodeRing.size()); - slotNodes[slot] = newHolder; - nodeSlotMap.get(newHolder).add(slot); - newHolderSlotMap.computeIfAbsent(newHolder, n -> new ArrayList<>()).add(slot); - } +// List<Integer> slots = nodeSlotMap.remove(target); +// for (int i = 0; i < slots.size(); i++) { +// int slot = slots.get(i); +// Node newHolder = nodeRing.get(i % nodeRing.size()); +// slotNodes[slot] = newHolder; +// nodeSlotMap.get(newHolder).add(slot); +// newHolderSlotMap.computeIfAbsent(newHolder, n -> new ArrayList<>()).add(slot); +// } return newHolderSlotMap; } @@ -494,7 +517,9 @@ public class SlotPartitionTable implements PartitionTable { private void calculateGlobalGroups() { globalGroups = new ArrayList<>(); for (Node n : getAllNodes()) { - globalGroups.add(getHeaderGroup(n)); + for (int i = 0; i < raftGroupNum; i++) { + globalGroups.add(getHeaderGroup(new Pair<>(n, i))); + } } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java index 391cf69..76fca3b 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java @@ -22,10 +22,8 @@ package org.apache.iotdb.cluster.server; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.iotdb.cluster.config.ClusterDescriptor; @@ -65,6 +63,7 @@ import org.apache.iotdb.cluster.server.member.MetaGroupMember; import org.apache.iotdb.cluster.server.service.DataAsyncService; import org.apache.iotdb.cluster.server.service.DataSyncService; import org.apache.iotdb.service.rpc.thrift.TSStatus; +import org.apache.iotdb.tsfile.utils.Pair; import org.apache.thrift.TException; import org.apache.thrift.TProcessor; import org.apache.thrift.async.AsyncMethodCallback; @@ -82,9 +81,9 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async // key: the header of a data group, value: the member representing this node in this group and // it is currently at service - private Map<Node, DataGroupMember> headerGroupMap = new ConcurrentHashMap<>(); - private Map<Node, DataAsyncService> asyncServiceMap = new ConcurrentHashMap<>(); - private Map<Node, DataSyncService> syncServiceMap = new ConcurrentHashMap<>(); + private Map<Pair<Node, Integer>, DataGroupMember> headerGroupMap = new ConcurrentHashMap<>(); + private Map<Pair<Node, Integer>, DataAsyncService> asyncServiceMap = new ConcurrentHashMap<>(); + private Map<Pair<Node, Integer>, DataSyncService> syncServiceMap = new ConcurrentHashMap<>(); // key: the header of a data group, value: the member representing this node in this group but // it is out of service because another node has joined the group and expelled this node, or // the node itself is removed, but it is still stored to provide snapshot for other nodes @@ -117,50 +116,56 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async * @param dataGroupMember */ public void addDataGroupMember(DataGroupMember dataGroupMember) { - DataGroupMember removedMember = headerGroupMap.remove(dataGroupMember.getHeader()); + Pair<Node, Integer> pair = new Pair<>(dataGroupMember.getHeader(), + dataGroupMember.getRaftGroupId()); + DataGroupMember removedMember = headerGroupMap + .remove(pair); if (removedMember != null) { removedMember.stop(); - asyncServiceMap.remove(dataGroupMember.getHeader()); - syncServiceMap.remove(dataGroupMember.getHeader()); + asyncServiceMap.remove(pair); + syncServiceMap.remove(pair); } - stoppedMemberManager.remove(dataGroupMember.getHeader()); + stoppedMemberManager.remove(pair); - headerGroupMap.put(dataGroupMember.getHeader(), dataGroupMember); + headerGroupMap.put(pair, dataGroupMember); } - private <T> DataAsyncService getDataAsyncService(Node header, + private <T> DataAsyncService getDataAsyncService(Node header, Integer id, AsyncMethodCallback<T> resultHandler, Object request) { - return asyncServiceMap.computeIfAbsent(header, h -> { - DataGroupMember dataMember = getDataMember(h, resultHandler, request); + Pair<Node, Integer> pair = new Pair<>(header, id); + return asyncServiceMap.computeIfAbsent(pair, h -> { + DataGroupMember dataMember = getDataMember(pair, resultHandler, request); return dataMember != null ? new DataAsyncService(dataMember) : null; }); } - private DataSyncService getDataSyncService(Node header) { - return syncServiceMap.computeIfAbsent(header, h -> { - DataGroupMember dataMember = getDataMember(h, null, null); + private DataSyncService getDataSyncService(Node header, Integer id) { + Pair<Node, Integer> pair = new Pair<>(header, id); + return syncServiceMap.computeIfAbsent(pair, h -> { + DataGroupMember dataMember = getDataMember(pair, null, null); return dataMember != null ? new DataSyncService(dataMember) : null; }); } /** - * @param header the header of the group which the local node is in + * @param pair the header of the group which the local node is in * @param resultHandler can be set to null if the request is an internal request * @param request the toString() of this parameter should explain what the request is and it * is only used in logs for tracing * @return */ - public <T> DataGroupMember getDataMember(Node header, AsyncMethodCallback<T> resultHandler, + public <T> DataGroupMember getDataMember(Pair<Node, Integer> pair, + AsyncMethodCallback<T> resultHandler, Object request) { // if the resultHandler is not null, then the request is a external one and must be with a // header - if (header == null) { + if (pair.left == null) { if (resultHandler != null) { resultHandler.onError(new NoHeaderNodeException()); } return null; } - DataGroupMember member = stoppedMemberManager.get(header); + DataGroupMember member = stoppedMemberManager.get(pair); if (member != null) { return member; } @@ -168,14 +173,14 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async // avoid creating two members for a header Exception ex = null; synchronized (headerGroupMap) { - member = headerGroupMap.get(header); + member = headerGroupMap.get(pair); if (member != null) { return member; } - logger.info("Received a request \"{}\" from unregistered header {}", request, header); + logger.info("Received a request \"{}\" from unregistered header {}", request, pair); if (partitionTable != null) { try { - member = createNewMember(header); + member = createNewMember(pair); } catch (NotInSameGroupException | CheckConsistencyException e) { ex = e; } @@ -191,37 +196,37 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async } /** - * @param header + * @param pair * @return A DataGroupMember representing this node in the data group of the header. * @throws NotInSameGroupException If this node is not in the group of the header. */ - private DataGroupMember createNewMember(Node header) + private DataGroupMember createNewMember(Pair<Node, Integer> pair) throws NotInSameGroupException, CheckConsistencyException { DataGroupMember member; PartitionGroup partitionGroup; - partitionGroup = partitionTable.getHeaderGroup(header); + partitionGroup = partitionTable.getHeaderGroup(pair); if (partitionGroup == null || !partitionGroup.contains(thisNode)) { // if the partition table is old, this node may have not been moved to the new group metaGroupMember.syncLeaderWithConsistencyCheck(true); - partitionGroup = partitionTable.getHeaderGroup(header); + partitionGroup = partitionTable.getHeaderGroup(pair); } if (partitionGroup != null && partitionGroup.contains(thisNode)) { // the two nodes are in the same group, create a new data member member = dataMemberFactory.create(partitionGroup, thisNode); - DataGroupMember prevMember = headerGroupMap.put(header, member); + DataGroupMember prevMember = headerGroupMap.put(pair, member); if (prevMember != null) { prevMember.stop(); } - logger.info("Created a member for header {}", header); + logger.info("Created a member for header {}", pair); member.start(); } else { // the member may have been stopped after syncLeader - member = stoppedMemberManager.get(header); + member = stoppedMemberManager.get(pair); if (member != null) { return member; } logger.info("This node {} does not belong to the group {}, header {}", thisNode, - partitionGroup, header); + partitionGroup, pair); throw new NotInSameGroupException(partitionGroup, thisNode); } return member; @@ -233,8 +238,8 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async @Override public void sendHeartbeat(HeartBeatRequest request, AsyncMethodCallback<HeartBeatResponse> resultHandler) { - Node header = request.getHeader(); - DataAsyncService service = getDataAsyncService(header, resultHandler, request); + DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), + resultHandler, request); if (service != null) { service.sendHeartbeat(request, resultHandler); } @@ -242,8 +247,8 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async @Override public void startElection(ElectionRequest request, AsyncMethodCallback<Long> resultHandler) { - Node header = request.getHeader(); - DataAsyncService service = getDataAsyncService(header, resultHandler, request); + DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), + resultHandler, request); if (service != null) { service.startElection(request, resultHandler); } @@ -251,8 +256,8 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async @Override public void appendEntries(AppendEntriesRequest request, AsyncMethodCallback<Long> resultHandler) { - Node header = request.getHeader(); - DataAsyncService service = getDataAsyncService(header, resultHandler, request); + DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), + resultHandler, request); if (service != null) { service.appendEntries(request, resultHandler); } @@ -260,8 +265,8 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async @Override public void appendEntry(AppendEntryRequest request, AsyncMethodCallback<Long> resultHandler) { - Node header = request.getHeader(); - DataAsyncService service = getDataAsyncService(header, resultHandler, request); + DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), + resultHandler, request); if (service != null) { service.appendEntry(request, resultHandler); } @@ -269,8 +274,8 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async @Override public void sendSnapshot(SendSnapshotRequest request, AsyncMethodCallback<Void> resultHandler) { - Node header = request.getHeader(); - DataAsyncService service = getDataAsyncService(header, resultHandler, request); + DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), + resultHandler, request); if (service != null) { service.sendSnapshot(request, resultHandler); } @@ -279,8 +284,8 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async @Override public void pullSnapshot(PullSnapshotRequest request, AsyncMethodCallback<PullSnapshotResp> resultHandler) { - Node header = request.getHeader(); - DataAsyncService service = getDataAsyncService(header, resultHandler, request); + DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), + resultHandler, request); if (service != null) { service.pullSnapshot(request, resultHandler); } @@ -289,16 +294,17 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async @Override public void executeNonQueryPlan(ExecutNonQueryReq request, AsyncMethodCallback<TSStatus> resultHandler) { - Node header = request.getHeader(); - DataAsyncService service = getDataAsyncService(header, resultHandler, request); + DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), + resultHandler, request); if (service != null) { service.executeNonQueryPlan(request, resultHandler); } } @Override - public void requestCommitIndex(Node header, AsyncMethodCallback<Long> resultHandler) { - DataAsyncService service = getDataAsyncService(header, resultHandler, "Request commit index"); + public void requestCommitIndex(Node header, int id, AsyncMethodCallback<Long> resultHandler) { + DataAsyncService service = getDataAsyncService(header, id, resultHandler, + "Request commit index"); if (service != null) { service.requestCommitIndex(header, resultHandler); } @@ -307,7 +313,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async @Override public void readFile(String filePath, long offset, int length, AsyncMethodCallback<ByteBuffer> resultHandler) { - DataAsyncService service = getDataAsyncService(thisNode, resultHandler, + DataAsyncService service = getDataAsyncService(thisNode, 0, resultHandler, "Read file:" + filePath); if (service != null) { service.readFile(filePath, offset, length, resultHandler); @@ -317,7 +323,8 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async @Override public void querySingleSeries(SingleSeriesQueryRequest request, AsyncMethodCallback<Long> resultHandler) { - DataAsyncService service = getDataAsyncService(request.getHeader(), resultHandler, + DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), + resultHandler, "Query series:" + request.getPath()); if (service != null) { service.querySingleSeries(request, resultHandler); @@ -325,9 +332,9 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async } @Override - public void fetchSingleSeries(Node header, long readerId, + public void fetchSingleSeries(Node header, int id, long readerId, AsyncMethodCallback<ByteBuffer> resultHandler) { - DataAsyncService service = getDataAsyncService(header, resultHandler, + DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Fetch reader:" + readerId); if (service != null) { service.fetchSingleSeries(header, readerId, resultHandler); @@ -335,18 +342,18 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async } @Override - public void getAllPaths(Node header, List<String> paths, boolean withAlias, + public void getAllPaths(Node header, int id, List<String> paths, boolean withAlias, AsyncMethodCallback<GetAllPathsResult> resultHandler) { - DataAsyncService service = getDataAsyncService(header, resultHandler, "Find path:" + paths); + DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Find path:" + paths); if (service != null) { service.getAllPaths(header, paths, withAlias, resultHandler); } } @Override - public void endQuery(Node header, Node thisNode, long queryId, + public void endQuery(Node header, int id, Node thisNode, long queryId, AsyncMethodCallback<Void> resultHandler) { - DataAsyncService service = getDataAsyncService(header, resultHandler, "End query"); + DataAsyncService service = getDataAsyncService(header, id, resultHandler, "End query"); if (service != null) { service.endQuery(header, thisNode, queryId, resultHandler); } @@ -355,7 +362,8 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async @Override public void querySingleSeriesByTimestamp(SingleSeriesQueryRequest request, AsyncMethodCallback<Long> resultHandler) { - DataAsyncService service = getDataAsyncService(request.getHeader(), resultHandler, + DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), + resultHandler, "Query by timestamp:" + request.getQueryId() + "#" + request.getPath() + " of " + request .getRequester()); if (service != null) { @@ -364,9 +372,9 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async } @Override - public void fetchSingleSeriesByTimestamp(Node header, long readerId, long time, + public void fetchSingleSeriesByTimestamp(Node header, int id, long readerId, long time, AsyncMethodCallback<ByteBuffer> resultHandler) { - DataAsyncService service = getDataAsyncService(header, resultHandler, + DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Fetch by timestamp:" + readerId); if (service != null) { service.fetchSingleSeriesByTimestamp(header, readerId, time, resultHandler); @@ -376,8 +384,8 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async @Override public void pullTimeSeriesSchema(PullSchemaRequest request, AsyncMethodCallback<PullSchemaResp> resultHandler) { - Node header = request.getHeader(); - DataAsyncService service = getDataAsyncService(header, resultHandler, request); + DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), + resultHandler, request); if (service != null) { service.pullTimeSeriesSchema(request, resultHandler); } @@ -386,37 +394,38 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async @Override public void pullMeasurementSchema(PullSchemaRequest request, AsyncMethodCallback<PullSchemaResp> resultHandler) { - DataAsyncService service = getDataAsyncService(request.getHeader(), resultHandler, + DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), + resultHandler, "Pull measurement schema"); service.pullMeasurementSchema(request, resultHandler); } @Override - public void getAllDevices(Node header, List<String> paths, + public void getAllDevices(Node header, int id, List<String> paths, AsyncMethodCallback<Set<String>> resultHandler) { - DataAsyncService service = getDataAsyncService(header, resultHandler, "Get all devices"); + DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Get all devices"); service.getAllDevices(header, paths, resultHandler); } @Override - public void getNodeList(Node header, String path, int nodeLevel, + public void getNodeList(Node header, int id, String path, int nodeLevel, AsyncMethodCallback<List<String>> resultHandler) { - DataAsyncService service = getDataAsyncService(header, resultHandler, "Get node list"); + DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Get node list"); service.getNodeList(header, path, nodeLevel, resultHandler); } @Override - public void getChildNodePathInNextLevel(Node header, String path, + public void getChildNodePathInNextLevel(Node header, int id, String path, AsyncMethodCallback<Set<String>> resultHandler) { - DataAsyncService service = getDataAsyncService(header, resultHandler, + DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Get child node path in next level"); service.getChildNodePathInNextLevel(header, path, resultHandler); } @Override - public void getAllMeasurementSchema(Node header, ByteBuffer planBytes, + public void getAllMeasurementSchema(Node header, int id, ByteBuffer planBytes, AsyncMethodCallback<ByteBuffer> resultHandler) { - DataAsyncService service = getDataAsyncService(header, resultHandler, + DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Get all measurement schema"); service.getAllMeasurementSchema(header, planBytes, resultHandler); } @@ -424,28 +433,30 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async @Override public void getAggrResult(GetAggrResultRequest request, AsyncMethodCallback<List<ByteBuffer>> resultHandler) { - DataAsyncService service = getDataAsyncService(request.getHeader(), resultHandler, request); + DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), + resultHandler, request); service.getAggrResult(request, resultHandler); } @Override - public void getUnregisteredTimeseries(Node header, List<String> timeseriesList, + public void getUnregisteredTimeseries(Node header, int id, List<String> timeseriesList, AsyncMethodCallback<List<String>> resultHandler) { - DataAsyncService service = getDataAsyncService(header, resultHandler, + DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Check if measurements are registered"); service.getUnregisteredTimeseries(header, timeseriesList, resultHandler); } @Override public void getGroupByExecutor(GroupByRequest request, AsyncMethodCallback<Long> resultHandler) { - DataAsyncService service = getDataAsyncService(request.getHeader(), resultHandler, request); + DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), + resultHandler, request); service.getGroupByExecutor(request, resultHandler); } @Override - public void getGroupByResult(Node header, long executorId, long startTime, long endTime, + public void getGroupByResult(Node header, int id, long executorId, long startTime, long endTime, AsyncMethodCallback<List<ByteBuffer>> resultHandler) { - DataAsyncService service = getDataAsyncService(header, resultHandler, "Fetch group by"); + DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Fetch group by"); service.getGroupByResult(header, executorId, startTime, endTime, resultHandler); } @@ -488,37 +499,37 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async * @param result */ public void addNode(Node node, NodeAdditionResult result) { - Iterator<Entry<Node, DataGroupMember>> entryIterator = headerGroupMap.entrySet().iterator(); - synchronized (headerGroupMap) { - while (entryIterator.hasNext()) { - Entry<Node, DataGroupMember> entry = entryIterator.next(); - DataGroupMember dataGroupMember = entry.getValue(); - // the member may be extruded from the group, remove and stop it if so - boolean shouldLeave = dataGroupMember.addNode(node, result); - if (shouldLeave) { - logger.info("This node does not belong to {} any more", dataGroupMember.getAllNodes()); - entryIterator.remove(); - removeMember(entry.getKey(), entry.getValue()); - } - } - - if (result.getNewGroup().contains(thisNode)) { - logger.info("Adding this node into a new group {}", result.getNewGroup()); - DataGroupMember dataGroupMember = dataMemberFactory.create(result.getNewGroup(), thisNode); - addDataGroupMember(dataGroupMember); - dataGroupMember.start(); - dataGroupMember - .pullNodeAdditionSnapshots(((SlotPartitionTable) partitionTable).getNodeSlots(node), - node); - } - } +// Iterator<Entry<Node, DataGroupMember>> entryIterator = headerGroupMap.entrySet().iterator(); +// synchronized (headerGroupMap) { +// while (entryIterator.hasNext()) { +// Entry<Node, DataGroupMember> entry = entryIterator.next(); +// DataGroupMember dataGroupMember = entry.getValue(); +// // the member may be extruded from the group, remove and stop it if so +// boolean shouldLeave = dataGroupMember.addNode(node, result); +// if (shouldLeave) { +// logger.info("This node does not belong to {} any more", dataGroupMember.getAllNodes()); +// entryIterator.remove(); +// removeMember(entry.getKey(), entry.getValue()); +// } +// } +// +// if (result.getNewGroup().contains(thisNode)) { +// logger.info("Adding this node into a new group {}", result.getNewGroup()); +// DataGroupMember dataGroupMember = dataMemberFactory.create(result.getNewGroup(), thisNode); +// addDataGroupMember(dataGroupMember); +// dataGroupMember.start(); +// dataGroupMember +// .pullNodeAdditionSnapshots(((SlotPartitionTable) partitionTable).getNodeSlots(node), +// node); +// } +// } } private void removeMember(Node header, DataGroupMember dataGroupMember) { - dataGroupMember.syncLeader(); - dataGroupMember.setReadOnly(); - dataGroupMember.stop(); - stoppedMemberManager.put(header, dataGroupMember); +// dataGroupMember.syncLeader(); +// dataGroupMember.setReadOnly(); +// dataGroupMember.stop(); +// stoppedMemberManager.put(header, dataGroupMember); } /** @@ -574,43 +585,43 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async * @param removalResult cluster changes due to the node removal */ public void removeNode(Node node, NodeRemovalResult removalResult) { - Iterator<Entry<Node, DataGroupMember>> entryIterator = headerGroupMap.entrySet().iterator(); - synchronized (headerGroupMap) { - while (entryIterator.hasNext()) { - Entry<Node, DataGroupMember> entry = entryIterator.next(); - DataGroupMember dataGroupMember = entry.getValue(); - if (dataGroupMember.getHeader().equals(node)) { - // the group is removed as the node is removed, so new writes should be rejected as - // they belong to the new holder, but the member is kept alive for other nodes to pull - // snapshots - entryIterator.remove(); - removeMember(entry.getKey(), entry.getValue()); - } else { - if (node.equals(thisNode)) { - // this node is removed, it is no more replica of other groups - List<Integer> nodeSlots = - ((SlotPartitionTable) partitionTable).getNodeSlots(dataGroupMember.getHeader()); - dataGroupMember.removeLocalData(nodeSlots); - entryIterator.remove(); - dataGroupMember.stop(); - } else { - // the group should be updated and pull new slots from the removed node - dataGroupMember.removeNode(node, removalResult); - } - } - } - PartitionGroup newGroup = removalResult.getNewGroup(); - if (newGroup != null) { - logger.info("{} should join a new group {}", thisNode, newGroup); - try { - createNewMember(newGroup.getHeader()); - } catch (NotInSameGroupException e) { - // ignored - } catch (CheckConsistencyException ce) { - logger.error("remove node failed, error={}", ce.getMessage()); - } - } - } +// Iterator<Entry<Node, DataGroupMember>> entryIterator = headerGroupMap.entrySet().iterator(); +// synchronized (headerGroupMap) { +// while (entryIterator.hasNext()) { +// Entry<Node, DataGroupMember> entry = entryIterator.next(); +// DataGroupMember dataGroupMember = entry.getValue(); +// if (dataGroupMember.getHeader().equals(node)) { +// // the group is removed as the node is removed, so new writes should be rejected as +// // they belong to the new holder, but the member is kept alive for other nodes to pull +// // snapshots +// entryIterator.remove(); +// removeMember(entry.getKey(), entry.getValue()); +// } else { +// if (node.equals(thisNode)) { +// // this node is removed, it is no more replica of other groups +// List<Integer> nodeSlots = +// ((SlotPartitionTable) partitionTable).getNodeSlots(dataGroupMember.getHeader()); +// dataGroupMember.removeLocalData(nodeSlots); +// entryIterator.remove(); +// dataGroupMember.stop(); +// } else { +// // the group should be updated and pull new slots from the removed node +// dataGroupMember.removeNode(node, removalResult); +// } +// } +// } +// PartitionGroup newGroup = removalResult.getNewGroup(); +// if (newGroup != null) { +// logger.info("{} should join a new group {}", thisNode, newGroup); +// try { +// createNewMember(newGroup.getHeader()); +// } catch (NotInSameGroupException e) { +// // ignored +// } catch (CheckConsistencyException ce) { +// logger.error("remove node failed, error={}", ce.getMessage()); +// } +// } +// } } public void setPartitionTable(PartitionTable partitionTable) { @@ -642,7 +653,8 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async @Override public void previousFill(PreviousFillRequest request, AsyncMethodCallback<ByteBuffer> resultHandler) { - DataAsyncService service = getDataAsyncService(request.getHeader(), resultHandler, request); + DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), + resultHandler, request); service.previousFill(request, resultHandler); } @@ -653,208 +665,226 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async } @Override - public void matchTerm(long index, long term, Node header, + public void matchTerm(long index, long term, Node header, int id, AsyncMethodCallback<Boolean> resultHandler) { - DataAsyncService service = getDataAsyncService(header, resultHandler, "Match term"); + DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Match term"); service.matchTerm(index, term, header, resultHandler); } @Override public void last(LastQueryRequest request, AsyncMethodCallback<ByteBuffer> resultHandler) { - DataAsyncService service = getDataAsyncService(request.getHeader(), resultHandler, "last"); + DataAsyncService service = getDataAsyncService(request.getHeader(), request.getRaftId(), + resultHandler, "last"); service.last(request, resultHandler); } @Override - public void getPathCount(Node header, List<String> pathsToQuery, int level, + public void getPathCount(Node header, int id, List<String> pathsToQuery, int level, AsyncMethodCallback<Integer> resultHandler) { - DataAsyncService service = getDataAsyncService(header, resultHandler, "count path"); + DataAsyncService service = getDataAsyncService(header, id, resultHandler, "count path"); service.getPathCount(header, pathsToQuery, level, resultHandler); } @Override - public void onSnapshotApplied(Node header, List<Integer> slots, + public void onSnapshotApplied(Node header, int id, List<Integer> slots, AsyncMethodCallback<Boolean> resultHandler) { - DataAsyncService service = getDataAsyncService(header, resultHandler, "Snapshot applied"); + DataAsyncService service = getDataAsyncService(header, id, resultHandler, "Snapshot applied"); service.onSnapshotApplied(header, slots, resultHandler); } @Override public long querySingleSeries(SingleSeriesQueryRequest request) throws TException { - return getDataSyncService(request.getHeader()).querySingleSeries(request); + return getDataSyncService(request.getHeader(), request.getRaftId()).querySingleSeries(request); } @Override - public ByteBuffer fetchSingleSeries(Node header, long readerId) throws TException { - return getDataSyncService(header).fetchSingleSeries(header, readerId); + public ByteBuffer fetchSingleSeries(Node header, int raftId, long readerId) throws TException { + return getDataSyncService(header, raftId).fetchSingleSeries(header, readerId); } @Override public long querySingleSeriesByTimestamp(SingleSeriesQueryRequest request) throws TException { - return getDataSyncService(request.getHeader()).querySingleSeriesByTimestamp(request); + return getDataSyncService(request.getHeader(), request.getRaftId()) + .querySingleSeriesByTimestamp(request); } @Override - public ByteBuffer fetchSingleSeriesByTimestamp(Node header, long readerId, long timestamp) + public ByteBuffer fetchSingleSeriesByTimestamp(Node header, int raftId, long readerId, + long timestamp) throws TException { - return getDataSyncService(header).fetchSingleSeriesByTimestamp(header, readerId, timestamp); + return getDataSyncService(header, raftId) + .fetchSingleSeriesByTimestamp(header, readerId, timestamp); } @Override - public void endQuery(Node header, Node thisNode, long queryId) throws TException { - getDataSyncService(header).endQuery(header, thisNode, queryId); + public void endQuery(Node header, int raftId, Node thisNode, long queryId) throws TException { + getDataSyncService(header, raftId).endQuery(header, thisNode, queryId); } @Override - public GetAllPathsResult getAllPaths(Node header, List<String> path, boolean withAlias) + public GetAllPathsResult getAllPaths(Node header, int raftId, List<String> path, + boolean withAlias) throws TException { - return getDataSyncService(header).getAllPaths(header, path, withAlias); + return getDataSyncService(header, raftId).getAllPaths(header, path, withAlias); } @Override - public Set<String> getAllDevices(Node header, List<String> path) throws TException { - return getDataSyncService(header).getAllDevices(header, path); + public Set<String> getAllDevices(Node header, int raftId, List<String> path) throws TException { + return getDataSyncService(header, raftId).getAllDevices(header, path); } @Override - public List<String> getNodeList(Node header, String path, int nodeLevel) throws TException { - return getDataSyncService(header).getNodeList(header, path, nodeLevel); + public List<String> getNodeList(Node header, int raftId, String path, int nodeLevel) + throws TException { + return getDataSyncService(header, raftId).getNodeList(header, path, nodeLevel); } @Override - public Set<String> getChildNodePathInNextLevel(Node header, String path) throws TException { - return getDataSyncService(header).getChildNodePathInNextLevel(header, path); + public Set<String> getChildNodePathInNextLevel(Node header, int raftId, String path) + throws TException { + return getDataSyncService(header, raftId).getChildNodePathInNextLevel(header, path); } @Override - public ByteBuffer getAllMeasurementSchema(Node header, ByteBuffer planBinary) throws TException { - return getDataSyncService(header).getAllMeasurementSchema(header, planBinary); + public ByteBuffer getAllMeasurementSchema(Node header, int raftId, ByteBuffer planBinary) + throws TException { + return getDataSyncService(header, raftId).getAllMeasurementSchema(header, planBinary); } @Override public List<ByteBuffer> getAggrResult(GetAggrResultRequest request) throws TException { - return getDataSyncService(request.getHeader()).getAggrResult(request); + return getDataSyncService(request.getHeader(), request.getRaftId()).getAggrResult(request); } @Override - public List<String> getUnregisteredTimeseries(Node header, List<String> timeseriesList) + public List<String> getUnregisteredTimeseries(Node header, int raftId, + List<String> timeseriesList) throws TException { - return getDataSyncService(header).getUnregisteredTimeseries(header, timeseriesList); + return getDataSyncService(header, raftId).getUnregisteredTimeseries(header, timeseriesList); } @Override public PullSnapshotResp pullSnapshot(PullSnapshotRequest request) throws TException { - return getDataSyncService(request.getHeader()).pullSnapshot(request); + return getDataSyncService(request.getHeader(), request.getRaftId()).pullSnapshot(request); } @Override public long getGroupByExecutor(GroupByRequest request) throws TException { - return getDataSyncService(request.header).getGroupByExecutor(request); + return getDataSyncService(request.getHeader(), request.getRaftId()).getGroupByExecutor(request); } @Override - public List<ByteBuffer> getGroupByResult(Node header, long executorId, long startTime, + public List<ByteBuffer> getGroupByResult(Node header, int raftId, long executorId, long startTime, long endTime) throws TException { - return getDataSyncService(header).getGroupByResult(header, executorId, startTime, endTime); + return getDataSyncService(header, raftId) + .getGroupByResult(header, executorId, startTime, endTime); } @Override public PullSchemaResp pullTimeSeriesSchema(PullSchemaRequest request) throws TException { - return getDataSyncService(request.getHeader()).pullTimeSeriesSchema(request); + return getDataSyncService(request.getHeader(), request.getRaftId()) + .pullTimeSeriesSchema(request); } @Override public PullSchemaResp pullMeasurementSchema(PullSchemaRequest request) throws TException { - return getDataSyncService(request.getHeader()).pullMeasurementSchema(request); + return getDataSyncService(request.getHeader(), request.getRaftId()) + .pullMeasurementSchema(request); } @Override public ByteBuffer previousFill(PreviousFillRequest request) throws TException { - return getDataSyncService(request.getHeader()).previousFill(request); + return getDataSyncService(request.getHeader(), request.getRaftId()).previousFill(request); } @Override public ByteBuffer last(LastQueryRequest request) throws TException { - return getDataSyncService(request.getHeader()).last(request); + return getDataSyncService(request.getHeader(), request.getRaftId()).last(request); } @Override - public int getPathCount(Node header, List<String> pathsToQuery, int level) throws TException { - return getDataSyncService(header).getPathCount(header, pathsToQuery, level); + public int getPathCount(Node header, int raftId, List<String> pathsToQuery, int level) + throws TException { + return getDataSyncService(header, raftId).getPathCount(header, pathsToQuery, level); } @Override - public boolean onSnapshotApplied(Node header, List<Integer> slots) { - return getDataSyncService(header).onSnapshotApplied(header, slots); + public boolean onSnapshotApplied(Node header, int raftId, List<Integer> slots) { + return getDataSyncService(header, raftId).onSnapshotApplied(header, slots); } @Override public HeartBeatResponse sendHeartbeat(HeartBeatRequest request) { - return getDataSyncService(request.getHeader()).sendHeartbeat(request); + return getDataSyncService(request.getHeader(), request.getRaftId()).sendHeartbeat(request); } @Override public long startElection(ElectionRequest request) { - return getDataSyncService(request.getHeader()).startElection(request); + return getDataSyncService(request.getHeader(), request.getRaftId()).startElection(request); } @Override public long appendEntries(AppendEntriesRequest request) throws TException { - return getDataSyncService(request.getHeader()).appendEntries(request); + return getDataSyncService(request.getHeader(), request.getRaftId()).appendEntries(request); } @Override public long appendEntry(AppendEntryRequest request) throws TException { - return getDataSyncService(request.getHeader()).appendEntry(request); + return getDataSyncService(request.getHeader(), request.getRaftId()).appendEntry(request); } @Override public void sendSnapshot(SendSnapshotRequest request) throws TException { - getDataSyncService(request.getHeader()).sendSnapshot(request); + getDataSyncService(request.getHeader(), request.getRaftId()).sendSnapshot(request); } @Override public TSStatus executeNonQueryPlan(ExecutNonQueryReq request) throws TException { - return getDataSyncService(request.getHeader()).executeNonQueryPlan(request); + return getDataSyncService(request.getHeader(), request.getRaftId()) + .executeNonQueryPlan(request); } @Override - public long requestCommitIndex(Node header) throws TException { - return getDataSyncService(header).requestCommitIndex(header); + public long requestCommitIndex(Node header, int raftId) throws TException { + return getDataSyncService(header, raftId).requestCommitIndex(header); } @Override public ByteBuffer readFile(String filePath, long offset, int length) throws TException { - return getDataSyncService(thisNode).readFile(filePath, offset, length); + return getDataSyncService(thisNode, 0).readFile(filePath, offset, length); } @Override - public boolean matchTerm(long index, long term, Node header) { - return getDataSyncService(header).matchTerm(index, term, header); + public boolean matchTerm(long index, long term, Node header, int raftId) { + return getDataSyncService(header, raftId).matchTerm(index, term, header); } @Override - public ByteBuffer peekNextNotNullValue(Node header, long executorId, long startTime, long endTime) + public ByteBuffer peekNextNotNullValue(Node header, int raftId, long executorId, long startTime, + long endTime) throws TException { - return getDataSyncService(header).peekNextNotNullValue(header, executorId, startTime, endTime); + return getDataSyncService(header, raftId) + .peekNextNotNullValue(header, executorId, startTime, endTime); } @Override - public void peekNextNotNullValue(Node header, long executorId, long startTime, long endTime, + public void peekNextNotNullValue(Node header, int raftId, long executorId, long startTime, + long endTime, AsyncMethodCallback<ByteBuffer> resultHandler) throws TException { resultHandler.onComplete( - getDataSyncService(header).peekNextNotNullValue(header, executorId, startTime, endTime)); + getDataSyncService(header, raftId) + .peekNextNotNullValue(header, executorId, startTime, endTime)); } @Override public void removeHardLink(String hardLinkPath) throws TException { - getDataSyncService(thisNode).removeHardLink(hardLinkPath); + getDataSyncService(thisNode, 0).removeHardLink(hardLinkPath); } @Override public void removeHardLink(String hardLinkPath, AsyncMethodCallback<Void> resultHandler) { - getDataAsyncService(thisNode, resultHandler, hardLinkPath).removeHardLink(hardLinkPath, + getDataAsyncService(thisNode, 0, resultHandler, hardLinkPath).removeHardLink(hardLinkPath, resultHandler); } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java index 6fd2059..b203aaf 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java @@ -33,16 +33,17 @@ import org.apache.iotdb.cluster.server.member.DataGroupMember; import org.apache.iotdb.cluster.server.member.DataGroupMember.Factory; import org.apache.iotdb.cluster.utils.ClusterUtils; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * When a node is added or removed, several partition groups are affected and nodes may exit some - * groups. For example, the local node is #5 and it is in a data group of [1, 3, 5], then node #3 - * is added, so the group becomes [1, 3, 4] and the local node must leave the group. However, #5 - * may have data that #4 needs to pull, so the Member of #5 in this group is stopped but not - * removed yet and when system recovers, we need to resume the groups so that they can keep - * providing snapshots for data transfers. + * groups. For example, the local node is #5 and it is in a data group of [1, 3, 5], then node #3 is + * added, so the group becomes [1, 3, 4] and the local node must leave the group. However, #5 may + * have data that #4 needs to pull, so the Member of #5 in this group is stopped but not removed yet + * and when system recovers, we need to resume the groups so that they can keep providing snapshots + * for data transfers. */ public class StoppedMemberManager { @@ -53,7 +54,7 @@ public class StoppedMemberManager { private static final String REMOVED = "0"; private static final String RESUMED = "1"; - private Map<Node, DataGroupMember> removedMemberMap = new HashMap<>(); + private Map<Pair<Node, Integer>, DataGroupMember> removedMemberMap = new HashMap<>(); private DataGroupMember.Factory memberFactory; private Node thisNode; @@ -65,13 +66,14 @@ public class StoppedMemberManager { } /** - * When a DataGroupMember is removed, add it here and record this removal, so in next start-up - * we can recover it as a data source for data transfers. - * @param header + * When a DataGroupMember is removed, add it here and record this removal, so in next start-up we + * can recover it as a data source for data transfers. + * + * @param pair * @param dataGroupMember */ - public synchronized void put(Node header, DataGroupMember dataGroupMember) { - removedMemberMap.put(header, dataGroupMember); + public synchronized void put(Pair<Node, Integer> pair, DataGroupMember dataGroupMember) { + removedMemberMap.put(pair, dataGroupMember); try (BufferedWriter writer = new BufferedWriter(new FileWriter(stoppedMembersFileName, true))) { StringBuilder builder = new StringBuilder(REMOVED); for (Node node : dataGroupMember.getAllNodes()) { @@ -80,27 +82,28 @@ public class StoppedMemberManager { writer.write(builder.toString()); writer.newLine(); } catch (IOException e) { - logger.error("Cannot record removed member of header {}", header, e); + logger.error("Cannot record removed member of header {}", pair, e); } } /** - * When a DataGroupMember is resumed, add it here and record this removal, so in next start-up - * we will not recover it here. - * @param header + * When a DataGroupMember is resumed, add it here and record this removal, so in next start-up we + * will not recover it here. + * + * @param pair */ - public synchronized void remove(Node header) { - removedMemberMap.remove(header); + public synchronized void remove(Pair<Node, Integer> pair) { + removedMemberMap.remove(pair); try (BufferedWriter writer = new BufferedWriter(new FileWriter(stoppedMembersFileName, true))) { - writer.write(RESUMED + ";" + header.toString()); + writer.write(RESUMED + ";" + pair.toString()); writer.newLine(); } catch (IOException e) { - logger.error("Cannot record resumed member of header {}", header, e); + logger.error("Cannot record resumed member of header {}", pair, e); } } - public synchronized DataGroupMember get(Node header) { - return removedMemberMap.get(header); + public synchronized DataGroupMember get(Pair<Node, Integer> pair) { + return removedMemberMap.get(pair); } private void recover() { @@ -143,7 +146,8 @@ public class StoppedMemberManager { } DataGroupMember member = memberFactory.create(partitionGroup, thisNode); member.setReadOnly(); - removedMemberMap.put(partitionGroup.getHeader(), member); + //TODO CORRECT + removedMemberMap.put(new Pair(partitionGroup.getHeader(), 0), member); } private void parseResumed(String[] split) { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java index de200cf..62f2b71 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java @@ -143,8 +143,8 @@ public class DataGroupMember extends RaftMember { private LocalQueryExecutor localQueryExecutor; /** - * When a new partition table is installed, all data members will be checked if unchanged. If - * not, such members will be removed. + * When a new partition table is installed, all data members will be checked if unchanged. If not, + * such members will be removed. */ private boolean unchanged; @@ -237,6 +237,10 @@ public class DataGroupMember extends RaftMember { return allNodes.get(0); } + public Integer getRaftGroupId() { + return allNodes.getId(); + } + public ClusterQueryManager getQueryManager() { return queryManager; } @@ -385,7 +389,8 @@ public class DataGroupMember extends RaftMember { public void receiveSnapshot(SendSnapshotRequest request) throws SnapshotInstallationException { logger.info("{}: received a snapshot from {} with size {}", name, request.getHeader(), request.getSnapshotBytes().length); - PartitionedSnapshot<FileSnapshot> snapshot = new PartitionedSnapshot<>(FileSnapshot.Factory.INSTANCE); + PartitionedSnapshot<FileSnapshot> snapshot = new PartitionedSnapshot<>( + FileSnapshot.Factory.INSTANCE); snapshot.deserialize(ByteBuffer.wrap(request.getSnapshotBytes())); if (logger.isDebugEnabled()) { @@ -485,7 +490,8 @@ public class DataGroupMember extends RaftMember { Node node = entry.getKey(); List<Integer> nodeSlots = entry.getValue(); PullSnapshotTaskDescriptor taskDescriptor = - new PullSnapshotTaskDescriptor(metaGroupMember.getPartitionTable().getHeaderGroup(node), + new PullSnapshotTaskDescriptor(metaGroupMember.getPartitionTable() + .getHeaderGroup(new Pair<>(node, getRaftGroupId())), nodeSlots, false); pullFileSnapshot(taskDescriptor, null); } @@ -522,8 +528,9 @@ public class DataGroupMember extends RaftMember { descriptor.getSlots().get(0), descriptor.getSlots().size() - 1); } - pullSnapshotService.submit(new PullSnapshotTask<>(descriptor, this, FileSnapshot.Factory.INSTANCE, - snapshotSave)); + pullSnapshotService + .submit(new PullSnapshotTask<>(descriptor, this, FileSnapshot.Factory.INSTANCE, + snapshotSave)); } /** @@ -619,9 +626,9 @@ public class DataGroupMember extends RaftMember { List<Pair<Long, Boolean>> tmpPairList = entry.getValue(); for (Pair<Long, Boolean> pair : tmpPairList) { long partitionId = pair.left; - Node header = metaGroupMember.getPartitionTable().routeToHeaderByTime(storageGroupName, + Pair<Node, Integer> pair = metaGroupMember.getPartitionTable().routeToHeaderByTime(storageGroupName, partitionId * StorageEngine.getTimePartitionInterval()); - DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(header); + DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(pair); if (localDataMember.getHeader().equals(this.getHeader())) { localListPair.add(new Pair<>(partitionId, pair.right)); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java index 863314f..9f2fbb6 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java @@ -147,6 +147,7 @@ import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.EndPoint; import org.apache.iotdb.service.rpc.thrift.TSStatus; import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.apache.iotdb.tsfile.utils.Pair; import org.apache.thrift.TException; import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.transport.TTransportException; @@ -273,7 +274,7 @@ public class MetaGroupMember extends RaftMember { new SyncClientPool(new SyncMetaClient.FactorySync(factory)), new AsyncClientPool(new AsyncMetaHeartbeatClient.FactoryAsync(factory), false), new SyncClientPool(new SyncMetaHeartbeatClient.FactorySync(factory))); - allNodes = new ArrayList<>(); + allNodes = new PartitionGroup(); initPeerMap(); dataClientProvider = new DataClientProvider(factory); @@ -307,9 +308,9 @@ public class MetaGroupMember extends RaftMember { * @return true if the member is a leader and the partition is closed, false otherwise */ public void closePartition(String storageGroupName, long partitionId, boolean isSeq) { - Node header = partitionTable.routeToHeaderByTime(storageGroupName, + Pair<Node, Integer> pair = partitionTable.routeToHeaderByTime(storageGroupName, partitionId * StorageEngine.getTimePartitionInterval()); - DataGroupMember localDataMember = getLocalDataMember(header); + DataGroupMember localDataMember = getLocalDataMember(pair); if (localDataMember == null || localDataMember.getCharacter() != NodeCharacter.LEADER) { return; } @@ -678,7 +679,7 @@ public class MetaGroupMember extends RaftMember { } private void updateNodeList(Collection<Node> nodes) { - allNodes = new ArrayList<>(nodes); + allNodes = new PartitionGroup(nodes); initPeerMap(); logger.info("All nodes in the partition table: {}", allNodes); initIdNodeMap(); @@ -1730,7 +1731,7 @@ public class MetaGroupMember extends RaftMember { if (partitionGroup.contains(thisNode)) { // the query should be handled by a group the local node is in, handle it with in the group logger.debug("Execute {} in a local group of {}", plan, partitionGroup.getHeader()); - status = getLocalDataMember(partitionGroup.getHeader()) + status = getLocalDataMember(partitionGroup.getHeader(), partitionGroup.getId()) .executeNonQueryPlan(plan); } else { // forward the query to the group that should handle it @@ -2113,8 +2114,8 @@ public class MetaGroupMember extends RaftMember { } @Override - public void setAllNodes(List<Node> allNodes) { - super.setAllNodes(allNodes); + public void setAllNodes(PartitionGroup allNodes) { + super.setAllNodes(new PartitionGroup(allNodes)); initPeerMap(); idNodeMap = new HashMap<>(); for (Node node : allNodes) { @@ -2136,10 +2137,10 @@ public class MetaGroupMember extends RaftMember { /** * Get a local DataGroupMember that is in the group of "header" for an internal request. * - * @param header the header of the group which the local node is in + * @param pair the header of the group which the local node is in */ - public DataGroupMember getLocalDataMember(Node header) { - return dataClusterServer.getDataMember(header, null, "Internal call"); + public DataGroupMember getLocalDataMember(Pair<Node, Integer> pair) { + return dataClusterServer.getDataMember(pair, null, "Internal call"); } public DataClientProvider getClientProvider() { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java index 25201e9..f396e9c 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java @@ -63,6 +63,7 @@ import org.apache.iotdb.cluster.log.LogParser; import org.apache.iotdb.cluster.log.catchup.CatchUpTask; import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog; import org.apache.iotdb.cluster.log.manage.RaftLogManager; +import org.apache.iotdb.cluster.partition.PartitionGroup; import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest; import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest; import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest; @@ -146,7 +147,7 @@ public abstract class RaftMember { /** * the nodes that belong to the same raft group as thisNode. */ - protected List<Node> allNodes; + protected PartitionGroup allNodes; ClusterConfig config = ClusterDescriptor.getInstance().getConfig(); /** * the name of the member, to distinguish several members in the logs. @@ -657,7 +658,7 @@ public abstract class RaftMember { return allNodes; } - public void setAllNodes(List<Node> allNodes) { + public void setAllNodes(PartitionGroup allNodes) { this.allNodes = allNodes; } @@ -1006,7 +1007,7 @@ public abstract class RaftMember { return commitIdResult.get(); } synchronized (commitIdResult) { - client.requestCommitIndex(getHeader(), new GenericHandler<>(leader.get(), commitIdResult)); + client.requestCommitIndex(getHeader(), get,new GenericHandler<>(leader.get(), commitIdResult)); commitIdResult.wait(RaftServer.getSyncLeaderMaxWaitMs()); } return commitIdResult.get(); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java index 113c862..521050e 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java @@ -83,7 +83,7 @@ public abstract class BaseAsyncService implements RaftService.AsyncIface { } @Override - public void requestCommitIndex(Node header, AsyncMethodCallback<Long> resultHandler) { + public void requestCommitIndex(Node header, int raftId, AsyncMethodCallback<Long> resultHandler) { long commitIndex = member.getCommitIndex(); if (commitIndex != Long.MIN_VALUE) { resultHandler.onComplete(commitIndex); @@ -97,7 +97,7 @@ public abstract class BaseAsyncService implements RaftService.AsyncIface { return; } try { - client.requestCommitIndex(header, resultHandler); + client.requestCommitIndex(header, raftId, resultHandler); } catch (TException e) { resultHandler.onError(e); } @@ -125,7 +125,7 @@ public abstract class BaseAsyncService implements RaftService.AsyncIface { } @Override - public void matchTerm(long index, long term, Node header, + public void matchTerm(long index, long term, Node header, int raftId, AsyncMethodCallback<Boolean> resultHandler) { resultHandler.onComplete(member.matchLog(index, term)); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java index 5a07130..bffc345 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java @@ -90,7 +90,7 @@ public abstract class BaseSyncService implements RaftService.Iface { } @Override - public long requestCommitIndex(Node header) + public long requestCommitIndex(Node header, int raftId) throws TException { long commitIndex = member.getCommitIndex(); if (commitIndex != Long.MIN_VALUE) { @@ -104,7 +104,7 @@ public abstract class BaseSyncService implements RaftService.Iface { } long commitIndex1 = 0; try { - commitIndex1 = client.requestCommitIndex(header); + commitIndex1 = client.requestCommitIndex(header, raftId); } catch (TException e) { client.getInputProtocol().getTransport().close(); throw e; @@ -141,7 +141,7 @@ public abstract class BaseSyncService implements RaftService.Iface { } @Override - public boolean matchTerm(long index, long term, Node header) { + public boolean matchTerm(long index, long term, Node header, int raftId) { return member.matchLog(index, term); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java index 3e3accf..486274e 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java @@ -88,13 +88,15 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService. } } - private void forwardPullSnapshot(PullSnapshotRequest request, AsyncMethodCallback<PullSnapshotResp> resultHandler) { + private void forwardPullSnapshot(PullSnapshotRequest request, + AsyncMethodCallback<PullSnapshotResp> resultHandler) { // if this node has been set readOnly, then it must have been synchronized with the leader // otherwise forward the request to the leader if (dataGroupMember.getLeader() != null) { logger.debug("{} forwarding a pull snapshot request to the leader {}", name, dataGroupMember.getLeader()); - AsyncDataClient client = (AsyncDataClient) dataGroupMember.getAsyncClient(dataGroupMember.getLeader()); + AsyncDataClient client = (AsyncDataClient) dataGroupMember + .getAsyncClient(dataGroupMember.getLeader()); try { client.pullSnapshot(request, resultHandler); } catch (TException e) { @@ -109,7 +111,8 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService. public void pullTimeSeriesSchema(PullSchemaRequest request, AsyncMethodCallback<PullSchemaResp> resultHandler) { try { - resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor().queryTimeSeriesSchema(request)); + resultHandler + .onComplete(dataGroupMember.getLocalQueryExecutor().queryTimeSeriesSchema(request)); } catch (CheckConsistencyException e) { // if this node cannot synchronize with the leader with in a given time, forward the // request to the leader @@ -137,7 +140,8 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService. public void pullMeasurementSchema(PullSchemaRequest request, AsyncMethodCallback<PullSchemaResp> resultHandler) { try { - resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor().queryMeasurementSchema(request)); + resultHandler + .onComplete(dataGroupMember.getLocalQueryExecutor().queryMeasurementSchema(request)); } catch (CheckConsistencyException e) { // if this node cannot synchronize with the leader with in a given time, forward the // request to the leader @@ -170,14 +174,15 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService. public void querySingleSeriesByTimestamp(SingleSeriesQueryRequest request, AsyncMethodCallback<Long> resultHandler) { try { - resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor().querySingleSeriesByTimestamp(request)); + resultHandler.onComplete( + dataGroupMember.getLocalQueryExecutor().querySingleSeriesByTimestamp(request)); } catch (Exception e) { resultHandler.onError(e); } } @Override - public void endQuery(Node header, Node requester, long queryId, + public void endQuery(Node header, int raftId, Node requester, long queryId, AsyncMethodCallback<Void> resultHandler) { try { dataGroupMember.getQueryManager().endQuery(requester, queryId); @@ -188,7 +193,7 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService. } @Override - public void fetchSingleSeries(Node header, long readerId, + public void fetchSingleSeries(Node header, int raftId, long readerId, AsyncMethodCallback<ByteBuffer> resultHandler) { try { resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor().fetchSingleSeries(readerId)); @@ -198,17 +203,18 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService. } @Override - public void fetchSingleSeriesByTimestamp(Node header, long readerId, long timestamp, + public void fetchSingleSeriesByTimestamp(Node header, int raftId, long readerId, long timestamp, AsyncMethodCallback<ByteBuffer> resultHandler) { try { - resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor().fetchSingleSeriesByTimestamp(readerId, timestamp)); + resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor() + .fetchSingleSeriesByTimestamp(readerId, timestamp)); } catch (ReaderNotFoundException | IOException e) { resultHandler.onError(e); } } @Override - public void getAllPaths(Node header, List<String> paths, boolean withAlias, + public void getAllPaths(Node header, int raftId, List<String> paths, boolean withAlias, AsyncMethodCallback<GetAllPathsResult> resultHandler) { try { dataGroupMember.syncLeaderWithConsistencyCheck(false); @@ -219,7 +225,7 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService. } @Override - public void getAllDevices(Node header, List<String> path, + public void getAllDevices(Node header, int raftId, List<String> path, AsyncMethodCallback<Set<String>> resultHandler) { try { dataGroupMember.syncLeaderWithConsistencyCheck(false); @@ -230,7 +236,7 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService. } @Override - public void getNodeList(Node header, String path, int nodeLevel, + public void getNodeList(Node header, int raftId, String path, int nodeLevel, AsyncMethodCallback<List<String>> resultHandler) { try { dataGroupMember.syncLeaderWithConsistencyCheck(false); @@ -241,7 +247,7 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService. } @Override - public void getChildNodePathInNextLevel(Node header, String path, + public void getChildNodePathInNextLevel(Node header, int raftId, String path, AsyncMethodCallback<Set<String>> resultHandler) { try { dataGroupMember.syncLeaderWithConsistencyCheck(false); @@ -252,10 +258,11 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService. } @Override - public void getAllMeasurementSchema(Node header, ByteBuffer planBinary, + public void getAllMeasurementSchema(Node header, int raftId, ByteBuffer planBinary, AsyncMethodCallback<ByteBuffer> resultHandler) { try { - resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor().getAllMeasurementSchema(planBinary)); + resultHandler + .onComplete(dataGroupMember.getLocalQueryExecutor().getAllMeasurementSchema(planBinary)); } catch (CheckConsistencyException | IOException | MetadataException e) { resultHandler.onError(e); } @@ -272,10 +279,11 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService. } @Override - public void getUnregisteredTimeseries(Node header, List<String> timeseriesList, + public void getUnregisteredTimeseries(Node header, int raftId, List<String> timeseriesList, AsyncMethodCallback<List<String>> resultHandler) { try { - resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor().getUnregisteredTimeseries(timeseriesList)); + resultHandler.onComplete( + dataGroupMember.getLocalQueryExecutor().getUnregisteredTimeseries(timeseriesList)); } catch (CheckConsistencyException e) { resultHandler.onError(e); } @@ -291,10 +299,12 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService. } @Override - public void getGroupByResult(Node header, long executorId, long startTime, long endTime, + public void getGroupByResult(Node header, int raftId, long executorId, long startTime, + long endTime, AsyncMethodCallback<List<ByteBuffer>> resultHandler) { try { - resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor().getGroupByResult(executorId, startTime, endTime)); + resultHandler.onComplete( + dataGroupMember.getLocalQueryExecutor().getGroupByResult(executorId, startTime, endTime)); } catch (ReaderNotFoundException | IOException | QueryProcessException e) { resultHandler.onError(e); } @@ -320,26 +330,29 @@ public class DataAsyncService extends BaseAsyncService implements TSDataService. } @Override - public void getPathCount(Node header, List<String> pathsToQuery, int level, + public void getPathCount(Node header, int raftId, List<String> pathsToQuery, int level, AsyncMethodCallback<Integer> resultHandler) { try { - resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor().getPathCount(pathsToQuery, level)); + resultHandler + .onComplete(dataGroupMember.getLocalQueryExecutor().getPathCount(pathsToQuery, level)); } catch (CheckConsistencyException | MetadataException e) { resultHandler.onError(e); } } @Override - public void onSnapshotApplied(Node header, List<Integer> slots, + public void onSnapshotApplied(Node header, int raftId, List<Integer> slots, AsyncMethodCallback<Boolean> resultHandler) { resultHandler.onComplete(dataGroupMember.onSnapshotInstalled(slots)); } @Override - public void peekNextNotNullValue(Node header, long executorId, long startTime, long endTime, + public void peekNextNotNullValue(Node header, int raftId, long executorId, long startTime, + long endTime, AsyncMethodCallback<ByteBuffer> resultHandler) { try { - resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor().peekNextNotNullValue(executorId, startTime, endTime)); + resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor() + .peekNextNotNullValue(executorId, startTime, endTime)); } catch (ReaderNotFoundException | IOException e) { resultHandler.onError(e); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java index 38be676..d974bf3 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java @@ -187,7 +187,7 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If } @Override - public void endQuery(Node header, Node requester, long queryId) throws TException { + public void endQuery(Node header, int raftId, Node requester, long queryId) throws TException { try { dataGroupMember.getQueryManager().endQuery(requester, queryId); } catch (StorageEngineException e) { @@ -196,7 +196,7 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If } @Override - public ByteBuffer fetchSingleSeries(Node header, long readerId) throws TException { + public ByteBuffer fetchSingleSeries(Node header, int raftId, long readerId) throws TException { try { return dataGroupMember.getLocalQueryExecutor().fetchSingleSeries(readerId); } catch (ReaderNotFoundException | IOException e) { @@ -205,7 +205,8 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If } @Override - public ByteBuffer fetchSingleSeriesByTimestamp(Node header, long readerId, long timestamp) + public ByteBuffer fetchSingleSeriesByTimestamp(Node header, int raftId, long readerId, + long timestamp) throws TException { try { return dataGroupMember.getLocalQueryExecutor() @@ -216,7 +217,8 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If } @Override - public GetAllPathsResult getAllPaths(Node header, List<String> paths, boolean withAlias) + public GetAllPathsResult getAllPaths(Node header, int raftId, List<String> paths, + boolean withAlias) throws TException { try { dataGroupMember.syncLeaderWithConsistencyCheck(false); @@ -227,7 +229,7 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If } @Override - public Set<String> getAllDevices(Node header, List<String> path) throws TException { + public Set<String> getAllDevices(Node header, int raftId, List<String> path) throws TException { try { dataGroupMember.syncLeaderWithConsistencyCheck(false); return ((CMManager) IoTDB.metaManager).getAllDevices(path); @@ -237,7 +239,8 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If } @Override - public List<String> getNodeList(Node header, String path, int nodeLevel) throws TException { + public List<String> getNodeList(Node header, int raftId, String path, int nodeLevel) + throws TException { try { dataGroupMember.syncLeaderWithConsistencyCheck(false); return ((CMManager) IoTDB.metaManager).getNodeList(path, nodeLevel); @@ -247,7 +250,8 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If } @Override - public Set<String> getChildNodePathInNextLevel(Node header, String path) throws TException { + public Set<String> getChildNodePathInNextLevel(Node header, int raftId, String path) + throws TException { try { dataGroupMember.syncLeaderWithConsistencyCheck(false); return ((CMManager) IoTDB.metaManager).getChildNodePathInNextLevel(path); @@ -257,7 +261,8 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If } @Override - public ByteBuffer getAllMeasurementSchema(Node header, ByteBuffer planBinary) throws TException { + public ByteBuffer getAllMeasurementSchema(Node header, int raftId, ByteBuffer planBinary) + throws TException { try { return dataGroupMember.getLocalQueryExecutor().getAllMeasurementSchema(planBinary); } catch (CheckConsistencyException | IOException | MetadataException e) { @@ -275,7 +280,8 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If } @Override - public List<String> getUnregisteredTimeseries(Node header, List<String> timeseriesList) + public List<String> getUnregisteredTimeseries(Node header, int raftId, + List<String> timeseriesList) throws TException { try { return dataGroupMember.getLocalQueryExecutor().getUnregisteredTimeseries(timeseriesList); @@ -294,7 +300,7 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If } @Override - public List<ByteBuffer> getGroupByResult(Node header, long executorId, long startTime, + public List<ByteBuffer> getGroupByResult(Node header, int raftId, long executorId, long startTime, long endTime) throws TException { try { @@ -324,7 +330,8 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If } @Override - public int getPathCount(Node header, List<String> pathsToQuery, int level) throws TException { + public int getPathCount(Node header, int raftId, List<String> pathsToQuery, int level) + throws TException { try { return dataGroupMember.getLocalQueryExecutor().getPathCount(pathsToQuery, level); } catch (CheckConsistencyException | MetadataException e) { @@ -333,12 +340,13 @@ public class DataSyncService extends BaseSyncService implements TSDataService.If } @Override - public boolean onSnapshotApplied(Node header, List<Integer> slots) { + public boolean onSnapshotApplied(Node header, int raftId, List<Integer> slots) { return dataGroupMember.onSnapshotInstalled(slots); } @Override - public ByteBuffer peekNextNotNullValue(Node header, long executorId, long startTime, long endTime) + public ByteBuffer peekNextNotNullValue(Node header, int raftId, long executorId, long startTime, + long endTime) throws TException { try { return dataGroupMember.getLocalQueryExecutor() diff --git a/thrift/src/main/thrift/cluster.thrift b/thrift/src/main/thrift/cluster.thrift index 6c86a1f..56f1dd4 100644 --- a/thrift/src/main/thrift/cluster.thrift +++ b/thrift/src/main/thrift/cluster.thrift @@ -41,6 +41,7 @@ struct HeartBeatRequest { // because a data server may play many data groups members, this is used to identify which // member should process the request or response. Only used in data group communication. 8: optional Node header + 9: optional int raftId } // follower -> leader @@ -56,6 +57,7 @@ struct HeartBeatResponse { // because a data server may play many data groups members, this is used to identify which // member should process the request or response. Only used in data group communication. 7: optional Node header + 8: optional int raftId } // node -> node @@ -68,8 +70,9 @@ struct ElectionRequest { // because a data server may play many data groups members, this is used to identify which // member should process the request or response. Only used in data group communication. 5: optional Node header - 6: optional long dataLogLastIndex - 7: optional long dataLogLastTerm + 6: optional int raftId + 7: optional long dataLogLastIndex + 8: optional long dataLogLastTerm } // leader -> follower @@ -84,6 +87,7 @@ struct AppendEntryRequest { // because a data server may play many data groups members, this is used to identify which // member should process the request or response. Only used in data group communication. 7: optional Node header + 8: optional int raftId } // leader -> follower @@ -98,6 +102,7 @@ struct AppendEntriesRequest { // because a data server may play many data groups members, this is used to identify which // member should process the request or response. Only used in data group communication. 7: optional Node header + 8: optional int raftId } struct AddNodeResponse { @@ -138,16 +143,18 @@ struct SendSnapshotRequest { 1: required binary snapshotBytes // for data group 2: optional Node header + 3: optional int raftId } struct PullSnapshotRequest { 1: required list<int> requiredSlots // for data group 2: optional Node header + 3: optional int raftId // set to true if the previous holder has been removed from the cluster. // This will make the previous holder read-only so that different new // replicas can pull the same snapshot. - 3: required bool requireReadOnly + 4: required bool requireReadOnly } struct PullSnapshotResp { @@ -157,11 +164,13 @@ struct PullSnapshotResp { struct ExecutNonQueryReq { 1: required binary planBytes 2: optional Node header + 3: optional int raftId } struct PullSchemaRequest { 1: required list<string> prefixPaths 2: optional Node header + 3: optional int raftId } struct PullSchemaResp { @@ -175,11 +184,12 @@ struct SingleSeriesQueryRequest { 4: required long queryId 5: required Node requester 6: required Node header - 7: required int dataTypeOrdinal - 8: required set<string> deviceMeasurements - 9: required bool ascending - 10: required int fetchSize - 11: required int deduplicatedPathNum + 7: required int raftId + 8: required int dataTypeOrdinal + 9: required set<string> deviceMeasurements + 10: required bool ascending + 11: required int fetchSize + 12: required int deduplicatedPathNum } struct PreviousFillRequest { @@ -189,8 +199,9 @@ struct PreviousFillRequest { 4: required long queryId 5: required Node requester 6: required Node header - 7: required int dataTypeOrdinal - 8: required set<string> deviceMeasurements + 7: required int raftId + 8: required int dataTypeOrdinal + 9: required set<string> deviceMeasurements } // the spec and load of a node, for query coordinating @@ -204,10 +215,11 @@ struct GetAggrResultRequest { 3: required int dataTypeOrdinal 4: optional binary timeFilterBytes 5: required Node header - 6: required long queryId - 7: required Node requestor - 8: required set<string> deviceMeasurements - 9: required bool ascending + 6: required int raftId + 7: required long queryId + 8: required Node requestor + 9: required set<string> deviceMeasurements + 10: required bool ascending } struct GroupByRequest { @@ -217,9 +229,10 @@ struct GroupByRequest { 4: required long queryId 5: required list<int> aggregationTypeOrdinals 6: required Node header - 7: required Node requestor - 8: required set<string> deviceMeasurements - 9: required bool ascending + 7: required int raftId + 8: required Node requestor + 9: required set<string> deviceMeasurements + 10: required bool ascending } struct LastQueryRequest { @@ -229,7 +242,8 @@ struct LastQueryRequest { 4: required map<string, set<string>> deviceMeasurements 5: optional binary filterBytes 6: required Node header - 7: required Node requestor + 7: required int raftId + 8: required Node requestor } struct GetAllPathsResult { @@ -293,7 +307,7 @@ service RaftService { * Ask the leader for its commit index, used to check whether the node has caught up with the * leader. **/ - long requestCommitIndex(1:Node header) + long requestCommitIndex(1:Node header, 2:int raftId) /** @@ -306,7 +320,7 @@ service RaftService { /** * Test if a log of "index" and "term" exists. **/ - bool matchTerm(1:long index, 2:long term, 3:Node header) + bool matchTerm(1:long index, 2:long term, 3:Node header, 4:int raftId) /** * When a follower finds that it already has a file in a snapshot locally, it calls this @@ -331,7 +345,7 @@ service TSDataService extends RaftService { * @return a ByteBuffer containing the serialized time-value pairs or an empty buffer if there * are not more results. **/ - binary fetchSingleSeries(1:Node header, 2:long readerId) + binary fetchSingleSeries(1:Node header, 2:int raftId, 3:long readerId) /** * Query a time series and generate an IReaderByTimestamp. @@ -345,32 +359,32 @@ service TSDataService extends RaftService { * @return a ByteBuffer containing the serialized value or an empty buffer if there * are not more results. **/ - binary fetchSingleSeriesByTimestamp(1:Node header, 2:long readerId, 3:long timestamp) + binary fetchSingleSeriesByTimestamp(1:Node header, 2:int raftId, 3:long readerId, 4:long timestamp) /** * Find the local query established for the remote query and release all its resource. **/ - void endQuery(1:Node header, 2:Node thisNode, 3:long queryId) + void endQuery(1:Node header, 2:int raftId, 3:Node thisNode, 4:long queryId) /** * Given path patterns (paths with wildcard), return all paths they match. **/ - GetAllPathsResult getAllPaths(1:Node header, 2:list<string> path, 3:bool withAlias) + GetAllPathsResult getAllPaths(1:Node header, 2:int raftId, 3:list<string> path, 4:bool withAlias) /** * Given path patterns (paths with wildcard), return all devices they match. **/ - set<string> getAllDevices(1:Node header, 2:list<string> path) + set<string> getAllDevices(1:Node header, 2:int raftId, 3:list<string> path) - list<string> getNodeList(1:Node header, 2:string path, 3:int nodeLevel) + list<string> getNodeList(1:Node header, 2:int raftId, 3:string path, 4:int nodeLevel) - set<string> getChildNodePathInNextLevel(1: Node header, 2: string path) + set<string> getChildNodePathInNextLevel(1: Node header, 2:int raftId, 3: string path) - binary getAllMeasurementSchema(1: Node header, 2: binary planBinary) + binary getAllMeasurementSchema(1: Node header, 2:int raftId, 3: binary planBinary) list<binary> getAggrResult(1:GetAggrResultRequest request) - list<string> getUnregisteredTimeseries(1: Node header, 2: list<string> timeseriesList) + list<string> getUnregisteredTimeseries(1: Node header, 2:int raftId, 3: list<string> timeseriesList) PullSnapshotResp pullSnapshot(1:PullSnapshotRequest request) @@ -385,7 +399,7 @@ service TSDataService extends RaftService { * @return the serialized AggregationResults, each is the result of one of the previously * required aggregations, and their orders are the same. **/ - list<binary> getGroupByResult(1:Node header, 2:long executorId, 3:long startTime, 4:long endTime) + list<binary> getGroupByResult(1:Node header, 2:int raftId, 3:long executorId, 4:long startTime, 5:long endTime) /** @@ -410,15 +424,15 @@ service TSDataService extends RaftService { **/ binary last(1: LastQueryRequest request) - int getPathCount(1: Node header 2: list<string> pathsToQuery 3: int level) + int getPathCount(1: Node header, 2:int raftId, 3: list<string> pathsToQuery, 4: int level) /** * During slot transfer, when a member has pulled snapshot from a group, the member will use this * method to inform the group that one replica of such slots has been pulled. **/ - bool onSnapshotApplied(1: Node header 2: list<int> slots) + bool onSnapshotApplied(1: Node header, 2:int raftId, 3: list<int> slots) - binary peekNextNotNullValue(1: Node header, 2: long executorId, 3: long startTime, 4: long + binary peekNextNotNullValue(1: Node header, 2:int raftId, 3: long executorId, 4: long startTime, 5: long endTime) }
