This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch cluster_multi_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 55fc053da4fa93d8959a35404e3fda4bdf7788fa
Author: LebronAl <[email protected]>
AuthorDate: Fri Dec 11 19:49:04 2020 +0800

    init
---
 .../iotdb/cluster/partition/PartitionGroup.java    |  34 +-
 .../iotdb/cluster/partition/PartitionTable.java    |   7 +-
 .../cluster/partition/slot/SlotPartitionTable.java | 387 ++++++++++---------
 .../iotdb/cluster/server/DataClusterServer.java    | 426 +++++++++++----------
 .../iotdb/cluster/server/StoppedMemberManager.java |  48 +--
 .../cluster/server/member/DataGroupMember.java     |  23 +-
 .../cluster/server/member/MetaGroupMember.java     |  21 +-
 .../iotdb/cluster/server/member/RaftMember.java    |   7 +-
 .../cluster/server/service/BaseAsyncService.java   |   6 +-
 .../cluster/server/service/BaseSyncService.java    |   6 +-
 .../cluster/server/service/DataAsyncService.java   |  61 +--
 .../cluster/server/service/DataSyncService.java    |  34 +-
 thrift/src/main/thrift/cluster.thrift              |  80 ++--
 13 files changed, 630 insertions(+), 510 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
index 8bf9f52..dc5cbcc 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
@@ -21,45 +21,61 @@ package org.apache.iotdb.cluster.partition;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 
 /**
  * PartitionGroup contains all the nodes that will form a data group with a 
certain node, which are
- * the REPLICATION_NUM - 1 different physical nodes after this node.
- * The first element of the list is called header, which is also the 
identifier of the data group.
+ * the REPLICATION_NUM - 1 different physical nodes after this node. The first 
element of the list
+ * is called header, which is also the identifier of the data group.
  */
 public class PartitionGroup extends ArrayList<Node> {
 
-  private Node thisNode;
+  private int id;
 
   public PartitionGroup() {
   }
 
-  public PartitionGroup(Node... nodes) {
+  public PartitionGroup(Collection<Node> nodes) {
+    this.addAll(nodes);
+  }
+
+  public PartitionGroup(int id, Node... nodes) {
     this.addAll(Arrays.asList(nodes));
+    this.id = id;
   }
 
   public PartitionGroup(PartitionGroup other) {
     super(other);
-    this.thisNode = other.thisNode;
+    this.id = other.getId();
   }
 
   @Override
   public boolean equals(Object o) {
-    return super.equals(o);
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    PartitionGroup group = (PartitionGroup) o;
+    return Objects.equals(id, group.getId()) &&
+        super.equals(group);
   }
 
   @Override
   public int hashCode() {
-    return super.hashCode();
+    return Objects.hash(id, getHeader());
   }
 
+
   public Node getHeader() {
     return get(0);
   }
 
-  public void setThisNode(Node thisNode) {
-    this.thisNode = thisNode;
+  public int getId() {
+    return id;
   }
 
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
index 81f0199..a1a5ae6 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 /**
  * PartitionTable manages the map whose key is the StorageGroupName with a 
time interval and the
@@ -54,7 +55,7 @@ public interface PartitionTable {
    * @param timestamp
    * @return
    */
-  Node routeToHeaderByTime(String storageGroupName, long timestamp);
+  Pair<Node, Integer> routeToHeaderByTime(String storageGroupName, long 
timestamp);
 
   /**
    * Add a new node to update the partition table.
@@ -78,10 +79,10 @@ public interface PartitionTable {
   List<PartitionGroup> getLocalGroups();
 
   /**
-   * @param header
+   * @param pair
    * @return the partition group starting from the header.
    */
-  PartitionGroup getHeaderGroup(Node header);
+  PartitionGroup getHeaderGroup(Pair<Node, Integer> pair);
 
   ByteBuffer serialize();
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
index ead856c..a1f98ce 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
@@ -14,8 +14,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -31,6 +29,7 @@ import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.partition.slot.SlotStrategy.DefaultStrategy;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.db.utils.SerializeUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +46,8 @@ public class SlotPartitionTable implements PartitionTable {
   private int replicationNum =
       ClusterDescriptor.getInstance().getConfig().getReplicationNum();
 
+  private int raftGroupNum = 2;
+
   //all nodes
   private List<Node> nodeRing = new ArrayList<>();
   //normally, it is equal to ClusterConstant.SLOT_NUM.
@@ -54,12 +55,12 @@ public class SlotPartitionTable implements PartitionTable {
 
   //The following fields are used for determining which node a data item 
belongs to.
   // the slots held by each node
-  private Map<Node, List<Integer>> nodeSlotMap = new ConcurrentHashMap<>();
+  private Map<Pair<Node, Integer>, List<Integer>> nodeSlotMap = new 
ConcurrentHashMap<>();
   // each slot is managed by whom
-  private Node[] slotNodes = new Node[ClusterConstant.SLOT_NUM];
+  private Pair<Node, Integer>[] slotNodes = new Pair[ClusterConstant.SLOT_NUM];
   // the nodes that each slot belongs to before a new node is added, used for 
the new node to
   // find the data source
-  private Map<Node, Map<Integer, Node>> previousNodeMap = new 
ConcurrentHashMap<>();
+  private Map<Pair<Node, Integer>, Map<Integer, Node>> previousNodeMap = new 
ConcurrentHashMap<>();
 
   //the filed is used for determining which nodes need to be a group.
   // the data groups which this node belongs to.
@@ -111,8 +112,11 @@ public class SlotPartitionTable implements PartitionTable {
     // evenly assign the slots to each node
     int nodeNum = nodeRing.size();
     int slotsPerNode = totalSlotNumbers / nodeNum;
+    int slotsPerRaftGroup = slotsPerNode / raftGroupNum;
     for (Node node : nodeRing) {
-      nodeSlotMap.put(node, new ArrayList<>());
+      for (int i = 0; i < raftGroupNum; i++) {
+        nodeSlotMap.put(new Pair<>(node, i), new ArrayList<>());
+      }
     }
 
     for (int i = 0; i < totalSlotNumbers; i++) {
@@ -121,11 +125,17 @@ public class SlotPartitionTable implements PartitionTable 
{
         // the last node may receive a little more if total slots cannot de 
divided by node number
         nodeIdx--;
       }
-      nodeSlotMap.get(nodeRing.get(nodeIdx)).add(i);
+      for (int j = 0; j < nodeIdx; j++) {
+        int groupIdx = j / slotsPerRaftGroup;
+        if (groupIdx >= raftGroupNum) {
+          groupIdx--;
+        }
+        nodeSlotMap.get(new Pair<>(nodeRing.get(nodeIdx), groupIdx)).add(i);
+      }
     }
 
     // build the index to find a node by slot
-    for (Entry<Node, List<Integer>> entry : nodeSlotMap.entrySet()) {
+    for (Entry<Pair<Node, Integer>, List<Integer>> entry : 
nodeSlotMap.entrySet()) {
       for (Integer slot : entry.getValue()) {
         slotNodes[slot] = entry.getKey();
       }
@@ -145,7 +155,9 @@ public class SlotPartitionTable implements PartitionTable {
       if (startIndex < 0) {
         startIndex = startIndex + nodeRing.size();
       }
-      ret.add(getHeaderGroup(nodeRing.get(startIndex)));
+      for (int j = 0; j < raftGroupNum; j++) {
+        ret.add(getHeaderGroup(new Pair<>(nodeRing.get(startIndex), j)));
+      }
     }
 
     logger.debug("The partition groups of {} are: {}", node, ret);
@@ -153,13 +165,13 @@ public class SlotPartitionTable implements PartitionTable 
{
   }
 
   @Override
-  public PartitionGroup getHeaderGroup(Node node) {
-    PartitionGroup ret = new PartitionGroup();
+  public PartitionGroup getHeaderGroup(Pair<Node, Integer> pair) {
+    PartitionGroup ret = new PartitionGroup(pair.right);
 
     // assuming the nodes are [1,2,3,4,5]
-    int nodeIndex = nodeRing.indexOf(node);
+    int nodeIndex = nodeRing.indexOf(pair.left);
     if (nodeIndex == -1) {
-      logger.error("Node {} is not in the cluster", node);
+      logger.error("Node {} is not in the cluster", pair.left);
       return null;
     }
     int endIndex = nodeIndex + replicationNum;
@@ -177,8 +189,8 @@ public class SlotPartitionTable implements PartitionTable {
   @Override
   public PartitionGroup route(String storageGroupName, long timestamp) {
     synchronized (nodeRing) {
-      Node node = routeToHeaderByTime(storageGroupName, timestamp);
-      return getHeaderGroup(node);
+      Pair<Node, Integer> pair = routeToHeaderByTime(storageGroupName, 
timestamp);
+      return getHeaderGroup(pair);
     }
   }
 
@@ -188,112 +200,115 @@ public class SlotPartitionTable implements 
PartitionTable {
           Thread.currentThread().getStackTrace());
       return null;
     }
-    Node node = slotNodes[slot];
-    logger.debug("The slot of {} is held by {}", slot, node);
-    if (node == null) {
+    Pair<Node, Integer> pair = slotNodes[slot];
+    logger.debug("The slot of {} is held by {}", slot, pair);
+    if (pair.left == null) {
       logger.warn("The slot {} is incorrect", slot);
       return null;
     }
-    return getHeaderGroup(node);
+    return getHeaderGroup(pair);
   }
 
   @Override
-  public Node routeToHeaderByTime(String storageGroupName, long timestamp) {
+  public Pair<Node, Integer> routeToHeaderByTime(String storageGroupName, long 
timestamp) {
     synchronized (nodeRing) {
       int slot = getSlotStrategy()
           .calculateSlotByTime(storageGroupName, timestamp, 
getTotalSlotNumbers());
-      Node node = slotNodes[slot];
+      Pair<Node, Integer> pair = slotNodes[slot];
       logger.trace("The slot of {}@{} is {}, held by {}", storageGroupName, 
timestamp,
-          slot, node);
-      return node;
+          slot, pair);
+      return pair;
     }
   }
 
   @Override
   public NodeAdditionResult addNode(Node node) {
-    synchronized (nodeRing) {
-      if (nodeRing.contains(node)) {
-        return null;
-      }
-
-      nodeRing.add(node);
-      nodeRing.sort(Comparator.comparingInt(Node::getNodeIdentifier));
-
-      List<PartitionGroup> retiredGroups = new ArrayList<>();
-      for (int i = 0; i < localGroups.size(); i++) {
-        PartitionGroup oldGroup = localGroups.get(i);
-        Node header = oldGroup.getHeader();
-        PartitionGroup newGrp = getHeaderGroup(header);
-        if (newGrp.contains(node) && newGrp.contains(thisNode)) {
-          // this group changes but still contains the local node
-          localGroups.set(i, newGrp);
-        } else if (newGrp.contains(node) && !newGrp.contains(thisNode)) {
-          // the local node retires from the group
-          retiredGroups.add(newGrp);
-        }
-      }
-
-      // remove retired groups
-      Iterator<PartitionGroup> groupIterator = localGroups.iterator();
-      while (groupIterator.hasNext()) {
-        PartitionGroup partitionGroup = groupIterator.next();
-        for (PartitionGroup retiredGroup : retiredGroups) {
-          if (retiredGroup.getHeader().equals(partitionGroup.getHeader())) {
-            groupIterator.remove();
-            break;
-          }
-        }
-      }
-    }
-
-    SlotNodeAdditionResult result = new SlotNodeAdditionResult();
-    PartitionGroup newGroup = getHeaderGroup(node);
-    if (newGroup.contains(thisNode)) {
-      localGroups.add(newGroup);
-    }
-    result.setNewGroup(newGroup);
-
-    calculateGlobalGroups();
-
-    // the slots movement is only done logically, the new node itself will 
pull data from the
-    // old node
-    result.setLostSlots(moveSlotsToNew(node));
-
-    return result;
+//    synchronized (nodeRing) {
+//      if (nodeRing.contains(node)) {
+//        return null;
+//      }
+//
+//      nodeRing.add(node);
+//      nodeRing.sort(Comparator.comparingInt(Node::getNodeIdentifier));
+//
+//      List<PartitionGroup> retiredGroups = new ArrayList<>();
+//      for (int i = 0; i < localGroups.size(); i++) {
+//        PartitionGroup oldGroup = localGroups.get(i);
+//        Node header = oldGroup.getHeader();
+//        PartitionGroup newGrp = getHeaderGroup(header);
+//        if (newGrp.contains(node) && newGrp.contains(thisNode)) {
+//          // this group changes but still contains the local node
+//          localGroups.set(i, newGrp);
+//        } else if (newGrp.contains(node) && !newGrp.contains(thisNode)) {
+//          // the local node retires from the group
+//          retiredGroups.add(newGrp);
+//        }
+//      }
+//
+//      // remove retired groups
+//      Iterator<PartitionGroup> groupIterator = localGroups.iterator();
+//      while (groupIterator.hasNext()) {
+//        PartitionGroup partitionGroup = groupIterator.next();
+//        for (PartitionGroup retiredGroup : retiredGroups) {
+//          if (retiredGroup.getHeader().equals(partitionGroup.getHeader())) {
+//            groupIterator.remove();
+//            break;
+//          }
+//        }
+//      }
+//    }
+//
+//    SlotNodeAdditionResult result = new SlotNodeAdditionResult();
+//    PartitionGroup newGroup = getHeaderGroup(node);
+//    if (newGroup.contains(thisNode)) {
+//      localGroups.add(newGroup);
+//    }
+//    result.setNewGroup(newGroup);
+//
+//    calculateGlobalGroups();
+//
+//    // the slots movement is only done logically, the new node itself will 
pull data from the
+//    // old node
+//    result.setLostSlots(moveSlotsToNew(node));
+//
+//    return result;
+    return null;
   }
 
 
   /**
    * Move last slots from each group whose slot number is bigger than the new 
average to the new
    * node.
+   *
    * @param newNode
    * @return a map recording what slots each group lost.
    */
   private Map<Node, Set<Integer>> moveSlotsToNew(Node newNode) {
-    Map<Node, Set<Integer>> result = new HashMap<>();
-    // as a node is added, the average slots for each node decrease
-    // move the slots to the new node if any previous node have more slots 
than the new average
-    List<Integer> newSlots = new ArrayList<>();
-    Map<Integer, Node> previousHolders = new HashMap<>();
-    int newAvg = totalSlotNumbers / nodeRing.size();
-    for (Entry<Node, List<Integer>> entry : nodeSlotMap.entrySet()) {
-      List<Integer> slots = entry.getValue();
-      int transferNum = slots.size() - newAvg;
-      if (transferNum > 0) {
-        List<Integer> slotsToMove = slots.subList(slots.size() - transferNum, 
slots.size());
-        newSlots.addAll(slotsToMove);
-        for (Integer slot : slotsToMove) {
-          // record what node previously hold the integer
-          previousHolders.put(slot, entry.getKey());
-          slotNodes[slot] = newNode;
-        }
-        result.computeIfAbsent(entry.getKey(), n -> new 
HashSet<>()).addAll(slotsToMove);
-        slotsToMove.clear();
-      }
-    }
-    nodeSlotMap.put(newNode, newSlots);
-    previousNodeMap.put(newNode, previousHolders);
-    return result;
+//    Map<Node, Set<Integer>> result = new HashMap<>();
+//    // as a node is added, the average slots for each node decrease
+//    // move the slots to the new node if any previous node have more slots 
than the new average
+//    List<Integer> newSlots = new ArrayList<>();
+//    Map<Integer, Node> previousHolders = new HashMap<>();
+//    int newAvg = totalSlotNumbers / nodeRing.size();
+//    for (Entry<Node, List<Integer>> entry : nodeSlotMap.entrySet()) {
+//      List<Integer> slots = entry.getValue();
+//      int transferNum = slots.size() - newAvg;
+//      if (transferNum > 0) {
+//        List<Integer> slotsToMove = slots.subList(slots.size() - 
transferNum, slots.size());
+//        newSlots.addAll(slotsToMove);
+//        for (Integer slot : slotsToMove) {
+//          // record what node previously hold the integer
+//          previousHolders.put(slot, entry.getKey());
+//          slotNodes[slot] = newNode;
+//        }
+//        result.computeIfAbsent(entry.getKey(), n -> new 
HashSet<>()).addAll(slotsToMove);
+//        slotsToMove.clear();
+//      }
+//    }
+//    nodeSlotMap.put(newNode, newSlots);
+//    previousNodeMap.put(newNode, previousHolders);
+//    return result;
+    return null;
   }
 
   @Override
@@ -310,22 +325,23 @@ public class SlotPartitionTable implements PartitionTable 
{
     try {
       dataOutputStream.writeInt(totalSlotNumbers);
       dataOutputStream.writeInt(nodeSlotMap.size());
-      for (Entry<Node, List<Integer>> entry : nodeSlotMap.entrySet()) {
-        SerializeUtils.serialize(entry.getKey(), dataOutputStream);
+      for (Entry<Pair<Node, Integer>, List<Integer>> entry : 
nodeSlotMap.entrySet()) {
+        SerializeUtils.serialize(entry.getKey().left, dataOutputStream);
+        dataOutputStream.writeInt(entry.getKey().right);
         SerializeUtils.serialize(entry.getValue(), dataOutputStream);
       }
 
-      dataOutputStream.writeInt(previousNodeMap.size());
-      for (Entry<Node, Map<Integer, Node>> nodeMapEntry : 
previousNodeMap.entrySet()) {
-        dataOutputStream.writeInt(nodeMapEntry.getKey().getNodeIdentifier());
-
-        Map<Integer, Node> prevHolders = nodeMapEntry.getValue();
-        dataOutputStream.writeInt(prevHolders.size());
-        for (Entry<Integer, Node> integerNodeEntry : prevHolders.entrySet()) {
-          dataOutputStream.writeInt(integerNodeEntry.getKey());
-          
dataOutputStream.writeInt(integerNodeEntry.getValue().getNodeIdentifier());
-        }
-      }
+//      dataOutputStream.writeInt(previousNodeMap.size());
+//      for (Entry<Node, Map<Integer, Node>> nodeMapEntry : 
previousNodeMap.entrySet()) {
+//        dataOutputStream.writeInt(nodeMapEntry.getKey().getNodeIdentifier());
+//
+//        Map<Integer, Node> prevHolders = nodeMapEntry.getValue();
+//        dataOutputStream.writeInt(prevHolders.size());
+//        for (Entry<Integer, Node> integerNodeEntry : prevHolders.entrySet()) 
{
+//          dataOutputStream.writeInt(integerNodeEntry.getKey());
+//          
dataOutputStream.writeInt(integerNodeEntry.getValue().getNodeIdentifier());
+//        }
+//      }
 
       dataOutputStream.writeLong(lastLogIndex);
     } catch (IOException ignored) {
@@ -345,32 +361,38 @@ public class SlotPartitionTable implements PartitionTable 
{
       Node node = new Node();
       List<Integer> slots = new ArrayList<>();
       SerializeUtils.deserialize(node, buffer);
+      int id = buffer.getInt();
       SerializeUtils.deserialize(slots, buffer);
-      nodeSlotMap.put(node, slots);
+      Pair pair = new Pair<>(node, id);
+      nodeSlotMap.put(pair, slots);
       idNodeMap.put(node.getNodeIdentifier(), node);
       for (Integer slot : slots) {
-        slotNodes[slot] = node;
+        slotNodes[slot] = pair;
       }
     }
 
-    int prevNodeMapSize = buffer.getInt();
-    previousNodeMap = new HashMap<>();
-    for (int i = 0; i < prevNodeMapSize; i++) {
-      int nodeId = buffer.getInt();
-      Node node = idNodeMap.get(nodeId);
-
-      Map<Integer, Node> prevHolders = new HashMap<>();
-      int holderNum = buffer.getInt();
-      for (int i1 = 0; i1 < holderNum; i1++) {
-        int slot = buffer.getInt();
-        Node holder = idNodeMap.get(buffer.getInt());
-        prevHolders.put(slot, holder);
-      }
-      previousNodeMap.put(node, prevHolders);
-    }
+//    int prevNodeMapSize = buffer.getInt();
+//    previousNodeMap = new HashMap<>();
+//    for (int i = 0; i < prevNodeMapSize; i++) {
+//      int nodeId = buffer.getInt();
+//      Node node = idNodeMap.get(nodeId);
+//
+//      Map<Integer, Node> prevHolders = new HashMap<>();
+//      int holderNum = buffer.getInt();
+//      for (int i1 = 0; i1 < holderNum; i1++) {
+//        int slot = buffer.getInt();
+//        Node holder = idNodeMap.get(buffer.getInt());
+//        prevHolders.put(slot, holder);
+//      }
+//      previousNodeMap.put(node, prevHolders);
+//    }
     lastLogIndex = buffer.getLong();
 
-    nodeRing.addAll(nodeSlotMap.keySet());
+    for (Pair<Node, Integer> nodeIntegerPair : nodeSlotMap.keySet()) {
+      if (!nodeRing.contains(nodeIntegerPair.left)) {
+        nodeRing.add(nodeIntegerPair.left);
+      }
+    }
     nodeRing.sort(Comparator.comparingInt(Node::getNodeIdentifier));
     logger.info("All known nodes: {}", nodeRing);
 
@@ -390,7 +412,7 @@ public class SlotPartitionTable implements PartitionTable {
     return nodeSlotMap.get(header);
   }
 
-  public Map<Node, List<Integer>> getAllNodeSlots() {
+  public Map<Pair<Node, Integer>, List<Integer>> getAllNodeSlots() {
     return nodeSlotMap;
   }
 
@@ -421,62 +443,63 @@ public class SlotPartitionTable implements PartitionTable 
{
 
   @Override
   public NodeRemovalResult removeNode(Node target) {
-    synchronized (nodeRing) {
-      if (!nodeRing.contains(target)) {
-        return null;
-      }
-
-      SlotNodeRemovalResult result = new SlotNodeRemovalResult();
-      result.setRemovedGroup(getHeaderGroup(target));
-      nodeRing.remove(target);
-
-      // if the node belongs to a group that headed by target, this group 
should be removed
-      // and other groups containing target should be updated
-      int removedGroupIdx = -1;
-      for (int i = 0; i < localGroups.size(); i++) {
-        PartitionGroup oldGroup = localGroups.get(i);
-        Node header = oldGroup.getHeader();
-        if (header.equals(target)) {
-          removedGroupIdx = i;
-        } else {
-          PartitionGroup newGrp = getHeaderGroup(header);
-          localGroups.set(i, newGrp);
-        }
-      }
-      if (removedGroupIdx != -1) {
-        localGroups.remove(removedGroupIdx);
-        // each node exactly joins replicationNum groups, so when a group is 
removed, the node
-        // should join a new one
-        int thisNodeIdx = nodeRing.indexOf(thisNode);
-        // this node must be the last node of the new group
-        int headerNodeIdx = thisNodeIdx - (replicationNum - 1);
-        headerNodeIdx = headerNodeIdx < 0 ? headerNodeIdx + nodeRing.size() : 
headerNodeIdx;
-        Node header = nodeRing.get(headerNodeIdx);
-        PartitionGroup newGrp = getHeaderGroup(header);
-        localGroups.add(newGrp);
-        result.setNewGroup(newGrp);
-      }
-
-      calculateGlobalGroups();
-
-      // the slots movement is only done logically, the new node itself will 
pull data from the
-      // old node
-      Map<Node, List<Integer>> nodeListMap = retrieveSlots(target);
-      result.setNewSlotOwners(nodeListMap);
-      return result;
-    }
+//    synchronized (nodeRing) {
+//      if (!nodeRing.contains(target)) {
+//        return null;
+//      }
+//
+//      SlotNodeRemovalResult result = new SlotNodeRemovalResult();
+//      result.setRemovedGroup(getHeaderGroup(target));
+//      nodeRing.remove(target);
+//
+//      // if the node belongs to a group that headed by target, this group 
should be removed
+//      // and other groups containing target should be updated
+//      int removedGroupIdx = -1;
+//      for (int i = 0; i < localGroups.size(); i++) {
+//        PartitionGroup oldGroup = localGroups.get(i);
+//        Node header = oldGroup.getHeader();
+//        if (header.equals(target)) {
+//          removedGroupIdx = i;
+//        } else {
+//          PartitionGroup newGrp = getHeaderGroup(header);
+//          localGroups.set(i, newGrp);
+//        }
+//      }
+//      if (removedGroupIdx != -1) {
+//        localGroups.remove(removedGroupIdx);
+//        // each node exactly joins replicationNum groups, so when a group is 
removed, the node
+//        // should join a new one
+//        int thisNodeIdx = nodeRing.indexOf(thisNode);
+//        // this node must be the last node of the new group
+//        int headerNodeIdx = thisNodeIdx - (replicationNum - 1);
+//        headerNodeIdx = headerNodeIdx < 0 ? headerNodeIdx + nodeRing.size() 
: headerNodeIdx;
+//        Node header = nodeRing.get(headerNodeIdx);
+//        PartitionGroup newGrp = getHeaderGroup(header);
+//        localGroups.add(newGrp);
+//        result.setNewGroup(newGrp);
+//      }
+//
+//      calculateGlobalGroups();
+//
+//      // the slots movement is only done logically, the new node itself will 
pull data from the
+//      // old node
+//      Map<Node, List<Integer>> nodeListMap = retrieveSlots(target);
+//      result.setNewSlotOwners(nodeListMap);
+//      return result;
+//    }
+    return null;
   }
 
   private Map<Node, List<Integer>> retrieveSlots(Node target) {
     Map<Node, List<Integer>> newHolderSlotMap = new HashMap<>();
-    List<Integer> slots = nodeSlotMap.remove(target);
-    for (int i = 0; i < slots.size(); i++) {
-      int slot = slots.get(i);
-      Node newHolder = nodeRing.get(i % nodeRing.size());
-      slotNodes[slot] = newHolder;
-      nodeSlotMap.get(newHolder).add(slot);
-      newHolderSlotMap.computeIfAbsent(newHolder, n -> new 
ArrayList<>()).add(slot);
-    }
+//    List<Integer> slots = nodeSlotMap.remove(target);
+//    for (int i = 0; i < slots.size(); i++) {
+//      int slot = slots.get(i);
+//      Node newHolder = nodeRing.get(i % nodeRing.size());
+//      slotNodes[slot] = newHolder;
+//      nodeSlotMap.get(newHolder).add(slot);
+//      newHolderSlotMap.computeIfAbsent(newHolder, n -> new 
ArrayList<>()).add(slot);
+//    }
     return newHolderSlotMap;
   }
 
@@ -494,7 +517,9 @@ public class SlotPartitionTable implements PartitionTable {
   private void calculateGlobalGroups() {
     globalGroups = new ArrayList<>();
     for (Node n : getAllNodes()) {
-      globalGroups.add(getHeaderGroup(n));
+      for (int i = 0; i < raftGroupNum; i++) {
+        globalGroups.add(getHeaderGroup(new Pair<>(n, i)));
+      }
     }
   }
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 391cf69..76fca3b 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -22,10 +22,8 @@ package org.apache.iotdb.cluster.server;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
@@ -65,6 +63,7 @@ import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.cluster.server.service.DataAsyncService;
 import org.apache.iotdb.cluster.server.service.DataSyncService;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.thrift.TException;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.async.AsyncMethodCallback;
@@ -82,9 +81,9 @@ public class DataClusterServer extends RaftServer implements 
TSDataService.Async
 
   // key: the header of a data group, value: the member representing this node 
in this group and
   // it is currently at service
-  private Map<Node, DataGroupMember> headerGroupMap = new 
ConcurrentHashMap<>();
-  private Map<Node, DataAsyncService> asyncServiceMap = new 
ConcurrentHashMap<>();
-  private Map<Node, DataSyncService> syncServiceMap = new 
ConcurrentHashMap<>();
+  private Map<Pair<Node, Integer>, DataGroupMember> headerGroupMap = new 
ConcurrentHashMap<>();
+  private Map<Pair<Node, Integer>, DataAsyncService> asyncServiceMap = new 
ConcurrentHashMap<>();
+  private Map<Pair<Node, Integer>, DataSyncService> syncServiceMap = new 
ConcurrentHashMap<>();
   // key: the header of a data group, value: the member representing this node 
in this group but
   // it is out of service because another node has joined the group and 
expelled this node, or
   // the node itself is removed, but it is still stored to provide snapshot 
for other nodes
@@ -117,50 +116,56 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
    * @param dataGroupMember
    */
   public void addDataGroupMember(DataGroupMember dataGroupMember) {
-    DataGroupMember removedMember = 
headerGroupMap.remove(dataGroupMember.getHeader());
+    Pair<Node, Integer> pair = new Pair<>(dataGroupMember.getHeader(),
+        dataGroupMember.getRaftGroupId());
+    DataGroupMember removedMember = headerGroupMap
+        .remove(pair);
     if (removedMember != null) {
       removedMember.stop();
-      asyncServiceMap.remove(dataGroupMember.getHeader());
-      syncServiceMap.remove(dataGroupMember.getHeader());
+      asyncServiceMap.remove(pair);
+      syncServiceMap.remove(pair);
     }
-    stoppedMemberManager.remove(dataGroupMember.getHeader());
+    stoppedMemberManager.remove(pair);
 
-    headerGroupMap.put(dataGroupMember.getHeader(), dataGroupMember);
+    headerGroupMap.put(pair, dataGroupMember);
   }
 
-  private <T> DataAsyncService getDataAsyncService(Node header,
+  private <T> DataAsyncService getDataAsyncService(Node header, Integer id,
       AsyncMethodCallback<T> resultHandler, Object request) {
-    return asyncServiceMap.computeIfAbsent(header, h -> {
-      DataGroupMember dataMember = getDataMember(h, resultHandler, request);
+    Pair<Node, Integer> pair = new Pair<>(header, id);
+    return asyncServiceMap.computeIfAbsent(pair, h -> {
+      DataGroupMember dataMember = getDataMember(pair, resultHandler, request);
       return dataMember != null ? new DataAsyncService(dataMember) : null;
     });
   }
 
-  private DataSyncService getDataSyncService(Node header) {
-    return syncServiceMap.computeIfAbsent(header, h -> {
-      DataGroupMember dataMember = getDataMember(h, null, null);
+  private DataSyncService getDataSyncService(Node header, Integer id) {
+    Pair<Node, Integer> pair = new Pair<>(header, id);
+    return syncServiceMap.computeIfAbsent(pair, h -> {
+      DataGroupMember dataMember = getDataMember(pair, null, null);
       return dataMember != null ? new DataSyncService(dataMember) : null;
     });
   }
 
   /**
-   * @param header        the header of the group which the local node is in
+   * @param pair          the header of the group which the local node is in
    * @param resultHandler can be set to null if the request is an internal 
request
    * @param request       the toString() of this parameter should explain what 
the request is and it
    *                      is only used in logs for tracing
    * @return
    */
-  public <T> DataGroupMember getDataMember(Node header, AsyncMethodCallback<T> 
resultHandler,
+  public <T> DataGroupMember getDataMember(Pair<Node, Integer> pair,
+      AsyncMethodCallback<T> resultHandler,
       Object request) {
     // if the resultHandler is not null, then the request is a external one 
and must be with a
     // header
-    if (header == null) {
+    if (pair.left == null) {
       if (resultHandler != null) {
         resultHandler.onError(new NoHeaderNodeException());
       }
       return null;
     }
-    DataGroupMember member = stoppedMemberManager.get(header);
+    DataGroupMember member = stoppedMemberManager.get(pair);
     if (member != null) {
       return member;
     }
@@ -168,14 +173,14 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
     // avoid creating two members for a header
     Exception ex = null;
     synchronized (headerGroupMap) {
-      member = headerGroupMap.get(header);
+      member = headerGroupMap.get(pair);
       if (member != null) {
         return member;
       }
-      logger.info("Received a request \"{}\" from unregistered header {}", 
request, header);
+      logger.info("Received a request \"{}\" from unregistered header {}", 
request, pair);
       if (partitionTable != null) {
         try {
-          member = createNewMember(header);
+          member = createNewMember(pair);
         } catch (NotInSameGroupException | CheckConsistencyException e) {
           ex = e;
         }
@@ -191,37 +196,37 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
   }
 
   /**
-   * @param header
+   * @param pair
    * @return A DataGroupMember representing this node in the data group of the 
header.
    * @throws NotInSameGroupException If this node is not in the group of the 
header.
    */
-  private DataGroupMember createNewMember(Node header)
+  private DataGroupMember createNewMember(Pair<Node, Integer> pair)
       throws NotInSameGroupException, CheckConsistencyException {
     DataGroupMember member;
     PartitionGroup partitionGroup;
-    partitionGroup = partitionTable.getHeaderGroup(header);
+    partitionGroup = partitionTable.getHeaderGroup(pair);
     if (partitionGroup == null || !partitionGroup.contains(thisNode)) {
       // if the partition table is old, this node may have not been moved to 
the new group
       metaGroupMember.syncLeaderWithConsistencyCheck(true);
-      partitionGroup = partitionTable.getHeaderGroup(header);
+      partitionGroup = partitionTable.getHeaderGroup(pair);
     }
     if (partitionGroup != null && partitionGroup.contains(thisNode)) {
       // the two nodes are in the same group, create a new data member
       member = dataMemberFactory.create(partitionGroup, thisNode);
-      DataGroupMember prevMember = headerGroupMap.put(header, member);
+      DataGroupMember prevMember = headerGroupMap.put(pair, member);
       if (prevMember != null) {
         prevMember.stop();
       }
-      logger.info("Created a member for header {}", header);
+      logger.info("Created a member for header {}", pair);
       member.start();
     } else {
       // the member may have been stopped after syncLeader
-      member = stoppedMemberManager.get(header);
+      member = stoppedMemberManager.get(pair);
       if (member != null) {
         return member;
       }
       logger.info("This node {} does not belong to the group {}, header {}", 
thisNode,
-          partitionGroup, header);
+          partitionGroup, pair);
       throw new NotInSameGroupException(partitionGroup, thisNode);
     }
     return member;
@@ -233,8 +238,8 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
   @Override
   public void sendHeartbeat(HeartBeatRequest request,
       AsyncMethodCallback<HeartBeatResponse> resultHandler) {
-    Node header = request.getHeader();
-    DataAsyncService service = getDataAsyncService(header, resultHandler, 
request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), 
request.getRaftId(),
+        resultHandler, request);
     if (service != null) {
       service.sendHeartbeat(request, resultHandler);
     }
@@ -242,8 +247,8 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
 
   @Override
   public void startElection(ElectionRequest request, AsyncMethodCallback<Long> 
resultHandler) {
-    Node header = request.getHeader();
-    DataAsyncService service = getDataAsyncService(header, resultHandler, 
request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), 
request.getRaftId(),
+        resultHandler, request);
     if (service != null) {
       service.startElection(request, resultHandler);
     }
@@ -251,8 +256,8 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
 
   @Override
   public void appendEntries(AppendEntriesRequest request, 
AsyncMethodCallback<Long> resultHandler) {
-    Node header = request.getHeader();
-    DataAsyncService service = getDataAsyncService(header, resultHandler, 
request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), 
request.getRaftId(),
+        resultHandler, request);
     if (service != null) {
       service.appendEntries(request, resultHandler);
     }
@@ -260,8 +265,8 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
 
   @Override
   public void appendEntry(AppendEntryRequest request, 
AsyncMethodCallback<Long> resultHandler) {
-    Node header = request.getHeader();
-    DataAsyncService service = getDataAsyncService(header, resultHandler, 
request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), 
request.getRaftId(),
+        resultHandler, request);
     if (service != null) {
       service.appendEntry(request, resultHandler);
     }
@@ -269,8 +274,8 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
 
   @Override
   public void sendSnapshot(SendSnapshotRequest request, 
AsyncMethodCallback<Void> resultHandler) {
-    Node header = request.getHeader();
-    DataAsyncService service = getDataAsyncService(header, resultHandler, 
request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), 
request.getRaftId(),
+        resultHandler, request);
     if (service != null) {
       service.sendSnapshot(request, resultHandler);
     }
@@ -279,8 +284,8 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
   @Override
   public void pullSnapshot(PullSnapshotRequest request,
       AsyncMethodCallback<PullSnapshotResp> resultHandler) {
-    Node header = request.getHeader();
-    DataAsyncService service = getDataAsyncService(header, resultHandler, 
request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), 
request.getRaftId(),
+        resultHandler, request);
     if (service != null) {
       service.pullSnapshot(request, resultHandler);
     }
@@ -289,16 +294,17 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
   @Override
   public void executeNonQueryPlan(ExecutNonQueryReq request,
       AsyncMethodCallback<TSStatus> resultHandler) {
-    Node header = request.getHeader();
-    DataAsyncService service = getDataAsyncService(header, resultHandler, 
request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), 
request.getRaftId(),
+        resultHandler, request);
     if (service != null) {
       service.executeNonQueryPlan(request, resultHandler);
     }
   }
 
   @Override
-  public void requestCommitIndex(Node header, AsyncMethodCallback<Long> 
resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, resultHandler, 
"Request commit index");
+  public void requestCommitIndex(Node header, int id, 
AsyncMethodCallback<Long> resultHandler) {
+    DataAsyncService service = getDataAsyncService(header, id, resultHandler,
+        "Request commit index");
     if (service != null) {
       service.requestCommitIndex(header, resultHandler);
     }
@@ -307,7 +313,7 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
   @Override
   public void readFile(String filePath, long offset, int length,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    DataAsyncService service = getDataAsyncService(thisNode, resultHandler,
+    DataAsyncService service = getDataAsyncService(thisNode, 0, resultHandler,
         "Read file:" + filePath);
     if (service != null) {
       service.readFile(filePath, offset, length, resultHandler);
@@ -317,7 +323,8 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
   @Override
   public void querySingleSeries(SingleSeriesQueryRequest request,
       AsyncMethodCallback<Long> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), 
resultHandler,
+    DataAsyncService service = getDataAsyncService(request.getHeader(), 
request.getRaftId(),
+        resultHandler,
         "Query series:" + request.getPath());
     if (service != null) {
       service.querySingleSeries(request, resultHandler);
@@ -325,9 +332,9 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
   }
 
   @Override
-  public void fetchSingleSeries(Node header, long readerId,
+  public void fetchSingleSeries(Node header, int id, long readerId,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, resultHandler,
+    DataAsyncService service = getDataAsyncService(header, id, resultHandler,
         "Fetch reader:" + readerId);
     if (service != null) {
       service.fetchSingleSeries(header, readerId, resultHandler);
@@ -335,18 +342,18 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
   }
 
   @Override
-  public void getAllPaths(Node header, List<String> paths, boolean withAlias,
+  public void getAllPaths(Node header, int id, List<String> paths, boolean 
withAlias,
       AsyncMethodCallback<GetAllPathsResult> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, resultHandler, 
"Find path:" + paths);
+    DataAsyncService service = getDataAsyncService(header, id, resultHandler, 
"Find path:" + paths);
     if (service != null) {
       service.getAllPaths(header, paths, withAlias, resultHandler);
     }
   }
 
   @Override
-  public void endQuery(Node header, Node thisNode, long queryId,
+  public void endQuery(Node header, int id, Node thisNode, long queryId,
       AsyncMethodCallback<Void> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, resultHandler, "End 
query");
+    DataAsyncService service = getDataAsyncService(header, id, resultHandler, 
"End query");
     if (service != null) {
       service.endQuery(header, thisNode, queryId, resultHandler);
     }
@@ -355,7 +362,8 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
   @Override
   public void querySingleSeriesByTimestamp(SingleSeriesQueryRequest request,
       AsyncMethodCallback<Long> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), 
resultHandler,
+    DataAsyncService service = getDataAsyncService(request.getHeader(), 
request.getRaftId(),
+        resultHandler,
         "Query by timestamp:" + request.getQueryId() + "#" + request.getPath() 
+ " of " + request
             .getRequester());
     if (service != null) {
@@ -364,9 +372,9 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
   }
 
   @Override
-  public void fetchSingleSeriesByTimestamp(Node header, long readerId, long 
time,
+  public void fetchSingleSeriesByTimestamp(Node header, int id, long readerId, 
long time,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, resultHandler,
+    DataAsyncService service = getDataAsyncService(header, id, resultHandler,
         "Fetch by timestamp:" + readerId);
     if (service != null) {
       service.fetchSingleSeriesByTimestamp(header, readerId, time, 
resultHandler);
@@ -376,8 +384,8 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
   @Override
   public void pullTimeSeriesSchema(PullSchemaRequest request,
       AsyncMethodCallback<PullSchemaResp> resultHandler) {
-    Node header = request.getHeader();
-    DataAsyncService service = getDataAsyncService(header, resultHandler, 
request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), 
request.getRaftId(),
+        resultHandler, request);
     if (service != null) {
       service.pullTimeSeriesSchema(request, resultHandler);
     }
@@ -386,37 +394,38 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
   @Override
   public void pullMeasurementSchema(PullSchemaRequest request,
       AsyncMethodCallback<PullSchemaResp> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), 
resultHandler,
+    DataAsyncService service = getDataAsyncService(request.getHeader(), 
request.getRaftId(),
+        resultHandler,
         "Pull measurement schema");
     service.pullMeasurementSchema(request, resultHandler);
   }
 
   @Override
-  public void getAllDevices(Node header, List<String> paths,
+  public void getAllDevices(Node header, int id, List<String> paths,
       AsyncMethodCallback<Set<String>> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, resultHandler, "Get 
all devices");
+    DataAsyncService service = getDataAsyncService(header, id, resultHandler, 
"Get all devices");
     service.getAllDevices(header, paths, resultHandler);
   }
 
   @Override
-  public void getNodeList(Node header, String path, int nodeLevel,
+  public void getNodeList(Node header, int id, String path, int nodeLevel,
       AsyncMethodCallback<List<String>> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, resultHandler, "Get 
node list");
+    DataAsyncService service = getDataAsyncService(header, id, resultHandler, 
"Get node list");
     service.getNodeList(header, path, nodeLevel, resultHandler);
   }
 
   @Override
-  public void getChildNodePathInNextLevel(Node header, String path,
+  public void getChildNodePathInNextLevel(Node header, int id, String path,
       AsyncMethodCallback<Set<String>> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, resultHandler,
+    DataAsyncService service = getDataAsyncService(header, id, resultHandler,
         "Get child node path in next level");
     service.getChildNodePathInNextLevel(header, path, resultHandler);
   }
 
   @Override
-  public void getAllMeasurementSchema(Node header, ByteBuffer planBytes,
+  public void getAllMeasurementSchema(Node header, int id, ByteBuffer 
planBytes,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, resultHandler,
+    DataAsyncService service = getDataAsyncService(header, id, resultHandler,
         "Get all measurement schema");
     service.getAllMeasurementSchema(header, planBytes, resultHandler);
   }
@@ -424,28 +433,30 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
   @Override
   public void getAggrResult(GetAggrResultRequest request,
       AsyncMethodCallback<List<ByteBuffer>> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), 
resultHandler, request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), 
request.getRaftId(),
+        resultHandler, request);
     service.getAggrResult(request, resultHandler);
   }
 
   @Override
-  public void getUnregisteredTimeseries(Node header, List<String> 
timeseriesList,
+  public void getUnregisteredTimeseries(Node header, int id, List<String> 
timeseriesList,
       AsyncMethodCallback<List<String>> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, resultHandler,
+    DataAsyncService service = getDataAsyncService(header, id, resultHandler,
         "Check if measurements are registered");
     service.getUnregisteredTimeseries(header, timeseriesList, resultHandler);
   }
 
   @Override
   public void getGroupByExecutor(GroupByRequest request, 
AsyncMethodCallback<Long> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), 
resultHandler, request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), 
request.getRaftId(),
+        resultHandler, request);
     service.getGroupByExecutor(request, resultHandler);
   }
 
   @Override
-  public void getGroupByResult(Node header, long executorId, long startTime, 
long endTime,
+  public void getGroupByResult(Node header, int id, long executorId, long 
startTime, long endTime,
       AsyncMethodCallback<List<ByteBuffer>> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, resultHandler, 
"Fetch group by");
+    DataAsyncService service = getDataAsyncService(header, id, resultHandler, 
"Fetch group by");
     service.getGroupByResult(header, executorId, startTime, endTime, 
resultHandler);
   }
 
@@ -488,37 +499,37 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
    * @param result
    */
   public void addNode(Node node, NodeAdditionResult result) {
-    Iterator<Entry<Node, DataGroupMember>> entryIterator = 
headerGroupMap.entrySet().iterator();
-    synchronized (headerGroupMap) {
-      while (entryIterator.hasNext()) {
-        Entry<Node, DataGroupMember> entry = entryIterator.next();
-        DataGroupMember dataGroupMember = entry.getValue();
-        // the member may be extruded from the group, remove and stop it if so
-        boolean shouldLeave = dataGroupMember.addNode(node, result);
-        if (shouldLeave) {
-          logger.info("This node does not belong to {} any more", 
dataGroupMember.getAllNodes());
-          entryIterator.remove();
-          removeMember(entry.getKey(), entry.getValue());
-        }
-      }
-
-      if (result.getNewGroup().contains(thisNode)) {
-        logger.info("Adding this node into a new group {}", 
result.getNewGroup());
-        DataGroupMember dataGroupMember = 
dataMemberFactory.create(result.getNewGroup(), thisNode);
-        addDataGroupMember(dataGroupMember);
-        dataGroupMember.start();
-        dataGroupMember
-            .pullNodeAdditionSnapshots(((SlotPartitionTable) 
partitionTable).getNodeSlots(node),
-                node);
-      }
-    }
+//    Iterator<Entry<Node, DataGroupMember>> entryIterator = 
headerGroupMap.entrySet().iterator();
+//    synchronized (headerGroupMap) {
+//      while (entryIterator.hasNext()) {
+//        Entry<Node, DataGroupMember> entry = entryIterator.next();
+//        DataGroupMember dataGroupMember = entry.getValue();
+//        // the member may be extruded from the group, remove and stop it if 
so
+//        boolean shouldLeave = dataGroupMember.addNode(node, result);
+//        if (shouldLeave) {
+//          logger.info("This node does not belong to {} any more", 
dataGroupMember.getAllNodes());
+//          entryIterator.remove();
+//          removeMember(entry.getKey(), entry.getValue());
+//        }
+//      }
+//
+//      if (result.getNewGroup().contains(thisNode)) {
+//        logger.info("Adding this node into a new group {}", 
result.getNewGroup());
+//        DataGroupMember dataGroupMember = 
dataMemberFactory.create(result.getNewGroup(), thisNode);
+//        addDataGroupMember(dataGroupMember);
+//        dataGroupMember.start();
+//        dataGroupMember
+//            .pullNodeAdditionSnapshots(((SlotPartitionTable) 
partitionTable).getNodeSlots(node),
+//                node);
+//      }
+//    }
   }
 
   private void removeMember(Node header, DataGroupMember dataGroupMember) {
-    dataGroupMember.syncLeader();
-    dataGroupMember.setReadOnly();
-    dataGroupMember.stop();
-    stoppedMemberManager.put(header, dataGroupMember);
+//    dataGroupMember.syncLeader();
+//    dataGroupMember.setReadOnly();
+//    dataGroupMember.stop();
+//    stoppedMemberManager.put(header, dataGroupMember);
   }
 
   /**
@@ -574,43 +585,43 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
    * @param removalResult cluster changes due to the node removal
    */
   public void removeNode(Node node, NodeRemovalResult removalResult) {
-    Iterator<Entry<Node, DataGroupMember>> entryIterator = 
headerGroupMap.entrySet().iterator();
-    synchronized (headerGroupMap) {
-      while (entryIterator.hasNext()) {
-        Entry<Node, DataGroupMember> entry = entryIterator.next();
-        DataGroupMember dataGroupMember = entry.getValue();
-        if (dataGroupMember.getHeader().equals(node)) {
-          // the group is removed as the node is removed, so new writes should 
be rejected as
-          // they belong to the new holder, but the member is kept alive for 
other nodes to pull
-          // snapshots
-          entryIterator.remove();
-          removeMember(entry.getKey(), entry.getValue());
-        } else {
-          if (node.equals(thisNode)) {
-            // this node is removed, it is no more replica of other groups
-            List<Integer> nodeSlots =
-                ((SlotPartitionTable) 
partitionTable).getNodeSlots(dataGroupMember.getHeader());
-            dataGroupMember.removeLocalData(nodeSlots);
-            entryIterator.remove();
-            dataGroupMember.stop();
-          } else {
-            // the group should be updated and pull new slots from the removed 
node
-            dataGroupMember.removeNode(node, removalResult);
-          }
-        }
-      }
-      PartitionGroup newGroup = removalResult.getNewGroup();
-      if (newGroup != null) {
-        logger.info("{} should join a new group {}", thisNode, newGroup);
-        try {
-          createNewMember(newGroup.getHeader());
-        } catch (NotInSameGroupException e) {
-          // ignored
-        } catch (CheckConsistencyException ce) {
-          logger.error("remove node failed, error={}", ce.getMessage());
-        }
-      }
-    }
+//    Iterator<Entry<Node, DataGroupMember>> entryIterator = 
headerGroupMap.entrySet().iterator();
+//    synchronized (headerGroupMap) {
+//      while (entryIterator.hasNext()) {
+//        Entry<Node, DataGroupMember> entry = entryIterator.next();
+//        DataGroupMember dataGroupMember = entry.getValue();
+//        if (dataGroupMember.getHeader().equals(node)) {
+//          // the group is removed as the node is removed, so new writes 
should be rejected as
+//          // they belong to the new holder, but the member is kept alive for 
other nodes to pull
+//          // snapshots
+//          entryIterator.remove();
+//          removeMember(entry.getKey(), entry.getValue());
+//        } else {
+//          if (node.equals(thisNode)) {
+//            // this node is removed, it is no more replica of other groups
+//            List<Integer> nodeSlots =
+//                ((SlotPartitionTable) 
partitionTable).getNodeSlots(dataGroupMember.getHeader());
+//            dataGroupMember.removeLocalData(nodeSlots);
+//            entryIterator.remove();
+//            dataGroupMember.stop();
+//          } else {
+//            // the group should be updated and pull new slots from the 
removed node
+//            dataGroupMember.removeNode(node, removalResult);
+//          }
+//        }
+//      }
+//      PartitionGroup newGroup = removalResult.getNewGroup();
+//      if (newGroup != null) {
+//        logger.info("{} should join a new group {}", thisNode, newGroup);
+//        try {
+//          createNewMember(newGroup.getHeader());
+//        } catch (NotInSameGroupException e) {
+//          // ignored
+//        } catch (CheckConsistencyException ce) {
+//          logger.error("remove node failed, error={}", ce.getMessage());
+//        }
+//      }
+//    }
   }
 
   public void setPartitionTable(PartitionTable partitionTable) {
@@ -642,7 +653,8 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
   @Override
   public void previousFill(PreviousFillRequest request,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), 
resultHandler, request);
+    DataAsyncService service = getDataAsyncService(request.getHeader(), 
request.getRaftId(),
+        resultHandler, request);
     service.previousFill(request, resultHandler);
   }
 
@@ -653,208 +665,226 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
   }
 
   @Override
-  public void matchTerm(long index, long term, Node header,
+  public void matchTerm(long index, long term, Node header, int id,
       AsyncMethodCallback<Boolean> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, resultHandler, 
"Match term");
+    DataAsyncService service = getDataAsyncService(header, id, resultHandler, 
"Match term");
     service.matchTerm(index, term, header, resultHandler);
   }
 
   @Override
   public void last(LastQueryRequest request, AsyncMethodCallback<ByteBuffer> 
resultHandler) {
-    DataAsyncService service = getDataAsyncService(request.getHeader(), 
resultHandler, "last");
+    DataAsyncService service = getDataAsyncService(request.getHeader(), 
request.getRaftId(),
+        resultHandler, "last");
     service.last(request, resultHandler);
   }
 
   @Override
-  public void getPathCount(Node header, List<String> pathsToQuery, int level,
+  public void getPathCount(Node header, int id, List<String> pathsToQuery, int 
level,
       AsyncMethodCallback<Integer> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, resultHandler, 
"count path");
+    DataAsyncService service = getDataAsyncService(header, id, resultHandler, 
"count path");
     service.getPathCount(header, pathsToQuery, level, resultHandler);
   }
 
   @Override
-  public void onSnapshotApplied(Node header, List<Integer> slots,
+  public void onSnapshotApplied(Node header, int id, List<Integer> slots,
       AsyncMethodCallback<Boolean> resultHandler) {
-    DataAsyncService service = getDataAsyncService(header, resultHandler, 
"Snapshot applied");
+    DataAsyncService service = getDataAsyncService(header, id, resultHandler, 
"Snapshot applied");
     service.onSnapshotApplied(header, slots, resultHandler);
   }
 
   @Override
   public long querySingleSeries(SingleSeriesQueryRequest request) throws 
TException {
-    return getDataSyncService(request.getHeader()).querySingleSeries(request);
+    return getDataSyncService(request.getHeader(), 
request.getRaftId()).querySingleSeries(request);
   }
 
   @Override
-  public ByteBuffer fetchSingleSeries(Node header, long readerId) throws 
TException {
-    return getDataSyncService(header).fetchSingleSeries(header, readerId);
+  public ByteBuffer fetchSingleSeries(Node header, int raftId, long readerId) 
throws TException {
+    return getDataSyncService(header, raftId).fetchSingleSeries(header, 
readerId);
   }
 
   @Override
   public long querySingleSeriesByTimestamp(SingleSeriesQueryRequest request) 
throws TException {
-    return 
getDataSyncService(request.getHeader()).querySingleSeriesByTimestamp(request);
+    return getDataSyncService(request.getHeader(), request.getRaftId())
+        .querySingleSeriesByTimestamp(request);
   }
 
   @Override
-  public ByteBuffer fetchSingleSeriesByTimestamp(Node header, long readerId, 
long timestamp)
+  public ByteBuffer fetchSingleSeriesByTimestamp(Node header, int raftId, long 
readerId,
+      long timestamp)
       throws TException {
-    return getDataSyncService(header).fetchSingleSeriesByTimestamp(header, 
readerId, timestamp);
+    return getDataSyncService(header, raftId)
+        .fetchSingleSeriesByTimestamp(header, readerId, timestamp);
   }
 
   @Override
-  public void endQuery(Node header, Node thisNode, long queryId) throws 
TException {
-    getDataSyncService(header).endQuery(header, thisNode, queryId);
+  public void endQuery(Node header, int raftId, Node thisNode, long queryId) 
throws TException {
+    getDataSyncService(header, raftId).endQuery(header, thisNode, queryId);
   }
 
   @Override
-  public GetAllPathsResult getAllPaths(Node header, List<String> path, boolean 
withAlias)
+  public GetAllPathsResult getAllPaths(Node header, int raftId, List<String> 
path,
+      boolean withAlias)
       throws TException {
-    return getDataSyncService(header).getAllPaths(header, path, withAlias);
+    return getDataSyncService(header, raftId).getAllPaths(header, path, 
withAlias);
   }
 
   @Override
-  public Set<String> getAllDevices(Node header, List<String> path) throws 
TException {
-    return getDataSyncService(header).getAllDevices(header, path);
+  public Set<String> getAllDevices(Node header, int raftId, List<String> path) 
throws TException {
+    return getDataSyncService(header, raftId).getAllDevices(header, path);
   }
 
   @Override
-  public List<String> getNodeList(Node header, String path, int nodeLevel) 
throws TException {
-    return getDataSyncService(header).getNodeList(header, path, nodeLevel);
+  public List<String> getNodeList(Node header, int raftId, String path, int 
nodeLevel)
+      throws TException {
+    return getDataSyncService(header, raftId).getNodeList(header, path, 
nodeLevel);
   }
 
   @Override
-  public Set<String> getChildNodePathInNextLevel(Node header, String path) 
throws TException {
-    return getDataSyncService(header).getChildNodePathInNextLevel(header, 
path);
+  public Set<String> getChildNodePathInNextLevel(Node header, int raftId, 
String path)
+      throws TException {
+    return getDataSyncService(header, 
raftId).getChildNodePathInNextLevel(header, path);
   }
 
   @Override
-  public ByteBuffer getAllMeasurementSchema(Node header, ByteBuffer 
planBinary) throws TException {
-    return getDataSyncService(header).getAllMeasurementSchema(header, 
planBinary);
+  public ByteBuffer getAllMeasurementSchema(Node header, int raftId, 
ByteBuffer planBinary)
+      throws TException {
+    return getDataSyncService(header, raftId).getAllMeasurementSchema(header, 
planBinary);
   }
 
   @Override
   public List<ByteBuffer> getAggrResult(GetAggrResultRequest request) throws 
TException {
-    return getDataSyncService(request.getHeader()).getAggrResult(request);
+    return getDataSyncService(request.getHeader(), 
request.getRaftId()).getAggrResult(request);
   }
 
   @Override
-  public List<String> getUnregisteredTimeseries(Node header, List<String> 
timeseriesList)
+  public List<String> getUnregisteredTimeseries(Node header, int raftId,
+      List<String> timeseriesList)
       throws TException {
-    return getDataSyncService(header).getUnregisteredTimeseries(header, 
timeseriesList);
+    return getDataSyncService(header, 
raftId).getUnregisteredTimeseries(header, timeseriesList);
   }
 
   @Override
   public PullSnapshotResp pullSnapshot(PullSnapshotRequest request) throws 
TException {
-    return getDataSyncService(request.getHeader()).pullSnapshot(request);
+    return getDataSyncService(request.getHeader(), 
request.getRaftId()).pullSnapshot(request);
   }
 
   @Override
   public long getGroupByExecutor(GroupByRequest request) throws TException {
-    return getDataSyncService(request.header).getGroupByExecutor(request);
+    return getDataSyncService(request.getHeader(), 
request.getRaftId()).getGroupByExecutor(request);
   }
 
   @Override
-  public List<ByteBuffer> getGroupByResult(Node header, long executorId, long 
startTime,
+  public List<ByteBuffer> getGroupByResult(Node header, int raftId, long 
executorId, long startTime,
       long endTime) throws TException {
-    return getDataSyncService(header).getGroupByResult(header, executorId, 
startTime, endTime);
+    return getDataSyncService(header, raftId)
+        .getGroupByResult(header, executorId, startTime, endTime);
   }
 
   @Override
   public PullSchemaResp pullTimeSeriesSchema(PullSchemaRequest request) throws 
TException {
-    return 
getDataSyncService(request.getHeader()).pullTimeSeriesSchema(request);
+    return getDataSyncService(request.getHeader(), request.getRaftId())
+        .pullTimeSeriesSchema(request);
   }
 
   @Override
   public PullSchemaResp pullMeasurementSchema(PullSchemaRequest request) 
throws TException {
-    return 
getDataSyncService(request.getHeader()).pullMeasurementSchema(request);
+    return getDataSyncService(request.getHeader(), request.getRaftId())
+        .pullMeasurementSchema(request);
   }
 
   @Override
   public ByteBuffer previousFill(PreviousFillRequest request) throws 
TException {
-    return getDataSyncService(request.getHeader()).previousFill(request);
+    return getDataSyncService(request.getHeader(), 
request.getRaftId()).previousFill(request);
   }
 
   @Override
   public ByteBuffer last(LastQueryRequest request) throws TException {
-    return getDataSyncService(request.getHeader()).last(request);
+    return getDataSyncService(request.getHeader(), 
request.getRaftId()).last(request);
   }
 
   @Override
-  public int getPathCount(Node header, List<String> pathsToQuery, int level) 
throws TException {
-    return getDataSyncService(header).getPathCount(header, pathsToQuery, 
level);
+  public int getPathCount(Node header, int raftId, List<String> pathsToQuery, 
int level)
+      throws TException {
+    return getDataSyncService(header, raftId).getPathCount(header, 
pathsToQuery, level);
   }
 
   @Override
-  public boolean onSnapshotApplied(Node header, List<Integer> slots) {
-    return getDataSyncService(header).onSnapshotApplied(header, slots);
+  public boolean onSnapshotApplied(Node header, int raftId, List<Integer> 
slots) {
+    return getDataSyncService(header, raftId).onSnapshotApplied(header, slots);
   }
 
   @Override
   public HeartBeatResponse sendHeartbeat(HeartBeatRequest request) {
-    return getDataSyncService(request.getHeader()).sendHeartbeat(request);
+    return getDataSyncService(request.getHeader(), 
request.getRaftId()).sendHeartbeat(request);
   }
 
   @Override
   public long startElection(ElectionRequest request) {
-    return getDataSyncService(request.getHeader()).startElection(request);
+    return getDataSyncService(request.getHeader(), 
request.getRaftId()).startElection(request);
   }
 
   @Override
   public long appendEntries(AppendEntriesRequest request) throws TException {
-    return getDataSyncService(request.getHeader()).appendEntries(request);
+    return getDataSyncService(request.getHeader(), 
request.getRaftId()).appendEntries(request);
   }
 
   @Override
   public long appendEntry(AppendEntryRequest request) throws TException {
-    return getDataSyncService(request.getHeader()).appendEntry(request);
+    return getDataSyncService(request.getHeader(), 
request.getRaftId()).appendEntry(request);
   }
 
   @Override
   public void sendSnapshot(SendSnapshotRequest request) throws TException {
-    getDataSyncService(request.getHeader()).sendSnapshot(request);
+    getDataSyncService(request.getHeader(), 
request.getRaftId()).sendSnapshot(request);
   }
 
   @Override
   public TSStatus executeNonQueryPlan(ExecutNonQueryReq request) throws 
TException {
-    return 
getDataSyncService(request.getHeader()).executeNonQueryPlan(request);
+    return getDataSyncService(request.getHeader(), request.getRaftId())
+        .executeNonQueryPlan(request);
   }
 
   @Override
-  public long requestCommitIndex(Node header) throws TException {
-    return getDataSyncService(header).requestCommitIndex(header);
+  public long requestCommitIndex(Node header, int raftId) throws TException {
+    return getDataSyncService(header, raftId).requestCommitIndex(header);
   }
 
   @Override
   public ByteBuffer readFile(String filePath, long offset, int length) throws 
TException {
-    return getDataSyncService(thisNode).readFile(filePath, offset, length);
+    return getDataSyncService(thisNode, 0).readFile(filePath, offset, length);
   }
 
   @Override
-  public boolean matchTerm(long index, long term, Node header) {
-    return getDataSyncService(header).matchTerm(index, term, header);
+  public boolean matchTerm(long index, long term, Node header, int raftId) {
+    return getDataSyncService(header, raftId).matchTerm(index, term, header);
   }
 
   @Override
-  public ByteBuffer peekNextNotNullValue(Node header, long executorId, long 
startTime, long endTime)
+  public ByteBuffer peekNextNotNullValue(Node header, int raftId, long 
executorId, long startTime,
+      long endTime)
       throws TException {
-    return getDataSyncService(header).peekNextNotNullValue(header, executorId, 
startTime, endTime);
+    return getDataSyncService(header, raftId)
+        .peekNextNotNullValue(header, executorId, startTime, endTime);
   }
 
   @Override
-  public void peekNextNotNullValue(Node header, long executorId, long 
startTime, long endTime,
+  public void peekNextNotNullValue(Node header, int raftId, long executorId, 
long startTime,
+      long endTime,
       AsyncMethodCallback<ByteBuffer> resultHandler) throws TException {
     resultHandler.onComplete(
-        getDataSyncService(header).peekNextNotNullValue(header, executorId, 
startTime, endTime));
+        getDataSyncService(header, raftId)
+            .peekNextNotNullValue(header, executorId, startTime, endTime));
   }
 
   @Override
   public void removeHardLink(String hardLinkPath) throws TException {
-    getDataSyncService(thisNode).removeHardLink(hardLinkPath);
+    getDataSyncService(thisNode, 0).removeHardLink(hardLinkPath);
   }
 
   @Override
   public void removeHardLink(String hardLinkPath,
       AsyncMethodCallback<Void> resultHandler) {
-    getDataAsyncService(thisNode, resultHandler, 
hardLinkPath).removeHardLink(hardLinkPath,
+    getDataAsyncService(thisNode, 0, resultHandler, 
hardLinkPath).removeHardLink(hardLinkPath,
         resultHandler);
   }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java
index 6fd2059..b203aaf 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java
@@ -33,16 +33,17 @@ import 
org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.cluster.server.member.DataGroupMember.Factory;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * When a node is added or removed, several partition groups are affected and 
nodes may exit some
- * groups. For example, the local node is #5 and it is in a data group of [1, 
3, 5], then node #3
- * is added, so the group becomes [1, 3, 4] and the local node must leave the 
group. However, #5
- * may have data that #4 needs to pull, so the Member of #5 in this group is 
stopped but not
- * removed yet and when system recovers, we need to resume the groups so that 
they can keep
- * providing snapshots for data transfers.
+ * groups. For example, the local node is #5 and it is in a data group of [1, 
3, 5], then node #3 is
+ * added, so the group becomes [1, 3, 4] and the local node must leave the 
group. However, #5 may
+ * have data that #4 needs to pull, so the Member of #5 in this group is 
stopped but not removed yet
+ * and when system recovers, we need to resume the groups so that they can 
keep providing snapshots
+ * for data transfers.
  */
 public class StoppedMemberManager {
 
@@ -53,7 +54,7 @@ public class StoppedMemberManager {
   private static final String REMOVED = "0";
   private static final String RESUMED = "1";
 
-  private Map<Node, DataGroupMember> removedMemberMap = new HashMap<>();
+  private Map<Pair<Node, Integer>, DataGroupMember> removedMemberMap = new 
HashMap<>();
   private DataGroupMember.Factory memberFactory;
   private Node thisNode;
 
@@ -65,13 +66,14 @@ public class StoppedMemberManager {
   }
 
   /**
-   * When a DataGroupMember is removed, add it here and record this removal, 
so in next start-up
-   * we can recover it as a data source for data transfers.
-   * @param header
+   * When a DataGroupMember is removed, add it here and record this removal, 
so in next start-up we
+   * can recover it as a data source for data transfers.
+   *
+   * @param pair
    * @param dataGroupMember
    */
-  public synchronized void put(Node header, DataGroupMember dataGroupMember) {
-    removedMemberMap.put(header, dataGroupMember);
+  public synchronized void put(Pair<Node, Integer> pair, DataGroupMember 
dataGroupMember) {
+    removedMemberMap.put(pair, dataGroupMember);
     try (BufferedWriter writer = new BufferedWriter(new 
FileWriter(stoppedMembersFileName, true))) {
       StringBuilder builder = new StringBuilder(REMOVED);
       for (Node node : dataGroupMember.getAllNodes()) {
@@ -80,27 +82,28 @@ public class StoppedMemberManager {
       writer.write(builder.toString());
       writer.newLine();
     } catch (IOException e) {
-      logger.error("Cannot record removed member of header {}", header, e);
+      logger.error("Cannot record removed member of header {}", pair, e);
     }
   }
 
   /**
-   * When a DataGroupMember is resumed, add it here and record this removal, 
so in next start-up
-   * we will not recover it here.
-   * @param header
+   * When a DataGroupMember is resumed, add it here and record this removal, 
so in next start-up we
+   * will not recover it here.
+   *
+   * @param pair
    */
-  public synchronized void remove(Node header) {
-    removedMemberMap.remove(header);
+  public synchronized void remove(Pair<Node, Integer> pair) {
+    removedMemberMap.remove(pair);
     try (BufferedWriter writer = new BufferedWriter(new 
FileWriter(stoppedMembersFileName, true))) {
-      writer.write(RESUMED + ";" + header.toString());
+      writer.write(RESUMED + ";" + pair.toString());
       writer.newLine();
     } catch (IOException e) {
-      logger.error("Cannot record resumed member of header {}", header, e);
+      logger.error("Cannot record resumed member of header {}", pair, e);
     }
   }
 
-  public synchronized DataGroupMember get(Node header) {
-    return removedMemberMap.get(header);
+  public synchronized DataGroupMember get(Pair<Node, Integer> pair) {
+    return removedMemberMap.get(pair);
   }
 
   private void recover() {
@@ -143,7 +146,8 @@ public class StoppedMemberManager {
     }
     DataGroupMember member = memberFactory.create(partitionGroup, thisNode);
     member.setReadOnly();
-    removedMemberMap.put(partitionGroup.getHeader(), member);
+    //TODO CORRECT
+    removedMemberMap.put(new Pair(partitionGroup.getHeader(), 0), member);
   }
 
   private void parseResumed(String[] split) {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index de200cf..62f2b71 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -143,8 +143,8 @@ public class DataGroupMember extends RaftMember {
   private LocalQueryExecutor localQueryExecutor;
 
   /**
-   * When a new partition table is installed, all data members will be checked 
if unchanged. If
-   * not, such members will be removed.
+   * When a new partition table is installed, all data members will be checked 
if unchanged. If not,
+   * such members will be removed.
    */
   private boolean unchanged;
 
@@ -237,6 +237,10 @@ public class DataGroupMember extends RaftMember {
     return allNodes.get(0);
   }
 
+  public Integer getRaftGroupId() {
+    return allNodes.getId();
+  }
+
   public ClusterQueryManager getQueryManager() {
     return queryManager;
   }
@@ -385,7 +389,8 @@ public class DataGroupMember extends RaftMember {
   public void receiveSnapshot(SendSnapshotRequest request) throws 
SnapshotInstallationException {
     logger.info("{}: received a snapshot from {} with size {}", name, 
request.getHeader(),
         request.getSnapshotBytes().length);
-    PartitionedSnapshot<FileSnapshot> snapshot = new 
PartitionedSnapshot<>(FileSnapshot.Factory.INSTANCE);
+    PartitionedSnapshot<FileSnapshot> snapshot = new PartitionedSnapshot<>(
+        FileSnapshot.Factory.INSTANCE);
 
     snapshot.deserialize(ByteBuffer.wrap(request.getSnapshotBytes()));
     if (logger.isDebugEnabled()) {
@@ -485,7 +490,8 @@ public class DataGroupMember extends RaftMember {
         Node node = entry.getKey();
         List<Integer> nodeSlots = entry.getValue();
         PullSnapshotTaskDescriptor taskDescriptor =
-            new 
PullSnapshotTaskDescriptor(metaGroupMember.getPartitionTable().getHeaderGroup(node),
+            new PullSnapshotTaskDescriptor(metaGroupMember.getPartitionTable()
+                .getHeaderGroup(new Pair<>(node, getRaftGroupId())),
                 nodeSlots, false);
         pullFileSnapshot(taskDescriptor, null);
       }
@@ -522,8 +528,9 @@ public class DataGroupMember extends RaftMember {
           descriptor.getSlots().get(0), descriptor.getSlots().size() - 1);
     }
 
-    pullSnapshotService.submit(new PullSnapshotTask<>(descriptor, this, 
FileSnapshot.Factory.INSTANCE,
-        snapshotSave));
+    pullSnapshotService
+        .submit(new PullSnapshotTask<>(descriptor, this, 
FileSnapshot.Factory.INSTANCE,
+            snapshotSave));
   }
 
   /**
@@ -619,9 +626,9 @@ public class DataGroupMember extends RaftMember {
       List<Pair<Long, Boolean>> tmpPairList = entry.getValue();
       for (Pair<Long, Boolean> pair : tmpPairList) {
         long partitionId = pair.left;
-        Node header = 
metaGroupMember.getPartitionTable().routeToHeaderByTime(storageGroupName,
+        Pair<Node, Integer> pair = 
metaGroupMember.getPartitionTable().routeToHeaderByTime(storageGroupName,
             partitionId * StorageEngine.getTimePartitionInterval());
-        DataGroupMember localDataMember = 
metaGroupMember.getLocalDataMember(header);
+        DataGroupMember localDataMember = 
metaGroupMember.getLocalDataMember(pair);
         if (localDataMember.getHeader().equals(this.getHeader())) {
           localListPair.add(new Pair<>(partitionId, pair.right));
         }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 863314f..9f2fbb6 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -147,6 +147,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TTransportException;
@@ -273,7 +274,7 @@ public class MetaGroupMember extends RaftMember {
         new SyncClientPool(new SyncMetaClient.FactorySync(factory)),
         new AsyncClientPool(new 
AsyncMetaHeartbeatClient.FactoryAsync(factory), false),
         new SyncClientPool(new SyncMetaHeartbeatClient.FactorySync(factory)));
-    allNodes = new ArrayList<>();
+    allNodes = new PartitionGroup();
     initPeerMap();
 
     dataClientProvider = new DataClientProvider(factory);
@@ -307,9 +308,9 @@ public class MetaGroupMember extends RaftMember {
    * @return true if the member is a leader and the partition is closed, false 
otherwise
    */
   public void closePartition(String storageGroupName, long partitionId, 
boolean isSeq) {
-    Node header = partitionTable.routeToHeaderByTime(storageGroupName,
+    Pair<Node, Integer> pair = 
partitionTable.routeToHeaderByTime(storageGroupName,
         partitionId * StorageEngine.getTimePartitionInterval());
-    DataGroupMember localDataMember = getLocalDataMember(header);
+    DataGroupMember localDataMember = getLocalDataMember(pair);
     if (localDataMember == null || localDataMember.getCharacter() != 
NodeCharacter.LEADER) {
       return;
     }
@@ -678,7 +679,7 @@ public class MetaGroupMember extends RaftMember {
   }
 
   private void updateNodeList(Collection<Node> nodes) {
-    allNodes = new ArrayList<>(nodes);
+    allNodes = new PartitionGroup(nodes);
     initPeerMap();
     logger.info("All nodes in the partition table: {}", allNodes);
     initIdNodeMap();
@@ -1730,7 +1731,7 @@ public class MetaGroupMember extends RaftMember {
       if (partitionGroup.contains(thisNode)) {
         // the query should be handled by a group the local node is in, handle 
it with in the group
         logger.debug("Execute {} in a local group of {}", plan, 
partitionGroup.getHeader());
-        status = getLocalDataMember(partitionGroup.getHeader())
+        status = getLocalDataMember(partitionGroup.getHeader(), 
partitionGroup.getId())
             .executeNonQueryPlan(plan);
       } else {
         // forward the query to the group that should handle it
@@ -2113,8 +2114,8 @@ public class MetaGroupMember extends RaftMember {
   }
 
   @Override
-  public void setAllNodes(List<Node> allNodes) {
-    super.setAllNodes(allNodes);
+  public void setAllNodes(PartitionGroup allNodes) {
+    super.setAllNodes(new PartitionGroup(allNodes));
     initPeerMap();
     idNodeMap = new HashMap<>();
     for (Node node : allNodes) {
@@ -2136,10 +2137,10 @@ public class MetaGroupMember extends RaftMember {
   /**
    * Get a local DataGroupMember that is in the group of "header" for an 
internal request.
    *
-   * @param header the header of the group which the local node is in
+   * @param pair the header of the group which the local node is in
    */
-  public DataGroupMember getLocalDataMember(Node header) {
-    return dataClusterServer.getDataMember(header, null, "Internal call");
+  public DataGroupMember getLocalDataMember(Pair<Node, Integer> pair) {
+    return dataClusterServer.getDataMember(pair, null, "Internal call");
   }
 
   public DataClientProvider getClientProvider() {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 25201e9..f396e9c 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -63,6 +63,7 @@ import org.apache.iotdb.cluster.log.LogParser;
 import org.apache.iotdb.cluster.log.catchup.CatchUpTask;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
 import org.apache.iotdb.cluster.log.manage.RaftLogManager;
+import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
@@ -146,7 +147,7 @@ public abstract class RaftMember {
   /**
    * the nodes that belong to the same raft group as thisNode.
    */
-  protected List<Node> allNodes;
+  protected PartitionGroup allNodes;
   ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
   /**
    * the name of the member, to distinguish several members in the logs.
@@ -657,7 +658,7 @@ public abstract class RaftMember {
     return allNodes;
   }
 
-  public void setAllNodes(List<Node> allNodes) {
+  public void setAllNodes(PartitionGroup allNodes) {
     this.allNodes = allNodes;
   }
 
@@ -1006,7 +1007,7 @@ public abstract class RaftMember {
       return commitIdResult.get();
     }
     synchronized (commitIdResult) {
-      client.requestCommitIndex(getHeader(), new 
GenericHandler<>(leader.get(), commitIdResult));
+      client.requestCommitIndex(getHeader(), get,new 
GenericHandler<>(leader.get(), commitIdResult));
       commitIdResult.wait(RaftServer.getSyncLeaderMaxWaitMs());
     }
     return commitIdResult.get();
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
index 113c862..521050e 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
@@ -83,7 +83,7 @@ public abstract class BaseAsyncService implements 
RaftService.AsyncIface {
   }
 
   @Override
-  public void requestCommitIndex(Node header, AsyncMethodCallback<Long> 
resultHandler) {
+  public void requestCommitIndex(Node header, int raftId, 
AsyncMethodCallback<Long> resultHandler) {
     long commitIndex = member.getCommitIndex();
     if (commitIndex != Long.MIN_VALUE) {
       resultHandler.onComplete(commitIndex);
@@ -97,7 +97,7 @@ public abstract class BaseAsyncService implements 
RaftService.AsyncIface {
       return;
     }
     try {
-      client.requestCommitIndex(header, resultHandler);
+      client.requestCommitIndex(header, raftId, resultHandler);
     } catch (TException e) {
       resultHandler.onError(e);
     }
@@ -125,7 +125,7 @@ public abstract class BaseAsyncService implements 
RaftService.AsyncIface {
   }
 
   @Override
-  public void matchTerm(long index, long term, Node header,
+  public void matchTerm(long index, long term, Node header, int raftId,
       AsyncMethodCallback<Boolean> resultHandler) {
     resultHandler.onComplete(member.matchLog(index, term));
   }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
index 5a07130..bffc345 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
@@ -90,7 +90,7 @@ public abstract class BaseSyncService implements 
RaftService.Iface {
   }
 
   @Override
-  public long requestCommitIndex(Node header)
+  public long requestCommitIndex(Node header, int raftId)
       throws TException {
     long commitIndex = member.getCommitIndex();
     if (commitIndex != Long.MIN_VALUE) {
@@ -104,7 +104,7 @@ public abstract class BaseSyncService implements 
RaftService.Iface {
     }
     long commitIndex1 = 0;
     try {
-      commitIndex1 = client.requestCommitIndex(header);
+      commitIndex1 = client.requestCommitIndex(header, raftId);
     } catch (TException e) {
       client.getInputProtocol().getTransport().close();
       throw e;
@@ -141,7 +141,7 @@ public abstract class BaseSyncService implements 
RaftService.Iface {
   }
 
   @Override
-  public boolean matchTerm(long index, long term, Node header) {
+  public boolean matchTerm(long index, long term, Node header, int raftId) {
     return member.matchLog(index, term);
   }
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
index 3e3accf..486274e 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
@@ -88,13 +88,15 @@ public class DataAsyncService extends BaseAsyncService 
implements TSDataService.
     }
   }
 
-  private void forwardPullSnapshot(PullSnapshotRequest request, 
AsyncMethodCallback<PullSnapshotResp> resultHandler) {
+  private void forwardPullSnapshot(PullSnapshotRequest request,
+      AsyncMethodCallback<PullSnapshotResp> resultHandler) {
     // if this node has been set readOnly, then it must have been synchronized 
with the leader
     // otherwise forward the request to the leader
     if (dataGroupMember.getLeader() != null) {
       logger.debug("{} forwarding a pull snapshot request to the leader {}", 
name,
           dataGroupMember.getLeader());
-      AsyncDataClient client = (AsyncDataClient) 
dataGroupMember.getAsyncClient(dataGroupMember.getLeader());
+      AsyncDataClient client = (AsyncDataClient) dataGroupMember
+          .getAsyncClient(dataGroupMember.getLeader());
       try {
         client.pullSnapshot(request, resultHandler);
       } catch (TException e) {
@@ -109,7 +111,8 @@ public class DataAsyncService extends BaseAsyncService 
implements TSDataService.
   public void pullTimeSeriesSchema(PullSchemaRequest request,
       AsyncMethodCallback<PullSchemaResp> resultHandler) {
     try {
-      
resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor().queryTimeSeriesSchema(request));
+      resultHandler
+          
.onComplete(dataGroupMember.getLocalQueryExecutor().queryTimeSeriesSchema(request));
     } catch (CheckConsistencyException e) {
       // if this node cannot synchronize with the leader with in a given time, 
forward the
       // request to the leader
@@ -137,7 +140,8 @@ public class DataAsyncService extends BaseAsyncService 
implements TSDataService.
   public void pullMeasurementSchema(PullSchemaRequest request,
       AsyncMethodCallback<PullSchemaResp> resultHandler) {
     try {
-      
resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor().queryMeasurementSchema(request));
+      resultHandler
+          
.onComplete(dataGroupMember.getLocalQueryExecutor().queryMeasurementSchema(request));
     } catch (CheckConsistencyException e) {
       // if this node cannot synchronize with the leader with in a given time, 
forward the
       // request to the leader
@@ -170,14 +174,15 @@ public class DataAsyncService extends BaseAsyncService 
implements TSDataService.
   public void querySingleSeriesByTimestamp(SingleSeriesQueryRequest request,
       AsyncMethodCallback<Long> resultHandler) {
     try {
-      
resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor().querySingleSeriesByTimestamp(request));
+      resultHandler.onComplete(
+          
dataGroupMember.getLocalQueryExecutor().querySingleSeriesByTimestamp(request));
     } catch (Exception e) {
       resultHandler.onError(e);
     }
   }
 
   @Override
-  public void endQuery(Node header, Node requester, long queryId,
+  public void endQuery(Node header, int raftId, Node requester, long queryId,
       AsyncMethodCallback<Void> resultHandler) {
     try {
       dataGroupMember.getQueryManager().endQuery(requester, queryId);
@@ -188,7 +193,7 @@ public class DataAsyncService extends BaseAsyncService 
implements TSDataService.
   }
 
   @Override
-  public void fetchSingleSeries(Node header, long readerId,
+  public void fetchSingleSeries(Node header, int raftId, long readerId,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
     try {
       
resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor().fetchSingleSeries(readerId));
@@ -198,17 +203,18 @@ public class DataAsyncService extends BaseAsyncService 
implements TSDataService.
   }
 
   @Override
-  public void fetchSingleSeriesByTimestamp(Node header, long readerId, long 
timestamp,
+  public void fetchSingleSeriesByTimestamp(Node header, int raftId, long 
readerId, long timestamp,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
     try {
-      
resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor().fetchSingleSeriesByTimestamp(readerId,
 timestamp));
+      resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor()
+          .fetchSingleSeriesByTimestamp(readerId, timestamp));
     } catch (ReaderNotFoundException | IOException e) {
       resultHandler.onError(e);
     }
   }
 
   @Override
-  public void getAllPaths(Node header, List<String> paths, boolean withAlias,
+  public void getAllPaths(Node header, int raftId, List<String> paths, boolean 
withAlias,
       AsyncMethodCallback<GetAllPathsResult> resultHandler) {
     try {
       dataGroupMember.syncLeaderWithConsistencyCheck(false);
@@ -219,7 +225,7 @@ public class DataAsyncService extends BaseAsyncService 
implements TSDataService.
   }
 
   @Override
-  public void getAllDevices(Node header, List<String> path,
+  public void getAllDevices(Node header, int raftId, List<String> path,
       AsyncMethodCallback<Set<String>> resultHandler) {
     try {
       dataGroupMember.syncLeaderWithConsistencyCheck(false);
@@ -230,7 +236,7 @@ public class DataAsyncService extends BaseAsyncService 
implements TSDataService.
   }
 
   @Override
-  public void getNodeList(Node header, String path, int nodeLevel,
+  public void getNodeList(Node header, int raftId, String path, int nodeLevel,
       AsyncMethodCallback<List<String>> resultHandler) {
     try {
       dataGroupMember.syncLeaderWithConsistencyCheck(false);
@@ -241,7 +247,7 @@ public class DataAsyncService extends BaseAsyncService 
implements TSDataService.
   }
 
   @Override
-  public void getChildNodePathInNextLevel(Node header, String path,
+  public void getChildNodePathInNextLevel(Node header, int raftId, String path,
       AsyncMethodCallback<Set<String>> resultHandler) {
     try {
       dataGroupMember.syncLeaderWithConsistencyCheck(false);
@@ -252,10 +258,11 @@ public class DataAsyncService extends BaseAsyncService 
implements TSDataService.
   }
 
   @Override
-  public void getAllMeasurementSchema(Node header, ByteBuffer planBinary,
+  public void getAllMeasurementSchema(Node header, int raftId, ByteBuffer 
planBinary,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
     try {
-      
resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor().getAllMeasurementSchema(planBinary));
+      resultHandler
+          
.onComplete(dataGroupMember.getLocalQueryExecutor().getAllMeasurementSchema(planBinary));
     } catch (CheckConsistencyException | IOException | MetadataException e) {
       resultHandler.onError(e);
     }
@@ -272,10 +279,11 @@ public class DataAsyncService extends BaseAsyncService 
implements TSDataService.
   }
 
   @Override
-  public void getUnregisteredTimeseries(Node header, List<String> 
timeseriesList,
+  public void getUnregisteredTimeseries(Node header, int raftId, List<String> 
timeseriesList,
       AsyncMethodCallback<List<String>> resultHandler) {
     try {
-      
resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor().getUnregisteredTimeseries(timeseriesList));
+      resultHandler.onComplete(
+          
dataGroupMember.getLocalQueryExecutor().getUnregisteredTimeseries(timeseriesList));
     } catch (CheckConsistencyException e) {
       resultHandler.onError(e);
     }
@@ -291,10 +299,12 @@ public class DataAsyncService extends BaseAsyncService 
implements TSDataService.
   }
 
   @Override
-  public void getGroupByResult(Node header, long executorId, long startTime, 
long endTime,
+  public void getGroupByResult(Node header, int raftId, long executorId, long 
startTime,
+      long endTime,
       AsyncMethodCallback<List<ByteBuffer>> resultHandler) {
     try {
-      
resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor().getGroupByResult(executorId,
 startTime, endTime));
+      resultHandler.onComplete(
+          dataGroupMember.getLocalQueryExecutor().getGroupByResult(executorId, 
startTime, endTime));
     } catch (ReaderNotFoundException | IOException | QueryProcessException e) {
       resultHandler.onError(e);
     }
@@ -320,26 +330,29 @@ public class DataAsyncService extends BaseAsyncService 
implements TSDataService.
   }
 
   @Override
-  public void getPathCount(Node header, List<String> pathsToQuery, int level,
+  public void getPathCount(Node header, int raftId, List<String> pathsToQuery, 
int level,
       AsyncMethodCallback<Integer> resultHandler) {
     try {
-      
resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor().getPathCount(pathsToQuery,
 level));
+      resultHandler
+          
.onComplete(dataGroupMember.getLocalQueryExecutor().getPathCount(pathsToQuery, 
level));
     } catch (CheckConsistencyException | MetadataException e) {
       resultHandler.onError(e);
     }
   }
 
   @Override
-  public void onSnapshotApplied(Node header, List<Integer> slots,
+  public void onSnapshotApplied(Node header, int raftId, List<Integer> slots,
       AsyncMethodCallback<Boolean> resultHandler) {
     resultHandler.onComplete(dataGroupMember.onSnapshotInstalled(slots));
   }
 
   @Override
-  public void peekNextNotNullValue(Node header, long executorId, long 
startTime, long endTime,
+  public void peekNextNotNullValue(Node header, int raftId, long executorId, 
long startTime,
+      long endTime,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
     try {
-      
resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor().peekNextNotNullValue(executorId,
 startTime, endTime));
+      resultHandler.onComplete(dataGroupMember.getLocalQueryExecutor()
+          .peekNextNotNullValue(executorId, startTime, endTime));
     } catch (ReaderNotFoundException | IOException e) {
       resultHandler.onError(e);
     }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
index 38be676..d974bf3 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
@@ -187,7 +187,7 @@ public class DataSyncService extends BaseSyncService 
implements TSDataService.If
   }
 
   @Override
-  public void endQuery(Node header, Node requester, long queryId) throws 
TException {
+  public void endQuery(Node header, int raftId, Node requester, long queryId) 
throws TException {
     try {
       dataGroupMember.getQueryManager().endQuery(requester, queryId);
     } catch (StorageEngineException e) {
@@ -196,7 +196,7 @@ public class DataSyncService extends BaseSyncService 
implements TSDataService.If
   }
 
   @Override
-  public ByteBuffer fetchSingleSeries(Node header, long readerId) throws 
TException {
+  public ByteBuffer fetchSingleSeries(Node header, int raftId, long readerId) 
throws TException {
     try {
       return 
dataGroupMember.getLocalQueryExecutor().fetchSingleSeries(readerId);
     } catch (ReaderNotFoundException | IOException e) {
@@ -205,7 +205,8 @@ public class DataSyncService extends BaseSyncService 
implements TSDataService.If
   }
 
   @Override
-  public ByteBuffer fetchSingleSeriesByTimestamp(Node header, long readerId, 
long timestamp)
+  public ByteBuffer fetchSingleSeriesByTimestamp(Node header, int raftId, long 
readerId,
+      long timestamp)
       throws TException {
     try {
       return dataGroupMember.getLocalQueryExecutor()
@@ -216,7 +217,8 @@ public class DataSyncService extends BaseSyncService 
implements TSDataService.If
   }
 
   @Override
-  public GetAllPathsResult getAllPaths(Node header, List<String> paths, 
boolean withAlias)
+  public GetAllPathsResult getAllPaths(Node header, int raftId, List<String> 
paths,
+      boolean withAlias)
       throws TException {
     try {
       dataGroupMember.syncLeaderWithConsistencyCheck(false);
@@ -227,7 +229,7 @@ public class DataSyncService extends BaseSyncService 
implements TSDataService.If
   }
 
   @Override
-  public Set<String> getAllDevices(Node header, List<String> path) throws 
TException {
+  public Set<String> getAllDevices(Node header, int raftId, List<String> path) 
throws TException {
     try {
       dataGroupMember.syncLeaderWithConsistencyCheck(false);
       return ((CMManager) IoTDB.metaManager).getAllDevices(path);
@@ -237,7 +239,8 @@ public class DataSyncService extends BaseSyncService 
implements TSDataService.If
   }
 
   @Override
-  public List<String> getNodeList(Node header, String path, int nodeLevel) 
throws TException {
+  public List<String> getNodeList(Node header, int raftId, String path, int 
nodeLevel)
+      throws TException {
     try {
       dataGroupMember.syncLeaderWithConsistencyCheck(false);
       return ((CMManager) IoTDB.metaManager).getNodeList(path, nodeLevel);
@@ -247,7 +250,8 @@ public class DataSyncService extends BaseSyncService 
implements TSDataService.If
   }
 
   @Override
-  public Set<String> getChildNodePathInNextLevel(Node header, String path) 
throws TException {
+  public Set<String> getChildNodePathInNextLevel(Node header, int raftId, 
String path)
+      throws TException {
     try {
       dataGroupMember.syncLeaderWithConsistencyCheck(false);
       return ((CMManager) IoTDB.metaManager).getChildNodePathInNextLevel(path);
@@ -257,7 +261,8 @@ public class DataSyncService extends BaseSyncService 
implements TSDataService.If
   }
 
   @Override
-  public ByteBuffer getAllMeasurementSchema(Node header, ByteBuffer 
planBinary) throws TException {
+  public ByteBuffer getAllMeasurementSchema(Node header, int raftId, 
ByteBuffer planBinary)
+      throws TException {
     try {
       return 
dataGroupMember.getLocalQueryExecutor().getAllMeasurementSchema(planBinary);
     } catch (CheckConsistencyException | IOException | MetadataException e) {
@@ -275,7 +280,8 @@ public class DataSyncService extends BaseSyncService 
implements TSDataService.If
   }
 
   @Override
-  public List<String> getUnregisteredTimeseries(Node header, List<String> 
timeseriesList)
+  public List<String> getUnregisteredTimeseries(Node header, int raftId,
+      List<String> timeseriesList)
       throws TException {
     try {
       return 
dataGroupMember.getLocalQueryExecutor().getUnregisteredTimeseries(timeseriesList);
@@ -294,7 +300,7 @@ public class DataSyncService extends BaseSyncService 
implements TSDataService.If
   }
 
   @Override
-  public List<ByteBuffer> getGroupByResult(Node header, long executorId, long 
startTime,
+  public List<ByteBuffer> getGroupByResult(Node header, int raftId, long 
executorId, long startTime,
       long endTime)
       throws TException {
     try {
@@ -324,7 +330,8 @@ public class DataSyncService extends BaseSyncService 
implements TSDataService.If
   }
 
   @Override
-  public int getPathCount(Node header, List<String> pathsToQuery, int level) 
throws TException {
+  public int getPathCount(Node header, int raftId, List<String> pathsToQuery, 
int level)
+      throws TException {
     try {
       return 
dataGroupMember.getLocalQueryExecutor().getPathCount(pathsToQuery, level);
     } catch (CheckConsistencyException | MetadataException e) {
@@ -333,12 +340,13 @@ public class DataSyncService extends BaseSyncService 
implements TSDataService.If
   }
 
   @Override
-  public boolean onSnapshotApplied(Node header, List<Integer> slots) {
+  public boolean onSnapshotApplied(Node header, int raftId, List<Integer> 
slots) {
     return dataGroupMember.onSnapshotInstalled(slots);
   }
 
   @Override
-  public ByteBuffer peekNextNotNullValue(Node header, long executorId, long 
startTime, long endTime)
+  public ByteBuffer peekNextNotNullValue(Node header, int raftId, long 
executorId, long startTime,
+      long endTime)
       throws TException {
     try {
       return dataGroupMember.getLocalQueryExecutor()
diff --git a/thrift/src/main/thrift/cluster.thrift 
b/thrift/src/main/thrift/cluster.thrift
index 6c86a1f..56f1dd4 100644
--- a/thrift/src/main/thrift/cluster.thrift
+++ b/thrift/src/main/thrift/cluster.thrift
@@ -41,6 +41,7 @@ struct HeartBeatRequest {
   // because a data server may play many data groups members, this is used to 
identify which
   // member should process the request or response. Only used in data group 
communication.
   8: optional Node header
+  9: optional int raftId
 }
 
 // follower -> leader
@@ -56,6 +57,7 @@ struct HeartBeatResponse {
   // because a data server may play many data groups members, this is used to 
identify which
   // member should process the request or response. Only used in data group 
communication.
   7: optional Node header
+  8: optional int raftId
 }
 
 // node -> node
@@ -68,8 +70,9 @@ struct ElectionRequest {
   // because a data server may play many data groups members, this is used to 
identify which
   // member should process the request or response. Only used in data group 
communication.
   5: optional Node header
-  6: optional long dataLogLastIndex
-  7: optional long dataLogLastTerm
+  6: optional int raftId
+  7: optional long dataLogLastIndex
+  8: optional long dataLogLastTerm
 }
 
 // leader -> follower
@@ -84,6 +87,7 @@ struct AppendEntryRequest {
   // because a data server may play many data groups members, this is used to 
identify which
   // member should process the request or response. Only used in data group 
communication.
   7: optional Node header
+  8: optional int raftId
 }
 
 // leader -> follower
@@ -98,6 +102,7 @@ struct AppendEntriesRequest {
   // because a data server may play many data groups members, this is used to 
identify which
   // member should process the request or response. Only used in data group 
communication.
   7: optional Node header
+  8: optional int raftId
 }
 
 struct AddNodeResponse {
@@ -138,16 +143,18 @@ struct SendSnapshotRequest {
   1: required binary snapshotBytes
   // for data group
   2: optional Node header
+  3: optional int raftId
 }
 
 struct PullSnapshotRequest {
   1: required list<int> requiredSlots
   // for data group
   2: optional Node header
+  3: optional int raftId
   // set to true if the previous holder has been removed from the cluster.
   // This will make the previous holder read-only so that different new
   // replicas can pull the same snapshot.
-  3: required bool requireReadOnly
+  4: required bool requireReadOnly
 }
 
 struct PullSnapshotResp {
@@ -157,11 +164,13 @@ struct PullSnapshotResp {
 struct ExecutNonQueryReq {
   1: required binary planBytes
   2: optional Node header
+  3: optional int raftId
 }
 
 struct PullSchemaRequest {
   1: required list<string> prefixPaths
   2: optional Node header
+  3: optional int raftId
 }
 
 struct PullSchemaResp {
@@ -175,11 +184,12 @@ struct SingleSeriesQueryRequest {
   4: required long queryId
   5: required Node requester
   6: required Node header
-  7: required int dataTypeOrdinal
-  8: required set<string> deviceMeasurements
-  9: required bool ascending
-  10: required int fetchSize
-  11: required int deduplicatedPathNum
+  7: required int raftId
+  8: required int dataTypeOrdinal
+  9: required set<string> deviceMeasurements
+  10: required bool ascending
+  11: required int fetchSize
+  12: required int deduplicatedPathNum
 }
 
 struct PreviousFillRequest {
@@ -189,8 +199,9 @@ struct PreviousFillRequest {
   4: required long queryId
   5: required Node requester
   6: required Node header
-  7: required int dataTypeOrdinal
-  8: required set<string> deviceMeasurements
+  7: required int raftId
+  8: required int dataTypeOrdinal
+  9: required set<string> deviceMeasurements
 }
 
 // the spec and load of a node, for query coordinating
@@ -204,10 +215,11 @@ struct GetAggrResultRequest {
   3: required int dataTypeOrdinal
   4: optional binary timeFilterBytes
   5: required Node header
-  6: required long queryId
-  7: required Node requestor
-  8: required set<string> deviceMeasurements
-  9: required bool ascending
+  6: required int raftId
+  7: required long queryId
+  8: required Node requestor
+  9: required set<string> deviceMeasurements
+  10: required bool ascending
 }
 
 struct GroupByRequest {
@@ -217,9 +229,10 @@ struct GroupByRequest {
   4: required long queryId
   5: required list<int> aggregationTypeOrdinals
   6: required Node header
-  7: required Node requestor
-  8: required set<string> deviceMeasurements
-  9: required bool ascending
+  7: required int raftId
+  8: required Node requestor
+  9: required set<string> deviceMeasurements
+  10: required bool ascending
 }
 
 struct LastQueryRequest {
@@ -229,7 +242,8 @@ struct LastQueryRequest {
   4: required map<string, set<string>> deviceMeasurements
   5: optional binary filterBytes
   6: required Node header
-  7: required Node requestor
+  7: required int raftId
+  8: required Node requestor
 }
 
 struct GetAllPathsResult {
@@ -293,7 +307,7 @@ service RaftService {
   * Ask the leader for its commit index, used to check whether the node has 
caught up with the
   * leader.
   **/
-  long requestCommitIndex(1:Node header)
+  long requestCommitIndex(1:Node header, 2:int raftId)
 
 
   /**
@@ -306,7 +320,7 @@ service RaftService {
   /**
   * Test if a log of "index" and "term" exists.
   **/
-  bool matchTerm(1:long index, 2:long term, 3:Node header)
+  bool matchTerm(1:long index, 2:long term, 3:Node header, 4:int raftId)
 
   /**
   * When a follower finds that it already has a file in a snapshot locally, it 
calls this
@@ -331,7 +345,7 @@ service TSDataService extends RaftService {
   * @return a ByteBuffer containing the serialized time-value pairs or an 
empty buffer if there
   * are not more results.
   **/
-  binary fetchSingleSeries(1:Node header, 2:long readerId)
+  binary fetchSingleSeries(1:Node header, 2:int raftId, 3:long readerId)
 
    /**
    * Query a time series and generate an IReaderByTimestamp.
@@ -345,32 +359,32 @@ service TSDataService extends RaftService {
    * @return a ByteBuffer containing the serialized value or an empty buffer 
if there
    * are not more results.
    **/
-   binary fetchSingleSeriesByTimestamp(1:Node header, 2:long readerId, 3:long 
timestamp)
+   binary fetchSingleSeriesByTimestamp(1:Node header, 2:int raftId, 3:long 
readerId, 4:long timestamp)
 
   /**
   * Find the local query established for the remote query and release all its 
resource.
   **/
-  void endQuery(1:Node header, 2:Node thisNode, 3:long queryId)
+  void endQuery(1:Node header, 2:int raftId, 3:Node thisNode, 4:long queryId)
 
   /**
   * Given path patterns (paths with wildcard), return all paths they match.
   **/
-  GetAllPathsResult getAllPaths(1:Node header, 2:list<string> path, 3:bool 
withAlias)
+  GetAllPathsResult getAllPaths(1:Node header, 2:int raftId, 3:list<string> 
path, 4:bool withAlias)
 
   /**
    * Given path patterns (paths with wildcard), return all devices they match.
    **/
-  set<string> getAllDevices(1:Node header, 2:list<string> path)
+  set<string> getAllDevices(1:Node header, 2:int raftId, 3:list<string> path)
 
-  list<string> getNodeList(1:Node header, 2:string path, 3:int nodeLevel)
+  list<string> getNodeList(1:Node header, 2:int raftId, 3:string path, 4:int 
nodeLevel)
 
-  set<string> getChildNodePathInNextLevel(1: Node header, 2: string path)
+  set<string> getChildNodePathInNextLevel(1: Node header, 2:int raftId, 3: 
string path)
 
-  binary getAllMeasurementSchema(1: Node header, 2: binary planBinary)
+  binary getAllMeasurementSchema(1: Node header, 2:int raftId, 3: binary 
planBinary)
 
   list<binary> getAggrResult(1:GetAggrResultRequest request)
 
-  list<string> getUnregisteredTimeseries(1: Node header, 2: list<string> 
timeseriesList)
+  list<string> getUnregisteredTimeseries(1: Node header, 2:int raftId, 3: 
list<string> timeseriesList)
 
   PullSnapshotResp pullSnapshot(1:PullSnapshotRequest request)
 
@@ -385,7 +399,7 @@ service TSDataService extends RaftService {
   * @return the serialized AggregationResults, each is the result of one of 
the previously
   * required aggregations, and their orders are the same.
   **/
-  list<binary> getGroupByResult(1:Node header, 2:long executorId, 3:long 
startTime, 4:long endTime)
+  list<binary> getGroupByResult(1:Node header, 2:int raftId, 3:long 
executorId, 4:long startTime, 5:long endTime)
 
 
   /**
@@ -410,15 +424,15 @@ service TSDataService extends RaftService {
   **/
   binary last(1: LastQueryRequest request)
 
-  int getPathCount(1: Node header 2: list<string> pathsToQuery 3: int level)
+  int getPathCount(1: Node header, 2:int raftId, 3: list<string> pathsToQuery, 
4: int level)
 
   /**
   * During slot transfer, when a member has pulled snapshot from a group, the 
member will use this
   * method to inform the group that one replica of such slots has been pulled.
   **/
-  bool onSnapshotApplied(1: Node header 2: list<int> slots)
+  bool onSnapshotApplied(1: Node header, 2:int raftId, 3: list<int> slots)
 
-  binary peekNextNotNullValue(1: Node header, 2: long executorId, 3: long 
startTime, 4: long
+  binary peekNextNotNullValue(1: Node header, 2:int raftId, 3: long 
executorId, 4: long startTime, 5: long
   endTime)
 
 }

Reply via email to