This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch consensus_module_refactor
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8b9d90ff1e6fea9d1ed1b98533f4e1fd006a484f
Author: BUAAserein <[email protected]>
AuthorDate: Wed Aug 16 22:37:48 2023 +0800

    refactor write
---
 .../confignode/manager/ClusterQuotaManager.java    |  48 ++++-----
 .../iotdb/confignode/manager/ModelManager.java     |  32 +++---
 .../confignode/manager/PermissionManager.java      |  29 ++++--
 .../iotdb/confignode/manager/TriggerManager.java   |  41 ++++----
 .../iotdb/confignode/manager/UDFManager.java       |   7 +-
 .../manager/consensus/ConsensusManager.java        |   8 +-
 .../iotdb/confignode/manager/cq/CQManager.java     |  15 ++-
 .../confignode/manager/cq/CQScheduleTask.java      |  27 ++---
 .../iotdb/confignode/manager/node/NodeManager.java |  44 +++++++--
 .../manager/partition/PartitionManager.java        |  38 ++++---
 .../manager/schema/ClusterSchemaManager.java       | 107 +++++++++++++++++---
 .../procedure/env/ConfigNodeProcedureEnv.java      |  18 +++-
 .../procedure/env/DataNodeRemoveHandler.java       |   7 +-
 .../procedure/impl/cq/CreateCQProcedure.java       | 110 ++++++++++-----------
 .../procedure/impl/model/CreateModelProcedure.java |  18 ++--
 .../procedure/impl/model/DropModelProcedure.java   |   7 +-
 .../pipe/plugin/CreatePipePluginProcedure.java     |  27 +++--
 .../impl/pipe/plugin/DropPipePluginProcedure.java  |  13 ++-
 .../runtime/PipeHandleLeaderChangeProcedure.java   |  19 ++--
 .../runtime/PipeHandleMetaChangeProcedure.java     |  23 +++--
 .../impl/pipe/task/CreatePipeProcedureV2.java      |  42 +++++---
 .../impl/pipe/task/DropPipeProcedureV2.java        |  18 +++-
 .../impl/pipe/task/StartPipeProcedureV2.java       |  42 +++++---
 .../impl/pipe/task/StopPipeProcedureV2.java        |  42 +++++---
 .../impl/schema/DeleteDatabaseProcedure.java       |   3 +-
 .../impl/schema/SetTemplateProcedure.java          |  41 ++++++--
 .../statemachine/CreateRegionGroupsProcedure.java  |   7 +-
 .../impl/trigger/CreateTriggerProcedure.java       |  32 ++++--
 .../procedure/store/ConfigProcedureStore.java      |  13 ++-
 .../org/apache/iotdb/consensus/IConsensus.java     |  44 ++++-----
 .../apache/iotdb/consensus/iot/IoTConsensus.java   |  88 +++++++++--------
 .../iotdb/consensus/ratis/RatisConsensus.java      |  60 +++++------
 .../iotdb/consensus/simple/SimpleConsensus.java    |   1 -
 .../execution/executor/RegionWriteExecutor.java    |  47 ++++-----
 34 files changed, 683 insertions(+), 435 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterQuotaManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterQuotaManager.java
index e013d1f6d46..01ee76691bf 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterQuotaManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterQuotaManager.java
@@ -36,7 +36,7 @@ import 
org.apache.iotdb.confignode.persistence.quota.QuotaInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TShowThrottleReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp;
 import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -79,12 +79,12 @@ public class ClusterQuotaManager {
           "The used quota exceeds the preset quota. Please set a larger 
value.");
     }
     // TODO: Datanode failed to receive rpc
