This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_scalability
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4609bc00ff6d808dd0dbac64fb7edba81a07d4c1
Author: lta <[email protected]>
AuthorDate: Tue Feb 23 20:32:01 2021 +0800

    This pr fix following bugs:
    1. LogPlan serialize and deserialize bug
    2. add shell scripts to remove nodes
    3. enrich the function of node tool
    4. fix some bugs of adding new nodes
---
 cluster/src/assembly/resources/sbin/add-node.bat   |   2 +-
 cluster/src/assembly/resources/sbin/add-node.sh    |   2 +-
 .../sbin/{start-node.bat => remove-node.bat}       |  16 +--
 .../resources/sbin/{add-node.sh => remove-node.sh} |  28 +++--
 cluster/src/assembly/resources/sbin/start-node.bat |   2 +-
 .../java/org/apache/iotdb/cluster/ClusterMain.java |  11 +-
 .../iotdb/cluster/log/manage/RaftLogManager.java   |   3 +
 .../iotdb/cluster/partition/PartitionGroup.java    |   4 +
 .../partition/balancer/DefaultSlotBalancer.java    |   2 +
 .../iotdb/cluster/query/ClusterPlanRouter.java     |   2 +-
 .../org/apache/iotdb/cluster/server/Response.java  |   2 +
 .../server/handlers/caller/NodeStatusHandler.java  |   9 +-
 .../cluster/server/member/MetaGroupMember.java     | 139 +++++++++++++--------
 .../iotdb/cluster/server/member/RaftMember.java    |   1 -
 .../cluster/utils/nodetool/ClusterMonitor.java     |   2 +-
 .../utils/nodetool/ClusterMonitorMBean.java        |   4 +-
 .../cluster/utils/nodetool/function/Host.java      |   2 +-
 .../cluster/utils/nodetool/function/Status.java    |  24 +++-
 .../apache/iotdb/cluster/log/LogParserTest.java    |  14 +++
 .../apache/iotdb/db/qp/physical/sys/LogPlan.java   |  11 +-
 20 files changed, 179 insertions(+), 101 deletions(-)

diff --git a/cluster/src/assembly/resources/sbin/add-node.bat 
b/cluster/src/assembly/resources/sbin/add-node.bat
index 958f16f..452f1c3 100755
--- a/cluster/src/assembly/resources/sbin/add-node.bat
+++ b/cluster/src/assembly/resources/sbin/add-node.bat
@@ -19,7 +19,7 @@
 
 @echo off
 echo ````````````````````````
-echo Starting IoTDB
+echo Starting IoTDB (Cluster Mode)
 echo ````````````````````````
 
 PATH %PATH%;%JAVA_HOME%\bin\
diff --git a/cluster/src/assembly/resources/sbin/add-node.sh 
b/cluster/src/assembly/resources/sbin/add-node.sh
index 807175b..935abde 100755
--- a/cluster/src/assembly/resources/sbin/add-node.sh
+++ b/cluster/src/assembly/resources/sbin/add-node.sh
@@ -20,7 +20,7 @@
 
 
 echo ---------------------
-echo Starting IoTDB
+echo "Starting IoTDB (Cluster Mode)"
 echo ---------------------
 
 if [ -z "${IOTDB_HOME}" ]; then
diff --git a/cluster/src/assembly/resources/sbin/start-node.bat 
b/cluster/src/assembly/resources/sbin/remove-node.bat
similarity index 91%
copy from cluster/src/assembly/resources/sbin/start-node.bat
copy to cluster/src/assembly/resources/sbin/remove-node.bat
index f9a7d1f..a2b0564 100755
--- a/cluster/src/assembly/resources/sbin/start-node.bat
+++ b/cluster/src/assembly/resources/sbin/remove-node.bat
@@ -19,7 +19,7 @@
 
 @echo off
 echo ````````````````````````
-echo Starting IoTDB
+echo Starting to remove a node (Cluster Mode)
 echo ````````````````````````
 
 PATH %PATH%;%JAVA_HOME%\bin\
@@ -57,20 +57,8 @@ popd
 set IOTDB_CONF=%IOTDB_HOME%\conf
 set IOTDB_LOGS=%IOTDB_HOME%\logs
 
