This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch consensus_module_refactor
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1608c75beb77c755a703e2ec6b6c2f7fbc7cb775
Author: OneSizeFitQuorum <[email protected]>
AuthorDate: Wed Aug 16 15:05:07 2023 +0800

    init
    
    Signed-off-by: OneSizeFitQuorum <[email protected]>
---
 .../manager/consensus/ConsensusManager.java        |   6 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |   2 +-
 .../org/apache/iotdb/consensus/IConsensus.java     | 128 +++++------
 .../exception/RatisUnderRecoveryException.java     |   2 +-
 .../apache/iotdb/consensus/iot/IoTConsensus.java   | 155 +++++---------
 .../iotdb/consensus/ratis/RatisConsensus.java      | 234 +++++++++------------
 .../iotdb/consensus/simple/SimpleConsensus.java    | 140 ++++++------
 ...verImpl.java => SimpleConsensusServerImpl.java} |   4 +-
 .../apache/iotdb/consensus/iot/ReplicateTest.java  |  12 +-
 .../apache/iotdb/consensus/iot/StabilityTest.java  |  16 +-
 .../iotdb/consensus/ratis/RatisConsensusTest.java  |  60 +++---
 .../iotdb/consensus/ratis/RecoverReadTest.java     |   8 +-
 .../iotdb/consensus/simple/RecoveryTest.java       |   4 +-
 .../consensus/simple/SimpleConsensusTest.java      |  35 ++-
 .../impl/DataNodeInternalRPCServiceImpl.java       |   8 +-
 .../thrift/impl/DataNodeRegionManager.java         |   4 +-
 .../execution/executor/RegionWriteExecutor.java    |  94 ++++-----
 .../iotdb/db/service/RegionMigrateService.java     |  12 +-
 .../DataNodeInternalRPCServiceImplTest.java        |   4 +-
 .../apache/iotdb/commons/utils/StatusUtils.java    |   2 +-
 20 files changed, 403 insertions(+), 527 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
index 740390d902d..95182ce9ab1 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
@@ -250,7 +250,7 @@ public class ConsensusManager {
               configNodeLocation.getConfigNodeId(),
               configNodeLocation.getConsensusEndPoint()));
     }
