This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_scalability
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 327eb7e80d7165931b4fa41d4bb1c25eebd38f65
Author: lta <[email protected]>
AuthorDate: Thu Jan 14 15:40:58 2021 +0800

    1. fix ut tests
    2. The two-stage relative order problem of double logs is solved.
---
 .../apache/iotdb/cluster/config/ClusterConfig.java |   2 +-
 .../iotdb/cluster/log/applier/MetaLogApplier.java  |   2 +-
 .../iotdb/cluster/log/logtypes/AddNodeLog.java     |  14 ++-
 .../iotdb/cluster/log/logtypes/RemoveNodeLog.java  |  14 ++-
 .../iotdb/cluster/log/manage/RaftLogManager.java   |   4 +
 .../iotdb/cluster/partition/NodeRemovalResult.java |   1 +
 .../partition/balancer/DefaultSlotBalancer.java    | 118 ++++++++++++++++++
 .../cluster/partition/balancer/SlotBalancer.java   |  43 +++++++
 .../iotdb/cluster/partition/slot/SlotManager.java  |  17 ++-
 .../cluster/partition/slot/SlotPartitionTable.java | 115 +++++++-----------
 .../iotdb/cluster/partition/slot/SlotStrategy.java |  18 ++-
 .../iotdb/cluster/query/ClusterPlanRouter.java     |   2 +-
 .../iotdb/cluster/server/DataClusterServer.java    |   4 +-
 .../cluster/server/member/DataGroupMember.java     |  15 ++-
 .../cluster/server/member/MetaGroupMember.java     | 132 +++------------------
 .../iotdb/cluster/server/member/RaftMember.java    |  22 ++--
 .../iotdb/cluster/common/TestAsyncDataClient.java  |  27 +++--
 .../org/apache/iotdb/cluster/common/TestUtils.java |   2 +
 .../cluster/server/member/DataGroupMemberTest.java |  30 ++++-
 .../iotdb/cluster/server/member/MemberTest.java    |  34 ++++--
 .../cluster/server/member/MetaGroupMemberTest.java |  55 +++++++--
 .../apache/iotdb/db/qp/physical/sys/LogPlan.java   |   1 +
 22 files changed, 421 insertions(+), 251 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 05745fd..621149d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -46,7 +46,7 @@ public class ClusterConfig {
   private int replicationNum = 2;
 
   @ClusterConsistent
-  private int multiRaftFactor = 2;
+  private int multiRaftFactor = 1;
 
   @ClusterConsistent
   private String clusterName = "default";
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
index 94437ae..aaa03a4 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
@@ -82,7 +82,7 @@ public class MetaLogApplier extends BaseApplier {
     LogPlan plan = new LogPlan(log.serialize());
     TSStatus status = member.executeNonQueryPlan(plan);
     if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      throw new ChangeMembershipException(String.format("apply %s failed", 
log));
+      throw new ChangeMembershipException(String.format("apply %s failed with 
status {%s}", log, status));
     }
   }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
index 824c3f2..380ba08 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
@@ -37,6 +37,8 @@ public class AddNodeLog extends Log {
 
   private Node newNode;
 
+  private long metaLogIndex;
+
   public AddNodeLog(ByteBuffer partitionTable, Node newNode) {
     this.partitionTable = partitionTable;
     this.newNode = newNode;
@@ -45,6 +47,14 @@ public class AddNodeLog extends Log {
   public AddNodeLog() {
   }
 
+  public long getMetaLogIndex() {
+    return metaLogIndex;
+  }
+
+  public void setMetaLogIndex(long metaLogIndex) {
+    this.metaLogIndex = metaLogIndex;
+  }
+
   public void setPartitionTable(ByteBuffer partitionTable) {
     this.partitionTable = partitionTable;
   }
@@ -68,10 +78,11 @@ public class AddNodeLog extends Log {
       dataOutputStream.writeByte(Types.ADD_NODE.ordinal());
       dataOutputStream.writeLong(getCurrLogIndex());
       dataOutputStream.writeLong(getCurrLogTerm());
+      dataOutputStream.writeLong(getMetaLogIndex());
 
       SerializeUtils.serialize(newNode, dataOutputStream);
 
-      dataOutputStream.write(partitionTable.array().length);
+      dataOutputStream.writeInt(partitionTable.array().length);
       dataOutputStream.write(partitionTable.array());
     } catch (IOException e) {
       // ignored
@@ -87,6 +98,7 @@ public class AddNodeLog extends Log {
     // ipLength(int), inBytes(byte[]), port(int), identifier(int), 
dataPort(int)
     setCurrLogIndex(buffer.getLong());
     setCurrLogTerm(buffer.getLong());
+    setMetaLogIndex(buffer.getLong());
 
     newNode = new Node();
     SerializeUtils.deserialize(newNode, buffer);
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
index 800b77d..22af482 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java
@@ -34,6 +34,8 @@ public class RemoveNodeLog extends Log {
 
   private Node removedNode;
 
+  private long metaLogIndex;
+
   public RemoveNodeLog(ByteBuffer partitionTable,
       Node removedNode) {
     this.partitionTable = partitionTable;
@@ -43,6 +45,14 @@ public class RemoveNodeLog extends Log {
   public RemoveNodeLog() {
   }
 
+  public long getMetaLogIndex() {
+    return metaLogIndex;
+  }
+
+  public void setMetaLogIndex(long metaLogIndex) {
+    this.metaLogIndex = metaLogIndex;
+  }
+
   public ByteBuffer getPartitionTable() {
     return partitionTable;
   }
@@ -58,10 +68,11 @@ public class RemoveNodeLog extends Log {
       dataOutputStream.writeByte(Types.REMOVE_NODE.ordinal());
       dataOutputStream.writeLong(getCurrLogIndex());
       dataOutputStream.writeLong(getCurrLogTerm());
+      dataOutputStream.writeLong(getMetaLogIndex());
 
       SerializeUtils.serialize(removedNode, dataOutputStream);
 
-      dataOutputStream.write(partitionTable.array().length);
+      dataOutputStream.writeInt(partitionTable.array().length);
       dataOutputStream.write(partitionTable.array());
     } catch (IOException e) {
       // ignored
@@ -73,6 +84,7 @@ public class RemoveNodeLog extends Log {
   public void deserialize(ByteBuffer buffer) {
     setCurrLogIndex(buffer.getLong());
     setCurrLogTerm(buffer.getLong());
+    setMetaLogIndex(buffer.getLong());
 
     removedNode = new Node();
     SerializeUtils.deserialize(removedNode, buffer);
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index bd65c26..c57ee06 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -946,4 +946,8 @@ public abstract class RaftLogManager {
   public long getBlockAppliedCommitIndex() {
     return blockAppliedCommitIndex;
   }
+
+  public RaftLogManager(LogApplier logApplier) {
+    this.logApplier = logApplier;
+  }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
index 4193ffd..86ff9a2 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java
@@ -93,4 +93,5 @@ public class NodeRemovalResult {
       newGroupList.add(group);
     }
   }
+
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/balancer/DefaultSlotBalancer.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/balancer/DefaultSlotBalancer.java
new file mode 100644
index 0000000..eb1825f
--- /dev/null
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/balancer/DefaultSlotBalancer.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.partition.balancer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.partition.PartitionGroup;
+import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
+
+/**
+ * This balancer aims to avg slots to all raft groups.
+ */
+public class DefaultSlotBalancer implements SlotBalancer {
+
+  private int multiRaftFactor =
+      ClusterDescriptor.getInstance().getConfig().getMultiRaftFactor();
+  private SlotPartitionTable table;
+
+  public DefaultSlotBalancer(SlotPartitionTable partitionTable) {
+    this.table = partitionTable;
+  }
+
+  /**
+   * Move last slots from each group whose slot number is bigger than the new 
average to the new node.
+   */
+  @Override
+  public void moveSlotsToNew(Node newNode, List<Node> oldRing) {
+    Map<RaftNode, List<Integer>> nodeSlotMap = table.getAllNodeSlots();
+    Map<RaftNode, Map<Integer, PartitionGroup>> previousNodeMap = 
table.getPreviousNodeMap();
+    RaftNode[] slotNodes = table.getSlotNodes();
+
+    // as a node is added, the average slots for each node decrease
+    // move the slots to the new node if any previous node have more slots 
than the new average
+    int newAvg = table.getTotalSlotNumbers() / table.getAllNodes().size() / 
multiRaftFactor;
+    Map<RaftNode, List<Integer>> newNodeSlotMap = new HashMap<>();
+    int raftId = 0;
+    for (int i = 0; i < multiRaftFactor; i++) {
+      RaftNode raftNode = new RaftNode(newNode, i);
+      newNodeSlotMap.putIfAbsent(raftNode, new ArrayList<>());
+      previousNodeMap.putIfAbsent(raftNode, new HashMap<>());
+    }
+    for (Entry<RaftNode, List<Integer>> entry : nodeSlotMap.entrySet()) {
+      List<Integer> slots = entry.getValue();
+      int transferNum = slots.size() - newAvg;
+      if (transferNum > 0) {
+        RaftNode curNode = new RaftNode(newNode, raftId);
+        int numToMove = transferNum;
+        if (raftId != multiRaftFactor - 1) {
+          numToMove = Math.min(numToMove, newAvg - 
newNodeSlotMap.get(curNode).size());
+        }
+        List<Integer> slotsToMove = slots
+            .subList(slots.size() - transferNum, slots.size() - transferNum + 
numToMove);
+        newNodeSlotMap.get(curNode).addAll(slotsToMove);
+        for (Integer slot : slotsToMove) {
+          // record what node previously hold the integer
+          previousNodeMap.get(curNode).put(slot, 
table.getHeaderGroup(entry.getKey(), oldRing));
+          slotNodes[slot] = curNode;
+        }
+        transferNum -= numToMove;
+        if (transferNum > 0) {
+          curNode = new RaftNode(newNode, ++raftId);
+          slotsToMove = slots.subList(slots.size() - transferNum, 
slots.size());
+          newNodeSlotMap.get(curNode).addAll(slotsToMove);
+          for (Integer slot : slotsToMove) {
+            // record what node previously hold the integer
+            previousNodeMap.get(curNode).put(slot, 
table.getHeaderGroup(entry.getKey(), oldRing));
+            slotNodes[slot] = curNode;
+          }
+        }
+      }
+    }
+    nodeSlotMap.putAll(newNodeSlotMap);
+  }
+
+  @Override
+  public Map<RaftNode, List<Integer>> retrieveSlots(Node target) {
+    Map<RaftNode, List<Integer>> nodeSlotMap = table.getAllNodeSlots();
+    RaftNode[] slotNodes = table.getSlotNodes();
+    List<Node> nodeRing = table.getAllNodes();
+
+    Map<RaftNode, List<Integer>> newHolderSlotMap = new HashMap<>();
+    for(int raftId = 0 ; raftId < multiRaftFactor; raftId++) {
+      RaftNode raftNode = new RaftNode(target, raftId);
+      List<Integer> slots = nodeSlotMap.remove(raftNode);
+      for (int i = 0; i < slots.size(); i++) {
+        int slot = slots.get(i);
+        RaftNode newHolder = new RaftNode(nodeRing.get(i % nodeRing.size()), 
raftId);
+        slotNodes[slot] = newHolder;
+        nodeSlotMap.computeIfAbsent(newHolder, n -> new 
ArrayList<>()).add(slot);
+        newHolderSlotMap.computeIfAbsent(newHolder, n -> new 
ArrayList<>()).add(slot);
+      }
+    }
+    return newHolderSlotMap;
+  }
+}
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/balancer/SlotBalancer.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/balancer/SlotBalancer.java
new file mode 100644
index 0000000..95f806b
--- /dev/null
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/balancer/SlotBalancer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.partition.balancer;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
+
+/**
+ * When add/remove node, the slots need to be redistributed.
+ */
+public interface SlotBalancer {
+
+  /**
+   * When add a new node, new raft groups will take over some hash slots from 
another raft groups.
+   */
+  void moveSlotsToNew(Node newNode, List<Node> oldRing);
+
+  /**
+   * When remove a old node, all hash slots of the removed groups will 
assigned to other raft groups.
+   * @param target the node to be removed
+   */
+  Map<RaftNode, List<Integer>> retrieveSlots(Node target);
+
+}
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java
index f0fc11c..7165145 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java
@@ -1,5 +1,20 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with this work 
for additional information regarding copyright ownership.  The ASF licenses 
this file to you under the Apache License, Version 2.0 (the "License"); you may 
not use this file except in compliance with the License.  You may obtain a copy 
of the License at      http://www.apache.org/licenses/LICENSE-2.0  Unless 
required by applicable law or ag [...]
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.iotdb.cluster.partition.slot;
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
index f441e4a..255fb22 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java
@@ -1,5 +1,20 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with this work 
for additional information regarding copyright ownership.  The ASF licenses 
this file to you under the Apache License, Version 2.0 (the "License"); you may 
not use this file except in compliance with the License.  You may obtain a copy 
of the License at      http://www.apache.org/licenses/LICENSE-2.0  Unless 
required by applicable law or ag [...]
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.iotdb.cluster.partition.slot;
@@ -27,6 +42,8 @@ import org.apache.iotdb.cluster.partition.NodeAdditionResult;
 import org.apache.iotdb.cluster.partition.NodeRemovalResult;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.PartitionTable;
+import org.apache.iotdb.cluster.partition.balancer.DefaultSlotBalancer;
+import org.apache.iotdb.cluster.partition.balancer.SlotBalancer;
 import org.apache.iotdb.cluster.partition.slot.SlotStrategy.DefaultStrategy;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
@@ -78,6 +95,8 @@ public class SlotPartitionTable implements PartitionTable {
   // last log index that modifies the partition table
   private long lastLogIndex = -1;
 
+  private SlotBalancer slotBalancer = new DefaultSlotBalancer(this);
+
   /**
    * only used for deserialize.
    *
@@ -105,6 +124,14 @@ public class SlotPartitionTable implements PartitionTable {
     SlotPartitionTable.slotStrategy = slotStrategy;
   }
 
+  public SlotBalancer getLoadBalancer() {
+    return slotBalancer;
+  }
+
+  public void setLoadBalancer(SlotBalancer slotBalancer) {
+    this.slotBalancer = slotBalancer;
+  }
+
   private void init(Collection<Node> nodes) {
     logger.info("Initializing a new partition table");
     nodeRing.addAll(nodes);
@@ -167,7 +194,7 @@ public class SlotPartitionTable implements PartitionTable {
     return ret;
   }
 
-  private PartitionGroup getHeaderGroup(RaftNode raftNode, List<Node> 
nodeRing) {
+  public PartitionGroup getHeaderGroup(RaftNode raftNode, List<Node> nodeRing) 
{
     PartitionGroup ret = new PartitionGroup(raftNode.getRaftId());
 
     // assuming the nodes are [1,2,3,4,5]
@@ -284,7 +311,7 @@ public class SlotPartitionTable implements PartitionTable {
 
     // the slots movement is only done logically, the new node itself will 
pull data from the
     // old node
-    moveSlotsToNew(node, oldRing);
+    slotBalancer.moveSlotsToNew(node, oldRing);
 
   }
 
@@ -304,55 +331,6 @@ public class SlotPartitionTable implements PartitionTable {
     return result;
   }
 
-
-  /**
-   * Move last slots from each group whose slot number is bigger than the new 
average to the new
-   * node.
-   *
-   * @param newNode
-   */
-  private void moveSlotsToNew(Node newNode, List<Node> oldRing) {
-    // as a node is added, the average slots for each node decrease
-    // move the slots to the new node if any previous node have more slots 
than the new average
-    int newAvg = totalSlotNumbers / nodeRing.size() / multiRaftFactor;
-    int raftId = 0;
-    for (int i = 0; i < multiRaftFactor; i++) {
-      RaftNode raftNode = new RaftNode(newNode, i);
-      nodeSlotMap.putIfAbsent(raftNode, new ArrayList<>());
-      previousNodeMap.putIfAbsent(raftNode, new HashMap<>());
-    }
-    for (Entry<RaftNode, List<Integer>> entry : nodeSlotMap.entrySet()) {
-      List<Integer> slots = entry.getValue();
-      int transferNum = slots.size() - newAvg;
-      if (transferNum > 0) {
-        RaftNode curNode = new RaftNode(newNode, raftId);
-        int numToMove = transferNum;
-        if (raftId != multiRaftFactor - 1) {
-          numToMove = Math.min(numToMove, newAvg - 
nodeSlotMap.get(curNode).size());
-        }
-        List<Integer> slotsToMove = slots
-            .subList(slots.size() - transferNum, slots.size() - transferNum + 
numToMove);
-        nodeSlotMap.get(curNode).addAll(slotsToMove);
-        for (Integer slot : slotsToMove) {
-          // record what node previously hold the integer
-          previousNodeMap.get(curNode).put(slot, 
getHeaderGroup(entry.getKey(), oldRing));
-          slotNodes[slot] = curNode;
-        }
-        transferNum -= numToMove;
-        if (transferNum > 0) {
-          curNode = new RaftNode(newNode, ++raftId);
-          slotsToMove = slots.subList(slots.size() - transferNum, 
slots.size());
-          nodeSlotMap.get(curNode).addAll(slotsToMove);
-          for (Integer slot : slotsToMove) {
-            // record what node previously hold the integer
-            previousNodeMap.get(curNode).put(slot, 
getHeaderGroup(entry.getKey(), oldRing));
-            slotNodes[slot] = curNode;
-          }
-        }
-      }
-    }
-  }
-
   @Override
   public List<PartitionGroup> getLocalGroups() {
     return localGroups;
@@ -381,8 +359,8 @@ public class SlotPartitionTable implements PartitionTable {
         Map<Integer, PartitionGroup> prevHolders = nodeMapEntry.getValue();
         dataOutputStream.writeInt(prevHolders.size());
         for (Entry<Integer, PartitionGroup> integerNodeEntry : 
prevHolders.entrySet()) {
-          dataOutputStream.writeInt(integerNodeEntry.getKey());
           integerNodeEntry.getValue().serialize(dataOutputStream);
+          dataOutputStream.writeInt(integerNodeEntry.getKey());
         }
       }
 
@@ -398,7 +376,7 @@ public class SlotPartitionTable implements PartitionTable {
     long newLastLogIndex = buffer.getLong();
 
     // judge whether the partition table of byte buffer is out of date
-    if (lastLogIndex >= newLastLogIndex) {
+    if (lastLogIndex != -1 && lastLogIndex >= newLastLogIndex) {
       return lastLogIndex <= newLastLogIndex;
     }
     lastLogIndex = newLastLogIndex;
@@ -455,6 +433,10 @@ public class SlotPartitionTable implements PartitionTable {
     return nodeRing;
   }
 
+  public Map<RaftNode, Map<Integer, PartitionGroup>> getPreviousNodeMap() {
+    return previousNodeMap;
+  }
+
   public Map<Integer, PartitionGroup> getPreviousNodeMap(RaftNode raftNode) {
     return previousNodeMap.get(raftNode);
   }
@@ -488,7 +470,8 @@ public class SlotPartitionTable implements PartitionTable {
         Objects.equals(nodeRing, that.nodeRing) &&
         Objects.equals(nodeSlotMap, that.nodeSlotMap) &&
         Arrays.equals(slotNodes, that.slotNodes) &&
-        Objects.equals(previousNodeMap, that.previousNodeMap);
+        Objects.equals(previousNodeMap, that.previousNodeMap) &&
+        lastLogIndex == that.lastLogIndex;
   }
 
   @Override
@@ -548,7 +531,7 @@ public class SlotPartitionTable implements PartitionTable {
 
       // the slots movement is only done logically, the new node itself will 
pull data from the
       // old node
-      Map<RaftNode, List<Integer>> raftNodeListMap = retrieveSlots(target);
+      Map<RaftNode, List<Integer>> raftNodeListMap = 
slotBalancer.retrieveSlots(target);
       result.addNewSlotOwners(raftNodeListMap);
       this.nodeRemovalResult = result;
     }
@@ -559,22 +542,6 @@ public class SlotPartitionTable implements PartitionTable {
     return nodeRemovalResult;
   }
 
-  private Map<RaftNode, List<Integer>> retrieveSlots(Node target) {
-    Map<RaftNode, List<Integer>> newHolderSlotMap = new HashMap<>();
-    for(int raftId = 0 ; raftId < multiRaftFactor; raftId++) {
-      RaftNode raftNode = new RaftNode(target, raftId);
-      List<Integer> slots = nodeSlotMap.remove(raftNode);
-      for (int i = 0; i < slots.size(); i++) {
-        int slot = slots.get(i);
-        RaftNode newHolder = new RaftNode(nodeRing.get(i % nodeRing.size()), 
raftId);
-        slotNodes[slot] = newHolder;
-        nodeSlotMap.computeIfAbsent(newHolder, n -> new 
ArrayList<>()).add(slot);
-        newHolderSlotMap.computeIfAbsent(newHolder, n -> new 
ArrayList<>()).add(slot);
-      }
-    }
-    return newHolderSlotMap;
-  }
-
   @Override
   public List<PartitionGroup> getGlobalGroups() {
     // preventing a thread from getting incomplete globalGroups
@@ -609,4 +576,8 @@ public class SlotPartitionTable implements PartitionTable {
   public synchronized void setLastLogIndex(long lastLogIndex) {
     this.lastLogIndex = Math.max(this.lastLogIndex, lastLogIndex);
   }
+
+  public RaftNode[] getSlotNodes() {
+    return slotNodes;
+  }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotStrategy.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotStrategy.java
index b5a45c0..5144bcb 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotStrategy.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotStrategy.java
@@ -1,8 +1,22 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with this work 
for additional information regarding copyright ownership.  The ASF licenses 
this file to you under the Apache License, Version 2.0 (the "License"); you may 
not use this file except in compliance with the License.  You may obtain a copy 
of the License at      http://www.apache.org/licenses/LICENSE-2.0  Unless 
required by applicable law or ag [...]
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
-
 package org.apache.iotdb.cluster.partition.slot;
 
 import static org.apache.iotdb.cluster.config.ClusterConstant.HASH_SALT;
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
index 0c8cf25..487b88d 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
@@ -144,7 +144,7 @@ public class ClusterPlanRouter {
     throw new UnsupportedPlanException(plan);
   }
 
-  private Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(LogPlan plan)
+  protected Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(LogPlan plan)
       throws UnknownLogTypeException, UnsupportedPlanException {
     Map<PhysicalPlan, PartitionGroup> result = new HashMap<>();
     Log log = LogParser.getINSTANCE().parse(plan.getLog());
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 16c1da6..3d233ed 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -494,7 +494,7 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
 
   public void preAddNodeForDataGroup(AddNodeLog log, DataGroupMember 
targetDataGroupMember) {
     // Make sure the previous add/remove node log has applied
-    metaGroupMember.syncLeader();
+    metaGroupMember.waitUtil(log.getMetaLogIndex() - 1);
 
     // Check the validity of the partition table
     if 
(!metaGroupMember.getPartitionTable().deserialize(log.getPartitionTable())) {
@@ -601,7 +601,7 @@ public class DataClusterServer extends RaftServer 
implements TSDataService.Async
 
   public void preRemoveNodeForDataGroup(RemoveNodeLog log, DataGroupMember 
targetDataGroupMember) {
     // Make sure the previous add/remove node log has applied
-    metaGroupMember.syncLeader();
+    metaGroupMember.waitUtil(log.getMetaLogIndex() - 1);
 
     // Check the validity of the partition table
     if 
(!metaGroupMember.getPartitionTable().deserialize(log.getPartitionTable())) {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 077d61d..df45b53 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -261,9 +261,9 @@ public class DataGroupMember extends RaftMember {
     }
   }
 
-  public void preAddNode(Node node) {
+  public boolean preAddNode(Node node) {
     if (allNodes.contains(node)) {
-      return;
+      return false;
     }
     synchronized (allNodes) {
       int insertIndex = -1;
@@ -288,6 +288,7 @@ public class DataGroupMember extends RaftMember {
         // the group
         logger.debug("{}: Node {} is inserted into the data group {}", name, 
node, allNodes);
       }
+      return insertIndex > 0;
     }
   }
 
@@ -299,6 +300,7 @@ public class DataGroupMember extends RaftMember {
    * otherwise
    */
   public boolean addNode(Node node, NodeAdditionResult result) {
+    syncLeader();
 
     // mark slots that do not belong to this group any more
     Set<Integer> lostSlots = ((SlotNodeAdditionResult) result).getLostSlots()
@@ -604,9 +606,9 @@ public class DataGroupMember extends RaftMember {
    * @param partitionId
    * @param isSeq
    */
-  void closePartition(String storageGroupName, long partitionId, boolean 
isSeq) {
+  boolean closePartition(String storageGroupName, long partitionId, boolean 
isSeq) {
     if (character != NodeCharacter.LEADER) {
-      return;
+      return false;
     }
     CloseFileLog log = new CloseFileLog(storageGroupName, partitionId, isSeq);
     synchronized (logManager) {
@@ -618,10 +620,11 @@ public class DataGroupMember extends RaftMember {
       logger.info("Send the close file request of {} to other nodes", log);
     }
     try {
-      appendLogInGroup(log);
+      return appendLogInGroup(log);
     } catch (LogExecutionException e) {
       logger.error("Cannot close partition {}#{} seq:{}", storageGroupName, 
partitionId, isSeq, e);
     }
+    return false;
   }
 
   public boolean flushFileWhenDoSnapshot(
@@ -762,6 +765,8 @@ public class DataGroupMember extends RaftMember {
    */
   @SuppressWarnings("java:S2445") // the reference of allNodes is unchanged
   public void removeNode(Node removedNode, NodeRemovalResult removalResult) {
+    syncLeader();
+
     synchronized (allNodes) {
       if (allNodes.contains(removedNode)) {
         // update the group if the deleted node was in it
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 22520e7..1534323 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -302,20 +302,24 @@ public class MetaGroupMember extends RaftMember {
    * closed by the method.
    *
    */
-  public void closePartition(String storageGroupName, long partitionId, 
boolean isSeq) {
+  public boolean closePartition(String storageGroupName, long partitionId, 
boolean isSeq) {
     RaftNode raftNode = partitionTable.routeToHeaderByTime(storageGroupName,
         partitionId * StorageEngine.getTimePartitionInterval());
     DataGroupMember localDataMember = getLocalDataMember(raftNode);
     if (localDataMember == null || localDataMember.getCharacter() != 
NodeCharacter.LEADER) {
-      return;
+      return false;
     }
-    localDataMember.closePartition(storageGroupName, partitionId, isSeq);
+    return localDataMember.closePartition(storageGroupName, partitionId, 
isSeq);
   }
 
   public DataClusterServer getDataClusterServer() {
     return dataClusterServer;
   }
 
+  public void setDataClusterServer(DataClusterServer dataClusterServer) {
+    this.dataClusterServer = dataClusterServer;
+  }
+
   public DataHeartbeatServer getDataHeartbeatServer() {
     return dataHeartbeatServer;
   }
@@ -864,6 +868,7 @@ public class MetaGroupMember extends RaftMember {
       addNodeLog.setPartitionTable(partitionTable.serialize());
       addNodeLog.setCurrLogTerm(getTerm().get());
       addNodeLog.setCurrLogIndex(logManager.getLastLogIndex() + 1);
+      addNodeLog.setMetaLogIndex(logManager.getLastLogIndex() + 1);
 
       addNodeLog.setNewNode(node);
 
@@ -1062,120 +1067,6 @@ public class MetaGroupMember extends RaftMember {
     return null;
   }
 
-  /**
-   * Send the log the all data groups and return a success only when each 
group's quorum has
-   * accepted this log.
-   */
-//  private AppendLogResult sendLogToAllGroups(Log log) {
-//    List<Node> nodeRing = partitionTable.getAllNodes();
-//
-//    AtomicLong newLeaderTerm = new AtomicLong(term.get());
-//    AtomicBoolean leaderShipStale = new AtomicBoolean(false);
-//    AppendEntryRequest request = buildAppendEntryRequest(log, true);
-//
-//    // ask for votes from each node
-//    int[] groupRemainings = askGroupVotes(nodeRing, request, 
leaderShipStale, log, newLeaderTerm);
-//
-//    if (!leaderShipStale.get()) {
-//      // if all quorums of all groups have received this log, it is 
considered succeeded.
-//      for (int remaining : groupRemainings) {
-//        if (remaining > 0) {
-//          return AppendLogResult.TIME_OUT;
-//        }
-//      }
-//    } else {
-//      return AppendLogResult.LEADERSHIP_STALE;
-//    }
-//
-//    return AppendLogResult.OK;
-//  }
-
-  /**
-   * Send "request" to each node in "nodeRing" and when a node returns a 
success, decrease all
-   * counters of the groups it is in of "groupRemainings"
-   *
-   * @return a int array indicating how many votes are left in each group to 
make an agreement
-   */
-  @SuppressWarnings({"java:S2445", "java:S2274"})
-  // groupRemaining is shared with the handlers,
-  // and we do not wait infinitely to enable timeouts
-//  private int[] askGroupVotes(List<Node> nodeRing,
-//      AppendEntryRequest request, AtomicBoolean leaderShipStale, Log log,
-//      AtomicLong newLeaderTerm) {
-//    // each node will be the header of a group, we use the node to represent 
the group
-//    int nodeSize = nodeRing.size();
-//    // the decreasing counters of how many nodes in a group has received the 
log, each time a
-//    // node receive the log, the counters of all groups it is in will 
decrease by 1
-//    int[] groupRemainings = new int[nodeSize];
-//    // a group is considered successfully received the log if such members 
receive the log
-//    int groupQuorum = REPLICATION_NUM / 2 + 1;
-//    Arrays.fill(groupRemainings, groupQuorum);
-//
-//    synchronized (groupRemainings) {
-//      // ask a vote from every node
-//      for (int i = 0; i < nodeSize; i++) {
-//        Node node = nodeRing.get(i);
-//        if (node.equals(thisNode)) {
-//          // this node automatically gives an agreement, decrease counters 
of all groups the local
-//          // node is in
-//          for (int j = 0; j < REPLICATION_NUM; j++) {
-//            int groupIndex = i - j;
-//            if (groupIndex < 0) {
-//              groupIndex += groupRemainings.length;
-//            }
-//            groupRemainings[groupIndex]--;
-//          }
-//        } else {
-//          askRemoteGroupVote(node, groupRemainings, i, leaderShipStale, log, 
newLeaderTerm,
-//              request);
-//        }
-//      }
-//
-//      try {
-//        groupRemainings.wait(RaftServer.getWriteOperationTimeoutMS());
-//      } catch (InterruptedException e) {
-//        Thread.currentThread().interrupt();
-//        logger.error("Unexpected interruption when waiting for the group 
votes", e);
-//      }
-//    }
-//    return groupRemainings;
-//  }
-
-  private void askRemoteGroupVote(Node node, int[] groupRemainings, int 
nodeIndex,
-      AtomicBoolean leaderShipStale, Log log,
-      AtomicLong newLeaderTerm, AppendEntryRequest request) {
-    AppendGroupEntryHandler handler = new 
AppendGroupEntryHandler(groupRemainings,
-        nodeIndex, node, leaderShipStale, log, newLeaderTerm, this);
-    if (config.isUseAsyncServer()) {
-      AsyncMetaClient client = (AsyncMetaClient) getAsyncClient(node);
-      try {
-        if (client != null) {
-          client.appendEntry(request, handler);
-        }
-      } catch (TException e) {
-        logger.error("Cannot send log to node {}", node, e);
-      }
-    } else {
-      SyncMetaClient client = (SyncMetaClient) getSyncClient(node);
-      if (client == null) {
-        logger.error("No available client for {}", node);
-        return;
-      }
-      getSerialToParallelPool().submit(() -> {
-        try {
-          handler.onComplete(client.appendEntry(request));
-        } catch (TException e) {
-          client.getInputProtocol().getTransport().close();
-          handler.onError(e);
-        } finally {
-          ClientUtils.putBackSyncClient(client);
-        }
-      });
-    }
-
-  }
-
-
   public Set<Node> getIdConflictNodes() {
     return idConflictNodes;
   }
@@ -1519,7 +1410,7 @@ public class MetaGroupMember extends RaftMember {
    *
    * @param planGroupMap sub-plan -> belong data group pairs
    */
-  private TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, 
PhysicalPlan plan) {
+  public TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, 
PhysicalPlan plan) {
     // the error codes from the groups that cannot execute the plan
     TSStatus status;
     if (planGroupMap.size() == 1) {
@@ -2009,6 +1900,7 @@ public class MetaGroupMember extends RaftMember {
       removeNodeLog.setPartitionTable(partitionTable.serialize());
       removeNodeLog.setCurrLogTerm(getTerm().get());
       removeNodeLog.setCurrLogIndex(logManager.getLastLogIndex() + 1);
+      removeNodeLog.setMetaLogIndex(logManager.getLastLogIndex() + 1);
 
       removeNodeLog.setRemovedNode(target);
 
@@ -2203,4 +2095,8 @@ public class MetaGroupMember extends RaftMember {
   public void setClientProvider(DataClientProvider dataClientProvider) {
     this.dataClientProvider = dataClientProvider;
   }
+
+  public void setRouter(ClusterPlanRouter router) {
+    this.router = router;
+  }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 57f22bc..121892b 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -275,6 +275,7 @@ public abstract class RaftMember {
     this.asyncHeartbeatClientPool = asyncHeartbeatPool;
     this.syncHeartbeatClientPool = syncHeartbeatPool;
     this.asyncSendLogClientPool = asyncSendLogClientPool;
+    this.stopStatus = new StopStatus();
   }
 
   /**
@@ -836,8 +837,6 @@ public abstract class RaftMember {
    * @return true if this node has caught up before timeout, false otherwise
    */
   private boolean waitUntilCatchUp() {
-    long startTime = System.currentTimeMillis();
-    long waitedTime = 0;
     long leaderCommitId;
     try {
       leaderCommitId = 
ClusterDescriptor.getInstance().getConfig().isUseAsyncServer() ?
@@ -854,16 +853,25 @@ public abstract class RaftMember {
       logger.error(MSG_NO_LEADER_COMMIT_INDEX, name, leader.get(), e);
       return false;
     }
+    logger.debug("Start to sync with leader, leader commit id is {}", 
leaderCommitId);
+    return waitUtil(leaderCommitId);
+  }
 
+  /**
+   * Wait until local commit index becomes not less than target log index
+   */
+  public boolean waitUtil(long targetLogIndex) {
+    long startTime = System.currentTimeMillis();
+    long waitedTime = 0;
     while (waitedTime < RaftServer.getSyncLeaderMaxWaitMs()) {
       try {
         long localAppliedId = logManager.getMaxHaveAppliedCommitIndex();
-        logger.debug("{}: synchronizing commitIndex {}/{}", name, 
localAppliedId, leaderCommitId);
-        if (leaderCommitId <= localAppliedId) {
+        logger.debug("{}: synchronizing commitIndex {}/{}", name, 
localAppliedId, targetLogIndex);
+        if (targetLogIndex <= localAppliedId) {
           // this node has caught up
           if (logger.isDebugEnabled()) {
             waitedTime = System.currentTimeMillis() - startTime;
-            logger.debug("{}: synchronized with the leader after {}ms", name, 
waitedTime);
+            logger.debug("{}: synchronized to target index {} after {}ms", 
name, targetLogIndex, waitedTime);
           }
           return true;
         }
@@ -879,7 +887,7 @@ public abstract class RaftMember {
         logger.error(MSG_NO_LEADER_COMMIT_INDEX, name, leader.get(), e);
       }
     }
-    logger.warn("{}: Failed to synchronize with the leader after {}ms", name, 
waitedTime);
+    logger.warn("{}: Failed to synchronize to target index {} after {}ms", 
name, targetLogIndex, waitedTime);
     return false;
   }
 
@@ -911,13 +919,13 @@ public abstract class RaftMember {
     } else {
       log = new PhysicalPlanLog();
       ((PhysicalPlanLog)log).setPlan(plan);
+      plan.setIndex(log.getCurrLogIndex());
     }
     // assign term and index to the new log and append it
     synchronized (logManager) {
       log.setCurrLogTerm(getTerm().get());
       log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
 
-      plan.setIndex(log.getCurrLogIndex());
       logManager.append(log);
     }
     
Timer.Statistic.RAFT_SENDER_APPEND_LOG.calOperationCostTimeFromStart(startTime);
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
index 51f0c8a..62cd0c8 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.PreviousFillRequest;
 import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest;
 import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.cluster.server.member.MemberTest;
@@ -53,10 +54,10 @@ import org.apache.thrift.async.AsyncMethodCallback;
 public class TestAsyncDataClient extends AsyncDataClient {
 
   private PlanExecutor planExecutor;
-  private Map<Node, DataGroupMember> dataGroupMemberMap;
+  private Map<RaftNode, DataGroupMember> dataGroupMemberMap;
 
   public TestAsyncDataClient(Node node,
-      Map<Node, DataGroupMember> dataGroupMemberMap)
+      Map<RaftNode, DataGroupMember> dataGroupMemberMap)
       throws IOException {
     super(null, null, node, null);
     this.dataGroupMemberMap = dataGroupMemberMap;
@@ -70,35 +71,35 @@ public class TestAsyncDataClient extends AsyncDataClient {
   @Override
   public void fetchSingleSeries(Node header, int raftId, long readerId,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    new Thread(() -> new 
DataAsyncService(dataGroupMemberMap.get(header)).fetchSingleSeries(header, 
raftId, readerId,
+    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(new 
RaftNode(header,0))).fetchSingleSeries(header, raftId, readerId,
         resultHandler)).start();
   }
 
   @Override
   public void getAggrResult(GetAggrResultRequest request,
       AsyncMethodCallback<List<ByteBuffer>> resultHandler) {
-    new Thread(() -> new 
DataAsyncService(dataGroupMemberMap.get(request.getHeader())).getAggrResult(request,
+    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(new 
RaftNode(request.getHeader(),0))).getAggrResult(request,
         resultHandler)).start();
   }
 
   @Override
   public void querySingleSeries(SingleSeriesQueryRequest request,
       AsyncMethodCallback<Long> resultHandler) {
-    new Thread(() -> new 
DataAsyncService(dataGroupMemberMap.get(request.getHeader())).querySingleSeries(request,
+    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(new 
RaftNode(request.getHeader(),0))).querySingleSeries(request,
         resultHandler)).start();
   }
 
   @Override
   public void fetchSingleSeriesByTimestamp(Node header, int raftId, long 
readerId, long time,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    new Thread(() -> new 
DataAsyncService(dataGroupMemberMap.get(header)).fetchSingleSeriesByTimestamp(header,
 raftId,
+    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(new 
RaftNode(header,0))).fetchSingleSeriesByTimestamp(header, raftId,
         readerId, time, resultHandler)).start();
   }
 
   @Override
   public void getAllPaths(Node header, int raftId, List<String> paths, boolean 
withAlias,
       AsyncMethodCallback<GetAllPathsResult> resultHandler) {
-    new Thread(() -> new 
DataAsyncService(dataGroupMemberMap.get(header)).getAllPaths(header, raftId,
+    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(new 
RaftNode(header,0))).getAllPaths(header, raftId,
         paths, withAlias, resultHandler)).start();
   }
 
@@ -152,39 +153,39 @@ public class TestAsyncDataClient extends AsyncDataClient {
   @Override
   public void pullTimeSeriesSchema(PullSchemaRequest request,
       AsyncMethodCallback<PullSchemaResp> resultHandler) {
-    new Thread(() -> new 
DataAsyncService(dataGroupMemberMap.get(request.getHeader())).pullTimeSeriesSchema(request,
 resultHandler)).start();
+    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(new 
RaftNode(request.getHeader(),0))).pullTimeSeriesSchema(request, 
resultHandler)).start();
   }
 
   @Override
   public void pullMeasurementSchema(PullSchemaRequest request,
       AsyncMethodCallback<PullSchemaResp> resultHandler) {
-    new Thread(() -> new 
DataAsyncService(dataGroupMemberMap.get(request.getHeader())).pullMeasurementSchema(request,
+    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(new 
RaftNode(request.getHeader(),0))).pullMeasurementSchema(request,
         resultHandler)).start();
   }
 
   @Override
   public void querySingleSeriesByTimestamp(SingleSeriesQueryRequest request,
       AsyncMethodCallback<Long> resultHandler) {
-    new Thread(() -> new 
DataAsyncService(dataGroupMemberMap.get(request.getHeader())).querySingleSeriesByTimestamp(request,
+    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(new 
RaftNode(request.getHeader(),0))).querySingleSeriesByTimestamp(request,
         resultHandler)).start();
   }
 
   @Override
   public void getGroupByExecutor(GroupByRequest request, 
AsyncMethodCallback<Long> resultHandler) {
-    new Thread(() -> new 
DataAsyncService(dataGroupMemberMap.get(request.getHeader())).getGroupByExecutor(request,
+    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(new 
RaftNode(request.getHeader(),0))).getGroupByExecutor(request,
         resultHandler)).start();
   }
 
   @Override
   public void getGroupByResult(Node header, int raftId, long executorId, long 
startTime, long endTime,
       AsyncMethodCallback<List<ByteBuffer>> resultHandler) {
-    new Thread(() -> new 
DataAsyncService(dataGroupMemberMap.get(header)).getGroupByResult(header, 
raftId, executorId,
+    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(new 
RaftNode(header,0))).getGroupByResult(header, raftId, executorId,
         startTime, endTime, resultHandler)).start();
   }
 
   @Override
   public void previousFill(PreviousFillRequest request,
       AsyncMethodCallback<ByteBuffer> resultHandler) {
-    new Thread(() -> new 
DataAsyncService(dataGroupMemberMap.get(request.getHeader())).previousFill(request,
 resultHandler)).start();
+    new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(new 
RaftNode(request.getHeader(),0))).previousFill(request, resultHandler)).start();
   }
 }
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java 
b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java
index 1f1f3ba..17d04a1 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java
@@ -103,6 +103,8 @@ public class TestUtils {
         
.setReplicationNumber(ClusterDescriptor.getInstance().getConfig().getReplicationNum());
     startUpStatus
         
.setClusterName(ClusterDescriptor.getInstance().getConfig().getClusterName());
+    startUpStatus
+        
.setMultiRaftFactor(ClusterDescriptor.getInstance().getConfig().getMultiRaftFactor());
     List<Node> seedNodeList = new ArrayList<>();
     for (int i = 0; i < 100; i += 10) {
       seedNodeList.add(getNode(i));
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index ca4fd92..fbffb7f 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -64,6 +64,7 @@ import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.slot.SlotNodeAdditionResult;
 import org.apache.iotdb.cluster.partition.slot.SlotNodeRemovalResult;
 import org.apache.iotdb.cluster.query.RemoteQueryContext;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
 import org.apache.iotdb.cluster.rpc.thrift.GetAllPathsResult;
 import org.apache.iotdb.cluster.rpc.thrift.GroupByRequest;
@@ -71,6 +72,7 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest;
 import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest;
 import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
 import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
 import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
@@ -165,9 +167,9 @@ public class DataGroupMemberTest extends MemberTest {
     };
   }
 
-  DataGroupMember getDataGroupMember(Node node) {
-    PartitionGroup nodes = partitionTable.getHeaderGroup(node);
-    return dataGroupMemberMap.computeIfAbsent(node, n -> getDataGroupMember(n, 
nodes));
+  DataGroupMember getDataGroupMember(RaftNode raftNode) {
+    PartitionGroup nodes = partitionTable.getHeaderGroup(raftNode.getNode());
+    return dataGroupMemberMap.computeIfAbsent(raftNode, n -> 
getDataGroupMember(n.getNode(), nodes));
   }
 
   private DataGroupMember getDataGroupMember(Node node, PartitionGroup nodes) {
@@ -179,6 +181,11 @@ public class DataGroupMemberTest extends MemberTest {
       }
 
       @Override
+      public long appendEntry(AppendEntryRequest request) {
+        return Response.RESPONSE_AGREE;
+      }
+
+      @Override
       public void updateHardState(long currentTerm, Node leader) {
       }
 
@@ -267,16 +274,25 @@ public class DataGroupMemberTest extends MemberTest {
 
     try {
       Node newNodeBeforeGroup = TestUtils.getNode(-5);
+      assertFalse(firstMember.preAddNode(newNodeBeforeGroup));
+      assertFalse(midMember.preAddNode(newNodeBeforeGroup));
+      assertFalse(lastMember.preAddNode(newNodeBeforeGroup));
       assertFalse(firstMember.addNode(newNodeBeforeGroup, result));
       assertFalse(midMember.addNode(newNodeBeforeGroup, result));
       assertFalse(lastMember.addNode(newNodeBeforeGroup, result));
 
       Node newNodeInGroup = TestUtils.getNode(66);
+      assertTrue(firstMember.preAddNode(newNodeInGroup));
+      assertTrue(midMember.preAddNode(newNodeInGroup));
+      assertTrue(lastMember.preAddNode(newNodeInGroup));
       assertFalse(firstMember.addNode(newNodeInGroup, result));
       assertFalse(midMember.addNode(newNodeInGroup, result));
       assertTrue(lastMember.addNode(newNodeInGroup, result));
 
       Node newNodeAfterGroup = TestUtils.getNode(101);
+      assertFalse(firstMember.preAddNode(newNodeAfterGroup));
+      assertFalse(midMember.preAddNode(newNodeAfterGroup));
+      assertFalse(lastMember.preAddNode(newNodeAfterGroup));
       assertFalse(firstMember.addNode(newNodeAfterGroup, result));
       assertFalse(midMember.addNode(newNodeAfterGroup, result));
     } finally {
@@ -480,6 +496,7 @@ public class DataGroupMemberTest extends MemberTest {
   @Test
   public void testForwardPullSnapshot() {
     System.out.println("Start testForwardPullSnapshot()");
+    hasInitialSnapshots = true;
     dataGroupMember.setCharacter(NodeCharacter.FOLLOWER);
     dataGroupMember.setLeader(TestUtils.getNode(1));
     PullSnapshotRequest request = new PullSnapshotRequest();
@@ -904,13 +921,15 @@ public class DataGroupMemberTest extends MemberTest {
     dataGroupMember.start();
 
     try {
+      dataGroupMember.preRemoveNode(nodeToRemove);
       dataGroupMember.removeNode(nodeToRemove, nodeRemovalResult);
 
       assertEquals(NodeCharacter.ELECTOR, dataGroupMember.getCharacter());
       assertEquals(Long.MIN_VALUE, 
dataGroupMember.getLastHeartbeatReceivedTime());
       
assertTrue(dataGroupMember.getAllNodes().contains(TestUtils.getNode(30)));
       assertFalse(dataGroupMember.getAllNodes().contains(nodeToRemove));
-      List<Integer> newSlots = 
nodeRemovalResult.getNewSlotOwners().get(TestUtils.getNode(0));
+      List<Integer> newSlots = nodeRemovalResult.getNewSlotOwners()
+          .get(new RaftNode(TestUtils.getNode(0), raftId));
       while (newSlots.size() != pulledSnapshots.size()) {
 
       }
@@ -933,13 +952,14 @@ public class DataGroupMemberTest extends MemberTest {
     dataGroupMember.start();
 
     try {
+      dataGroupMember.preRemoveNode(nodeToRemove);
       dataGroupMember.removeNode(nodeToRemove, nodeRemovalResult);
 
       assertEquals(0, dataGroupMember.getLastHeartbeatReceivedTime());
       
assertTrue(dataGroupMember.getAllNodes().contains(TestUtils.getNode(30)));
       assertFalse(dataGroupMember.getAllNodes().contains(nodeToRemove));
       List<Integer> newSlots =
-          ((SlotNodeRemovalResult) 
nodeRemovalResult).getNewSlotOwners().get(TestUtils.getNode(0));
+          ((SlotNodeRemovalResult) 
nodeRemovalResult).getNewSlotOwners().get(new RaftNode(TestUtils.getNode(0), 
0));
       while (newSlots.size() != pulledSnapshots.size()) {
 
       }
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
index 6065370..8c1beef 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.cluster.server.member;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -50,8 +51,10 @@ import org.apache.iotdb.cluster.metadata.MetaPuller;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
+import org.apache.iotdb.cluster.query.ClusterPlanRouter;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
 import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
 import org.apache.iotdb.cluster.server.NodeCharacter;
@@ -60,6 +63,8 @@ import 
org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.executor.PlanExecutor;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.LogPlan;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.SchemaUtils;
 import org.apache.thrift.async.AsyncMethodCallback;
@@ -73,7 +78,7 @@ public class MemberTest {
 
   public static AtomicLong dummyResponse = new 
AtomicLong(Response.RESPONSE_AGREE);
 
-  Map<Node, DataGroupMember> dataGroupMemberMap;
+  Map<RaftNode, DataGroupMember> dataGroupMemberMap;
   private Map<Node, MetaGroupMember> metaGroupMemberMap;
   PartitionGroup allNodes;
   protected MetaGroupMember testMetaMember;
@@ -95,7 +100,6 @@ public class MemberTest {
     prevUseAsyncServer = 
ClusterDescriptor.getInstance().getConfig().isUseAsyncServer();
     preLogBufferSize = 
ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize();
     ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(true);
-    ClusterDescriptor.getInstance().getConfig().setRaftLogBufferSize(4096);
     testThreadPool = Executors.newFixedThreadPool(4);
     prevLeaderWait = RaftMember.getWaitLeaderTimeMs();
     RaftMember.setWaitLeaderTimeMs(10);
@@ -105,7 +109,12 @@ public class MemberTest {
       allNodes.add(TestUtils.getNode(i));
     }
 
-    partitionTable = new SlotPartitionTable(allNodes, TestUtils.getNode(0));
+    partitionTable = new SlotPartitionTable(allNodes, TestUtils.getNode(0)) {
+      @Override
+      public RaftNode routeToHeaderByTime(String storageGroupName, long 
timestamp) {
+        return new RaftNode(TestUtils.getNode(0), 0);
+      }
+    };
 
     dataGroupMemberMap = new HashMap<>();
     metaGroupMemberMap = new HashMap<>();
@@ -170,11 +179,15 @@ public class MemberTest {
   }
 
   DataGroupMember getDataGroupMember(Node node) {
+    return getDataGroupMember(new RaftNode(node, 0));
+  }
+
+  DataGroupMember getDataGroupMember(RaftNode node) {
     return dataGroupMemberMap.computeIfAbsent(node, this::newDataGroupMember);
   }
 
-  private DataGroupMember newDataGroupMember(Node node) {
-    DataGroupMember newMember = new TestDataGroupMember(node, 
partitionTable.getHeaderGroup(node)) {
+  private DataGroupMember newDataGroupMember(RaftNode raftNode) {
+    DataGroupMember newMember = new TestDataGroupMember(raftNode.getNode(), 
partitionTable.getHeaderGroup(raftNode)) {
 
       @Override
       public boolean syncLeader() {
@@ -200,9 +213,9 @@ public class MemberTest {
         return getAsyncClient(node);
       }
     };
-    newMember.setThisNode(node);
+    newMember.setThisNode(raftNode.getNode());
     newMember.setMetaGroupMember(testMetaMember);
-    newMember.setLeader(node);
+    newMember.setLeader(raftNode.getNode());
     newMember.setCharacter(NodeCharacter.LEADER);
     newMember
         .setLogManager(
@@ -232,11 +245,16 @@ public class MemberTest {
       @Override
       public DataGroupMember getLocalDataMember(Node header, int raftId,
           Object request) {
-        return getDataGroupMember(header);
+        return getDataGroupMember(new RaftNode(header, raftId));
       }
 
       @Override
       public DataGroupMember getLocalDataMember(Node header, int raftId) {
+        return getDataGroupMember(new RaftNode(header, raftId));
+      }
+
+      @Override
+      public DataGroupMember getLocalDataMember(RaftNode header) {
         return getDataGroupMember(header);
       }
 
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index a1e563e..94f5067 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -51,6 +51,8 @@ import org.apache.iotdb.cluster.client.async.AsyncDataClient;
 import org.apache.iotdb.cluster.common.TestAsyncClient;
 import org.apache.iotdb.cluster.common.TestAsyncDataClient;
 import org.apache.iotdb.cluster.common.TestAsyncMetaClient;
+import org.apache.iotdb.cluster.common.TestLogApplier;
+import org.apache.iotdb.cluster.common.TestLogManager;
 import org.apache.iotdb.cluster.common.TestPartitionedLogManager;
 import org.apache.iotdb.cluster.common.TestSnapshot;
 import org.apache.iotdb.cluster.common.TestUtils;
@@ -60,6 +62,9 @@ import 
org.apache.iotdb.cluster.exception.EmptyIntervalException;
 import org.apache.iotdb.cluster.exception.LogExecutionException;
 import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
 import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
+import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
+import org.apache.iotdb.cluster.exception.UnsupportedPlanException;
+import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
@@ -68,6 +73,7 @@ import org.apache.iotdb.cluster.metadata.CMManager;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
+import org.apache.iotdb.cluster.query.ClusterPlanRouter;
 import org.apache.iotdb.cluster.query.LocalQueryExecutor;
 import org.apache.iotdb.cluster.query.RemoteQueryContext;
 import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
@@ -82,6 +88,7 @@ import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest;
 import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
 import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
 import org.apache.iotdb.cluster.rpc.thrift.StartUpStatus;
@@ -113,6 +120,7 @@ import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.LogPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
@@ -133,6 +141,7 @@ import org.apache.thrift.protocol.TCompactProtocol.Factory;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class MetaGroupMemberTest extends MemberTest {
@@ -177,6 +186,12 @@ public class MetaGroupMemberTest extends MemberTest {
 
     buildDataGroups(dataClusterServer);
     testMetaMember.getThisNode().setNodeIdentifier(0);
+    testMetaMember.setRouter(new 
ClusterPlanRouter(testMetaMember.getPartitionTable()){
+      @Override
+      protected Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(LogPlan 
plan) {
+        return Collections.singletonMap(plan, 
partitionTable.getHeaderGroup(testMetaMember.getThisNode()));
+      }
+    });
     mockDataClusterServer = false;
     QueryCoordinator.getINSTANCE().setMetaGroupMember(testMetaMember);
     exiledNode = null;
@@ -207,6 +222,11 @@ public class MetaGroupMemberTest extends MemberTest {
       }
 
       @Override
+      protected AppendLogResult sendLogToFollowers(Log log) {
+        return AppendLogResult.OK;
+      }
+
+      @Override
       public AsyncClient getAsyncClient(Node node) {
         return getClient(node);
       }
@@ -287,6 +307,7 @@ public class MetaGroupMemberTest extends MemberTest {
     return resp;
   }
 
+  @Override
   protected MetaGroupMember getMetaGroupMember(Node node) throws 
QueryProcessException {
     MetaGroupMember metaGroupMember = new MetaGroupMember(new Factory(), node) 
{
 
@@ -309,11 +330,16 @@ public class MetaGroupMemberTest extends MemberTest {
       @Override
       public DataGroupMember getLocalDataMember(Node header, int raftId,
           Object request) {
-        return getDataGroupMember(header);
+        return getDataGroupMember(new RaftNode(header, raftId));
       }
 
       @Override
       public DataGroupMember getLocalDataMember(Node header, int raftId) {
+        return getDataGroupMember(new RaftNode(header, raftId));
+      }
+
+      @Override
+      public DataGroupMember getLocalDataMember(RaftNode header) {
         return getDataGroupMember(header);
       }
 
@@ -433,7 +459,7 @@ public class MetaGroupMemberTest extends MemberTest {
             @Override
             public void removeNode(Node node, AsyncMethodCallback<Long> 
resultHandler) {
               new Thread(() -> {
-                testMetaMember.applyRemoveNode(new 
RemoveNodeLog(TestUtils.seralizePartitionTable, node));
+                testMetaMember.applyRemoveNode(new 
RemoveNodeLog(partitionTable.serialize(), node));
                 resultHandler.onComplete(Response.RESPONSE_AGREE);
               }).start();
             }
@@ -503,7 +529,7 @@ public class MetaGroupMemberTest extends MemberTest {
     }
 
     ExecutorService testThreadPool = Executors.newFixedThreadPool(4);
-    testMetaMember.closePartition(TestUtils.getTestSg(0), 0, true);
+    assertTrue(testMetaMember.closePartition(TestUtils.getTestSg(0), 0, true));
 
     StorageGroupProcessor processor =
         StorageEngine.getInstance().getProcessor(new 
PartialPath(TestUtils.getTestSg(0)));
@@ -522,15 +548,13 @@ public class MetaGroupMemberTest extends MemberTest {
       // the net work is down
       dummyResponse.set(Long.MIN_VALUE);
 
-      // network resume in 100ms
-      new Thread(() -> {
-        await().atLeast(200, TimeUnit.MILLISECONDS);
-        dummyResponse.set(Response.RESPONSE_AGREE);
-      }).start();
-
       System.out.println("Close the first file");
+      assertFalse(testMetaMember.closePartition(TestUtils.getTestSg(0), 0, 
true));
+      assertFalse(processor.getWorkSequenceTsFileProcessors().isEmpty());
 
-      testMetaMember.closePartition(TestUtils.getTestSg(0), 0, true);
+      // network resume in 100ms
+      dummyResponse.set(Response.RESPONSE_AGREE);
+      assertTrue(testMetaMember.closePartition(TestUtils.getTestSg(0), 0, 
true));
       assertTrue(processor.getWorkSequenceTsFileProcessors().isEmpty());
 
       System.out.println("Create the second file");
@@ -540,10 +564,11 @@ public class MetaGroupMemberTest extends MemberTest {
         PlanExecutor planExecutor = new PlanExecutor();
         planExecutor.processNonQuery(insertPlan);
       }
+
       // indicating the leader is stale
       System.out.println("Close the second file");
       dummyResponse.set(100);
-      testMetaMember.closePartition(TestUtils.getTestSg(0), 0, true);
+      assertFalse(testMetaMember.closePartition(TestUtils.getTestSg(0), 0, 
true));
       assertFalse(processor.getWorkSequenceTsFileProcessors().isEmpty());
     } finally {
       RaftServer.setConnectionTimeoutInMS(prevTimeout);
@@ -554,9 +579,10 @@ public class MetaGroupMemberTest extends MemberTest {
   @Test
   public void testAddNode() {
     System.out.println("Start testAddNode()");
-    Node newNode = TestUtils.getNode(10);
+    Node newNode = TestUtils.getNode(11);
+    testMetaMember.getPartitionTable().addNode(newNode);
     testMetaMember.onElectionWins();
-    testMetaMember.applyAddNode(new 
AddNodeLog(TestUtils.seralizePartitionTable, newNode));
+    testMetaMember.applyAddNode(new 
AddNodeLog(testMetaMember.getPartitionTable().serialize(), newNode));
     assertTrue(partitionTable.getAllNodes().contains(newNode));
   }
 
@@ -867,6 +893,7 @@ public class MetaGroupMemberTest extends MemberTest {
   public void testProcessValidHeartbeatReq() throws QueryProcessException {
     System.out.println("Start testProcessValidHeartbeatReq()");
     MetaGroupMember testMetaMember = getMetaGroupMember(TestUtils.getNode(10));
+    partitionTable = new SlotPartitionTable(allNodes, TestUtils.getNode(0));
     try {
       HeartBeatRequest request = new HeartBeatRequest();
       request.setRequireIdentifier(true);
@@ -1105,6 +1132,7 @@ public class MetaGroupMemberTest extends MemberTest {
     AtomicReference<Long> resultRef = new AtomicReference<>();
     testMetaMember.setLeader(testMetaMember.getThisNode());
     testMetaMember.setCharacter(LEADER);
+
     doRemoveNode(resultRef, testMetaMember.getThisNode());
 
     assertEquals(Response.RESPONSE_AGREE, (long) resultRef.get());
@@ -1203,6 +1231,7 @@ public class MetaGroupMemberTest extends MemberTest {
 
       @Override
       public void onError(Exception e) {
+        resultRef.set(-1L);
         e.printStackTrace();
       }
     });
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java
index 725c803..bdc19c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java
@@ -44,6 +44,7 @@ public class LogPlan extends PhysicalPlan {
   }
 
   public ByteBuffer getLog() {
+    log.clear();
     return log;
   }
 

Reply via email to