-
-IF EXIST "%IOTDB_CONF%\iotdb-env.bat" (
-    IF "%1" == "printgc" (
-      CALL "%IOTDB_CONF%\iotdb-env.bat" printgc
-      SHIFT
-    ) ELSE (
-      CALL "%IOTDB_CONF%\iotdb-env.bat"
-    )
-) ELSE (
-    echo "can't find %IOTDB_CONF%\iotdb-env.bat"
-)
-
 @setlocal ENABLEDELAYEDEXPANSION ENABLEEXTENSIONS
-set CONF_PARAMS=-s
+set CONF_PARAMS=-r
 set is_conf_path=false
 for %%i in (%*) do (
        IF "%%i" == "-c" (
diff --git a/cluster/src/assembly/resources/sbin/add-node.sh 
b/cluster/src/assembly/resources/sbin/remove-node.sh
similarity index 82%
copy from cluster/src/assembly/resources/sbin/add-node.sh
copy to cluster/src/assembly/resources/sbin/remove-node.sh
index 807175b..65ee58b 100755
--- a/cluster/src/assembly/resources/sbin/add-node.sh
+++ b/cluster/src/assembly/resources/sbin/remove-node.sh
@@ -20,7 +20,7 @@
 
 
 echo ---------------------
-echo Starting IoTDB
+echo "Starting to remove a node(Cluster Mode)"
 echo ---------------------
 
 if [ -z "${IOTDB_HOME}" ]; then
@@ -28,13 +28,24 @@ if [ -z "${IOTDB_HOME}" ]; then
 fi
 
 IOTDB_CONF=${IOTDB_HOME}/conf
-# IOTDB_LOGS=${IOTDB_HOME}/logs
 
-if [ -f "$IOTDB_CONF/iotdb-env.sh" ]; then
-    . "$IOTDB_CONF/iotdb-env.sh"
-else
-    echo "can't find $IOTDB_CONF/iotdb-env.sh"
-fi
+is_conf_path=false
+for arg do
+    shift
+    if [ "$arg" == "-c" ]; then
+        is_conf_path=true
+        continue
+    fi
+
+    if [ $is_conf_path == true ]; then
+        IOTDB_CONF=$arg
+        is_conf_path=false
+        continue
+    fi
+    set -- "$@" "$arg"
+done
+
+CONF_PARAMS="-r "$*
 
 if [ -n "$JAVA_HOME" ]; then
     for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do
@@ -65,8 +76,9 @@ launch_service()
        iotdb_parms="$iotdb_parms -DIOTDB_HOME=${IOTDB_HOME}"
        iotdb_parms="$iotdb_parms -DTSFILE_HOME=${IOTDB_HOME}"
        iotdb_parms="$iotdb_parms -DIOTDB_CONF=${IOTDB_CONF}"
+       iotdb_parms="$iotdb_parms -DCLUSTER_CONF=${IOTDB_CONF}"
        iotdb_parms="$iotdb_parms -Dname=iotdb\.IoTDB"
-       exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS -cp "$CLASSPATH" "$class" -a
+       exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS -cp "$CLASSPATH" "$class" 
$CONF_PARAMS
        return $?
 }
 
diff --git a/cluster/src/assembly/resources/sbin/start-node.bat 
b/cluster/src/assembly/resources/sbin/start-node.bat
index f9a7d1f..161cc2a 100755
--- a/cluster/src/assembly/resources/sbin/start-node.bat
+++ b/cluster/src/assembly/resources/sbin/start-node.bat
@@ -19,7 +19,7 @@
 
 @echo off
 echo ````````````````````````
-echo Starting IoTDB
+echo Starting IoTDB (Cluster Mode)
 echo ````````````````````````
 
 PATH %PATH%;%JAVA_HOME%\bin\
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
index 33d8a5d..48f8813 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterMain.java
@@ -68,8 +68,8 @@ public class ClusterMain {
           + "[-internal_data_port <internal data port>] "
           + "[-cluster_rpc_port <cluster rpc port>] "
           + "[-seed_nodes <node1:meta_port:data_port:cluster_rpc_port,"
-          +               "node2:meta_port:data_port:cluster_rpc_port,"
-          +           "...,noden:meta_port:data_port:cluster_rpc_port>] "
+          + "node2:meta_port:data_port:cluster_rpc_port,"
+          + "...,noden:meta_port:data_port:cluster_rpc_port>] "
           + "[-sc] "
           + "[-rpc_port <rpc port>]");
       return;
@@ -276,6 +276,9 @@ public class ClusterMain {
       logger.error("Cluster size is too small, cannot remove any node");
     } else if (response == Response.RESPONSE_REJECT) {
       logger.error("Node {} is not found in the cluster, please check", 
nodeToRemove);
+    } else if (response == Response.RESPONSE_CHANGE_MEMBERSHIP_CONFLICT) {
+      logger.warn(
+          "The cluster is performing other change membership operations. 
Change membership operations should be performed one by one. Please try again 
later");
     } else {
       logger.error("Unexpected response {}", response);
     }
@@ -302,7 +305,7 @@ public class ClusterMain {
       public int calculateSlotByTime(String storageGroupName, long timestamp, 
int maxSlotNum) {
         int sgSerialNum = extractSerialNumInSGName(storageGroupName) % k;
         if (sgSerialNum >= 0) {
-          return (int)(maxSlotNum / k * (sgSerialNum + 0.5));
+          return (int) (maxSlotNum / k * (sgSerialNum + 0.5));
         } else {
           return defaultStrategy.calculateSlotByTime(storageGroupName, 
timestamp, maxSlotNum);
         }
@@ -313,7 +316,7 @@ public class ClusterMain {
           int maxSlotNum) {
         int sgSerialNum = extractSerialNumInSGName(storageGroupName) % k;
         if (sgSerialNum >= 0) {
-          return (int)(maxSlotNum / k * (sgSerialNum + 0.5));
+          return (int) (maxSlotNum / k * (sgSerialNum + 0.5));
         } else {
           return defaultStrategy
               .calculateSlotByPartitionNum(storageGroupName, partitionId, 
maxSlotNum);
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index f774567..b08d327 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -641,6 +641,9 @@ public abstract class RaftLogManager {
    */
   void applyEntries(List<Log> entries) {
     for (Log entry : entries) {
+      if (entry.isApplied()) {
+        continue;
+      }
       if (blockAppliedCommitIndex > 0 && entry.getCurrLogIndex() > 
blockAppliedCommitIndex) {
         blockedUnappliedLogList.add(entry);
         continue;
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
index b35cc10..e7d039c 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java
@@ -98,4 +98,8 @@ public class PartitionGroup extends ArrayList<Node> {
     return id;
   }
 
+  @Override
+  public String toString() {
+    return String.format("PartitionGroup{id=%d, header=%s}", id, get(0));
+  }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/balancer/DefaultSlotBalancer.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/balancer/DefaultSlotBalancer.java
index eb1825f..ad90d65 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/partition/balancer/DefaultSlotBalancer.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/partition/balancer/DefaultSlotBalancer.java
@@ -79,6 +79,7 @@ public class DefaultSlotBalancer implements SlotBalancer {
           previousNodeMap.get(curNode).put(slot, 
table.getHeaderGroup(entry.getKey(), oldRing));
           slotNodes[slot] = curNode;
         }
+        slotsToMove.clear();
         transferNum -= numToMove;
         if (transferNum > 0) {
           curNode = new RaftNode(newNode, ++raftId);
@@ -89,6 +90,7 @@ public class DefaultSlotBalancer implements SlotBalancer {
             previousNodeMap.get(curNode).put(slot, 
table.getHeaderGroup(entry.getKey(), oldRing));
             slotNodes[slot] = curNode;
           }
+          slotsToMove.clear();
         }
       }
     }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
index 65f66dc..8fbc772 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
@@ -161,7 +161,7 @@ public class ClusterPlanRouter {
       throw new UnsupportedPlanException(plan);
     }
     for (PartitionGroup partitionGroup: 
partitionTable.calculateGlobalGroups(oldRing)) {
-      result.put(plan, partitionGroup);
+      result.put(new LogPlan(plan), partitionGroup);
     }
     return result;
   }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java
index 373d535..8a9b710 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java
@@ -49,6 +49,8 @@ public class Response {
   // the new node, which tries to join the cluster, contains conflicted 
parameters with the
   // cluster, so the operation is rejected.
   public static final long RESPONSE_NEW_NODE_PARAMETER_CONFLICT = -10;
+  // add/remove node operations should one by one
+  public static final long RESPONSE_CHANGE_MEMBERSHIP_CONFLICT = -11;
   // the request is not executed locally anc should be forwarded
   public static final long RESPONSE_NULL = Long.MIN_VALUE;
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/NodeStatusHandler.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/NodeStatusHandler.java
index f17bc6e..4a0c43f 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/NodeStatusHandler.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/NodeStatusHandler.java
@@ -27,11 +27,11 @@ import org.apache.thrift.async.AsyncMethodCallback;
 public class NodeStatusHandler implements AsyncMethodCallback<Node> {
 
 
-  private Map<Node, Boolean> nodeStatusMap;
+  private Map<Node, Integer> nodeStatusMap;
 
   private AtomicInteger countResponse;
 
-  public NodeStatusHandler(Map<Node, Boolean> nodeStatusMap) {
+  public NodeStatusHandler(Map<Node, Integer> nodeStatusMap) {
     this.nodeStatusMap = nodeStatusMap;
     this.countResponse = new AtomicInteger();
   }
@@ -39,7 +39,10 @@ public class NodeStatusHandler implements 
AsyncMethodCallback<Node> {
   @Override
   public void onComplete(Node response) {
     synchronized (nodeStatusMap) {
-      nodeStatusMap.put(response, true);
+      if (response == null) {
+        return;
+      }
+      nodeStatusMap.put(response, 0);
       // except for this node itself
       if(countResponse.incrementAndGet() == nodeStatusMap.size() - 1){
         nodeStatusMap.notifyAll();
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 9dc5d67..dd5b419 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -19,7 +19,40 @@
 
 package org.apache.iotdb.cluster.server.member;
 
+import static 
org.apache.iotdb.cluster.utils.ClusterUtils.WAIT_START_UP_CHECK_TIME_SEC;
+import static 
org.apache.iotdb.cluster.utils.ClusterUtils.analyseStartUpCheckResult;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iotdb.cluster.client.DataClientProvider;
 import org.apache.iotdb.cluster.client.async.AsyncClientPool;
 import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
@@ -117,40 +150,6 @@ import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static 
org.apache.iotdb.cluster.utils.ClusterUtils.WAIT_START_UP_CHECK_TIME_SEC;
-import static 
org.apache.iotdb.cluster.utils.ClusterUtils.analyseStartUpCheckResult;
-
 @SuppressWarnings("java:S1135")
 public class MetaGroupMember extends RaftMember {
 
@@ -472,6 +471,7 @@ public class MetaGroupMember extends RaftMember {
         partitionTable = new SlotPartitionTable(allNodes, thisNode);
         logger.info("Partition table is set up");
       }
+      initIdNodeMap();
       router = new ClusterPlanRouter(partitionTable);
       this.coordinator.setRouter(router);
       startSubServers();
@@ -604,6 +604,9 @@ public class MetaGroupMember extends RaftMember {
       setNodeIdentifier(genNodeIdentifier());
     } else if (resp.getRespNum() == 
Response.RESPONSE_NEW_NODE_PARAMETER_CONFLICT) {
       handleConfigInconsistency(resp);
+    } else if (resp.getRespNum() == 
Response.RESPONSE_CHANGE_MEMBERSHIP_CONFLICT) {
+      logger.warn(
+          "The cluster is performing other change membership operations. 
Change membership operations should be performed one by one. Please try again 
later");
     } else {
       logger
           .warn("Joining the cluster is rejected by {} for response {}", node, 
resp.getRespNum());
@@ -867,18 +870,27 @@ public class MetaGroupMember extends RaftMember {
    * immediately. If the identifier of "node" conflicts with an existing node, 
the request will be
    * turned down.
    *
-   * @param node          cannot be the local node
+   * @param newNode          cannot be the local node
    * @param startUpStatus the start up status of the new node
    * @param response      the response that will be sent to "node"
    * @return true if the process is over, false if the request should be 
forwarded
    */
-  private boolean processAddNodeLocally(Node node, StartUpStatus startUpStatus,
+  private boolean processAddNodeLocally(Node newNode, StartUpStatus 
startUpStatus,
       AddNodeResponse response) throws LogExecutionException {
     if (character != NodeCharacter.LEADER) {
       return false;
     }
-    if (allNodes.contains(node)) {
-      logger.debug("Node {} is already in the cluster", node);
+    boolean nodeExistInPartitionTable = false;
+    for (Node node : partitionTable.getAllNodes()) {
+      if (node.ip.equals(newNode.ip) && newNode.dataPort == node.dataPort
+          && newNode.metaPort == node.metaPort && newNode.clientPort == 
node.clientPort) {
+        newNode.nodeIdentifier = node.nodeIdentifier;
+        nodeExistInPartitionTable = true;
+        break;
+      }
+    }
+    if (allNodes.contains(newNode)) {
+      logger.debug("Node {} is already in the cluster", newNode);
       response.setRespNum((int) Response.RESPONSE_AGREE);
       synchronized (partitionTable) {
         response.setPartitionTableBytes(partitionTable.serialize());
@@ -886,9 +898,14 @@ public class MetaGroupMember extends RaftMember {
       return true;
     }
 
-    Node idConflictNode = idNodeMap.get(node.getNodeIdentifier());
+    if (!nodeExistInPartitionTable && partitionTable.getAllNodes().size() != 
allNodes.size()) {
+      response.setRespNum((int) Response.RESPONSE_CHANGE_MEMBERSHIP_CONFLICT);
+      return true;
+    }
+
+    Node idConflictNode = idNodeMap.get(newNode.getNodeIdentifier());
     if (idConflictNode != null) {
-      logger.debug("{}'s id conflicts with {}", node, idConflictNode);
+      logger.debug("{}'s id conflicts with {}", newNode, idConflictNode);
       response.setRespNum((int) Response.RESPONSE_IDENTIFIER_CONFLICT);
       return true;
     }
@@ -901,7 +918,7 @@ public class MetaGroupMember extends RaftMember {
     // node adding is serialized to reduce potential concurrency problem
     synchronized (logManager) {
       // update partition table
-      partitionTable.addNode(node);
+      partitionTable.addNode(newNode);
       ((SlotPartitionTable) 
partitionTable).setLastLogIndex(logManager.getLastLogIndex() + 1);
 
       AddNodeLog addNodeLog = new AddNodeLog();
@@ -910,28 +927,28 @@ public class MetaGroupMember extends RaftMember {
       addNodeLog.setCurrLogIndex(logManager.getLastLogIndex() + 1);
       addNodeLog.setMetaLogIndex(logManager.getLastLogIndex() + 1);
 
-      addNodeLog.setNewNode(node);
+      addNodeLog.setNewNode(newNode);
 
       logManager.append(addNodeLog);
 
       int retryTime = 1;
       while (true) {
         logger
-            .info("Send the join request of {} to other nodes, retry time: 
{}", node, retryTime);
+            .info("Send the join request of {} to other nodes, retry time: 
{}", newNode, retryTime);
         AppendLogResult result = sendLogToFollowers(addNodeLog);
         switch (result) {
           case OK:
             commitLog(addNodeLog);
-            logger.info("Join request of {} is accepted", node);
+            logger.info("Join request of {} is accepted", newNode);
 
             synchronized (partitionTable) {
               response.setPartitionTableBytes(partitionTable.serialize());
             }
             response.setRespNum((int) Response.RESPONSE_AGREE);
-            logger.info("Sending join response of {}", node);
+            logger.info("Sending join response of {}", newNode);
             return true;
           case TIME_OUT:
-            logger.info("Join request of {} timed out", node);
+            logger.info("Join request of {} timed out", newNode);
             retryTime++;
             continue;
           case LEADERSHIP_STALE:
@@ -1606,6 +1623,7 @@ public class MetaGroupMember extends RaftMember {
         allRedirect = false;
       }
       if (tmpStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        logger.error("Fail to send log {} to data group {}", entry.getKey(), 
entry.getValue());
         // execution failed, record the error message
         errorCodePartitionGroups.add(String.format("[%s@%s:%s]",
             tmpStatus.getCode(), entry.getValue().getHeader(),
@@ -1715,7 +1733,7 @@ public class MetaGroupMember extends RaftMember {
 
   private TSStatus forwardDataPlanSync(PhysicalPlan plan, Node receiver, Node 
header)
       throws IOException {
-    Client client = null;
+    Client client;
     try {
       client = getClientProvider().getSyncDataClient(receiver,
           RaftServer.getWriteOperationTimeoutMS());
@@ -1726,8 +1744,6 @@ public class MetaGroupMember extends RaftMember {
   }
 
   /**
-=======
->>>>>>> master
    * Get the data groups that should be queried when querying "path" with 
"filter". First, the time
    * interval qualified by the filter will be extracted. If any side of the 
interval is open, query
    * all groups. Otherwise compute all involved groups w.r.t. the time 
partitioning.
@@ -1776,14 +1792,14 @@ public class MetaGroupMember extends RaftMember {
   }
 
   @SuppressWarnings("java:S2274")
-  public Map<Node, Boolean> getAllNodeStatus() {
+  public Map<Node, Integer> getAllNodeStatus() {
     if (getPartitionTable() == null) {
       // the cluster is being built.
       return null;
     }
-    Map<Node, Boolean> nodeStatus = new HashMap<>();
+    Map<Node, Integer> nodeStatus = new HashMap<>();
     for (Node node : allNodes) {
-      nodeStatus.put(node, thisNode.equals(node));
+      nodeStatus.put(node, thisNode.equals(node) ? 0 : 1);
     }
 
     try {
@@ -1798,11 +1814,20 @@ public class MetaGroupMember extends RaftMember {
       Thread.currentThread().interrupt();
       logger.warn("Cannot get the status of all nodes", e);
     }
+
+    for (Node node: partitionTable.getAllNodes()) {
+      nodeStatus.putIfAbsent(node, 2);
+    }
+    for (Node node : allNodes) {
+      if (!partitionTable.getAllNodes().contains(node)) {
+        nodeStatus.put(node, 3);
+      }
+    }
     return nodeStatus;
   }
 
   @SuppressWarnings({"java:S2445", "java:S2274"})
-  private void getNodeStatusAsync(Map<Node, Boolean> nodeStatus)
+  private void getNodeStatusAsync(Map<Node, Integer> nodeStatus)
       throws TException, InterruptedException {
     NodeStatusHandler nodeStatusHandler = new NodeStatusHandler(nodeStatus);
     synchronized (nodeStatus) {
@@ -1816,7 +1841,7 @@ public class MetaGroupMember extends RaftMember {
     }
   }
 
-  private void getNodeStatusSync(Map<Node, Boolean> nodeStatus) {
+  private void getNodeStatusSync(Map<Node, Integer> nodeStatus) {
     NodeStatusHandler nodeStatusHandler = new NodeStatusHandler(nodeStatus);
     for (Node node : allNodes) {
       SyncMetaClient client = (SyncMetaClient) getSyncClient(node);
@@ -1902,10 +1927,14 @@ public class MetaGroupMember extends RaftMember {
       return Response.RESPONSE_REJECT;
     }
 
+    if (partitionTable.getAllNodes().contains(target) && 
partitionTable.getAllNodes().size() != allNodes.size()) {
+      return Response.RESPONSE_CHANGE_MEMBERSHIP_CONFLICT;
+    }
+
     // node removal must be serialized to reduce potential concurrency problem
     synchronized (logManager) {
       // update partition table
-      partitionTable.addNode(node);
+      partitionTable.removeNode(node);
       ((SlotPartitionTable) 
partitionTable).setLastLogIndex(logManager.getLastLogIndex() + 1);
 
       RemoveNodeLog removeNodeLog = new RemoveNodeLog();
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index d05ddd9..2cf5f77 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -907,7 +907,6 @@ public abstract class RaftMember {
 
     try {
       if (appendLogInGroup(log)) {
-        TSStatus res = StatusUtils.OK;
         return StatusUtils.OK;
       }
     } catch (LogExecutionException e) {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
index 890b402..f1f46d2 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
@@ -134,7 +134,7 @@ public class ClusterMonitor implements ClusterMonitorMBean, 
IService {
   }
 
   @Override
-  public Map<Node, Boolean> getAllNodeStatus() {
+  public Map<Node, Integer> getAllNodeStatus() {
     MetaGroupMember metaGroupMember = getMetaGroupMember();
     if (metaGroupMember == null) {
       return null;
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitorMBean.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitorMBean.java
index ea52c28..cc3e7b7 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitorMBean.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitorMBean.java
@@ -66,9 +66,9 @@ public interface ClusterMonitorMBean {
   /**
    * Get status of all nodes
    *
-   * @return key: node, value: live or not
+   * @return key: node, value: 0(live), 1(offline), 2(joining), 3(leaving)
    */
-  Map<Node, Boolean> getAllNodeStatus();
+  Map<Node, Integer> getAllNodeStatus();
 
   /**
    *
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Host.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Host.java
index da4305d..d32b94d 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Host.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Host.java
@@ -63,7 +63,7 @@ public class Host extends NodeToolCmd {
       for (int i = 1; i < raftGroup.size(); i++) {
         builder.append(", ").append(nodeToString(raftGroup.get(i)));
       }
-      builder.append(')');
+      builder.append("),id=").append(raftGroup.getId());
       msgPrintln(String.format("%-50s->%20s", builder.toString(), slotNum));
     }
   }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Status.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Status.java
index 700990a..fe8eb0f 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Status.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/function/Status.java
@@ -22,6 +22,7 @@ import static 
org.apache.iotdb.cluster.utils.nodetool.Printer.msgPrintln;
 
 import io.airlift.airline.Command;
 import java.util.Map;
+import java.util.Map.Entry;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitorMBean;
 
@@ -30,15 +31,26 @@ public class Status extends NodeToolCmd {
 
   @Override
   public void execute(ClusterMonitorMBean proxy) {
-    Map<Node, Boolean> statusMap = proxy.getAllNodeStatus();
-    if(statusMap == null){
+    Map<Node, Integer> statusMap = proxy.getAllNodeStatus();
+    if (statusMap == null) {
       msgPrintln(BUILDING_CLUSTER_INFO);
       return;
     }
     msgPrintln(String.format("%-30s  %10s", "Node", "Status"));
-    statusMap.forEach(
-        (node, status) -> msgPrintln(String.format("%-30s->%10s", 
nodeToString(node),
-            (Boolean.TRUE.equals(status) ?
-            "on" : "off"))));
+    for (Entry<Node, Integer> entry : statusMap.entrySet()) {
+      Node node = entry.getKey();
+      Integer statusNum = entry.getValue();
+      String status;
+      if (statusNum == 0) {
+        status = "on";
+      } else if (statusNum == 1) {
+        status = "off";
+      } else if (statusNum == 2) {
+        status = "joining";
+      } else {
+        status = "leaving";
+      }
+      msgPrintln(String.format("%-30s->%10s", nodeToString(node), status));
+    }
   }
 }
\ No newline at end of file
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java 
b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
index 76efe5f..59874b4 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.cluster.log;
 
 import static org.junit.Assert.assertEquals;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import org.apache.iotdb.cluster.common.TestUtils;
 import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
@@ -29,8 +30,12 @@ import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
 import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
+import org.apache.iotdb.cluster.partition.PartitionGroup;
+import org.apache.iotdb.cluster.utils.PlanSerializer;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.LogPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
 import org.junit.Test;
 
@@ -98,4 +103,13 @@ public class LogParserTest {
     Log serialized = logParser.parse(byteBuffer);
     assertEquals(log, serialized);
   }
+
+  @Test
+  public void testLogPlan() throws IOException, IllegalPathException, 
UnknownLogTypeException {
+    AddNodeLog log = new AddNodeLog(TestUtils.seralizePartitionTable, 
TestUtils.getNode(0));
+    LogPlan logPlan = new LogPlan(log.serialize());
+    ByteBuffer buffer = 
ByteBuffer.wrap(PlanSerializer.getInstance().serialize(logPlan));
+    PhysicalPlan plan = PhysicalPlan.Factory.create(buffer);
+    LogParser.getINSTANCE().parse(((LogPlan) plan).getLog());
+  }
 }
\ No newline at end of file
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java
index bdc19c5..d0118db 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java
@@ -43,6 +43,11 @@ public class LogPlan extends PhysicalPlan {
     this.log = log;
   }
 
+  public LogPlan(LogPlan plan) {
+    super(false);
+    this.log = plan.log;
+  }
+
   public ByteBuffer getLog() {
     log.clear();
     return log;
@@ -65,8 +70,10 @@ public class LogPlan extends PhysicalPlan {
   }
 
   @Override
-  public void serialize(ByteBuffer buffer) {
+  public void deserialize(ByteBuffer buffer) {
     int len = buffer.getInt();
-    log = ByteBuffer.wrap(buffer.array(), buffer.position(), len);
+    byte[] data = new byte[len];
+    System.arraycopy(buffer.array(), buffer.position(), data, 0, len);
+    log = ByteBuffer.wrap(data);
   }
 }

Reply via email to