-    ConsensusWriteResponse response =
-        configManager
-            .getConsensusManager()
-            .write(new SetSpaceQuotaPlan(req.getDatabase(), 
req.getSpaceLimit()));
-    if (response.getStatus() != null) {
-      if (response.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+    try {
+      TSStatus response =
+          configManager
+              .getConsensusManager()
+              .write(new SetSpaceQuotaPlan(req.getDatabase(), 
req.getSpaceLimit()));
+      if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         Map<Integer, TDataNodeLocation> dataNodeLocationMap =
             configManager.getNodeManager().getRegisteredDataNodeLocations();
         AsyncClientHandler<TSetSpaceQuotaReq, TSStatus> clientHandler =
@@ -92,15 +92,12 @@ public class ClusterQuotaManager {
         
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
         return 
RpcUtils.squashResponseStatusList(clientHandler.getResponseList());
       }
-      return response.getStatus();
-    } else {
-      LOGGER.warn(
-          "Unexpected error happened while setting space quota on {}: ",
-          req.getDatabase().toString(),
-          response.getException());
+      return response;
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
       // consensus layer related errors
       TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
-      res.setMessage(response.getErrorMessage());
+      res.setMessage(e.getMessage());
       return res;
     }
   }
@@ -183,12 +180,12 @@ public class ClusterQuotaManager {
   }
 
   public TSStatus setThrottleQuota(TSetThrottleQuotaReq req) {
-    ConsensusWriteResponse response =
-        configManager
-            .getConsensusManager()
-            .write(new SetThrottleQuotaPlan(req.getUserName(), 
req.getThrottleQuota()));
-    if (response.getStatus() != null) {
-      if (response.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+    try {
+      TSStatus response =
+          configManager
+              .getConsensusManager()
+              .write(new SetThrottleQuotaPlan(req.getUserName(), 
req.getThrottleQuota()));
+      if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         Map<Integer, TDataNodeLocation> dataNodeLocationMap =
             configManager.getNodeManager().getRegisteredDataNodeLocations();
         AsyncClientHandler<TSetThrottleQuotaReq, TSStatus> clientHandler =
@@ -197,15 +194,12 @@ public class ClusterQuotaManager {
         
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
         return 
RpcUtils.squashResponseStatusList(clientHandler.getResponseList());
       }
-      return response.getStatus();
-    } else {
-      LOGGER.warn(
-          "Unexpected error happened while setting throttle quota on user: {}: 
",
-          req.getUserName(),
-          response.getException());
+      return response;
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
       // consensus layer related errors
       TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
-      res.setMessage(response.getErrorMessage());
+      res.setMessage(e.getMessage());
       return res;
     }
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ModelManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ModelManager.java
index 425bafbd4c0..7fd18b5efca 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ModelManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ModelManager.java
@@ -37,7 +37,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowTrailResp;
 import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelInfoReq;
 import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelStateReq;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
@@ -79,35 +79,25 @@ public class ModelManager {
   }
 
   public TSStatus updateModelInfo(TUpdateModelInfoReq req) {
-    ConsensusWriteResponse response =
-        configManager.getConsensusManager().write(new 
UpdateModelInfoPlan(req));
-    if (response.getStatus() != null) {
-      return response.getStatus();
-    } else {
-      LOGGER.warn(
-          "Unexpected error happened while updating model {}: ",
-          req.getModelId(),
-          response.getException());
+    try {
+      return configManager.getConsensusManager().write(new 
UpdateModelInfoPlan(req));
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
       // consensus layer related errors
       TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
-      res.setMessage(response.getErrorMessage());
+      res.setMessage(e.getMessage());
       return res;
     }
   }
 
   public TSStatus updateModelState(TUpdateModelStateReq req) {
-    ConsensusWriteResponse response =
-        configManager.getConsensusManager().write(new 
UpdateModelStatePlan(req));
-    if (response.getStatus() != null) {
-      return response.getStatus();
-    } else {
-      LOGGER.warn(
-          "Unexpected error happened while updating state of model {}: ",
-          req.getModelId(),
-          response.getException());
+    try {
+      return configManager.getConsensusManager().write(new 
UpdateModelStatePlan(req));
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
       // consensus layer related errors
       TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
-      res.setMessage(response.getErrorMessage());
+      res.setMessage(e.getMessage());
       return res;
     }
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
index bfe0c3925fc..1f5c5a999d8 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
@@ -30,15 +30,21 @@ import 
org.apache.iotdb.confignode.consensus.response.auth.PermissionInfoResp;
 import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
 import org.apache.iotdb.confignode.persistence.AuthorInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.List;
 
 /** Manager permission read and operation. */
 public class PermissionManager {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PermissionManager.class);
+
   private final ConfigManager configManager;
   private final AuthorInfo authorInfo;
 
@@ -56,16 +62,23 @@ public class PermissionManager {
   public TSStatus operatePermission(AuthorPlan authorPlan) {
     TSStatus tsStatus;
     // If the permissions change, clear the cache content affected by the 
operation
-    if (authorPlan.getAuthorType() == ConfigPhysicalPlanType.CreateUser
-        || authorPlan.getAuthorType() == ConfigPhysicalPlanType.CreateRole) {
-      tsStatus = getConsensusManager().write(authorPlan).getStatus();
-    } else {
-      tsStatus = invalidateCache(authorPlan.getUserName(), 
authorPlan.getRoleName());
-      if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        tsStatus = getConsensusManager().write(authorPlan).getStatus();
+    try {
+      if (authorPlan.getAuthorType() == ConfigPhysicalPlanType.CreateUser
+          || authorPlan.getAuthorType() == ConfigPhysicalPlanType.CreateRole) {
+        tsStatus = getConsensusManager().write(authorPlan);
+      } else {
+        tsStatus = invalidateCache(authorPlan.getUserName(), 
authorPlan.getRoleName());
+        if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) 
{
+          tsStatus = getConsensusManager().write(authorPlan);
+        }
       }
+      return tsStatus;
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      return res;
     }
-    return tsStatus;
   }
 
   /**
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
index 1cb38042600..e42a0fb029e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
@@ -47,6 +47,7 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -58,7 +59,6 @@ import org.apache.iotdb.tsfile.utils.Binary;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -135,12 +135,9 @@ public class TriggerManager {
   public TGetTriggerTableResp getTriggerTable(boolean onlyStateful) {
     try {
       return ((TriggerTableResp)
-              configManager
-                  .getConsensusManager()
-                  .read(new GetTriggerTablePlan(onlyStateful))
-                  .getDataset())
+              configManager.getConsensusManager().read(new 
GetTriggerTablePlan(onlyStateful)))
           .convertToThriftResponse();
-    } catch (IOException e) {
+    } catch (Exception e) {
       LOGGER.error("Fail to get TriggerTable", e);
       return new TGetTriggerTableResp(
           new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
@@ -150,12 +147,16 @@ public class TriggerManager {
   }
 
   public TGetLocationForTriggerResp getLocationOfStatefulTrigger(String 
triggerName) {
-    return ((TriggerLocationResp)
-            configManager
-                .getConsensusManager()
-                .read(new GetTriggerLocationPlan(triggerName))
-                .getDataset())
-        .convertToThriftResponse();
+    try {
+      return ((TriggerLocationResp)
+              configManager.getConsensusManager().read(new 
GetTriggerLocationPlan(triggerName)))
+          .convertToThriftResponse();
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      return new TGetLocationForTriggerResp(
+          new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+              .setMessage(e.getMessage()));
+    }
   }
 
   public TGetJarInListResp getTriggerJar(TGetJarInListReq req) {
@@ -191,16 +192,13 @@ public class TriggerManager {
       NodeManager nodeManager = configManager.getNodeManager();
 
       transferResult =
-          consensusManager
-              .write(new 
UpdateTriggersOnTransferNodesPlan(newUnknownDataNodeList))
-              .getStatus();
+          consensusManager.write(new 
UpdateTriggersOnTransferNodesPlan(newUnknownDataNodeList));
       if (transferResult.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return transferResult;
       }
 
       List<String> transferringTriggers =
-          ((TransferringTriggersResp)
-                  consensusManager.read(new 
GetTransferringTriggersPlan()).getDataset())
+          ((TransferringTriggersResp) consensusManager.read(new 
GetTransferringTriggersPlan()))
               .getTransferringTriggers();
 
       for (String trigger : transferringTriggers) {
@@ -215,13 +213,16 @@ public class TriggerManager {
         }
 
         transferResult =
-            consensusManager
-                .write(new UpdateTriggerLocationPlan(trigger, 
newDataNodeLocation))
-                .getStatus();
+            consensusManager.write(new UpdateTriggerLocationPlan(trigger, 
newDataNodeLocation));
         if (transferResult.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
           return transferResult;
         }
       }
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      return res;
     } finally {
       triggerInfo.releaseTriggerTableLock();
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
index a7f5ef245b8..f2b747104c1 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
@@ -110,7 +110,7 @@ public class UDFManager {
 
       LOGGER.info("Start to add UDF [{}] in UDF_Table on Config Nodes", 
udfName);
 
-      return 
configManager.getConsensusManager().write(createFunctionPlan).getStatus();
+      return configManager.getConsensusManager().write(createFunctionPlan);
     } catch (Exception e) {
       LOGGER.warn(e.getMessage(), e);
       return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
@@ -143,10 +143,7 @@ public class UDFManager {
         return result;
       }
 
-      return configManager
-          .getConsensusManager()
-          .write(new DropFunctionPlan(functionName))
-          .getStatus();
+      return configManager.getConsensusManager().write(new 
DropFunctionPlan(functionName));
     } catch (Exception e) {
       LOGGER.warn(e.getMessage(), e);
       return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
index 95182ce9ab1..5d0d0c6551f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
@@ -36,11 +36,11 @@ import org.apache.iotdb.confignode.manager.IManager;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
 import org.apache.iotdb.consensus.config.RatisConfig;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.ratis.util.SizeInBytes;
@@ -294,12 +294,12 @@ public class ConsensusManager {
   }
 
   /** Transmit PhysicalPlan to confignode.consensus.statemachine */
-  public ConsensusWriteResponse write(ConfigPhysicalPlan plan) {
+  public TSStatus write(ConfigPhysicalPlan plan) throws ConsensusException {
     return consensusImpl.write(DEFAULT_CONSENSUS_GROUP_ID, plan);
   }
 
   /** Transmit PhysicalPlan to confignode.consensus.statemachine */
-  public ConsensusReadResponse read(ConfigPhysicalPlan plan) {
+  public DataSet read(ConfigPhysicalPlan plan) throws ConsensusException {
     return consensusImpl.read(DEFAULT_CONSENSUS_GROUP_ID, plan);
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
index bbbae4f3b2e..43878c16c96 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
@@ -34,7 +34,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
@@ -78,16 +78,13 @@ public class CQManager {
   }
 
   public TSStatus dropCQ(TDropCQReq req) {
-    ConsensusWriteResponse response =
-        configManager.getConsensusManager().write(new DropCQPlan(req.cqId));
-    if (response.getStatus() != null) {
-      return response.getStatus();
-    } else {
-      LOGGER.warn(
-          "Unexpected error happened while dropping cq {}: ", req.cqId, 
response.getException());
+    try {
+      return configManager.getConsensusManager().write(new 
DropCQPlan(req.cqId));
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
       // consensus layer related errors
       TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
-      res.setMessage(response.getErrorMessage());
+      res.setMessage(e.getMessage());
       return res;
     }
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
index 5fa60c834c8..beb97355496 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
@@ -29,7 +29,7 @@ import 
org.apache.iotdb.confignode.consensus.request.write.cq.UpdateCQLastExecTi
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.persistence.cq.CQInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.mpp.rpc.thrift.TExecuteCQ;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -245,24 +245,29 @@ public class CQScheduleTask implements Runnable {
             startTime,
             endTime,
             System.currentTimeMillis() * FACTOR);
-
-        ConsensusWriteResponse result =
-            configManager
-                .getConsensusManager()
-                .write(new UpdateCQLastExecTimePlan(cqId, executionTime, md5));
+        TSStatus result;
+        try {
+          result =
+              configManager
+                  .getConsensusManager()
+                  .write(new UpdateCQLastExecTimePlan(cqId, executionTime, 
md5));
+        } catch (ConsensusException e) {
+          LOGGER.warn("Something wrong happened while calling consensus 
layer's write API.", e);
+          result = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+          result.setMessage(e.getMessage());
+        }
 
         // while leadership changed, the update last exec time operation for 
CQTasks in new leader
         // may still update failed because stale CQTask in old leader may 
update it in advance
-        if (!result.isSuccessful()) {
+        if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
           LOGGER.warn(
               "Failed to update the last execution time {} of CQ {}, because 
{}",
               executionTime,
               cqId,
-              result.getErrorMessage());
+              result.getMessage());
           // no such cq, we don't need to submit it again
-          if (result.getStatus() != null
-              && result.getStatus().code == 
TSStatusCode.NO_SUCH_CQ.getStatusCode()) {
-            LOGGER.info("Stop submitting CQ {} because {}", cqId, 
result.getStatus().message);
+          if (result.getCode() == TSStatusCode.NO_SUCH_CQ.getStatusCode()) {
+            LOGGER.info("Stop submitting CQ {} because {}", cqId, 
result.getMessage());
             return;
           }
         }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 1ed797f6fdd..bf59d5c2b77 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -77,7 +77,7 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TSetDataNodeStatusReq;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -250,12 +250,20 @@ public class NodeManager {
         new RegisterDataNodePlan(req.getDataNodeConfiguration());
     // Register new DataNode
     
registerDataNodePlan.getDataNodeConfiguration().getLocation().setDataNodeId(dataNodeId);
-    getConsensusManager().write(registerDataNodePlan);
+    try {
+      getConsensusManager().write(registerDataNodePlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+    }
 
     // update datanode's versionInfo
     UpdateVersionInfoPlan updateVersionInfoPlan =
         new UpdateVersionInfoPlan(req.getVersionInfo(), dataNodeId);
-    getConsensusManager().write(updateVersionInfoPlan);
+    try {
+      getConsensusManager().write(updateVersionInfoPlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+    }
 
     // Bind DataNode metrics
     PartitionMetrics.bindDataNodePartitionMetrics(
@@ -281,14 +289,22 @@ public class NodeManager {
       // Update DataNodeConfiguration when modified during restart
       UpdateDataNodePlan updateDataNodePlan =
           new UpdateDataNodePlan(req.getDataNodeConfiguration());
-      getConsensusManager().write(updateDataNodePlan);
+      try {
+        getConsensusManager().write(updateDataNodePlan);
+      } catch (ConsensusException e) {
+        LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      }
     }
     TNodeVersionInfo versionInfo = nodeInfo.getVersionInfo(nodeId);
     if (!req.getVersionInfo().equals(versionInfo)) {
       // Update versionInfo when modified during restart
       UpdateVersionInfoPlan updateVersionInfoPlan =
           new UpdateVersionInfoPlan(req.getVersionInfo(), nodeId);
-      getConsensusManager().write(updateVersionInfoPlan);
+      try {
+        getConsensusManager().write(updateVersionInfoPlan);
+      } catch (ConsensusException e) {
+        LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      }
     }
 
     TDataNodeRestartResp resp = new TDataNodeRestartResp();
@@ -363,11 +379,11 @@ public class NodeManager {
       // Update versionInfo when modified during restart
       UpdateVersionInfoPlan updateConfigNodePlan =
           new UpdateVersionInfoPlan(versionInfo, configNodeId);
-      ConsensusWriteResponse result = 
getConsensusManager().write(updateConfigNodePlan);
-      if (result.getException() != null) {
+      try {
+        return getConsensusManager().write(updateConfigNodePlan);
+      } catch (ConsensusException e) {
         return new 
TSStatus(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode());
       }
-      return result.getStatus();
     }
     return ClusterNodeStartUtils.ACCEPT_NODE_RESTART;
   }
@@ -526,10 +542,18 @@ public class NodeManager {
   public void applyConfigNode(
       TConfigNodeLocation configNodeLocation, TNodeVersionInfo versionInfo) {
     ApplyConfigNodePlan applyConfigNodePlan = new 
ApplyConfigNodePlan(configNodeLocation);
-    getConsensusManager().write(applyConfigNodePlan);
+    try {
+      getConsensusManager().write(applyConfigNodePlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+    }
     UpdateVersionInfoPlan updateVersionInfoPlan =
         new UpdateVersionInfoPlan(versionInfo, 
configNodeLocation.getConfigNodeId());
-    getConsensusManager().write(updateVersionInfoPlan);
+    try {
+      getConsensusManager().write(updateVersionInfoPlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+    }
   }
 
   /**
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index ac0df2916a9..18c1c4f44e2 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -86,7 +86,7 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
 import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -438,15 +438,15 @@ public class PartitionManager {
       // since the RegionGroup creating process might take some time
       return status;
     }
-
-    ConsensusWriteResponse writeResp = getConsensusManager().write(plan);
-    if (!writeResp.isSuccessful()) {
+    try {
+      return getConsensusManager().write(plan);
+    } catch (ConsensusException e) {
       // The allocation might fail due to consensus error
-      status = writeResp.getStatus();
-      status.setMessage(writeResp.getErrorMessage());
       LOGGER.error("Write DataPartition allocation result failed because: {}", 
status);
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      return res;
     }
-    return status;
   }
 
   // ======================================================
@@ -896,7 +896,11 @@ public class PartitionManager {
       String database, PreDeleteDatabasePlan.PreDeleteType preDeleteType) {
     final PreDeleteDatabasePlan preDeleteDatabasePlan =
         new PreDeleteDatabasePlan(database, preDeleteType);
-    getConsensusManager().write(preDeleteDatabasePlan);
+    try {
+      getConsensusManager().write(preDeleteDatabasePlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+    }
   }
 
   public boolean isDatabasePreDeleted(String database) {
@@ -956,7 +960,14 @@ public class PartitionManager {
    * @return TSStatus
    */
   public TSStatus updateRegionLocation(UpdateRegionLocationPlan req) {
-    return getConsensusManager().write(req).getStatus();
+    try {
+      return getConsensusManager().write(req);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      return res;
+    }
   }
 
   public GetRegionIdResp getRegionId(TGetRegionIdReq req) {
@@ -1233,8 +1244,13 @@ public class PartitionManager {
                   }
 
                   // Poll the head entry if success
-                  getConsensusManager()
-                      .write(new 
PollSpecificRegionMaintainTaskPlan(successfulTask));
+                  try {
+                    getConsensusManager()
+                        .write(new 
PollSpecificRegionMaintainTaskPlan(successfulTask));
+                  } catch (ConsensusException e) {
+                    LOGGER.warn(
+                        "Something wrong happened while calling consensus 
layer's write API.", e);
+                  }
 
                   if (successfulTask.size() < 
selectedRegionMaintainTask.size()) {
                     // Here we just break and wait until next schedule task
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
index 808627fd82c..8ef9a1b64f3 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
@@ -72,6 +72,7 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.exception.metadata.DatabaseAlreadySetException;
 import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
@@ -152,12 +153,16 @@ public class ClusterSchemaManager {
         clusterSchemaInfo.checkDatabaseLimit();
       }
       // Cache DatabaseSchema
-      result = getConsensusManager().write(databaseSchemaPlan).getStatus();
+      result = getConsensusManager().write(databaseSchemaPlan);
       // Bind Database metrics
       PartitionMetrics.bindDatabasePartitionMetrics(
           MetricService.getInstance(), configManager, 
databaseSchemaPlan.getSchema().getName());
       // Adjust the maximum RegionGroup number of each Database
       adjustMaxRegionGroupNum();
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      result = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      result.setMessage(e.getMessage());
     } catch (MetadataException metadataException) {
       // Reject if StorageGroup already set
       if (metadataException instanceof IllegalPathException) {
@@ -220,12 +225,26 @@ public class ClusterSchemaManager {
     }
 
     // Alter DatabaseSchema
-    return getConsensusManager().write(databaseSchemaPlan).getStatus();
+    try {
+      return getConsensusManager().write(databaseSchemaPlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      result = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      result.setMessage(e.getMessage());
+      return result;
+    }
   }
 
   /** Delete DatabaseSchema. */
   public TSStatus deleteDatabase(DeleteDatabasePlan deleteDatabasePlan) {
-    TSStatus result = 
getConsensusManager().write(deleteDatabasePlan).getStatus();
+    TSStatus result;
+    try {
+      result = getConsensusManager().write(deleteDatabasePlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      result = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      result.setMessage(e.getMessage());
+    }
     if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       adjustMaxRegionGroupNum();
     }
@@ -374,25 +393,53 @@ public class ClusterSchemaManager {
     // TODO: Check response
     
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
 
-    return getConsensusManager().write(setTTLPlan).getStatus();
+    try {
+      return getConsensusManager().write(setTTLPlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      TSStatus result = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      result.setMessage(e.getMessage());
+      return result;
+    }
   }
 
   public TSStatus setSchemaReplicationFactor(
       SetSchemaReplicationFactorPlan setSchemaReplicationFactorPlan) {
     // TODO: Inform DataNodes
-    return 
getConsensusManager().write(setSchemaReplicationFactorPlan).getStatus();
+    try {
+      return getConsensusManager().write(setSchemaReplicationFactorPlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      TSStatus result = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      result.setMessage(e.getMessage());
+      return result;
+    }
   }
 
   public TSStatus setDataReplicationFactor(
       SetDataReplicationFactorPlan setDataReplicationFactorPlan) {
     // TODO: Inform DataNodes
-    return 
getConsensusManager().write(setDataReplicationFactorPlan).getStatus();
+    try {
+      return getConsensusManager().write(setDataReplicationFactorPlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      TSStatus result = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      result.setMessage(e.getMessage());
+      return result;
+    }
   }
 
   public TSStatus setTimePartitionInterval(
       SetTimePartitionIntervalPlan setTimePartitionIntervalPlan) {
     // TODO: Inform DataNodes
-    return 
getConsensusManager().write(setTimePartitionIntervalPlan).getStatus();
+    try {
+      return getConsensusManager().write(setTimePartitionIntervalPlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      TSStatus result = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      result.setMessage(e.getMessage());
+      return result;
+    }
   }
 
   /**
@@ -478,7 +525,11 @@ public class ClusterSchemaManager {
         LOGGER.warn("Adjust maxRegionGroupNum failed because StorageGroup 
doesn't exist", e);
       }
     }
-    getConsensusManager().write(adjustMaxRegionGroupNumPlan);
+    try {
+      getConsensusManager().write(adjustMaxRegionGroupNumPlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+    }
   }
 
   public static int calcMaxRegionGroupNum(
@@ -740,19 +791,36 @@ public class ClusterSchemaManager {
   }
 
   public TSStatus preUnsetSchemaTemplate(int templateId, PartialPath path) {
-    return getConsensusManager()
-        .write(new PreUnsetSchemaTemplatePlan(templateId, path))
-        .getStatus();
+    try {
+      return getConsensusManager().write(new 
PreUnsetSchemaTemplatePlan(templateId, path));
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      TSStatus result = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      result.setMessage(e.getMessage());
+      return result;
+    }
   }
 
   public TSStatus rollbackPreUnsetSchemaTemplate(int templateId, PartialPath 
path) {
-    return getConsensusManager()
-        .write(new RollbackPreUnsetSchemaTemplatePlan(templateId, path))
-        .getStatus();
+    try {
+      return getConsensusManager().write(new 
RollbackPreUnsetSchemaTemplatePlan(templateId, path));
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      TSStatus result = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      result.setMessage(e.getMessage());
+      return result;
+    }
   }
 
   public TSStatus unsetSchemaTemplateInBlackList(int templateId, PartialPath 
path) {
-    return getConsensusManager().write(new UnsetSchemaTemplatePlan(templateId, 
path)).getStatus();
+    try {
+      return getConsensusManager().write(new 
UnsetSchemaTemplatePlan(templateId, path));
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      TSStatus result = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      result.setMessage(e.getMessage());
+      return result;
+    }
   }
 
   public synchronized TSStatus dropSchemaTemplate(String templateName) {
@@ -784,7 +852,14 @@ public class ClusterSchemaManager {
     }
 
     // execute drop template
-    return getConsensusManager().write(new 
DropSchemaTemplatePlan(templateName)).getStatus();
+    try {
+      return getConsensusManager().write(new 
DropSchemaTemplatePlan(templateName));
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      TSStatus result = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      result.setMessage(e.getMessage());
+      return result;
+    }
   }
 
   public synchronized TSStatus extendSchemaTemplate(TemplateExtendInfo 
templateExtendInfo) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 371243bc197..996d44a301d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -58,6 +58,7 @@ import 
org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
 import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
 import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
 import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreatePipePluginInstanceReq;
@@ -263,14 +264,19 @@ public class ConfigNodeProcedureEnv {
     try {
       // Execute removePeer
       if (getConsensusManager().removeConfigNodePeer(tConfigNodeLocation)) {
-        tsStatus =
-            getConsensusManager().write(new 
RemoveConfigNodePlan(tConfigNodeLocation)).getStatus();
+        tsStatus = getConsensusManager().write(new 
RemoveConfigNodePlan(tConfigNodeLocation));
       } else {
         tsStatus =
             new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode())
                 .setMessage(
                     "Remove ConfigNode failed because update ConsensusGroup 
peer information failed.");
       }
+    } catch (ConsensusException e) {
+      LOG.warn("Something wrong happened while calling consensus layer's write 
API.", e);
+      tsStatus = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      tsStatus.setMessage(e.getMessage());
+    }
+    try {
       if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         throw new ProcedureException(tsStatus.getMessage());
       }
@@ -363,7 +369,7 @@ public class ConfigNodeProcedureEnv {
             new TUpdateConfigNodeGroupReq(registeredConfigNodes),
             registeredDataNodes);
 
-    if (registeredDataNodes.size() > 0) {
+    if (!registeredDataNodes.isEmpty()) {
       LOG.info(
           "Begin to broadcast the latest configNodeGroup to DataNodes, 
ConfigNodeGroups: {}, DataNodes: {}",
           registeredConfigNodes,
@@ -536,7 +542,11 @@ public class ConfigNodeProcedureEnv {
 
   public void persistRegionGroup(CreateRegionGroupsPlan 
createRegionGroupsPlan) {
     // Persist the allocation result
-    getConsensusManager().write(createRegionGroupsPlan);
+    try {
+      getConsensusManager().write(createRegionGroupsPlan);
+    } catch (ConsensusException e) {
+      LOG.warn("Something wrong happened while calling consensus layer's write 
API.", e);
+    }
   }
 
   /**
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index d4cde051a84..e15aa30c2e5 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
 import org.apache.iotdb.confignode.persistence.node.NodeInfo;
 import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq;
 import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
@@ -560,7 +561,11 @@ public class DataNodeRemoveHandler {
   public void removeDataNodePersistence(TDataNodeLocation dataNodeLocation) {
     // Remove consensus record
     List<TDataNodeLocation> removeDataNodes = 
Collections.singletonList(dataNodeLocation);
-    configManager.getConsensusManager().write(new 
RemoveDataNodePlan(removeDataNodes));
+    try {
+      configManager.getConsensusManager().write(new 
RemoveDataNodePlan(removeDataNodes));
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+    }
 
     // Adjust maxRegionGroupNum
     configManager.getClusterSchemaManager().adjustMaxRegionGroupNum();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
index 480fca9abbd..ec7eb850f59 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
@@ -34,7 +34,7 @@ import 
org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
 import org.apache.iotdb.confignode.procedure.state.cq.CreateCQState;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
@@ -119,51 +119,47 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
   }
 
   private void addCQ(ConfigNodeProcedureEnv env) {
-    ConsensusWriteResponse response =
-        env.getConfigManager()
-            .getConsensusManager()
-            .write(new AddCQPlan(req, md5, firstExecutionTime));
-    TSStatus res = response.getStatus();
-    if (res != null) {
-      if (res.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        LOGGER.debug("Finish init CQ {} successfully", req.cqId);
-        setNextState(INACTIVE);
-      } else if (res.code == TSStatusCode.CQ_ALREADY_EXIST.getStatusCode()) {
-        LOGGER.info("Failed to init CQ {} because such cq already exists", 
req.cqId);
-        setFailure(new ProcedureException(new IoTDBException(res.message, 
res.code)));
-      } else {
-        LOGGER.warn("Failed to init CQ {} because of unknown reasons {}", 
req.cqId, res);
-        setFailure(new ProcedureException(new IoTDBException(res.message, 
res.code)));
-      }
+    TSStatus res;
+    try {
+      res =
+          env.getConfigManager()
+              .getConsensusManager()
+              .write(new AddCQPlan(req, md5, firstExecutionTime));
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+    }
+    if (res.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      LOGGER.debug("Finish init CQ {} successfully", req.cqId);
+      setNextState(INACTIVE);
+    } else if (res.code == TSStatusCode.CQ_ALREADY_EXIST.getStatusCode()) {
+      LOGGER.info("Failed to init CQ {} because such cq already exists", 
req.cqId);
+      setFailure(new ProcedureException(new IoTDBException(res.message, 
res.code)));
     } else {
-      LOGGER.warn(
-          "Failed to init CQ {} because of unexpected exception: ",
-          req.cqId,
-          response.getException());
-      setFailure(new ProcedureException(response.getException()));
+      LOGGER.warn("Failed to init CQ {} because of unknown reasons {}", 
req.cqId, res);
+      setFailure(new ProcedureException(new IoTDBException(res.message, 
res.code)));
     }
   }
 
   private void activeCQ(ConfigNodeProcedureEnv env) {
-    ConsensusWriteResponse response =
-        env.getConfigManager().getConsensusManager().write(new 
ActiveCQPlan(req.cqId, md5));
-    TSStatus res = response.getStatus();
-    if (res != null) {
-      if (res.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        LOGGER.debug("Finish Scheduling CQ {} successfully", req.cqId);
-      } else if (res.code == TSStatusCode.NO_SUCH_CQ.getStatusCode()) {
-        LOGGER.warn("Failed to active CQ {} because of no such cq: {}", 
req.cqId, res.message);
-      } else if (res.code == TSStatusCode.CQ_ALREADY_ACTIVE.getStatusCode()) {
-        LOGGER.warn("Failed to active CQ {} because this cq has already been 
active", req.cqId);
-      } else {
-        LOGGER.warn(
-            "Failed to active CQ {} successfully because of unknown reasons 
{}", req.cqId, res);
-      }
+    TSStatus res;
+    try {
+      res = env.getConfigManager().getConsensusManager().write(new 
ActiveCQPlan(req.cqId, md5));
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+    }
+    if (res.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      LOGGER.debug("Finish Scheduling CQ {} successfully", req.cqId);
+    } else if (res.code == TSStatusCode.NO_SUCH_CQ.getStatusCode()) {
+      LOGGER.warn("Failed to active CQ {} because of no such cq: {}", 
req.cqId, res.message);
+    } else if (res.code == TSStatusCode.CQ_ALREADY_ACTIVE.getStatusCode()) {
+      LOGGER.warn("Failed to active CQ {} because this cq has already been 
active", req.cqId);
     } else {
       LOGGER.warn(
-          "Failed to active CQ {} successfully because of unexpected 
exception: ",
-          req.cqId,
-          response.getException());
+          "Failed to active CQ {} successfully because of unknown reasons {}", 
req.cqId, res);
     }
   }
 
@@ -177,28 +173,26 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
         break;
       case INACTIVE:
         LOGGER.info("Start [INACTIVE] rollback of CQ {}", req.cqId);
-        ConsensusWriteResponse response =
-            env.getConfigManager().getConsensusManager().write(new 
DropCQPlan(req.cqId, md5));
-        TSStatus res = response.getStatus();
-        if (res != null) {
-          if (res.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-            LOGGER.info("Finish [INACTIVE] rollback of CQ {} successfully", 
req.cqId);
-          } else if (res.code == TSStatusCode.NO_SUCH_CQ.getStatusCode()) {
-            LOGGER.warn(
-                "Failed to do [INACTIVE] rollback of CQ {} because of no such 
cq: {}",
-                req.cqId,
-                res.message);
-          } else {
-            LOGGER.warn(
-                "Failed to do [INACTIVE] rollback of CQ {} because of unknown 
reasons {}",
-                req.cqId,
-                res);
-          }
+        TSStatus res;
+        try {
+          res = env.getConfigManager().getConsensusManager().write(new 
DropCQPlan(req.cqId, md5));
+        } catch (ConsensusException e) {
+          LOGGER.warn("Something wrong happened while calling consensus 
layer's write API.", e);
+          res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+          res.setMessage(e.getMessage());
+        }
+        if (res.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          LOGGER.info("Finish [INACTIVE] rollback of CQ {} successfully", 
req.cqId);
+        } else if (res.code == TSStatusCode.NO_SUCH_CQ.getStatusCode()) {
+          LOGGER.warn(
+              "Failed to do [INACTIVE] rollback of CQ {} because of no such 
cq: {}",
+              req.cqId,
+              res.message);
         } else {
           LOGGER.warn(
-              "Failed to do [INACTIVE] rollback of CQ {} because of unexpected 
exception: ",
+              "Failed to do [INACTIVE] rollback of CQ {} because of unknown 
reasons {}",
               req.cqId,
-              response.getException());
+              res);
         }
 
         break;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/CreateModelProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/CreateModelProcedure.java
index be189bd5ce1..f4e0dbb989c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/CreateModelProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/CreateModelProcedure.java
@@ -31,7 +31,7 @@ import 
org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
 import org.apache.iotdb.confignode.procedure.state.model.CreateModelState;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.db.protocol.client.MLNodeClient;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -91,13 +91,13 @@ public class CreateModelProcedure extends 
AbstractNodeProcedure<CreateModelState
 
           LOGGER.info("Start to add model [{}] in ModelTable on Config Nodes", 
modelId);
 
-          ConsensusWriteResponse response =
+          TSStatus response =
               configManager.getConsensusManager().write(new 
CreateModelPlan(modelInformation));
-          if (!response.isSuccessful()) {
+          if (response.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
             throw new ModelManagementException(
                 String.format(
                     "Failed to add model [%s] in ModelTable on Config Nodes: 
%s",
-                    modelId, response.getErrorMessage()));
+                    modelId, response.getMessage()));
           }
 
           setNextState(CreateModelState.CONFIG_NODE_ACTIVE);
@@ -160,9 +160,13 @@ public class CreateModelProcedure extends 
AbstractNodeProcedure<CreateModelState
       case VALIDATED:
         LOGGER.info("Start [VALIDATED] rollback of model [{}]", 
modelInformation.getModelId());
 
-        env.getConfigManager()
-            .getConsensusManager()
-            .write(new DropModelPlan(modelInformation.getModelId()));
+        try {
+          env.getConfigManager()
+              .getConsensusManager()
+              .write(new DropModelPlan(modelInformation.getModelId()));
+        } catch (ConsensusException e) {
+          LOGGER.warn("Something wrong happened while calling consensus 
layer's write API.", e);
+        }
         break;
 
       default:
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/DropModelProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/DropModelProcedure.java
index 23ef71e898e..6c1f92c2d4c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/DropModelProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/DropModelProcedure.java
@@ -31,7 +31,6 @@ import 
org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
 import org.apache.iotdb.confignode.procedure.state.model.DropModelState;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.db.protocol.client.MLNodeClient;
 import org.apache.iotdb.mpp.rpc.thrift.TDeleteModelMetricsReq;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -131,13 +130,13 @@ public class DropModelProcedure extends 
AbstractNodeProcedure<DropModelState> {
         case ML_NODE_DROPPED:
           LOGGER.info("Start to drop model [{}] on Config Nodes", modelId);
 
-          ConsensusWriteResponse response =
+          TSStatus response =
               env.getConfigManager().getConsensusManager().write(new 
DropModelPlan(modelId));
-          if (!response.isSuccessful()) {
+          if (response.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
             throw new ModelManagementException(
                 String.format(
                     "Failed to drop model [%s], fail to drop model on Config 
Nodes: %s",
-                    modelId, response.getErrorMessage()));
+                    modelId, response.getMessage()));
           }
 
           setNextState(DropModelState.CONFIG_NODE_DROPPED);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
index c6d68587493..b0fd11933e5 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.confignode.procedure.impl.pipe.plugin;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
 import org.apache.iotdb.commons.utils.TestOnly;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan;
@@ -35,7 +36,7 @@ import 
org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure
 import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure;
 import 
org.apache.iotdb.confignode.procedure.state.pipe.plugin.CreatePipePluginState;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -163,10 +164,16 @@ public class CreatePipePluginProcedure extends 
AbstractNodeProcedure<CreatePipeP
     final CreatePipePluginPlan createPluginPlan =
         new CreatePipePluginPlan(pipePluginMeta, needToSaveJar ? new 
Binary(jarFile) : null);
 
-    final ConsensusWriteResponse response =
-        configNodeManager.getConsensusManager().write(createPluginPlan);
-    if (!response.isSuccessful()) {
-      throw new PipeException(response.getErrorMessage());
+    TSStatus response;
+    try {
+      response = 
configNodeManager.getConsensusManager().write(createPluginPlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      response = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      response.setMessage(e.getMessage());
+    }
+    if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(response.getMessage());
     }
 
     setNextState(CreatePipePluginState.CREATE_ON_DATA_NODES);
@@ -226,9 +233,13 @@ public class CreatePipePluginProcedure extends 
AbstractNodeProcedure<CreatePipeP
         "CreatePipePluginProcedure: rollbackFromCreateOnConfigNodes({})",
         pipePluginMeta.getPluginName());
 
-    env.getConfigManager()
-        .getConsensusManager()
-        .write(new DropPipePluginPlan(pipePluginMeta.getPluginName()));
+    try {
+      env.getConfigManager()
+          .getConsensusManager()
+          .write(new DropPipePluginPlan(pipePluginMeta.getPluginName()));
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+    }
   }
 
   private void rollbackFromCreateOnDataNodes(ConfigNodeProcedureEnv env) 
throws ProcedureException {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index 676d127e27c..f60ff784043 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure
 import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure;
 import 
org.apache.iotdb.confignode.procedure.state.pipe.plugin.DropPipePluginState;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -118,7 +119,11 @@ public class DropPipePluginProcedure extends 
AbstractNodeProcedure<DropPipePlugi
       return Flow.NO_MORE_STATE;
     }
 
-    env.getConfigManager().getConsensusManager().write(new 
DropPipePluginPlan(pluginName));
+    try {
+      env.getConfigManager().getConsensusManager().write(new 
DropPipePluginPlan(pluginName));
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+    }
 
     setNextState(DropPipePluginState.DROP_ON_DATA_NODES);
     return Flow.HAS_MORE_STATE;
@@ -141,7 +146,11 @@ public class DropPipePluginProcedure extends 
AbstractNodeProcedure<DropPipePlugi
   private Flow executeFromDropOnConfigNodes(ConfigNodeProcedureEnv env) {
     LOGGER.info("DropPipePluginProcedure: executeFromDropOnConfigNodes({})", 
pluginName);
 
-    env.getConfigManager().getConsensusManager().write(new 
DropPipePluginPlan(pluginName));
+    try {
+      env.getConfigManager().getConsensusManager().write(new 
DropPipePluginPlan(pluginName));
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+    }
 
     setNextState(DropPipePluginState.UNLOCK);
     return Flow.HAS_MORE_STATE;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
index 5d8fcf96e23..7536b822793 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
@@ -21,13 +21,15 @@ package 
org.apache.iotdb.confignode.procedure.impl.pipe.runtime;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleLeaderChangePlan;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import 
org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2;
 import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
@@ -91,11 +93,16 @@ public class PipeHandleLeaderChangeProcedure extends 
AbstractOperatePipeProcedur
 
     final PipeHandleLeaderChangePlan pipeHandleLeaderChangePlan =
         new 
PipeHandleLeaderChangePlan(newDataRegionGroupIdToLeaderDataRegionIdMap);
-
-    final ConsensusWriteResponse response =
-        
env.getConfigManager().getConsensusManager().write(pipeHandleLeaderChangePlan);
-    if (!response.isSuccessful()) {
-      throw new PipeException(response.getErrorMessage());
+    TSStatus response;
+    try {
+      response = 
env.getConfigManager().getConsensusManager().write(pipeHandleLeaderChangePlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      response = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      response.setMessage(e.getMessage());
+    }
+    if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(response.getMessage());
     }
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index 7d12e89b8d8..eb32e30281b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -19,14 +19,16 @@
 
 package org.apache.iotdb.confignode.procedure.impl.pipe.runtime;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import 
org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2;
 import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.slf4j.Logger;
@@ -89,12 +91,19 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
       pipeMetaList.add(pipeMeta);
     }
 
-    final ConsensusWriteResponse response =
-        env.getConfigManager()
-            .getConsensusManager()
-            .write(new PipeHandleMetaChangePlan(pipeMetaList));
-    if (!response.isSuccessful()) {
-      throw new PipeException(response.getErrorMessage());
+    TSStatus response;
+    try {
+      response =
+          env.getConfigManager()
+              .getConsensusManager()
+              .write(new PipeHandleMetaChangePlan(pipeMetaList));
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      response = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      response.setMessage(e.getMessage());
+    }
+    if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(response.getMessage());
     }
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index 5aefe33c222..12db604916a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.procedure.impl.pipe.task;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
@@ -33,9 +34,10 @@ import 
org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProced
 import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.slf4j.Logger;
@@ -127,12 +129,19 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
         "CreatePipeProcedureV2: executeFromWriteConfigNodeConsensus({})",
         createPipeRequest.getPipeName());
 
-    final ConsensusWriteResponse response =
-        env.getConfigManager()
-            .getConsensusManager()
-            .write(new CreatePipePlanV2(pipeStaticMeta, pipeRuntimeMeta));
-    if (!response.isSuccessful()) {
-      throw new PipeException(response.getErrorMessage());
+    TSStatus response;
+    try {
+      response =
+          env.getConfigManager()
+              .getConsensusManager()
+              .write(new CreatePipePlanV2(pipeStaticMeta, pipeRuntimeMeta));
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      response = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      response.setMessage(e.getMessage());
+    }
+    if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(response.getMessage());
     }
   }
 
@@ -175,12 +184,19 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
         "CreatePipeProcedureV2: rollbackFromWriteConfigNodeConsensus({})",
         createPipeRequest.getPipeName());
 
-    final ConsensusWriteResponse response =
-        env.getConfigManager()
-            .getConsensusManager()
-            .write(new DropPipePlanV2(createPipeRequest.getPipeName()));
-    if (!response.isSuccessful()) {
-      throw new PipeException(response.getErrorMessage());
+    TSStatus response;
+    try {
+      response =
+          env.getConfigManager()
+              .getConsensusManager()
+              .write(new DropPipePlanV2(createPipeRequest.getPipeName()));
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      response = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      response.setMessage(e.getMessage());
+    }
+    if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(response.getMessage());
     }
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
index 510d87d2763..19a8af9cd1b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
@@ -19,13 +19,15 @@
 
 package org.apache.iotdb.confignode.procedure.impl.pipe.task;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import 
org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2;
 import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.slf4j.Logger;
@@ -73,10 +75,16 @@ public class DropPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
       throws PipeException {
     LOGGER.info("DropPipeProcedureV2: 
executeFromWriteConfigNodeConsensus({})", pipeName);
 
-    final ConsensusWriteResponse response =
-        env.getConfigManager().getConsensusManager().write(new 
DropPipePlanV2(pipeName));
-    if (!response.isSuccessful()) {
-      throw new PipeException(response.getErrorMessage());
+    TSStatus response;
+    try {
+      response = env.getConfigManager().getConsensusManager().write(new 
DropPipePlanV2(pipeName));
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      response = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      response.setMessage(e.getMessage());
+    }
+    if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(response.getMessage());
     }
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
index eae446df24b..070bc20058b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
@@ -19,14 +19,16 @@
 
 package org.apache.iotdb.confignode.procedure.impl.pipe.task;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import 
org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2;
 import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.slf4j.Logger;
@@ -74,12 +76,19 @@ public class StartPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
       throws PipeException {
     LOGGER.info("StartPipeProcedureV2: 
executeFromWriteConfigNodeConsensus({})", pipeName);
 
-    final ConsensusWriteResponse response =
-        env.getConfigManager()
-            .getConsensusManager()
-            .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.RUNNING));
-    if (!response.isSuccessful()) {
-      throw new PipeException(response.getErrorMessage());
+    TSStatus response;
+    try {
+      response =
+          env.getConfigManager()
+              .getConsensusManager()
+              .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.RUNNING));
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      response = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      response.setMessage(e.getMessage());
+    }
+    if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(response.getMessage());
     }
   }
 
@@ -116,12 +125,19 @@ public class StartPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   protected void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv 
env) {
     LOGGER.info("StartPipeProcedureV2: 
rollbackFromWriteConfigNodeConsensus({})", pipeName);
 
-    final ConsensusWriteResponse response =
-        env.getConfigManager()
-            .getConsensusManager()
-            .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.STOPPED));
-    if (!response.isSuccessful()) {
-      throw new PipeException(response.getErrorMessage());
+    TSStatus response;
+    try {
+      response =
+          env.getConfigManager()
+              .getConsensusManager()
+              .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.STOPPED));
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      response = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      response.setMessage(e.getMessage());
+    }
+    if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(response.getMessage());
     }
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
index a3a85c8364e..c624c36a42f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
@@ -19,14 +19,16 @@
 
 package org.apache.iotdb.confignode.procedure.impl.pipe.task;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import 
org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2;
 import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.slf4j.Logger;
@@ -74,12 +76,19 @@ public class StopPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
       throws PipeException {
     LOGGER.info("StopPipeProcedureV2: 
executeFromWriteConfigNodeConsensus({})", pipeName);
 
-    final ConsensusWriteResponse response =
-        env.getConfigManager()
-            .getConsensusManager()
-            .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.STOPPED));
-    if (!response.isSuccessful()) {
-      throw new PipeException(response.getErrorMessage());
+    TSStatus response;
+    try {
+      response =
+          env.getConfigManager()
+              .getConsensusManager()
+              .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.STOPPED));
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      response = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      response.setMessage(e.getMessage());
+    }
+    if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(response.getMessage());
     }
   }
 
@@ -112,12 +121,19 @@ public class StopPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   protected void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv 
env) {
     LOGGER.info("StopPipeProcedureV2: 
rollbackFromWriteConfigNodeConsensus({})", pipeName);
 
-    final ConsensusWriteResponse response =
-        env.getConfigManager()
-            .getConsensusManager()
-            .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.RUNNING));
-    if (!response.isSuccessful()) {
-      throw new PipeException(response.getErrorMessage());
+    TSStatus response;
+    try {
+      response =
+          env.getConfigManager()
+              .getConsensusManager()
+              .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.RUNNING));
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      response = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      response.setMessage(e.getMessage());
+    }
+    if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeException(response.getMessage());
     }
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
index 7ea774cc33c..7fd0a1af2bb 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
@@ -42,6 +42,7 @@ import 
org.apache.iotdb.confignode.procedure.impl.statemachine.StateMachineProce
 import 
org.apache.iotdb.confignode.procedure.state.schema.DeleteStorageGroupState;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
 import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.TException;
@@ -215,7 +216,7 @@ public class DeleteDatabaseProcedure
                 new ProcedureException("[DeleteDatabaseProcedure] Delete 
DatabaseSchema failed"));
           }
       }
-    } catch (TException | IOException e) {
+    } catch (ConsensusException | TException | IOException e) {
       if (isRollbackSupported(state)) {
         setFailure(
             new ProcedureException("[DeleteDatabaseProcedure] Delete Database 
failed " + state));
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTemplateProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTemplateProcedure.java
index 78e04c750d6..a8be16f171e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTemplateProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTemplateProcedure.java
@@ -43,6 +43,7 @@ import 
org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
 import 
org.apache.iotdb.confignode.procedure.impl.statemachine.StateMachineProcedure;
 import org.apache.iotdb.confignode.procedure.state.schema.SetTemplateState;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import 
org.apache.iotdb.db.exception.metadata.template.TemplateIncompatibleException;
 import 
org.apache.iotdb.db.exception.metadata.template.UndefinedTemplateException;
 import org.apache.iotdb.db.schemaengine.template.Template;
@@ -166,8 +167,14 @@ public class SetTemplateProcedure
   private void preSetTemplate(ConfigNodeProcedureEnv env) {
     PreSetSchemaTemplatePlan preSetSchemaTemplatePlan =
         new PreSetSchemaTemplatePlan(templateName, templateSetPath);
-    TSStatus status =
-        
env.getConfigManager().getConsensusManager().write(preSetSchemaTemplatePlan).getStatus();
+    TSStatus status;
+    try {
+      status = 
env.getConfigManager().getConsensusManager().write(preSetSchemaTemplatePlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      status = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      status.setMessage(e.getMessage());
+    }
     if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       setNextState(SetTemplateState.PRE_RELEASE);
     } else {
@@ -317,8 +324,14 @@ public class SetTemplateProcedure
   private void commitSetTemplate(ConfigNodeProcedureEnv env) {
     CommitSetSchemaTemplatePlan commitSetSchemaTemplatePlan =
         new CommitSetSchemaTemplatePlan(templateName, templateSetPath);
-    TSStatus status =
-        
env.getConfigManager().getConsensusManager().write(commitSetSchemaTemplatePlan).getStatus();
+    TSStatus status = null;
+    try {
+      status = 
env.getConfigManager().getConsensusManager().write(commitSetSchemaTemplatePlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      status = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      status.setMessage(e.getMessage());
+    }
     if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       setNextState(SetTemplateState.COMMIT_RELEASE);
     } else {
@@ -414,8 +427,14 @@ public class SetTemplateProcedure
   private void rollbackPreSet(ConfigNodeProcedureEnv env) {
     PreSetSchemaTemplatePlan preSetSchemaTemplatePlan =
         new PreSetSchemaTemplatePlan(templateName, templateSetPath, true);
-    TSStatus status =
-        
env.getConfigManager().getConsensusManager().write(preSetSchemaTemplatePlan).getStatus();
+    TSStatus status = null;
+    try {
+      status = 
env.getConfigManager().getConsensusManager().write(preSetSchemaTemplatePlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      status = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      status.setMessage(e.getMessage());
+    }
     if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       LOGGER.warn(
           "Failed to rollback pre set template {} on path {} due to {}",
@@ -465,8 +484,14 @@ public class SetTemplateProcedure
   private void rollbackCommitSet(ConfigNodeProcedureEnv env) {
     CommitSetSchemaTemplatePlan commitSetSchemaTemplatePlan =
         new CommitSetSchemaTemplatePlan(templateName, templateSetPath, true);
-    TSStatus status =
-        
env.getConfigManager().getConsensusManager().write(commitSetSchemaTemplatePlan).getStatus();
+    TSStatus status = null;
+    try {
+      status = 
env.getConfigManager().getConsensusManager().write(commitSetSchemaTemplatePlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      status = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      status.setMessage(e.getMessage());
+    }
     if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       LOGGER.warn(
           "Failed to rollback commit set template {} on path {} due to {}",
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
index 2dab4c451aa..b709a1f00bd 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java
@@ -33,6 +33,7 @@ import 
org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDelete
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.state.CreateRegionGroupsState;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -161,7 +162,11 @@ public class CreateRegionGroupsProcedure
                         }));
 
         env.persistRegionGroup(persistPlan);
-        env.getConfigManager().getConsensusManager().write(offerPlan);
+        try {
+          env.getConfigManager().getConsensusManager().write(offerPlan);
+        } catch (ConsensusException e) {
+          LOGGER.warn("Something wrong happened while calling consensus 
layer's write API.", e);
+        }
         setNextState(CreateRegionGroupsState.ACTIVATE_REGION_GROUPS);
         break;
       case ACTIVATE_REGION_GROUPS:
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java
index f6ed578c3ca..53e9faae943 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.confignode.procedure.impl.trigger;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.trigger.TriggerInformation;
 import org.apache.iotdb.commons.trigger.exception.TriggerManagementException;
 import 
org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan;
@@ -32,7 +33,7 @@ import 
org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
 import org.apache.iotdb.confignode.procedure.state.CreateTriggerState;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
 import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.Binary;
@@ -90,12 +91,19 @@ public class CreateTriggerProcedure extends 
AbstractNodeProcedure<CreateTriggerS
               triggerInformation.getTriggerName(),
               jarFile != null);
 
-          ConsensusWriteResponse response =
-              configManager
-                  .getConsensusManager()
-                  .write(new AddTriggerInTablePlan(triggerInformation, 
jarFile));
-          if (!response.isSuccessful()) {
-            throw new TriggerManagementException(response.getErrorMessage());
+          TSStatus response;
+          try {
+            response =
+                configManager
+                    .getConsensusManager()
+                    .write(new AddTriggerInTablePlan(triggerInformation, 
jarFile));
+          } catch (ConsensusException e) {
+            LOG.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+            response = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+            response.setMessage(e.getMessage());
+          }
+          if (response.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+            throw new TriggerManagementException(response.getMessage());
           }
 
           setNextState(CreateTriggerState.CONFIG_NODE_INACTIVE);
@@ -189,9 +197,13 @@ public class CreateTriggerProcedure extends 
AbstractNodeProcedure<CreateTriggerS
       case VALIDATED:
         LOG.info("Start [VALIDATED] rollback of trigger [{}]", 
triggerInformation.getTriggerName());
 
-        env.getConfigManager()
-            .getConsensusManager()
-            .write(new 
DeleteTriggerInTablePlan(triggerInformation.getTriggerName()));
+        try {
+          env.getConfigManager()
+              .getConsensusManager()
+              .write(new 
DeleteTriggerInTablePlan(triggerInformation.getTriggerName()));
+        } catch (ConsensusException e) {
+          LOG.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+        }
         break;
 
       case CONFIG_NODE_INACTIVE:
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java
index 0d209a5f9e1..ba535ff11fc 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
 import org.apache.iotdb.confignode.persistence.ProcedureInfo;
 import org.apache.iotdb.confignode.procedure.Procedure;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -80,7 +81,11 @@ public class ConfigProcedureStore implements IProcedureStore 
{
     if (procedureType != null) {
       updateProcedurePlan.setProcedure(procedure);
     }
-    getConsensusManager().write(updateProcedurePlan);
+    try {
+      getConsensusManager().write(updateProcedurePlan);
+    } catch (ConsensusException e) {
+      LOG.warn("Something wrong happened while calling consensus layer's write 
API.", e);
+    }
   }
 
   @Override
@@ -94,7 +99,11 @@ public class ConfigProcedureStore implements IProcedureStore 
{
   public void delete(long procId) {
     DeleteProcedurePlan deleteProcedurePlan = new DeleteProcedurePlan();
     deleteProcedurePlan.setProcId(procId);
-    getConsensusManager().write(deleteProcedurePlan);
+    try {
+      getConsensusManager().write(deleteProcedurePlan);
+    } catch (ConsensusException e) {
+      LOG.warn("Something wrong happened while calling consensus layer's write 
API.", e);
+    }
   }
 
   @Override
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
index 1f51c09c743..39839d118ef 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.exception.ConsensusException;
 
 import javax.annotation.concurrent.ThreadSafe;
@@ -32,20 +31,14 @@ import javax.annotation.concurrent.ThreadSafe;
 import java.io.IOException;
 import java.util.List;
 
-/**
- * Consensus module base interface.
- */
+/** Consensus module base interface. */
 @ThreadSafe
 public interface IConsensus {
 
-  /**
-   * Start the consensus module
-   */
+  /** Start the consensus module */
   void start() throws IOException;
 
-  /**
-   * Stop the consensus module
-   */
+  /** Stop the consensus module */
   void stop() throws IOException;
 
   /**
@@ -69,14 +62,13 @@ public interface IConsensus {
   /**
    * Require the <em>local node</em> to create a Peer and become a member of 
the given consensus
    * group. This node will prepare and initialize local statemachine {@link 
IStateMachine} and other
-   * data structures. After this method returns, we can call
-   * {@link #addRemotePeer(ConsensusGroupId, Peer)} to notify original group 
that this new Peer is
-   * prepared to be added into the latest configuration. createLocalPeer 
should be called on a node
-   * that does not contain any peer of the consensus group, to avoid one node 
having more than one
-   * replica.
+   * data structures. After this method returns, we can call {@link 
#addRemotePeer(ConsensusGroupId,
+   * Peer)} to notify original group that this new Peer is prepared to be 
added into the latest
+   * configuration. createLocalPeer should be called on a node that does not 
contain any peer of the
+   * consensus group, to avoid one node having more than one replica.
    *
    * @param groupId the consensus group this Peer belongs
-   * @param peers   other known peers in this group
+   * @param peers other known peers in this group
    */
   void createLocalPeer(ConsensusGroupId groupId, List<Peer> peers) throws 
ConsensusException;
 
@@ -84,8 +76,8 @@ public interface IConsensus {
    * When the <em>local node</em> is no longer a member of the given consensus 
group, call this
    * method to do cleanup works. This method will close local statemachine 
{@link IStateMachine},
    * delete local data and do other cleanup works. deleteLocalPeer should be 
called after
-   * successfully removing this peer from current consensus group 
configuration (by calling
-   * {@link #removeRemotePeer(ConsensusGroupId, Peer)}).
+   * successfully removing this peer from current consensus group 
configuration (by calling {@link
+   * #removeRemotePeer(ConsensusGroupId, Peer)}).
    *
    * @param groupId the consensus group this Peer used to belong
    */
@@ -94,15 +86,15 @@ public interface IConsensus {
   // single consensus group API
 
   /**
-   * Tell the group that a new Peer is prepared to be added into this group. 
Call
-   * {@link #createLocalPeer(ConsensusGroupId, List)} on the new Peer before 
calling this method.
-   * When this method returns, the group data should be already transmitted to 
the new Peer. That
-   * is, the new peer is available to answer client requests by the time this 
method successfully
-   * returns. addRemotePeer should be called on a living peer of the consensus 
group. For example:
-   * We'd like to add a peer D to (A, B, C) group. We need to execute addPeer 
in A, B or C.
+   * Tell the group that a new Peer is prepared to be added into this group. 
Call {@link
+   * #createLocalPeer(ConsensusGroupId, List)} on the new Peer before calling 
this method. When this
+   * method returns, the group data should be already transmitted to the new 
Peer. That is, the new
+   * peer is available to answer client requests by the time this method 
successfully returns.
+   * addRemotePeer should be called on a living peer of the consensus group. 
For example: We'd like
+   * to add a peer D to (A, B, C) group. We need to execute addPeer in A, B or 
C.
    *
    * @param groupId the consensus group this peer belongs
-   * @param peer    the newly added peer
+   * @param peer the newly added peer
    */
   void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws 
ConsensusException;
 
@@ -114,7 +106,7 @@ public interface IConsensus {
    * to remove C, in case C is dead, the removePeer should be sent to A or B.
    *
    * @param groupId the consensus group this peer belongs
-   * @param peer    the peer to be removed
+   * @param peer the peer to be removed
    */
   void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws 
ConsensusException;
 
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index aee5a970ec3..3d4fa742d25 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -19,12 +19,10 @@
 
 package org.apache.iotdb.consensus.iot;
 
-import java.util.Optional;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
-import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.service.RegisterManager;
 import org.apache.iotdb.commons.utils.FileUtils;
@@ -35,9 +33,6 @@ import org.apache.iotdb.consensus.IStateMachine.Registry;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
-import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
 import org.apache.iotdb.consensus.config.IoTConsensusConfig;
 import org.apache.iotdb.consensus.exception.ConsensusException;
@@ -54,11 +49,9 @@ import 
org.apache.iotdb.consensus.iot.client.SyncIoTConsensusServiceClient;
 import org.apache.iotdb.consensus.iot.logdispatcher.IoTConsensusMemoryManager;
 import org.apache.iotdb.consensus.iot.service.IoTConsensusRPCService;
 import org.apache.iotdb.consensus.iot.service.IoTConsensusRPCServiceProcessor;
-import org.apache.iotdb.consensus.simple.SimpleConsensusServerImpl;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.apache.ratis.protocol.GroupManagementRequest.Op;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,6 +63,7 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -161,12 +155,14 @@ public class IoTConsensus implements IConsensus {
   @Override
   public TSStatus write(ConsensusGroupId groupId, IConsensusRequest request)
       throws ConsensusException {
-    IoTConsensusServerImpl impl = 
Optional.ofNullable(stateMachineMap.get(groupId))
-        .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+    IoTConsensusServerImpl impl =
+        Optional.ofNullable(stateMachineMap.get(groupId))
+            .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
     if (impl.isReadOnly()) {
       return StatusUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY);
     } else if (!impl.isActive()) {
-      return RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT,
+      return RpcUtils.getStatus(
+          TSStatusCode.WRITE_PROCESS_REJECT,
           "peer is inactive and not ready to receive sync log request.");
     } else {
       return impl.write(request);
@@ -193,31 +189,35 @@ public class IoTConsensus implements IConsensus {
       throw new IllegalPeerEndpointException(thisNode, peers);
     }
     AtomicBoolean exist = new AtomicBoolean(true);
-    Optional.ofNullable(stateMachineMap.computeIfAbsent(
-        groupId,
-        k -> {
-          exist.set(false);
-
-          String path = buildPeerDir(storageDir, groupId);
-          File file = new File(path);
-          if (!file.mkdirs()) {
-            logger.warn("Unable to create consensus dir for group {} at {}", 
groupId, path);
-            return null;
-          }
-
-          IoTConsensusServerImpl impl =
-              new IoTConsensusServerImpl(
-                  path,
-                  new Peer(groupId, thisNodeId, thisNode),
-                  peers,
-                  registry.apply(groupId),
-                  clientManager,
-                  syncClientManager,
-                  config);
-          impl.start();
-          return impl;
-        })).orElseThrow(() -> new ConsensusException(
-        String.format("Unable to create consensus dir for group %s", 
groupId)));
+    Optional.ofNullable(
+            stateMachineMap.computeIfAbsent(
+                groupId,
+                k -> {
+                  exist.set(false);
+
+                  String path = buildPeerDir(storageDir, groupId);
+                  File file = new File(path);
+                  if (!file.mkdirs()) {
+                    logger.warn("Unable to create consensus dir for group {} 
at {}", groupId, path);
+                    return null;
+                  }
+
+                  IoTConsensusServerImpl impl =
+                      new IoTConsensusServerImpl(
+                          path,
+                          new Peer(groupId, thisNodeId, thisNode),
+                          peers,
+                          registry.apply(groupId),
+                          clientManager,
+                          syncClientManager,
+                          config);
+                  impl.start();
+                  return impl;
+                }))
+        .orElseThrow(
+            () ->
+                new ConsensusException(
+                    String.format("Unable to create consensus dir for group 
%s", groupId)));
     if (exist.get()) {
       throw new ConsensusGroupAlreadyExistException(groupId);
     }
@@ -241,8 +241,9 @@ public class IoTConsensus implements IConsensus {
 
   @Override
   public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws 
ConsensusException {
-    IoTConsensusServerImpl impl = 
Optional.ofNullable(stateMachineMap.get(groupId))
-        .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+    IoTConsensusServerImpl impl =
+        Optional.ofNullable(stateMachineMap.get(groupId))
+            .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
     if (impl.getConfiguration().contains(peer)) {
       throw new PeerAlreadyInConsensusGroupException(groupId, peer);
     }
@@ -291,10 +292,10 @@ public class IoTConsensus implements IConsensus {
   }
 
   @Override
-  public void removeRemotePeer(ConsensusGroupId groupId, Peer peer)
-      throws ConsensusException {
-    IoTConsensusServerImpl impl = 
Optional.ofNullable(stateMachineMap.get(groupId))
-        .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+  public void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws 
ConsensusException {
+    IoTConsensusServerImpl impl =
+        Optional.ofNullable(stateMachineMap.get(groupId))
+            .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
 
     try {
       // let other peers remove the sync channel with target peer
@@ -322,8 +323,9 @@ public class IoTConsensus implements IConsensus {
 
   @Override
   public void triggerSnapshot(ConsensusGroupId groupId) throws 
ConsensusException {
-    IoTConsensusServerImpl impl = 
Optional.ofNullable(stateMachineMap.get(groupId))
-        .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+    IoTConsensusServerImpl impl =
+        Optional.ofNullable(stateMachineMap.get(groupId))
+            .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
     try {
       impl.takeSnapshot();
     } catch (ConsensusGroupModifyPeerException e) {
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 6d3d1e5567c..df72318f4ee 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.consensus.ratis;
 
-import java.util.Optional;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
@@ -95,6 +94,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
@@ -103,16 +103,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
-/**
- * A multi-raft consensus implementation based on Apache Ratis.
- */
+/** A multi-raft consensus implementation based on Apache Ratis. */
 class RatisConsensus implements IConsensus {
 
   private static final Logger logger = 
LoggerFactory.getLogger(RatisConsensus.class);
 
-  /**
-   * the unique net communication endpoint
-   */
+  /** the unique net communication endpoint */
   private final RaftPeer myself;
 
   private final RaftServer server;
@@ -120,7 +116,8 @@ class RatisConsensus implements IConsensus {
   private final RaftProperties properties = new RaftProperties();
   private final RaftClientRpc clientRpc;
 
-  private final IClientManager<RaftGroup, 
org.apache.iotdb.consensus.ratis.RatisClient> clientManager;
+  private final IClientManager<RaftGroup, 
org.apache.iotdb.consensus.ratis.RatisClient>
+      clientManager;
 
   private final Map<RaftGroupId, RaftGroup> lastSeen = new 
ConcurrentHashMap<>();
 
@@ -215,9 +212,7 @@ class RatisConsensus implements IConsensus {
     return !reply.isSuccess() && (reply.getException() instanceof 
ResourceUnavailableException);
   }
 
-  /**
-   * launch a consensus write with retry mechanism
-   */
+  /** launch a consensus write with retry mechanism */
   private RaftClientReply writeWithRetry(CheckedSupplier<RaftClientReply, 
IOException> caller)
       throws IOException {
 
@@ -257,8 +252,7 @@ class RatisConsensus implements IConsensus {
   }
 
   private RaftClientReply writeRemotelyWithRetry(
-      org.apache.iotdb.consensus.ratis.RatisClient client, Message message)
-      throws IOException {
+      org.apache.iotdb.consensus.ratis.RatisClient client, Message message) 
throws IOException {
     return writeWithRetry(() -> client.getRaftClient().io().send(message));
   }
 
@@ -299,8 +293,10 @@ class RatisConsensus implements IConsensus {
           
RatisMetricsManager.getInstance().startWriteLocallyTimer(consensusGroupType)) {
         RaftClientReply localServerReply = 
writeLocallyWithRetry(clientRequest);
         if (localServerReply.isSuccess()) {
-          org.apache.iotdb.consensus.ratis.ResponseMessage responseMessage = 
(org.apache.iotdb.consensus.ratis.ResponseMessage) 
localServerReply.getMessage();
-          return 
Optional.ofNullable(responseMessage.getContentHolder()).map(TSStatus.class::cast)
+          org.apache.iotdb.consensus.ratis.ResponseMessage responseMessage =
+              (org.apache.iotdb.consensus.ratis.ResponseMessage) 
localServerReply.getMessage();
+          return Optional.ofNullable(responseMessage.getContentHolder())
+              .map(TSStatus.class::cast)
               .orElse(null);
         }
         NotLeaderException ex = localServerReply.getNotLeaderException();
@@ -315,7 +311,7 @@ class RatisConsensus implements IConsensus {
     // 2. try raft client
     TSStatus writeResult;
     try (AutoCloseable ignored =
-        
RatisMetricsManager.getInstance().startWriteRemotelyTimer(consensusGroupType);
+            
RatisMetricsManager.getInstance().startWriteRemotelyTimer(consensusGroupType);
         org.apache.iotdb.consensus.ratis.RatisClient client = 
getRaftClient(raftGroup)) {
       RaftClientReply reply = writeRemotelyWithRetry(client, message);
       if (!reply.isSuccess()) {
@@ -333,9 +329,7 @@ class RatisConsensus implements IConsensus {
     return writeResult;
   }
 
-  /**
-   * Read directly from LOCAL COPY notice:
-   */
+  /** Read directly from LOCAL COPY notice: */
   @Override
   public DataSet read(ConsensusGroupId groupId, IConsensusRequest request)
       throws ConsensusException {
@@ -367,14 +361,14 @@ class RatisConsensus implements IConsensus {
       throw new RatisRequestFailedException(e);
     }
     Message ret = reply.getMessage();
-    org.apache.iotdb.consensus.ratis.ResponseMessage readResponseMessage = 
(org.apache.iotdb.consensus.ratis.ResponseMessage) ret;
-    return 
Optional.ofNullable(readResponseMessage.getContentHolder()).map(DataSet.class::cast)
+    org.apache.iotdb.consensus.ratis.ResponseMessage readResponseMessage =
+        (org.apache.iotdb.consensus.ratis.ResponseMessage) ret;
+    return Optional.ofNullable(readResponseMessage.getContentHolder())
+        .map(DataSet.class::cast)
         .orElse(null);
   }
 
-  /**
-   * return a success raft client reply or throw an Exception
-   */
+  /** return a success raft client reply or throw an Exception */
   private RaftClientReply doRead(
       RaftGroupId gid, IConsensusRequest readRequest, boolean linearizable) 
throws Exception {
     final RaftClientRequest.Type readType =
@@ -412,8 +406,8 @@ class RatisConsensus implements IConsensus {
     RaftGroup clientGroup =
         group.getPeers().isEmpty() ? RaftGroup.valueOf(group.getGroupId(), 
myself) : group;
     try (org.apache.iotdb.consensus.ratis.RatisClient client = 
getRaftClient(clientGroup)) {
-      RaftClientReply reply = 
client.getRaftClient().getGroupManagementApi(myself.getId())
-          .add(group);
+      RaftClientReply reply =
+          
client.getRaftClient().getGroupManagementApi(myself.getId()).add(group);
       if (!reply.isSuccess()) {
         throw new RatisRequestFailedException(reply.getException());
       }
@@ -522,15 +516,14 @@ class RatisConsensus implements IConsensus {
     return 
ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
   }
 
-
   /**
    * NOTICE: transferLeader *does not guarantee* the leader be transferred to 
newLeader.
    * transferLeader is implemented by 1. modify peer priority 2. ask current 
leader to step down
    *
-   * <p>1. call setConfiguration to upgrade newLeader's priority to 1 and 
degrade all follower
-   * peers to 0. By default, Ratis gives every Raft Peer same priority 0. 
Ratis does not allow a
-   * peer with priority <= currentLeader.priority to becomes the leader, so we 
have to upgrade
-   * leader's priority to 1
+   * <p>1. call setConfiguration to upgrade newLeader's priority to 1 and 
degrade all follower peers
+   * to 0. By default, Ratis gives every Raft Peer same priority 0. Ratis does 
not allow a peer with
+   * priority <= currentLeader.priority to becomes the leader, so we have to 
upgrade leader's
+   * priority to 1
    *
    * <p>2. call transferLeadership to force current leader to step down and 
raise a new round of
    * election. In this election, the newLeader peer with priority 1 is 
guaranteed to be elected.
@@ -540,8 +533,9 @@ class RatisConsensus implements IConsensus {
 
     // first fetch the newest information
     RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
-    RaftGroup raftGroup = Optional.ofNullable(getGroupInfo(raftGroupId))
-        .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
+    RaftGroup raftGroup =
+        Optional.ofNullable(getGroupInfo(raftGroupId))
+            .orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
 
     RaftPeer newRaftLeader = Utils.fromPeerAndPriorityToRaftPeer(newLeader, 
LEADER_PRIORITY);
 
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java
index 5544bad9a53..3ea153d25fe 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.consensus.IStateMachine.Registry;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
 import org.apache.iotdb.consensus.exception.ConsensusException;
 import 
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
index 05ef52758e8..e8d97bd8883 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
@@ -31,7 +31,6 @@ import 
org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.IConsensus;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -166,25 +165,21 @@ public class RegionWriteExecutor {
 
       try {
         TSStatus status = 
executePlanNodeInConsensusLayer(context.getRegionId(), node);
-        response.setAccepted(
-            TSStatusCode.SUCCESS_STATUS.getStatusCode() == status.getCode());
+        response.setAccepted(TSStatusCode.SUCCESS_STATUS.getStatusCode() == 
status.getCode());
         response.setMessage(status.getMessage());
         response.setStatus(status);
       } catch (ConsensusException e) {
-        LOGGER.error(
-            "Something wrong happened while calling consensus layer's write 
API.",
-            e);
+        LOGGER.error("Something wrong happened while calling consensus layer's 
write API.", e);
         response.setAccepted(false);
         response.setMessage(e.toString());
         response.setStatus(
-            RpcUtils.getStatus(
-                TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage()));
+            RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, 
e.getMessage()));
       }
       return response;
     }
 
-    private TSStatus executePlanNodeInConsensusLayer(
-        ConsensusGroupId groupId, PlanNode planNode) throws ConsensusException 
{
+    private TSStatus executePlanNodeInConsensusLayer(ConsensusGroupId groupId, 
PlanNode planNode)
+        throws ConsensusException {
       if (groupId instanceof DataRegionId) {
         return dataRegionConsensus.write(groupId, planNode);
       } else {
@@ -227,32 +222,26 @@ public class RegionWriteExecutor {
       RegionExecutionResult response = new RegionExecutionResult();
       context.getRegionWriteValidationRWLock().readLock().lock();
       try {
-        TSStatus status =
-            fireTriggerAndInsert(context.getRegionId(), insertNode);
-        response.setAccepted(
-            TSStatusCode.SUCCESS_STATUS.getStatusCode() == status.getCode());
+        TSStatus status = fireTriggerAndInsert(context.getRegionId(), 
insertNode);
+        response.setAccepted(TSStatusCode.SUCCESS_STATUS.getStatusCode() == 
status.getCode());
         response.setMessage(status.message);
         if (!response.isAccepted()) {
           response.setStatus(status);
         }
         return response;
       } catch (ConsensusException e) {
-        LOGGER.warn(
-            "Something wrong happened while calling consensus layer's write 
API.",
-            e);
+        LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
         response.setAccepted(false);
         response.setMessage(e.toString());
-        response.setStatus(
-            RpcUtils.getStatus(
-                TSStatusCode.WRITE_PROCESS_ERROR, e.toString()));
+        
response.setStatus(RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_ERROR, 
e.toString()));
         return response;
       } finally {
         context.getRegionWriteValidationRWLock().readLock().unlock();
       }
     }
 
-    private TSStatus fireTriggerAndInsert(
-        ConsensusGroupId groupId, PlanNode planNode) throws ConsensusException 
{
+    private TSStatus fireTriggerAndInsert(ConsensusGroupId groupId, PlanNode 
planNode)
+        throws ConsensusException {
       long triggerCostTime = 0;
       TSStatus status;
       long startTime = System.nanoTime();
@@ -260,8 +249,10 @@ public class RegionWriteExecutor {
       TriggerFireResult result = triggerFireVisitor.process(planNode, 
TriggerEvent.BEFORE_INSERT);
       triggerCostTime += (System.nanoTime() - startTime);
       if (result.equals(TriggerFireResult.TERMINATION)) {
-        status = 
RpcUtils.getStatus(TSStatusCode.TRIGGER_FIRE_ERROR.getStatusCode(),
-            "Failed to complete the insertion because trigger error before the 
insertion.");
+        status =
+            RpcUtils.getStatus(
+                TSStatusCode.TRIGGER_FIRE_ERROR.getStatusCode(),
+                "Failed to complete the insertion because trigger error before 
the insertion.");
       } else {
         boolean hasFailedTriggerBeforeInsertion =
             result.equals(TriggerFireResult.FAILED_NO_TERMINATION);
@@ -274,8 +265,10 @@ public class RegionWriteExecutor {
         startTime = System.nanoTime();
         result = triggerFireVisitor.process(planNode, 
TriggerEvent.AFTER_INSERT);
         if (hasFailedTriggerBeforeInsertion || 
!result.equals(TriggerFireResult.SUCCESS)) {
-          status = 
RpcUtils.getStatus(TSStatusCode.TRIGGER_FIRE_ERROR.getStatusCode(),
-              "Meet trigger error before/after the insertion, the insertion 
itself is completed.");
+          status =
+              RpcUtils.getStatus(
+                  TSStatusCode.TRIGGER_FIRE_ERROR.getStatusCode(),
+                  "Meet trigger error before/after the insertion, the 
insertion itself is completed.");
         }
         triggerCostTime += (System.nanoTime() - startTime);
       }
@@ -792,7 +785,7 @@ public class RegionWriteExecutor {
                 String.format(
                     "Template is being unsetting from prefix path of %s. 
Please try activating later.",
                     new PartialPath(
-                        Arrays.copyOf(entry.getKey().getNodes(), 
entry.getValue().right + 1))
+                            Arrays.copyOf(entry.getKey().getNodes(), 
entry.getValue().right + 1))
                         .getFullPath());
             result.setMessage(message);
             result.setStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, 
message));

Reply via email to