-    consensusImpl.createPeer(DEFAULT_CONSENSUS_GROUP_ID, peerList);
+    consensusImpl.createLocalPeer(DEFAULT_CONSENSUS_GROUP_ID, peerList);
   }
 
   /**
@@ -262,7 +262,7 @@ public class ConsensusManager {
   public void addConfigNodePeer(TConfigNodeLocation configNodeLocation) throws 
AddPeerException {
     boolean result =
         consensusImpl
-            .addPeer(
+            .addRemotePeer(
                 DEFAULT_CONSENSUS_GROUP_ID,
                 new Peer(
                     DEFAULT_CONSENSUS_GROUP_ID,
@@ -284,7 +284,7 @@ public class ConsensusManager {
    */
   public boolean removeConfigNodePeer(TConfigNodeLocation configNodeLocation) {
     return consensusImpl
-        .removePeer(
+        .removeRemotePeer(
             DEFAULT_CONSENSUS_GROUP_ID,
             new Peer(
                 DEFAULT_CONSENSUS_GROUP_ID,
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 8465fdc6c80..4d352cf59db 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -625,7 +625,7 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
 
     ConsensusGroupId groupId = 
configManager.getConsensusManager().getConsensusGroupId();
     ConsensusGenericResponse resp =
-        
configManager.getConsensusManager().getConsensusImpl().deletePeer(groupId);
+        
configManager.getConsensusManager().getConsensusImpl().deleteLocalPeer(groupId);
     if (!resp.isSuccess()) {
       return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode())
           .setMessage(
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
index 8a101e721e5..1f51c09c743 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
@@ -19,12 +19,12 @@
 
 package org.apache.iotdb.consensus;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
-import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.consensus.exception.ConsensusException;
 
 import javax.annotation.concurrent.ThreadSafe;
@@ -32,120 +32,96 @@ import javax.annotation.concurrent.ThreadSafe;
 import java.io.IOException;
 import java.util.List;
 
-/** Consensus module base class. */
+/**
+ * Consensus module base interface.
+ */
 @ThreadSafe
 public interface IConsensus {
 
+  /**
+   * Start the consensus module
+   */
   void start() throws IOException;
 
+  /**
+   * Stop the consensus module
+   */
   void stop() throws IOException;
 
-  // write API
-  ConsensusWriteResponse write(ConsensusGroupId groupId, IConsensusRequest 
request);
+  /**
+   * Write data to the corresponding consensus group
+   *
+   * @param groupId the consensus group this request belongs
+   * @param request write request
+   */
+  TSStatus write(ConsensusGroupId groupId, IConsensusRequest request) throws 
ConsensusException;
 
-  // read API
-  ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest 
request);
+  /**
+   * Read data from the corresponding consensus group
+   *
+   * @param groupId the consensus group this request belongs
+   * @param request read request
+   */
+  DataSet read(ConsensusGroupId groupId, IConsensusRequest request) throws 
ConsensusException;
 
   // multi consensus group API
 
   /**
    * Require the <em>local node</em> to create a Peer and become a member of 
the given consensus
    * group. This node will prepare and initialize local statemachine {@link 
IStateMachine} and other
-   * data structures. After this method returns, we can call {@link 
#addPeer(ConsensusGroupId,
-   * Peer)} to notify original group that this new Peer is prepared to be 
added into the latest
-   * configuration. createPeer should be called on a node that does not 
contain any peer of the
-   * consensus group, to avoid one node having more than one replica.
+   * data structures. After this method returns, we can call
+   * {@link #addRemotePeer(ConsensusGroupId, Peer)} to notify original group 
that this new Peer is
+   * prepared to be added into the latest configuration. createLocalPeer 
should be called on a node
+   * that does not contain any peer of the consensus group, to avoid one node 
having more than one
+   * replica.
    *
    * @param groupId the consensus group this Peer belongs
-   * @param peers other known peers in this group
+   * @param peers   other known peers in this group
    */
-  ConsensusGenericResponse createPeer(ConsensusGroupId groupId, List<Peer> 
peers);
+  void createLocalPeer(ConsensusGroupId groupId, List<Peer> peers) throws 
ConsensusException;
 
   /**
    * When the <em>local node</em> is no longer a member of the given consensus 
group, call this
    * method to do cleanup works. This method will close local statemachine 
{@link IStateMachine},
-   * delete local data and do other cleanup works. Be sure this method is 
called after successfully
-   * removing this peer from current consensus group configuration (by calling 
{@link
-   * #removePeer(ConsensusGroupId, Peer)} or {@link 
#changePeer(ConsensusGroupId, List)}).
+   * delete local data and do other cleanup works. deleteLocalPeer should be 
called after
+   * successfully removing this peer from current consensus group 
configuration (by calling
+   * {@link #removeRemotePeer(ConsensusGroupId, Peer)}).
    *
    * @param groupId the consensus group this Peer used to belong
    */
-  ConsensusGenericResponse deletePeer(ConsensusGroupId groupId);
+  void deleteLocalPeer(ConsensusGroupId groupId) throws ConsensusException;
 
   // single consensus group API
 
   /**
-   * Tell the group that a new Peer is prepared to be added into this group. 
Call {@link
-   * #createPeer(ConsensusGroupId, List)} on the new Peer before calling this 
method. When this
-   * method returns, the group data should be already transmitted to the new 
Peer. That is, the new
-   * peer is available to answer client requests by the time this method 
successfully returns.
-   * addPeer should be called on a living peer of the consensus group. For 
example: We'd like to add
-   * a peer D to (A, B, C) group. We need to execute addPeer in A, B or C.
+   * Tell the group that a new Peer is prepared to be added into this group. 
Call
+   * {@link #createLocalPeer(ConsensusGroupId, List)} on the new Peer before 
calling this method.
+   * When this method returns, the group data should be already transmitted to 
the new Peer. That
+   * is, the new peer is available to answer client requests by the time this 
method successfully
+   * returns. addRemotePeer should be called on a living peer of the consensus 
group. For example:
+   * We'd like to add a peer D to (A, B, C) group. We need to execute addPeer 
in A, B or C.
    *
    * @param groupId the consensus group this peer belongs
-   * @param peer the newly added peer
+   * @param peer    the newly added peer
    */
-  ConsensusGenericResponse addPeer(ConsensusGroupId groupId, Peer peer);
+  void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws 
ConsensusException;
 
   /**
    * Tell the group to remove an active Peer. The removed peer can no longer 
answer group requests
-   * when this method successfully returns. Call {@link 
#deletePeer(ConsensusGroupId)} on the
-   * removed Peer to do cleanup jobs after this method successfully returns. 
removePeer should be
-   * called on a living peer of its consensus group. For example: a group has 
A, B, C. We'd like to
-   * remove C, in case C is dead, the removePeer should be sent to A or B.
+   * when this method successfully returns. Call {@link 
#deleteLocalPeer(ConsensusGroupId)} on the
+   * removed Peer to do cleanup jobs after this method successfully returns. 
removeRemotePeer should
+   * be called on a living peer of its consensus group. For example: a group 
has A, B, C. We'd like
+   * to remove C, in case C is dead, the removePeer should be sent to A or B.
    *
    * @param groupId the consensus group this peer belongs
-   * @param peer the peer to be removed
-   */
-  ConsensusGenericResponse removePeer(ConsensusGroupId groupId, Peer peer);
-
-  /**
-   * Tell the group to update an active Peer. The modifiable part of {@link 
Peer} is TEndPoint.
-   *
-   * @param groupId the consensus group this peer belongs
-   * @param oldPeer the peer to be updated
-   * @param newPeer the peer to be updated to
-   */
-  // TODO: @SzyWilliam @SpriCoder Please implement this method
-  ConsensusGenericResponse updatePeer(ConsensusGroupId groupId, Peer oldPeer, 
Peer newPeer);
-
-  /**
-   * Change group configuration. This method allows you to add/remove multiple 
Peers at once. This
-   * method is similar to {@link #addPeer(ConsensusGroupId, Peer)} or {@link
-   * #removePeer(ConsensusGroupId, Peer)}
-   *
-   * @param groupId the consensus group
-   * @param newPeers the new member configuration of this group
-   */
-  ConsensusGenericResponse changePeer(ConsensusGroupId groupId, List<Peer> 
newPeers);
-
-  /**
-   * Tell the group to [create a new Peer on new node] and [add this member to 
join the group].
-   *
-   * <p>The underlying implementation should <br>
-   * 1. first call createPeer on the new member <br>
-   * 2. then call addPeer for configuration change <br>
-   * Call this method on any node of the <em>original group</em>. <br>
-   * NOTICE: Currently only RatisConsensus implements this method.
-   *
-   * @param groupId the consensus group
-   * @param newNode the new member
-   * @param originalGroup the original members of the existed group
+   * @param peer    the peer to be removed
    */
-  default ConsensusGenericResponse addNewNodeToExistedGroup(
-      ConsensusGroupId groupId, Peer newNode, List<Peer> originalGroup) {
-    return ConsensusGenericResponse.newBuilder()
-        .setSuccess(false)
-        .setException(
-            new ConsensusException(
-                "addNewNodeToExistedGroup method is not implemented by " + 
this + " class"))
-        .build();
-  }
+  void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws 
ConsensusException;
 
   // management API
-  ConsensusGenericResponse transferLeader(ConsensusGroupId groupId, Peer 
newLeader);
+  void transferLeader(ConsensusGroupId groupId, Peer newLeader) throws 
ConsensusException;
 
-  ConsensusGenericResponse triggerSnapshot(ConsensusGroupId groupId);
+  void triggerSnapshot(ConsensusGroupId groupId) throws ConsensusException;
 
   boolean isLeader(ConsensusGroupId groupId);
 
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisUnderRecoveryException.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisUnderRecoveryException.java
index 762b7ea432c..f614c2bd3f0 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisUnderRecoveryException.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisUnderRecoveryException.java
@@ -24,7 +24,7 @@ public class RatisUnderRecoveryException extends 
ConsensusException {
 
   public RatisUnderRecoveryException(Throwable cause) {
     super(
-        "Ratis Server is redoing Raft Log and cannot serve read requests now. 
Please try read later: "
+        "Raft Server is redoing Raft Log and cannot serve read requests now. 
Please try read later: "
             + cause,
         cause);
   }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 047d557ad55..aee5a970ec3 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -19,16 +19,20 @@
 
 package org.apache.iotdb.consensus.iot;
 
+import java.util.Optional;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.service.RegisterManager;
 import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.IStateMachine.Registry;
+import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
@@ -42,6 +46,7 @@ import 
org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
 import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
+import 
org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
 import org.apache.iotdb.consensus.iot.client.AsyncIoTConsensusServiceClient;
 import 
org.apache.iotdb.consensus.iot.client.IoTConsensusClientPool.AsyncIoTConsensusServiceClientPoolFactory;
 import 
org.apache.iotdb.consensus.iot.client.IoTConsensusClientPool.SyncIoTConsensusServiceClientPoolFactory;
@@ -49,9 +54,11 @@ import 
org.apache.iotdb.consensus.iot.client.SyncIoTConsensusServiceClient;
 import org.apache.iotdb.consensus.iot.logdispatcher.IoTConsensusMemoryManager;
 import org.apache.iotdb.consensus.iot.service.IoTConsensusRPCService;
 import org.apache.iotdb.consensus.iot.service.IoTConsensusRPCServiceProcessor;
+import org.apache.iotdb.consensus.simple.SimpleConsensusServerImpl;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.ratis.protocol.GroupManagementRequest.Op;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -105,7 +112,7 @@ public class IoTConsensus implements IConsensus {
   }
 
   @Override
-  public void start() throws IOException {
+  public synchronized void start() throws IOException {
     initAndRecover();
     service.initAsyncedServiceImpl(new IoTConsensusRPCServiceProcessor(this));
     try {
@@ -118,7 +125,7 @@ public class IoTConsensus implements IConsensus {
   private void initAndRecover() throws IOException {
     if (!storageDir.exists()) {
       if (!storageDir.mkdirs()) {
-        logger.warn("Unable to create consensus dir at {}", storageDir);
+        throw new IOException(String.format("Unable to create consensus dir at 
%s", storageDir));
       }
     } else {
       try (DirectoryStream<Path> stream = 
Files.newDirectoryStream(storageDir.toPath())) {
@@ -144,7 +151,7 @@ public class IoTConsensus implements IConsensus {
   }
 
   @Override
-  public void stop() {
+  public synchronized void stop() {
     
stateMachineMap.values().parallelStream().forEach(IoTConsensusServerImpl::stop);
     clientManager.close();
     syncClientManager.close();
@@ -152,62 +159,52 @@ public class IoTConsensus implements IConsensus {
   }
 
   @Override
-  public ConsensusWriteResponse write(ConsensusGroupId groupId, 
IConsensusRequest request) {
-    IoTConsensusServerImpl impl = stateMachineMap.get(groupId);
-    if (impl == null) {
-      return ConsensusWriteResponse.newBuilder()
-          .setException(new ConsensusGroupNotExistException(groupId))
-          .build();
-    }
-
-    TSStatus status;
+  public TSStatus write(ConsensusGroupId groupId, IConsensusRequest request)
+      throws ConsensusException {
+    IoTConsensusServerImpl impl = 
Optional.ofNullable(stateMachineMap.get(groupId))
+        .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
     if (impl.isReadOnly()) {
-      status = new TSStatus(TSStatusCode.SYSTEM_READ_ONLY.getStatusCode());
-      status.setMessage("Fail to do non-query operations because system is 
read-only.");
+      return StatusUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY);
     } else if (!impl.isActive()) {
-      // TODO: (xingtanzjr) whether we need to define a new status to indicate 
the inactive status ?
-      status = RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT);
-      status.setMessage("peer is inactive and not ready to receive sync log 
request.");
+      return RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT,
+          "peer is inactive and not ready to receive sync log request.");
     } else {
-      status = impl.write(request);
+      return impl.write(request);
     }
-    return ConsensusWriteResponse.newBuilder().setStatus(status).build();
   }
 
   @Override
-  public ConsensusReadResponse read(ConsensusGroupId groupId, 
IConsensusRequest request) {
-    IoTConsensusServerImpl impl = stateMachineMap.get(groupId);
-    if (impl == null) {
-      return ConsensusReadResponse.newBuilder()
-          .setException(new ConsensusGroupNotExistException(groupId))
-          .build();
-    }
-    return 
ConsensusReadResponse.newBuilder().setDataSet(impl.read(request)).build();
+  public DataSet read(ConsensusGroupId groupId, IConsensusRequest request)
+      throws ConsensusException {
+    return Optional.ofNullable(stateMachineMap.get(groupId))
+        .orElseThrow(() -> new ConsensusGroupNotExistException(groupId))
+        .read(request);
   }
 
+  @SuppressWarnings("java:S2201")
   @Override
-  public ConsensusGenericResponse createPeer(ConsensusGroupId groupId, 
List<Peer> peers) {
+  public void createLocalPeer(ConsensusGroupId groupId, List<Peer> peers)
+      throws ConsensusException {
     int consensusGroupSize = peers.size();
     if (consensusGroupSize == 0) {
-      return ConsensusGenericResponse.newBuilder()
-          .setException(new IllegalPeerNumException(consensusGroupSize))
-          .build();
+      throw new IllegalPeerNumException(consensusGroupSize);
     }
     if (!peers.contains(new Peer(groupId, thisNodeId, thisNode))) {
-      return ConsensusGenericResponse.newBuilder()
-          .setException(new IllegalPeerEndpointException(thisNode, peers))
-          .build();
+      throw new IllegalPeerEndpointException(thisNode, peers);
     }
     AtomicBoolean exist = new AtomicBoolean(true);
-    stateMachineMap.computeIfAbsent(
+    Optional.ofNullable(stateMachineMap.computeIfAbsent(
         groupId,
         k -> {
           exist.set(false);
+
           String path = buildPeerDir(storageDir, groupId);
           File file = new File(path);
           if (!file.mkdirs()) {
             logger.warn("Unable to create consensus dir for group {} at {}", 
groupId, path);
+            return null;
           }
+
           IoTConsensusServerImpl impl =
               new IoTConsensusServerImpl(
                   path,
@@ -219,17 +216,15 @@ public class IoTConsensus implements IConsensus {
                   config);
           impl.start();
           return impl;
-        });
+        })).orElseThrow(() -> new ConsensusException(
+        String.format("Unable to create consensus dir for group %s", 
groupId)));
     if (exist.get()) {
-      return ConsensusGenericResponse.newBuilder()
-          .setException(new ConsensusGroupAlreadyExistException(groupId))
-          .build();
+      throw new ConsensusGroupAlreadyExistException(groupId);
     }
-    return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
   }
 
   @Override
-  public ConsensusGenericResponse deletePeer(ConsensusGroupId groupId) {
+  public void deleteLocalPeer(ConsensusGroupId groupId) throws 
ConsensusException {
     AtomicBoolean exist = new AtomicBoolean(false);
     stateMachineMap.computeIfPresent(
         groupId,
@@ -239,22 +234,17 @@ public class IoTConsensus implements IConsensus {
           FileUtils.deleteDirectory(new File(buildPeerDir(storageDir, 
groupId)));
           return null;
         });
-
     if (!exist.get()) {
-      return ConsensusGenericResponse.newBuilder()
-          .setException(new ConsensusGroupNotExistException(groupId))
-          .build();
+      throw new ConsensusGroupNotExistException(groupId);
     }
-    return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
   }
 
   @Override
-  public ConsensusGenericResponse addPeer(ConsensusGroupId groupId, Peer peer) 
{
-    IoTConsensusServerImpl impl = stateMachineMap.get(groupId);
-    if (impl == null) {
-      return ConsensusGenericResponse.newBuilder()
-          .setException(new ConsensusGroupNotExistException(groupId))
-          .build();
+  public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws 
ConsensusException {
+    IoTConsensusServerImpl impl = 
Optional.ofNullable(stateMachineMap.get(groupId))
+        .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+    if (impl.getConfiguration().contains(peer)) {
+      throw new PeerAlreadyInConsensusGroupException(groupId, peer);
     }
     try {
       // step 1: inactive new Peer to prepare for following steps
@@ -287,14 +277,9 @@ public class IoTConsensus implements IConsensus {
       doSpotClean(peer, impl);
 
     } catch (ConsensusGroupModifyPeerException e) {
-      logger.error("cannot execute addPeer() for {}", peer, e);
-      return ConsensusGenericResponse.newBuilder()
-          .setSuccess(false)
-          .setException(new ConsensusException(e.getMessage()))
-          .build();
+      logger.error(String.format("cannot execute addPeer() for %s", peer), e);
+      throw new ConsensusException(e.getMessage());
     }
-
-    return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
   }
 
   private void doSpotClean(Peer peer, IoTConsensusServerImpl impl) {
@@ -306,21 +291,16 @@ public class IoTConsensus implements IConsensus {
   }
 
   @Override
-  public ConsensusGenericResponse removePeer(ConsensusGroupId groupId, Peer 
peer) {
-    IoTConsensusServerImpl impl = stateMachineMap.get(groupId);
-    if (impl == null) {
-      return ConsensusGenericResponse.newBuilder()
-          .setException(new ConsensusGroupNotExistException(groupId))
-          .build();
-    }
+  public void removeRemotePeer(ConsensusGroupId groupId, Peer peer)
+      throws ConsensusException {
+    IoTConsensusServerImpl impl = 
Optional.ofNullable(stateMachineMap.get(groupId))
+        .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+
     try {
       // let other peers remove the sync channel with target peer
       impl.notifyPeersToRemoveSyncLogChannel(peer);
     } catch (ConsensusGroupModifyPeerException e) {
-      return ConsensusGenericResponse.newBuilder()
-          .setSuccess(false)
-          .setException(new ConsensusException(e.getMessage()))
-          .build();
+      throw new ConsensusException(e.getMessage());
     }
 
     try {
@@ -330,44 +310,25 @@ public class IoTConsensus implements IConsensus {
       impl.waitTargetPeerUntilSyncLogCompleted(peer);
     } catch (ConsensusGroupModifyPeerException e) {
       // we only log warning here because sometimes the target peer may 
already be down
-      logger.warn("cannot wait {} to complete SyncLog. error message: {}", 
peer, e.getMessage());
+      logger.warn(String.format("cannot wait %s to complete SyncLog.", peer), 
e);
+      throw new ConsensusException(e.getMessage());
     }
-
-    return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
-  }
-
-  @Override
-  public ConsensusGenericResponse updatePeer(ConsensusGroupId groupId, Peer 
oldPeer, Peer newPeer) {
-    return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
   }
 
   @Override
-  public ConsensusGenericResponse changePeer(ConsensusGroupId groupId, 
List<Peer> newPeers) {
-    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+  public void transferLeader(ConsensusGroupId groupId, Peer newLeader) throws 
ConsensusException {
+    throw new ConsensusException("IoTConsensus does not support leader 
transfer");
   }
 
   @Override
-  public ConsensusGenericResponse transferLeader(ConsensusGroupId groupId, 
Peer newLeader) {
-    return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
-  }
-
-  @Override
-  public ConsensusGenericResponse triggerSnapshot(ConsensusGroupId groupId) {
-    IoTConsensusServerImpl impl = stateMachineMap.get(groupId);
-    if (impl == null) {
-      return ConsensusGenericResponse.newBuilder()
-          .setException(new ConsensusGroupNotExistException(groupId))
-          .build();
-    }
+  public void triggerSnapshot(ConsensusGroupId groupId) throws 
ConsensusException {
+    IoTConsensusServerImpl impl = 
Optional.ofNullable(stateMachineMap.get(groupId))
+        .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
     try {
       impl.takeSnapshot();
     } catch (ConsensusGroupModifyPeerException e) {
-      return ConsensusGenericResponse.newBuilder()
-          .setSuccess(false)
-          .setException(new ConsensusException(e.getMessage()))
-          .build();
+      throw new ConsensusException(e.getMessage());
     }
-    return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
   }
 
   @Override
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 27c8bb0d271..6d3d1e5567c 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.consensus.ratis;
 
+import java.util.Optional;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
@@ -45,6 +46,7 @@ import 
org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
 import org.apache.iotdb.consensus.config.RatisConfig;
 import org.apache.iotdb.consensus.exception.ConsensusException;
+import 
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.consensus.exception.NodeReadOnlyException;
 import 
org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
@@ -73,8 +75,12 @@ import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.SnapshotManagementRequest;
+import org.apache.ratis.protocol.exceptions.AlreadyExistsException;
+import org.apache.ratis.protocol.exceptions.GroupMismatchException;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.apache.ratis.protocol.exceptions.RaftException;
+import org.apache.ratis.protocol.exceptions.ReadException;
+import org.apache.ratis.protocol.exceptions.ReadIndexException;
 import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
 import org.apache.ratis.server.DivisionInfo;
 import org.apache.ratis.server.RaftServer;
@@ -97,12 +103,16 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
-/** A multi-raft consensus implementation based on Apache Ratis. */
+/**
+ * A multi-raft consensus implementation based on Apache Ratis.
+ */
 class RatisConsensus implements IConsensus {
 
   private static final Logger logger = 
LoggerFactory.getLogger(RatisConsensus.class);
 
-  /** the unique net communication endpoint */
+  /**
+   * the unique net communication endpoint
+   */
   private final RaftPeer myself;
 
   private final RaftServer server;
@@ -110,7 +120,7 @@ class RatisConsensus implements IConsensus {
   private final RaftProperties properties = new RaftProperties();
   private final RaftClientRpc clientRpc;
 
-  private final IClientManager<RaftGroup, RatisClient> clientManager;
+  private final IClientManager<RaftGroup, 
org.apache.iotdb.consensus.ratis.RatisClient> clientManager;
 
   private final Map<RaftGroupId, RaftGroup> lastSeen = new 
ConcurrentHashMap<>();
 
@@ -120,7 +130,6 @@ class RatisConsensus implements IConsensus {
   private static final int DEFAULT_PRIORITY = 0;
   private static final int LEADER_PRIORITY = 1;
 
-  /** TODO make it configurable */
   private static final int DEFAULT_WAIT_LEADER_READY_TIMEOUT = (int) 
TimeUnit.SECONDS.toMillis(20);
 
   private final ExecutorService addExecutor;
@@ -159,7 +168,7 @@ class RatisConsensus implements IConsensus {
             ThreadName.RATIS_BG_DISK_GUARDIAN.getName());
 
     clientManager =
-        new IClientManager.Factory<RaftGroup, RatisClient>()
+        new IClientManager.Factory<RaftGroup, 
org.apache.iotdb.consensus.ratis.RatisClient>()
             .createClientManager(new RatisClientPoolFactory());
 
     clientRpc = new GrpcFactory(new 
Parameters()).newRaftClientRpc(ClientId.randomId(), properties);
@@ -170,7 +179,7 @@ class RatisConsensus implements IConsensus {
             .setProperties(properties)
             .setStateMachineRegistry(
                 raftGroupId ->
-                    new ApplicationStateMachineProxy(
+                    new 
org.apache.iotdb.consensus.ratis.ApplicationStateMachineProxy(
                         
registry.apply(Utils.fromRaftGroupIdToConsensusGroupId(raftGroupId)),
                         raftGroupId,
                         canServeStaleRead))
@@ -178,14 +187,14 @@ class RatisConsensus implements IConsensus {
   }
 
   @Override
-  public void start() throws IOException {
+  public synchronized void start() throws IOException {
     MetricService.getInstance().addMetricSet(this.ratisMetricSet);
     server.start();
     startSnapshotGuardian();
   }
 
   @Override
-  public void stop() throws IOException {
+  public synchronized void stop() throws IOException {
     addExecutor.shutdown();
     diskGuardian.shutdown();
     try {
@@ -206,7 +215,9 @@ class RatisConsensus implements IConsensus {
     return !reply.isSuccess() && (reply.getException() instanceof 
ResourceUnavailableException);
   }
 
-  /** launch a consensus write with retry mechanism */
+  /**
+   * launch a consensus write with retry mechanism
+   */
   private RaftClientReply writeWithRetry(CheckedSupplier<RaftClientReply, 
IOException> caller)
       throws IOException {
 
@@ -245,120 +256,125 @@ class RatisConsensus implements IConsensus {
     return writeWithRetry(() -> server.submitClientRequest(request));
   }
 
-  private RaftClientReply writeRemotelyWithRetry(RatisClient client, Message 
message)
+  private RaftClientReply writeRemotelyWithRetry(
+      org.apache.iotdb.consensus.ratis.RatisClient client, Message message)
       throws IOException {
     return writeWithRetry(() -> client.getRaftClient().io().send(message));
   }
 
   /**
-   * write will first send request to local server use method call if local 
server is not leader, it
-   * will use RaftClient to send RPC to read leader
+   * write will first send request to local server using local method call. If 
local server is not
+   * leader, it will use RaftClient to send RPC to read leader
    */
   @Override
-  public ConsensusWriteResponse write(
-      ConsensusGroupId consensusGroupId, IConsensusRequest IConsensusRequest) {
+  public TSStatus write(ConsensusGroupId groupId, IConsensusRequest request)
+      throws ConsensusException {
     // pre-condition: group exists and myself server serves this group
-    RaftGroupId raftGroupId = 
Utils.fromConsensusGroupIdToRaftGroupId(consensusGroupId);
+    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
     RaftGroup raftGroup = getGroupInfo(raftGroupId);
     if (raftGroup == null || !raftGroup.getPeers().contains(myself)) {
-      return failedWrite(new 
ConsensusGroupNotExistException(consensusGroupId));
+      throw new ConsensusGroupNotExistException(groupId);
     }
 
     // current Peer is group leader and in ReadOnly State
-    if (isLeader(consensusGroupId) && Utils.rejectWrite()) {
+    if (isLeader(groupId) && Utils.rejectWrite()) {
       try {
         forceStepDownLeader(raftGroup);
       } catch (Exception e) {
         logger.warn("leader {} read only, force step down failed due to {}", 
myself, e);
       }
-      return failedWrite(new NodeReadOnlyException(myself));
+      throw new NodeReadOnlyException(myself);
     }
 
     // serialize request into Message
-    Message message = new RequestMessage(IConsensusRequest);
+    Message message = new RequestMessage(request);
 
     // 1. first try the local server
     RaftClientRequest clientRequest =
         buildRawRequest(raftGroupId, message, 
RaftClientRequest.writeRequestType());
 
-    RaftClientReply localServerReply;
     RaftPeer suggestedLeader = null;
-    if (isLeader(consensusGroupId) && waitUntilLeaderReady(raftGroupId)) {
+    if (isLeader(groupId) && waitUntilLeaderReady(raftGroupId)) {
       try (AutoCloseable ignored =
           
RatisMetricsManager.getInstance().startWriteLocallyTimer(consensusGroupType)) {
-        localServerReply = writeLocallyWithRetry(clientRequest);
+        RaftClientReply localServerReply = 
writeLocallyWithRetry(clientRequest);
         if (localServerReply.isSuccess()) {
-          ResponseMessage responseMessage = (ResponseMessage) 
localServerReply.getMessage();
-          TSStatus writeStatus = (TSStatus) responseMessage.getContentHolder();
-          return 
ConsensusWriteResponse.newBuilder().setStatus(writeStatus).build();
+          org.apache.iotdb.consensus.ratis.ResponseMessage responseMessage = 
(org.apache.iotdb.consensus.ratis.ResponseMessage) 
localServerReply.getMessage();
+          return 
Optional.ofNullable(responseMessage.getContentHolder()).map(TSStatus.class::cast)
+              .orElse(null);
         }
         NotLeaderException ex = localServerReply.getNotLeaderException();
-        if (ex != null) { // local server is not leader
+        if (ex != null) {
           suggestedLeader = ex.getSuggestedLeader();
         }
       } catch (Exception e) {
-        return failedWrite(new RatisRequestFailedException(e));
+        throw new RatisRequestFailedException(e);
       }
     }
 
     // 2. try raft client
     TSStatus writeResult;
     try (AutoCloseable ignored =
-            
RatisMetricsManager.getInstance().startWriteRemotelyTimer(consensusGroupType);
-        RatisClient client = getRaftClient(raftGroup)) {
+        
RatisMetricsManager.getInstance().startWriteRemotelyTimer(consensusGroupType);
+        org.apache.iotdb.consensus.ratis.RatisClient client = 
getRaftClient(raftGroup)) {
       RaftClientReply reply = writeRemotelyWithRetry(client, message);
       if (!reply.isSuccess()) {
-        return failedWrite(new 
RatisRequestFailedException(reply.getException()));
+        throw new RatisRequestFailedException(reply.getException());
       }
       writeResult = 
Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
     } catch (Exception e) {
-      return failedWrite(new RatisRequestFailedException(e));
+      throw new RatisRequestFailedException(e);
     }
 
     if (suggestedLeader != null) {
       TEndPoint leaderEndPoint = 
Utils.fromRaftPeerAddressToTEndPoint(suggestedLeader.getAddress());
       writeResult.setRedirectNode(new TEndPoint(leaderEndPoint.getIp(), 
leaderEndPoint.getPort()));
     }
-    return ConsensusWriteResponse.newBuilder().setStatus(writeResult).build();
+    return writeResult;
   }
 
-  /** Read directly from LOCAL COPY notice: May read stale data (not 
linearizable) */
+  /**
+   * Read directly from LOCAL COPY notice:
+   */
   @Override
-  public ConsensusReadResponse read(
-      ConsensusGroupId consensusGroupId, IConsensusRequest IConsensusRequest) {
-    RaftGroupId groupId = 
Utils.fromConsensusGroupIdToRaftGroupId(consensusGroupId);
-    RaftGroup group = getGroupInfo(groupId);
+  public DataSet read(ConsensusGroupId groupId, IConsensusRequest request)
+      throws ConsensusException {
+    RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
+    RaftGroup group = getGroupInfo(raftGroupId);
     if (group == null || !group.getPeers().contains(myself)) {
-      return failedRead(new ConsensusGroupNotExistException(consensusGroupId));
+      throw new ConsensusGroupNotExistException(groupId);
     }
 
     final boolean isLinearizableRead =
-        !canServeStaleRead.computeIfAbsent(consensusGroupId, id -> new 
AtomicBoolean(false)).get();
+        !canServeStaleRead.computeIfAbsent(groupId, id -> new 
AtomicBoolean(false)).get();
 
     RaftClientReply reply;
     try {
-      reply = doRead(groupId, IConsensusRequest, isLinearizableRead);
+      reply = doRead(raftGroupId, request, isLinearizableRead);
       // allow stale read if current linearizable read returns successfully
       if (isLinearizableRead) {
-        canServeStaleRead.get(consensusGroupId).set(true);
+        canServeStaleRead.get(groupId).set(true);
       }
-    } catch (Exception e) {
+    } catch (ReadException | ReadIndexException e) {
       if (isLinearizableRead) {
         // linearizable read failed. the RaftServer is recovering from Raft 
Log and cannot serve
         // read requests.
-        return failedRead(new RatisUnderRecoveryException(e));
+        throw new RatisUnderRecoveryException(e);
       } else {
-        return failedRead(new RatisRequestFailedException(e));
+        throw new RatisRequestFailedException(e);
       }
+    } catch (Exception e) {
+      throw new RatisRequestFailedException(e);
     }
-
     Message ret = reply.getMessage();
-    ResponseMessage readResponseMessage = (ResponseMessage) ret;
-    DataSet dataSet = (DataSet) readResponseMessage.getContentHolder();
-    return ConsensusReadResponse.newBuilder().setDataSet(dataSet).build();
+    org.apache.iotdb.consensus.ratis.ResponseMessage readResponseMessage = 
(org.apache.iotdb.consensus.ratis.ResponseMessage) ret;
+    return 
Optional.ofNullable(readResponseMessage.getContentHolder()).map(DataSet.class::cast)
+        .orElse(null);
   }
 
-  /** return a success raft client reply or throw an Exception */
+  /**
+   * return a success raft client reply or throw an Exception
+   */
   private RaftClientReply doRead(
       RaftGroupId gid, IConsensusRequest readRequest, boolean linearizable) 
throws Exception {
     final RaftClientRequest.Type readType =
@@ -390,25 +406,22 @@ class RatisConsensus implements IConsensus {
    * register self to the RaftGroup
    */
   @Override
-  public ConsensusGenericResponse createPeer(ConsensusGroupId groupId, 
List<Peer> peers) {
+  public void createLocalPeer(ConsensusGroupId groupId, List<Peer> peers)
+      throws ConsensusException {
     RaftGroup group = buildRaftGroup(groupId, peers);
-    // add RaftPeer myself to this RaftGroup
-    return addNewGroupToServer(group, myself);
-  }
-
-  private ConsensusGenericResponse addNewGroupToServer(RaftGroup group, 
RaftPeer server) {
-    RaftClientReply reply;
     RaftGroup clientGroup =
-        group.getPeers().isEmpty() ? RaftGroup.valueOf(group.getGroupId(), 
server) : group;
-    try (RatisClient client = getRaftClient(clientGroup)) {
-      reply = 
client.getRaftClient().getGroupManagementApi(server.getId()).add(group);
+        group.getPeers().isEmpty() ? RaftGroup.valueOf(group.getGroupId(), 
myself) : group;
+    try (org.apache.iotdb.consensus.ratis.RatisClient client = 
getRaftClient(clientGroup)) {
+      RaftClientReply reply = 
client.getRaftClient().getGroupManagementApi(myself.getId())
+          .add(group);
       if (!reply.isSuccess()) {
-        return failed(new RatisRequestFailedException(reply.getException()));
+        throw new RatisRequestFailedException(reply.getException());
       }
+    } catch (AlreadyExistsException e) {
+      throw new ConsensusGroupAlreadyExistException(groupId);
     } catch (Exception e) {
-      return failed(new RatisRequestFailedException(e));
+      throw new RatisRequestFailedException(e);
     }
-    return 
ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
   }
 
   /**
@@ -419,7 +432,7 @@ class RatisConsensus implements IConsensus {
    * clean up
    */
   @Override
-  public ConsensusGenericResponse deletePeer(ConsensusGroupId groupId) {
+  public void deleteLocalPeer(ConsensusGroupId groupId) throws 
ConsensusException {
     RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
 
     // send remove group to myself
@@ -435,13 +448,13 @@ class RatisConsensus implements IConsensus {
                   true,
                   false));
       if (!reply.isSuccess()) {
-        return failed(new RatisRequestFailedException(reply.getException()));
+        throw new RatisRequestFailedException(reply.getException());
       }
+    } catch (GroupMismatchException e) {
+      throw new ConsensusGroupNotExistException(groupId);
     } catch (IOException e) {
-      return failed(new RatisRequestFailedException(e));
+      throw new RatisRequestFailedException(e);
     }
-
-    return 
ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
   }
 
   /**
@@ -451,32 +464,25 @@ class RatisConsensus implements IConsensus {
    * change
    */
   @Override
-  public ConsensusGenericResponse addPeer(ConsensusGroupId groupId, Peer peer) 
{
+  public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws 
ConsensusException {
     RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
-    RaftGroup group = getGroupInfo(raftGroupId);
-    RaftPeer peerToAdd = Utils.fromPeerAndPriorityToRaftPeer(peer, 
DEFAULT_PRIORITY);
 
+    RaftGroup group = getGroupInfo(raftGroupId);
     // pre-conditions: group exists and myself in this group
     if (group == null || !group.getPeers().contains(myself)) {
-      return failed(new ConsensusGroupNotExistException(groupId));
+      throw new ConsensusGroupNotExistException(groupId);
     }
 
+    RaftPeer peerToAdd = Utils.fromPeerAndPriorityToRaftPeer(peer, 
DEFAULT_PRIORITY);
     // pre-condition: peer not in this group
     if (group.getPeers().contains(peerToAdd)) {
-      return failed(new PeerAlreadyInConsensusGroupException(groupId, peer));
+      throw new PeerAlreadyInConsensusGroupException(groupId, peer);
     }
 
     List<RaftPeer> newConfig = new ArrayList<>(group.getPeers());
     newConfig.add(peerToAdd);
 
-    RaftClientReply reply;
-    try {
-      reply = sendReconfiguration(RaftGroup.valueOf(raftGroupId, newConfig));
-    } catch (RatisRequestFailedException e) {
-      return failed(e);
-    }
-
-    return 
ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
+    sendReconfiguration(RaftGroup.valueOf(raftGroupId, newConfig));
   }
 
   /**
@@ -486,7 +492,7 @@ class RatisConsensus implements IConsensus {
    * change
    */
   @Override
-  public ConsensusGenericResponse removePeer(ConsensusGroupId groupId, Peer 
peer) {
+  public ConsensusGenericResponse removeRemotePeer(ConsensusGroupId groupId, 
Peer peer) {
     RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
     RaftGroup group = getGroupInfo(raftGroupId);
     RaftPeer peerToRemove = Utils.fromPeerAndPriorityToRaftPeer(peer, 
DEFAULT_PRIORITY);
@@ -516,53 +522,26 @@ class RatisConsensus implements IConsensus {
     return 
ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
   }
 
-  @Override
-  public ConsensusGenericResponse updatePeer(ConsensusGroupId groupId, Peer 
oldPeer, Peer newPeer) {
-    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
-  }
-
-  @Override
-  public ConsensusGenericResponse changePeer(ConsensusGroupId groupId, 
List<Peer> newPeers) {
-    RaftGroup raftGroup = buildRaftGroup(groupId, newPeers);
-
-    // pre-conditions: myself in this group
-    if (!raftGroup.getPeers().contains(myself)) {
-      return failed(new ConsensusGroupNotExistException(groupId));
-    }
-
-    // add RaftPeer myself to this RaftGroup
-    RaftClientReply reply;
-    try {
-      reply = sendReconfiguration(raftGroup);
-    } catch (RatisRequestFailedException e) {
-      return failed(e);
-    }
-    return 
ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
-  }
 
   /**
    * NOTICE: transferLeader *does not guarantee* the leader be transferred to 
newLeader.
    * transferLeader is implemented by 1. modify peer priority 2. ask current 
leader to step down
    *
-   * <p>1. call setConfiguration to upgrade newLeader's priority to 1 and 
degrade all follower peers
-   * to 0. By default, Ratis gives every Raft Peer same priority 0. Ratis does 
not allow a peer with
-   * priority <= currentLeader.priority to becomes the leader, so we have to 
upgrade leader's
-   * priority to 1
+   * <p>1. call setConfiguration to upgrade newLeader's priority to 1 and 
degrade all follower
+   * peers to 0. By default, Ratis gives every Raft Peer same priority 0. 
Ratis does not allow a
+   * peer with priority <= currentLeader.priority to becomes the leader, so we 
have to upgrade
+   * leader's priority to 1
    *
    * <p>2. call transferLeadership to force current leader to step down and 
raise a new round of
    * election. In this election, the newLeader peer with priority 1 is 
guaranteed to be elected.
    */
   @Override
-  public ConsensusGenericResponse transferLeader(ConsensusGroupId groupId, 
Peer newLeader) {
+  public void transferLeader(ConsensusGroupId groupId, Peer newLeader) throws 
ConsensusException {
 
     // first fetch the newest information
-
     RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
-    RaftGroup raftGroup = getGroupInfo(raftGroupId);
-
-    if (raftGroup == null) {
-      return failed(new ConsensusGroupNotExistException(groupId));
-    }
+    RaftGroup raftGroup = Optional.ofNullable(getGroupInfo(raftGroupId))
+        .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
 
     RaftPeer newRaftLeader = Utils.fromPeerAndPriorityToRaftPeer(newLeader, 
LEADER_PRIORITY);
 
@@ -581,21 +560,20 @@ class RatisConsensus implements IConsensus {
     }
 
     RaftClientReply reply;
-    try (RatisClient client = getRaftClient(raftGroup)) {
+    try (org.apache.iotdb.consensus.ratis.RatisClient client = 
getRaftClient(raftGroup)) {
       RaftClientReply configChangeReply =
           client.getRaftClient().admin().setConfiguration(newConfiguration);
       if (!configChangeReply.isSuccess()) {
-        return failed(new 
RatisRequestFailedException(configChangeReply.getException()));
+        throw new 
RatisRequestFailedException(configChangeReply.getException());
       }
 
       reply = transferLeader(raftGroup, newRaftLeader);
       if (!reply.isSuccess()) {
-        return failed(new RatisRequestFailedException(reply.getException()));
+        throw new RatisRequestFailedException(reply.getException());
       }
     } catch (Exception e) {
-      return failed(new RatisRequestFailedException(e));
+      throw new RatisRequestFailedException(e);
     }
-    return 
ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
   }
 
   private void forceStepDownLeader(RaftGroup group) throws Exception {
@@ -605,8 +583,7 @@ class RatisConsensus implements IConsensus {
   }
 
   private RaftClientReply transferLeader(RaftGroup group, RaftPeer newLeader) 
throws Exception {
-    try (RatisClient client = getRaftClient(group)) {
-      // TODO tuning for timeoutMs
+    try (org.apache.iotdb.consensus.ratis.RatisClient client = 
getRaftClient(group)) {
       return client
           .getRaftClient()
           .admin()
@@ -650,7 +627,7 @@ class RatisConsensus implements IConsensus {
       }
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
-      logger.warn("Unexpected interruption", e);
+      logger.warn("Unexpected interruption when waitUntilLeaderReady", e);
       return false;
     }
     return divisionInfo.isLeader();
@@ -690,12 +667,11 @@ class RatisConsensus implements IConsensus {
   }
 
   @Override
-  public ConsensusGenericResponse triggerSnapshot(ConsensusGroupId groupId) {
+  public void triggerSnapshot(ConsensusGroupId groupId) throws 
ConsensusException {
     RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
     RaftGroup groupInfo = getGroupInfo(raftGroupId);
-
     if (groupInfo == null || !groupInfo.getPeers().contains(myself)) {
-      return failed(new ConsensusGroupNotExistException(groupId));
+      throw new ConsensusGroupNotExistException(groupId);
     }
 
     // TODO tuning snapshot create timeout
@@ -707,13 +683,11 @@ class RatisConsensus implements IConsensus {
     try {
       reply = server.snapshotManagement(request);
       if (!reply.isSuccess()) {
-        return failed(new RatisRequestFailedException(reply.getException()));
+        throw new RatisRequestFailedException(reply.getException());
       }
     } catch (IOException ioException) {
-      return failed(new RatisRequestFailedException(ioException));
+      throw new RatisRequestFailedException(ioException);
     }
-
-    return 
ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
   }
 
   private void triggerSnapshotByCustomize() {
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java
index fe96ddfa918..5544bad9a53 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java
@@ -25,15 +25,16 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
 import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.IStateMachine.Registry;
+import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
-import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import 
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
@@ -51,6 +52,7 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -67,7 +69,8 @@ class SimpleConsensus implements IConsensus {
   private final int thisNodeId;
   private final File storageDir;
   private final IStateMachine.Registry registry;
-  private final Map<ConsensusGroupId, SimpleServerImpl> stateMachineMap = new 
ConcurrentHashMap<>();
+  private final Map<ConsensusGroupId, SimpleConsensusServerImpl> 
stateMachineMap =
+      new ConcurrentHashMap<>();
   private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS 
=
       PerformanceOverviewMetrics.getInstance();
 
@@ -79,14 +82,14 @@ class SimpleConsensus implements IConsensus {
   }
 
   @Override
-  public void start() throws IOException {
+  public synchronized void start() throws IOException {
     initAndRecover();
   }
 
   private void initAndRecover() throws IOException {
     if (!storageDir.exists()) {
       if (!storageDir.mkdirs()) {
-        logger.warn("Unable to create consensus dir at {}", storageDir);
+        throw new IOException(String.format("Unable to create consensus dir at 
%s", storageDir));
       }
     } else {
       try (DirectoryStream<Path> stream = 
Files.newDirectoryStream(storageDir.toPath())) {
@@ -95,8 +98,8 @@ class SimpleConsensus implements IConsensus {
           ConsensusGroupId consensusGroupId =
               ConsensusGroupId.Factory.create(
                   Integer.parseInt(items[0]), Integer.parseInt(items[1]));
-          SimpleServerImpl consensus =
-              new SimpleServerImpl(
+          SimpleConsensusServerImpl consensus =
+              new SimpleConsensusServerImpl(
                   new Peer(consensusGroupId, thisNodeId, thisNode),
                   registry.apply(consensusGroupId));
           stateMachineMap.put(consensusGroupId, consensus);
@@ -107,24 +110,20 @@ class SimpleConsensus implements IConsensus {
   }
 
   @Override
-  public void stop() throws IOException {
-    stateMachineMap.values().parallelStream().forEach(SimpleServerImpl::stop);
+  public synchronized void stop() throws IOException {
+    
stateMachineMap.values().parallelStream().forEach(SimpleConsensusServerImpl::stop);
   }
 
   @Override
-  public ConsensusWriteResponse write(ConsensusGroupId groupId, 
IConsensusRequest request) {
-    SimpleServerImpl impl = stateMachineMap.get(groupId);
-    if (impl == null) {
-      return ConsensusWriteResponse.newBuilder()
-          .setException(new ConsensusGroupNotExistException(groupId))
-          .build();
-    }
-
-    TSStatus status;
+  public TSStatus write(ConsensusGroupId groupId, IConsensusRequest request)
+      throws ConsensusException {
+    SimpleConsensusServerImpl impl =
+        Optional.ofNullable(stateMachineMap.get(groupId))
+            .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
     if (impl.isReadOnly()) {
-      status = new TSStatus(TSStatusCode.SYSTEM_READ_ONLY.getStatusCode());
-      status.setMessage("Fail to do non-query operations because system is 
read-only.");
+      return StatusUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY);
     } else {
+      TSStatus status;
       if (groupId instanceof DataRegionId) {
         long startWriteTime = System.nanoTime();
         status = impl.write(request);
@@ -133,58 +132,59 @@ class SimpleConsensus implements IConsensus {
       } else {
         status = impl.write(request);
       }
+      return status;
     }
-    return ConsensusWriteResponse.newBuilder().setStatus(status).build();
   }
 
   @Override
-  public ConsensusReadResponse read(ConsensusGroupId groupId, 
IConsensusRequest request) {
-    SimpleServerImpl impl = stateMachineMap.get(groupId);
-    if (impl == null) {
-      return ConsensusReadResponse.newBuilder()
-          .setException(new ConsensusGroupNotExistException(groupId))
-          .build();
-    }
-    return 
ConsensusReadResponse.newBuilder().setDataSet(impl.read(request)).build();
+  public DataSet read(ConsensusGroupId groupId, IConsensusRequest request)
+      throws ConsensusException {
+    return Optional.ofNullable(stateMachineMap.get(groupId))
+        .orElseThrow(() -> new ConsensusGroupNotExistException(groupId))
+        .read(request);
   }
 
+  @SuppressWarnings("java:S2201")
   @Override
-  public ConsensusGenericResponse createPeer(ConsensusGroupId groupId, 
List<Peer> peers) {
+  public void createLocalPeer(ConsensusGroupId groupId, List<Peer> peers)
+      throws ConsensusException {
     int consensusGroupSize = peers.size();
     if (consensusGroupSize != 1) {
-      return ConsensusGenericResponse.newBuilder()
-          .setException(new IllegalPeerNumException(consensusGroupSize))
-          .build();
+      throw new IllegalPeerNumException(consensusGroupSize);
     }
     if (!peers.contains(new Peer(groupId, thisNodeId, thisNode))) {
-      return ConsensusGenericResponse.newBuilder()
-          .setException(new IllegalPeerEndpointException(thisNode, peers))
-          .build();
+      throw new IllegalPeerEndpointException(thisNode, peers);
     }
     AtomicBoolean exist = new AtomicBoolean(true);
-    stateMachineMap.computeIfAbsent(
-        groupId,
-        k -> {
-          exist.set(false);
-          SimpleServerImpl impl = new SimpleServerImpl(peers.get(0), 
registry.apply(groupId));
-          impl.start();
-          String path = buildPeerDir(groupId);
-          File file = new File(path);
-          if (!file.mkdirs()) {
-            logger.warn("Unable to create consensus dir for group {} at {}", 
groupId, path);
-          }
-          return impl;
-        });
+    Optional.ofNullable(
+            stateMachineMap.computeIfAbsent(
+                groupId,
+                k -> {
+                  exist.set(false);
+
+                  String path = buildPeerDir(groupId);
+                  File file = new File(path);
+                  if (!file.mkdirs()) {
+                    logger.warn("Unable to create consensus dir for group {} 
at {}", groupId, path);
+                    return null;
+                  }
+
+                  SimpleConsensusServerImpl impl =
+                      new SimpleConsensusServerImpl(peers.get(0), 
registry.apply(groupId));
+                  impl.start();
+                  return impl;
+                }))
+        .orElseThrow(
+            () ->
+                new ConsensusException(
+                    String.format("Unable to create consensus dir for group 
%s", groupId)));
     if (exist.get()) {
-      return ConsensusGenericResponse.newBuilder()
-          .setException(new ConsensusGroupAlreadyExistException(groupId))
-          .build();
+      throw new ConsensusGroupAlreadyExistException(groupId);
     }
-    return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
   }
 
   @Override
-  public ConsensusGenericResponse deletePeer(ConsensusGroupId groupId) {
+  public void deleteLocalPeer(ConsensusGroupId groupId) throws 
ConsensusException {
     AtomicBoolean exist = new AtomicBoolean(false);
     stateMachineMap.computeIfPresent(
         groupId,
@@ -194,43 +194,29 @@ class SimpleConsensus implements IConsensus {
           FileUtils.deleteDirectory(new File(buildPeerDir(groupId)));
           return null;
         });
-
     if (!exist.get()) {
-      return ConsensusGenericResponse.newBuilder()
-          .setException(new ConsensusGroupNotExistException(groupId))
-          .build();
+      throw new ConsensusGroupNotExistException(groupId);
     }
-    return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
-  }
-
-  @Override
-  public ConsensusGenericResponse addPeer(ConsensusGroupId groupId, Peer peer) 
{
-    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
-  }
-
-  @Override
-  public ConsensusGenericResponse removePeer(ConsensusGroupId groupId, Peer 
peer) {
-    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
   }
 
   @Override
-  public ConsensusGenericResponse updatePeer(ConsensusGroupId groupId, Peer 
oldPeer, Peer newPeer) {
-    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+  public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws 
ConsensusException {
+    throw new ConsensusException("SimpleConsensus does not support membership 
changes");
   }
 
   @Override
-  public ConsensusGenericResponse changePeer(ConsensusGroupId groupId, 
List<Peer> newPeers) {
-    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+  public void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws 
ConsensusException {
+    throw new ConsensusException("SimpleConsensus does not support membership 
changes");
   }
 
   @Override
-  public ConsensusGenericResponse transferLeader(ConsensusGroupId groupId, 
Peer newLeader) {
-    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+  public void transferLeader(ConsensusGroupId groupId, Peer newLeader) throws 
ConsensusException {
+    throw new ConsensusException("SimpleConsensus does not support leader 
transfer");
   }
 
   @Override
-  public ConsensusGenericResponse triggerSnapshot(ConsensusGroupId groupId) {
-    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+  public void triggerSnapshot(ConsensusGroupId groupId) throws 
ConsensusException {
+    throw new ConsensusException("SimpleConsensus does not support snapshot 
trigger currently");
   }
 
   @Override
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java
similarity index 94%
rename from 
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleServerImpl.java
rename to 
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java
index 3f2b68857cb..b4d8fe87040 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleServerImpl.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java
@@ -27,12 +27,12 @@ import 
org.apache.iotdb.consensus.common.request.IConsensusRequest;
 
 import java.io.File;
 
-public class SimpleServerImpl implements IStateMachine {
+public class SimpleConsensusServerImpl implements IStateMachine {
 
   private final Peer peer;
   private final IStateMachine stateMachine;
 
-  public SimpleServerImpl(Peer peer, IStateMachine stateMachine) {
+  public SimpleConsensusServerImpl(Peer peer, IStateMachine stateMachine) {
     this.peer = peer;
     this.stateMachine = stateMachine;
   }
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
index 916f91f3ac5..8b95ae0d4d0 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
@@ -159,9 +159,9 @@ public class ReplicateTest {
   @Test
   public void replicateUsingQueueTest() throws IOException, 
InterruptedException {
     logger.info("Start ReplicateUsingQueueTest");
-    servers.get(0).createPeer(group.getGroupId(), group.getPeers());
-    servers.get(1).createPeer(group.getGroupId(), group.getPeers());
-    servers.get(2).createPeer(group.getGroupId(), group.getPeers());
+    servers.get(0).createLocalPeer(group.getGroupId(), group.getPeers());
+    servers.get(1).createLocalPeer(group.getGroupId(), group.getPeers());
+    servers.get(2).createLocalPeer(group.getGroupId(), group.getPeers());
 
     Assert.assertEquals(0, servers.get(0).getImpl(gid).getSearchIndex());
     Assert.assertEquals(0, servers.get(1).getImpl(gid).getSearchIndex());
@@ -237,8 +237,8 @@ public class ReplicateTest {
   @Test
   public void replicateUsingWALTest() throws IOException, InterruptedException 
{
     logger.info("Start ReplicateUsingWALTest");
-    servers.get(0).createPeer(group.getGroupId(), group.getPeers());
-    servers.get(1).createPeer(group.getGroupId(), group.getPeers());
+    servers.get(0).createLocalPeer(group.getGroupId(), group.getPeers());
+    servers.get(1).createLocalPeer(group.getGroupId(), group.getPeers());
 
     Assert.assertEquals(0, servers.get(0).getImpl(gid).getSearchIndex());
     Assert.assertEquals(0, servers.get(1).getImpl(gid).getSearchIndex());
@@ -256,7 +256,7 @@ public class ReplicateTest {
     stopServer();
     initServer();
 
-    servers.get(2).createPeer(group.getGroupId(), group.getPeers());
+    servers.get(2).createLocalPeer(group.getGroupId(), group.getPeers());
 
     Assert.assertEquals(peers, servers.get(0).getImpl(gid).getConfiguration());
     Assert.assertEquals(peers, servers.get(1).getImpl(gid).getConfiguration());
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
index 5afee66ee1a..c4333c860ea 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
@@ -90,11 +90,11 @@ public class StabilityTest {
   }
 
   public void peerTest() throws Exception {
-    consensusImpl.createPeer(
+    consensusImpl.createLocalPeer(
         dataRegionId,
         Collections.singletonList(new Peer(dataRegionId, 1, new 
TEndPoint("0.0.0.0", basePort))));
 
-    consensusImpl.deletePeer(dataRegionId);
+    consensusImpl.deleteLocalPeer(dataRegionId);
 
     consensusImpl.stop();
     consensusImpl = null;
@@ -102,16 +102,16 @@ public class StabilityTest {
     constructConsensus();
 
     ConsensusGenericResponse response =
-        consensusImpl.createPeer(
+        consensusImpl.createLocalPeer(
             dataRegionId,
             Collections.singletonList(
                 new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 
basePort))));
     Assert.assertTrue(response.isSuccess());
-    consensusImpl.deletePeer(dataRegionId);
+    consensusImpl.deleteLocalPeer(dataRegionId);
   }
 
   public void snapshotTest() throws IOException {
-    consensusImpl.createPeer(
+    consensusImpl.createLocalPeer(
         dataRegionId,
         Collections.singletonList(new Peer(dataRegionId, 1, new 
TEndPoint("0.0.0.0", basePort))));
     consensusImpl.triggerSnapshot(dataRegionId);
@@ -132,11 +132,11 @@ public class StabilityTest {
     Assert.assertNotNull(versionFiles2);
     Assert.assertEquals(1, versionFiles2.length);
     Assert.assertNotEquals(versionFiles1[0].getName(), 
versionFiles2[0].getName());
-    consensusImpl.deletePeer(dataRegionId);
+    consensusImpl.deleteLocalPeer(dataRegionId);
   }
 
   public void snapshotUpgradeTest() throws Exception {
-    consensusImpl.createPeer(
+    consensusImpl.createLocalPeer(
         dataRegionId,
         Collections.singletonList(new Peer(dataRegionId, 1, new 
TEndPoint("0.0.0.0", basePort))));
     consensusImpl.triggerSnapshot(dataRegionId);
@@ -166,6 +166,6 @@ public class StabilityTest {
     Assert.assertEquals(
         oldSnapshotIndex + 1,
         
Long.parseLong(snapshotFiles[0].getName().replaceAll(".*[^\\d](?=(\\d+))", 
"")));
-    consensusImpl.deletePeer(dataRegionId);
+    consensusImpl.deleteLocalPeer(dataRegionId);
   }
 }
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index 245474c10e5..d9cd902538a 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -101,9 +101,9 @@ public class RatisConsensusTest {
 
   @Test
   public void basicConsensus3Copy() throws Exception {
-    servers.get(0).createPeer(group.getGroupId(), group.getPeers());
-    servers.get(1).createPeer(group.getGroupId(), group.getPeers());
-    servers.get(2).createPeer(group.getGroupId(), group.getPeers());
+    servers.get(0).createLocalPeer(group.getGroupId(), group.getPeers());
+    servers.get(1).createLocalPeer(group.getGroupId(), group.getPeers());
+    servers.get(2).createLocalPeer(group.getGroupId(), group.getPeers());
 
     doConsensus(servers.get(0), group.getGroupId(), 10, 10);
   }
@@ -112,19 +112,19 @@ public class RatisConsensusTest {
   public void addMemberToGroup() throws Exception {
     List<Peer> original = peers.subList(0, 1);
 
-    servers.get(0).createPeer(group.getGroupId(), original);
+    servers.get(0).createLocalPeer(group.getGroupId(), original);
     doConsensus(servers.get(0), group.getGroupId(), 10, 10);
 
-    ConsensusGenericResponse resp = 
servers.get(0).createPeer(group.getGroupId(), original);
+    ConsensusGenericResponse resp = 
servers.get(0).createLocalPeer(group.getGroupId(), original);
     Assert.assertFalse(resp.isSuccess());
     Assert.assertTrue(resp.getException() instanceof 
RatisRequestFailedException);
 
     // add 2 members
-    servers.get(1).createPeer(group.getGroupId(), Collections.emptyList());
-    servers.get(0).addPeer(group.getGroupId(), peers.get(1));
+    servers.get(1).createLocalPeer(group.getGroupId(), 
Collections.emptyList());
+    servers.get(0).addRemotePeer(group.getGroupId(), peers.get(1));
 
-    servers.get(2).createPeer(group.getGroupId(), Collections.emptyList());
-    servers.get(0).changePeer(group.getGroupId(), peers);
+    servers.get(2).createLocalPeer(group.getGroupId(), 
Collections.emptyList());
+    servers.get(0).addRemotePeer(group.getGroupId(), peers.get(2));
 
     Assert.assertEquals(
         3, ((TestUtils.IntegerCounter) 
stateMachines.get(0)).getConfiguration().size());
@@ -133,39 +133,39 @@ public class RatisConsensusTest {
 
   @Test
   public void removeMemberFromGroup() throws Exception {
-    servers.get(0).createPeer(group.getGroupId(), group.getPeers());
-    servers.get(1).createPeer(group.getGroupId(), group.getPeers());
-    servers.get(2).createPeer(group.getGroupId(), group.getPeers());
+    servers.get(0).createLocalPeer(group.getGroupId(), group.getPeers());
+    servers.get(1).createLocalPeer(group.getGroupId(), group.getPeers());
+    servers.get(2).createLocalPeer(group.getGroupId(), group.getPeers());
 
     doConsensus(servers.get(0), group.getGroupId(), 10, 10);
 
     servers.get(0).transferLeader(gid, peers.get(0));
-    servers.get(0).removePeer(gid, peers.get(1));
-    servers.get(1).deletePeer(gid);
-    servers.get(0).removePeer(gid, peers.get(2));
-    servers.get(2).deletePeer(gid);
+    servers.get(0).removeRemotePeer(gid, peers.get(1));
+    servers.get(1).deleteLocalPeer(gid);
+    servers.get(0).removeRemotePeer(gid, peers.get(2));
+    servers.get(2).deleteLocalPeer(gid);
 
     doConsensus(servers.get(0), group.getGroupId(), 10, 20);
   }
 
   @Test
   public void oneMemberGroupChange() throws Exception {
-    servers.get(0).createPeer(group.getGroupId(), peers.subList(0, 1));
+    servers.get(0).createLocalPeer(group.getGroupId(), peers.subList(0, 1));
     doConsensus(servers.get(0), group.getGroupId(), 10, 10);
 
-    servers.get(1).createPeer(group.getGroupId(), Collections.emptyList());
-    servers.get(0).addPeer(group.getGroupId(), peers.get(1));
+    servers.get(1).createLocalPeer(group.getGroupId(), 
Collections.emptyList());
+    servers.get(0).addRemotePeer(group.getGroupId(), peers.get(1));
     servers.get(1).transferLeader(group.getGroupId(), peers.get(1));
-    servers.get(0).removePeer(group.getGroupId(), peers.get(0));
+    servers.get(0).removeRemotePeer(group.getGroupId(), peers.get(0));
     Assert.assertEquals(servers.get(1).getLeader(gid).getNodeId(), 
peers.get(1).getNodeId());
-    servers.get(0).deletePeer(group.getGroupId());
+    servers.get(0).deleteLocalPeer(group.getGroupId());
   }
 
   @Test
   public void crashAndStart() throws Exception {
-    servers.get(0).createPeer(group.getGroupId(), group.getPeers());
-    servers.get(1).createPeer(group.getGroupId(), group.getPeers());
-    servers.get(2).createPeer(group.getGroupId(), group.getPeers());
+    servers.get(0).createLocalPeer(group.getGroupId(), group.getPeers());
+    servers.get(1).createLocalPeer(group.getGroupId(), group.getPeers());
+    servers.get(2).createLocalPeer(group.getGroupId(), group.getPeers());
 
     // 200 operation will trigger snapshot & purge
     doConsensus(servers.get(0), group.getGroupId(), 200, 200);
@@ -178,9 +178,9 @@ public class RatisConsensusTest {
 
   // FIXME: Turn on the test when it is stable
   public void transferLeader() throws Exception {
-    servers.get(0).createPeer(group.getGroupId(), group.getPeers());
-    servers.get(1).createPeer(group.getGroupId(), group.getPeers());
-    servers.get(2).createPeer(group.getGroupId(), group.getPeers());
+    servers.get(0).createLocalPeer(group.getGroupId(), group.getPeers());
+    servers.get(1).createLocalPeer(group.getGroupId(), group.getPeers());
+    servers.get(2).createLocalPeer(group.getGroupId(), group.getPeers());
 
     doConsensus(servers.get(0), group.getGroupId(), 10, 10);
 
@@ -198,13 +198,13 @@ public class RatisConsensusTest {
 
   @Test
   public void transferSnapshot() throws Exception {
-    servers.get(0).createPeer(gid, peers.subList(0, 1));
+    servers.get(0).createLocalPeer(gid, peers.subList(0, 1));
 
     doConsensus(servers.get(0), gid, 10, 10);
     Assert.assertTrue(servers.get(0).triggerSnapshot(gid).isSuccess());
 
-    servers.get(1).createPeer(gid, Collections.emptyList());
-    servers.get(0).addPeer(gid, peers.get(1));
+    servers.get(1).createLocalPeer(gid, Collections.emptyList());
+    servers.get(0).addRemotePeer(gid, peers.get(1));
 
     doConsensus(servers.get(1), gid, 10, 20);
   }
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java
index 001bb95de0a..c90d5d9952d 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java
@@ -123,7 +123,7 @@ public class RecoverReadTest {
     final ConsensusGroupId gid = miniCluster.getGid();
     final List<Peer> members = miniCluster.getPeers();
 
-    miniCluster.getServers().forEach(s -> s.createPeer(gid, members));
+    miniCluster.getServers().forEach(s -> s.createLocalPeer(gid, members));
 
     // first write 10 ops
     TestUtils.write(miniCluster.getServer(0), gid, 10);
@@ -150,7 +150,7 @@ public class RecoverReadTest {
     final ConsensusGroupId gid = miniCluster.getGid();
     final List<Peer> members = miniCluster.getPeers();
 
-    miniCluster.getServers().forEach(s -> s.createPeer(gid, members));
+    miniCluster.getServers().forEach(s -> s.createLocalPeer(gid, members));
 
     // first write 10 ops
     TestUtils.write(miniCluster.getServer(0), gid, 10);
@@ -192,7 +192,7 @@ public class RecoverReadTest {
     final ConsensusGroupId gid = miniCluster.getGid();
     final List<Peer> members = miniCluster.getPeers();
 
-    miniCluster.getServers().forEach(s -> s.createPeer(gid, members));
+    miniCluster.getServers().forEach(s -> s.createLocalPeer(gid, members));
 
     // first write 10 ops
     TestUtils.write(miniCluster.getServer(0), gid, 10);
@@ -218,7 +218,7 @@ public class RecoverReadTest {
     final ConsensusGroupId gid = miniCluster.getGid();
     final List<Peer> members = miniCluster.getPeers();
 
-    miniCluster.getServers().forEach(s -> s.createPeer(gid, members));
+    miniCluster.getServers().forEach(s -> s.createLocalPeer(gid, members));
 
     // first write 30 ops
     TestUtils.write(miniCluster.getServer(0), gid, 50);
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/RecoveryTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/RecoveryTest.java
index b70499eb882..5fe3928925e 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/RecoveryTest.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/RecoveryTest.java
@@ -78,7 +78,7 @@ public class RecoveryTest {
 
   @Test
   public void recoveryTest() throws Exception {
-    consensusImpl.createPeer(
+    consensusImpl.createLocalPeer(
         schemaRegionId,
         Collections.singletonList(new Peer(schemaRegionId, 1, new 
TEndPoint("0.0.0.0", 9000))));
 
@@ -88,7 +88,7 @@ public class RecoveryTest {
     constructConsensus();
 
     ConsensusGenericResponse response =
-        consensusImpl.createPeer(
+        consensusImpl.createLocalPeer(
             schemaRegionId,
             Collections.singletonList(new Peer(schemaRegionId, 1, new 
TEndPoint("0.0.0.0", 9000))));
 
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java
index 6c065f76a4c..5cd64edf167 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java
@@ -167,21 +167,21 @@ public class SimpleConsensusTest {
   @Test
   public void addConsensusGroup() {
     ConsensusGenericResponse response1 =
-        consensusImpl.createPeer(
+        consensusImpl.createLocalPeer(
             dataRegionId,
             Collections.singletonList(new Peer(dataRegionId, 1, new 
TEndPoint("0.0.0.0", 6667))));
     assertTrue(response1.isSuccess());
     assertNull(response1.getException());
 
     ConsensusGenericResponse response2 =
-        consensusImpl.createPeer(
+        consensusImpl.createLocalPeer(
             dataRegionId,
             Collections.singletonList(new Peer(dataRegionId, 1, new 
TEndPoint("0.0.0.0", 6667))));
     assertFalse(response2.isSuccess());
     assertTrue(response2.getException() instanceof 
ConsensusGroupAlreadyExistException);
 
     ConsensusGenericResponse response3 =
-        consensusImpl.createPeer(
+        consensusImpl.createLocalPeer(
             dataRegionId,
             Arrays.asList(
                 new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 6667)),
@@ -190,14 +190,14 @@ public class SimpleConsensusTest {
     assertTrue(response3.getException() instanceof IllegalPeerNumException);
 
     ConsensusGenericResponse response4 =
-        consensusImpl.createPeer(
+        consensusImpl.createLocalPeer(
             dataRegionId,
             Collections.singletonList(new Peer(dataRegionId, 1, new 
TEndPoint("0.0.0.1", 6667))));
     assertFalse(response4.isSuccess());
     assertTrue(response4.getException() instanceof 
IllegalPeerEndpointException);
 
     ConsensusGenericResponse response5 =
-        consensusImpl.createPeer(
+        consensusImpl.createLocalPeer(
             schemaRegionId,
             Collections.singletonList(new Peer(schemaRegionId, 1, new 
TEndPoint("0.0.0.0", 6667))));
     assertTrue(response5.isSuccess());
@@ -206,18 +206,18 @@ public class SimpleConsensusTest {
 
   @Test
   public void removeConsensusGroup() {
-    ConsensusGenericResponse response1 = 
consensusImpl.deletePeer(dataRegionId);
+    ConsensusGenericResponse response1 = 
consensusImpl.deleteLocalPeer(dataRegionId);
     assertFalse(response1.isSuccess());
     assertTrue(response1.getException() instanceof 
ConsensusGroupNotExistException);
 
     ConsensusGenericResponse response2 =
-        consensusImpl.createPeer(
+        consensusImpl.createLocalPeer(
             dataRegionId,
             Collections.singletonList(new Peer(dataRegionId, 1, new 
TEndPoint("0.0.0.0", 6667))));
     assertTrue(response2.isSuccess());
     assertNull(response2.getException());
 
-    ConsensusGenericResponse response3 = 
consensusImpl.deletePeer(dataRegionId);
+    ConsensusGenericResponse response3 = 
consensusImpl.deleteLocalPeer(dataRegionId);
     assertTrue(response3.isSuccess());
     assertNull(response3.getException());
   }
@@ -225,7 +225,7 @@ public class SimpleConsensusTest {
   @Test
   public void addPeer() {
     ConsensusGenericResponse response =
-        consensusImpl.addPeer(
+        consensusImpl.addRemotePeer(
             dataRegionId, new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 
6667)));
     assertFalse(response.isSuccess());
   }
@@ -233,20 +233,11 @@ public class SimpleConsensusTest {
   @Test
   public void removePeer() {
     ConsensusGenericResponse response =
-        consensusImpl.removePeer(
+        consensusImpl.removeRemotePeer(
             dataRegionId, new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 
6667)));
     assertFalse(response.isSuccess());
   }
 
-  @Test
-  public void changePeer() {
-    ConsensusGenericResponse response =
-        consensusImpl.changePeer(
-            dataRegionId,
-            Collections.singletonList(new Peer(dataRegionId, 1, new 
TEndPoint("0.0.0.0", 6667))));
-    assertFalse(response.isSuccess());
-  }
-
   @Test
   public void transferLeader() {
     ConsensusGenericResponse response =
@@ -264,21 +255,21 @@ public class SimpleConsensusTest {
   @Test
   public void write() {
     ConsensusGenericResponse response1 =
-        consensusImpl.createPeer(
+        consensusImpl.createLocalPeer(
             dataRegionId,
             Collections.singletonList(new Peer(dataRegionId, 1, new 
TEndPoint("0.0.0.0", 6667))));
     assertTrue(response1.isSuccess());
     assertNull(response1.getException());
 
     ConsensusGenericResponse response2 =
-        consensusImpl.createPeer(
+        consensusImpl.createLocalPeer(
             schemaRegionId,
             Collections.singletonList(new Peer(schemaRegionId, 1, new 
TEndPoint("0.0.0.0", 6667))));
     assertTrue(response2.isSuccess());
     assertNull(response2.getException());
 
     ConsensusGenericResponse response3 =
-        consensusImpl.createPeer(
+        consensusImpl.createLocalPeer(
             configId,
             Collections.singletonList(new Peer(configId, 1, new 
TEndPoint("0.0.0.0", 6667))));
     assertTrue(response3.isSuccess());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 33c6cfcee4d..ff606c63a32 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1415,7 +1415,7 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(tconsensusGroupId);
     if (consensusGroupId instanceof DataRegionId) {
       ConsensusGenericResponse response =
-          DataRegionConsensusImpl.getInstance().deletePeer(consensusGroupId);
+          
DataRegionConsensusImpl.getInstance().deleteLocalPeer(consensusGroupId);
       if (!response.isSuccess()
           && !(response.getException() instanceof 
PeerNotInConsensusGroupException)) {
         return RpcUtils.getStatus(
@@ -1424,7 +1424,7 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
       return regionManager.deleteDataRegion((DataRegionId) consensusGroupId);
     } else {
       ConsensusGenericResponse response =
-          SchemaRegionConsensusImpl.getInstance().deletePeer(consensusGroupId);
+          
SchemaRegionConsensusImpl.getInstance().deleteLocalPeer(consensusGroupId);
       if (!response.isSuccess()
           && !(response.getException() instanceof 
PeerNotInConsensusGroupException)) {
         return RpcUtils.getStatus(
@@ -1755,9 +1755,9 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     TSStatus status = new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     ConsensusGenericResponse resp;
     if (regionId instanceof DataRegionId) {
-      resp = DataRegionConsensusImpl.getInstance().createPeer(regionId, peers);
+      resp = DataRegionConsensusImpl.getInstance().createLocalPeer(regionId, 
peers);
     } else {
-      resp = SchemaRegionConsensusImpl.getInstance().createPeer(regionId, 
peers);
+      resp = SchemaRegionConsensusImpl.getInstance().createLocalPeer(regionId, 
peers);
     }
     if (!resp.isSuccess()) {
       LOGGER.warn(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
index 55c4e3bb87d..fdff724a3dc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
@@ -119,7 +119,7 @@ public class DataNodeRegionManager {
         peers.add(new Peer(schemaRegionId, dataNodeLocation.getDataNodeId(), 
endpoint));
       }
       ConsensusGenericResponse consensusGenericResponse =
-          SchemaRegionConsensusImpl.getInstance().createPeer(schemaRegionId, 
peers);
+          
SchemaRegionConsensusImpl.getInstance().createLocalPeer(schemaRegionId, peers);
       if (consensusGenericResponse.isSuccess()) {
         tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
       } else if (consensusGenericResponse.getException()
@@ -160,7 +160,7 @@ public class DataNodeRegionManager {
         peers.add(new Peer(dataRegionId, dataNodeLocation.getDataNodeId(), 
endpoint));
       }
       ConsensusGenericResponse consensusGenericResponse =
-          DataRegionConsensusImpl.getInstance().createPeer(dataRegionId, 
peers);
+          DataRegionConsensusImpl.getInstance().createLocalPeer(dataRegionId, 
peers);
       if (consensusGenericResponse.isSuccess()) {
         tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
       } else if (consensusGenericResponse.getException()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
index 1b94e607282..05ef52758e8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
@@ -163,28 +164,27 @@ public class RegionWriteExecutor {
         return response;
       }
 
-      ConsensusWriteResponse writeResponse =
-          executePlanNodeInConsensusLayer(context.getRegionId(), node);
-      if (writeResponse.getStatus() != null) {
+      try {
+        TSStatus status = 
executePlanNodeInConsensusLayer(context.getRegionId(), node);
         response.setAccepted(
-            TSStatusCode.SUCCESS_STATUS.getStatusCode() == 
writeResponse.getStatus().getCode());
-        response.setMessage(writeResponse.getStatus().message);
-        response.setStatus(writeResponse.getStatus());
-      } else {
+            TSStatusCode.SUCCESS_STATUS.getStatusCode() == status.getCode());
+        response.setMessage(status.getMessage());
+        response.setStatus(status);
+      } catch (ConsensusException e) {
         LOGGER.error(
             "Something wrong happened while calling consensus layer's write 
API.",
-            writeResponse.getException());
+            e);
         response.setAccepted(false);
-        response.setMessage(writeResponse.getException().toString());
+        response.setMessage(e.toString());
         response.setStatus(
             RpcUtils.getStatus(
-                TSStatusCode.EXECUTE_STATEMENT_ERROR, 
writeResponse.getErrorMessage()));
+                TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage()));
       }
       return response;
     }
 
-    private ConsensusWriteResponse executePlanNodeInConsensusLayer(
-        ConsensusGroupId groupId, PlanNode planNode) {
+    private TSStatus executePlanNodeInConsensusLayer(
+        ConsensusGroupId groupId, PlanNode planNode) throws ConsensusException 
{
       if (groupId instanceof DataRegionId) {
         return dataRegionConsensus.write(groupId, planNode);
       } else {
@@ -227,72 +227,60 @@ public class RegionWriteExecutor {
       RegionExecutionResult response = new RegionExecutionResult();
       context.getRegionWriteValidationRWLock().readLock().lock();
       try {
-
-        ConsensusWriteResponse writeResponse =
+        TSStatus status =
             fireTriggerAndInsert(context.getRegionId(), insertNode);
-
-        if (writeResponse.getStatus() != null) {
-          response.setAccepted(
-              TSStatusCode.SUCCESS_STATUS.getStatusCode() == 
writeResponse.getStatus().getCode());
-          if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != 
writeResponse.getStatus().getCode()) {
-            response.setMessage(writeResponse.getStatus().message);
-            response.setStatus(writeResponse.getStatus());
-          } else {
-            response.setMessage(writeResponse.getStatus().message);
-          }
-        } else {
-          LOGGER.warn(
-              "Something wrong happened while calling consensus layer's write 
API.",
-              writeResponse.getException());
-          response.setAccepted(false);
-          response.setMessage(writeResponse.getException().toString());
-          response.setStatus(
-              RpcUtils.getStatus(
-                  TSStatusCode.WRITE_PROCESS_ERROR, 
writeResponse.getException().toString()));
+        response.setAccepted(
+            TSStatusCode.SUCCESS_STATUS.getStatusCode() == status.getCode());
+        response.setMessage(status.message);
+        if (!response.isAccepted()) {
+          response.setStatus(status);
         }
-
+        return response;
+      } catch (ConsensusException e) {
+        LOGGER.warn(
+            "Something wrong happened while calling consensus layer's write 
API.",
+            e);
+        response.setAccepted(false);
+        response.setMessage(e.toString());
+        response.setStatus(
+            RpcUtils.getStatus(
+                TSStatusCode.WRITE_PROCESS_ERROR, e.toString()));
         return response;
       } finally {
         context.getRegionWriteValidationRWLock().readLock().unlock();
       }
     }
 
-    private ConsensusWriteResponse fireTriggerAndInsert(
-        ConsensusGroupId groupId, PlanNode planNode) {
+    private TSStatus fireTriggerAndInsert(
+        ConsensusGroupId groupId, PlanNode planNode) throws ConsensusException 
{
       long triggerCostTime = 0;
-      ConsensusWriteResponse writeResponse;
+      TSStatus status;
       long startTime = System.nanoTime();
       // fire Trigger before the insertion
       TriggerFireResult result = triggerFireVisitor.process(planNode, 
TriggerEvent.BEFORE_INSERT);
       triggerCostTime += (System.nanoTime() - startTime);
       if (result.equals(TriggerFireResult.TERMINATION)) {
-        TSStatus triggerError = new 
TSStatus(TSStatusCode.TRIGGER_FIRE_ERROR.getStatusCode());
-        triggerError.setMessage(
+        status = 
RpcUtils.getStatus(TSStatusCode.TRIGGER_FIRE_ERROR.getStatusCode(),
             "Failed to complete the insertion because trigger error before the 
insertion.");
-        writeResponse = 
ConsensusWriteResponse.newBuilder().setStatus(triggerError).build();
       } else {
         boolean hasFailedTriggerBeforeInsertion =
             result.equals(TriggerFireResult.FAILED_NO_TERMINATION);
 
         long startWriteTime = System.nanoTime();
-        writeResponse = dataRegionConsensus.write(groupId, planNode);
+        status = dataRegionConsensus.write(groupId, planNode);
         
PERFORMANCE_OVERVIEW_METRICS.recordScheduleStorageCost(System.nanoTime() - 
startWriteTime);
 
         // fire Trigger after the insertion
-        if (writeResponse.isSuccessful()) {
-          startTime = System.nanoTime();
-          result = triggerFireVisitor.process(planNode, 
TriggerEvent.AFTER_INSERT);
-          if (hasFailedTriggerBeforeInsertion || 
!result.equals(TriggerFireResult.SUCCESS)) {
-            TSStatus triggerError = new 
TSStatus(TSStatusCode.TRIGGER_FIRE_ERROR.getStatusCode());
-            triggerError.setMessage(
-                "Meet trigger error before/after the insertion, the insertion 
itself is completed.");
-            writeResponse = 
ConsensusWriteResponse.newBuilder().setStatus(triggerError).build();
-          }
-          triggerCostTime += (System.nanoTime() - startTime);
+        startTime = System.nanoTime();
+        result = triggerFireVisitor.process(planNode, 
TriggerEvent.AFTER_INSERT);
+        if (hasFailedTriggerBeforeInsertion || 
!result.equals(TriggerFireResult.SUCCESS)) {
+          status = 
RpcUtils.getStatus(TSStatusCode.TRIGGER_FIRE_ERROR.getStatusCode(),
+              "Meet trigger error before/after the insertion, the insertion 
itself is completed.");
         }
+        triggerCostTime += (System.nanoTime() - startTime);
       }
       PERFORMANCE_OVERVIEW_METRICS.recordScheduleTriggerCost(triggerCostTime);
-      return writeResponse;
+      return status;
     }
 
     @Override
@@ -804,7 +792,7 @@ public class RegionWriteExecutor {
                 String.format(
                     "Template is being unsetting from prefix path of %s. 
Please try activating later.",
                     new PartialPath(
-                            Arrays.copyOf(entry.getKey().getNodes(), 
entry.getValue().right + 1))
+                        Arrays.copyOf(entry.getKey().getNodes(), 
entry.getValue().right + 1))
                         .getFullPath());
             result.setMessage(message);
             result.setStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, 
message));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
index 35b495c8c1b..b1829b810f4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
@@ -265,9 +265,9 @@ public class RegionMigrateService implements IService {
     private ConsensusGenericResponse addRegionPeer(ConsensusGroupId regionId, 
Peer newPeer) {
       ConsensusGenericResponse resp;
       if (regionId instanceof DataRegionId) {
-        resp = DataRegionConsensusImpl.getInstance().addPeer(regionId, 
newPeer);
+        resp = DataRegionConsensusImpl.getInstance().addRemotePeer(regionId, 
newPeer);
       } else {
-        resp = SchemaRegionConsensusImpl.getInstance().addPeer(regionId, 
newPeer);
+        resp = SchemaRegionConsensusImpl.getInstance().addRemotePeer(regionId, 
newPeer);
       }
       return resp;
     }
@@ -354,9 +354,9 @@ public class RegionMigrateService implements IService {
     private ConsensusGenericResponse removeRegionPeer(ConsensusGroupId 
regionId, Peer oldPeer) {
       ConsensusGenericResponse resp;
       if (regionId instanceof DataRegionId) {
-        resp = DataRegionConsensusImpl.getInstance().removePeer(regionId, 
oldPeer);
+        resp = 
DataRegionConsensusImpl.getInstance().removeRemotePeer(regionId, oldPeer);
       } else {
-        resp = SchemaRegionConsensusImpl.getInstance().removePeer(regionId, 
oldPeer);
+        resp = 
SchemaRegionConsensusImpl.getInstance().removeRemotePeer(regionId, oldPeer);
       }
       return resp;
     }
@@ -409,9 +409,9 @@ public class RegionMigrateService implements IService {
       ConsensusGenericResponse resp;
       try {
         if (regionId instanceof DataRegionId) {
-          resp = DataRegionConsensusImpl.getInstance().deletePeer(regionId);
+          resp = 
DataRegionConsensusImpl.getInstance().deleteLocalPeer(regionId);
         } else {
-          resp = SchemaRegionConsensusImpl.getInstance().deletePeer(regionId);
+          resp = 
SchemaRegionConsensusImpl.getInstance().deleteLocalPeer(regionId);
         }
       } catch (Throwable e) {
         taskLogger.error("{}, deletePeer error, regionId: {}", 
REGION_MIGRATE_PROCESS, regionId, e);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
index 8d5a9fff2c8..bc319d63525 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
@@ -90,7 +90,7 @@ public class DataNodeInternalRPCServiceImplTest {
   public void setUp() throws Exception {
     TRegionReplicaSet regionReplicaSet = genRegionReplicaSet();
     SchemaRegionConsensusImpl.getInstance()
-        .createPeer(
+        .createLocalPeer(
             
ConsensusGroupId.Factory.createFromTConsensusGroupId(regionReplicaSet.getRegionId()),
             genSchemaRegionPeerList(regionReplicaSet));
     dataNodeInternalRPCServiceImpl = new DataNodeInternalRPCServiceImpl();
@@ -100,7 +100,7 @@ public class DataNodeInternalRPCServiceImplTest {
   public void tearDown() throws Exception {
     TRegionReplicaSet regionReplicaSet = genRegionReplicaSet();
     SchemaRegionConsensusImpl.getInstance()
-        .deletePeer(
+        .deleteLocalPeer(
             
ConsensusGroupId.Factory.createFromTConsensusGroupId(regionReplicaSet.getRegionId()));
     FileUtils.deleteFully(new File(conf.getConsensusDir()));
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
index 6cb401b56fa..83f041b0f6e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
@@ -139,7 +139,7 @@ public class StatusUtils {
         status.setMessage("Meet error in close operation.");
         break;
       case SYSTEM_READ_ONLY:
-        status.setMessage("Database is read-only.");
+        status.setMessage("Fail to do non-query operations because system is 
read-only.");
         break;
       case DISK_SPACE_INSUFFICIENT:
         status.setMessage("Disk space is insufficient.");

Reply via email to