This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch consensus_module_refactor
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 72f9d25a609a065f58a00bc66c85b44b14631c90
Author: BUAAserein <[email protected]>
AuthorDate: Thu Aug 17 12:48:23 2023 +0800

    refactor read
---
 .../iotdb/confignode/manager/ConfigManager.java    |   6 +-
 .../iotdb/confignode/manager/ModelManager.java     |  40 +++--
 .../confignode/manager/PermissionManager.java      |  11 +-
 .../iotdb/confignode/manager/TriggerManager.java   |  18 ++-
 .../iotdb/confignode/manager/UDFManager.java       |  21 ++-
 .../iotdb/confignode/manager/cq/CQManager.java     |  27 ++--
 .../iotdb/confignode/manager/node/NodeManager.java |  11 +-
 .../manager/partition/PartitionManager.java        | 142 ++++++++++++------
 .../manager/pipe/plugin/PipePluginCoordinator.java |  23 ++-
 .../manager/pipe/task/PipeTaskCoordinator.java     |  14 +-
 .../manager/schema/ClusterSchemaManager.java       | 162 +++++++++++++++++----
 .../impl/schema/SetTemplateProcedure.java          |  33 +++--
 .../apache/iotdb/consensus/iot/ReplicateTest.java  |   6 +-
 .../iotdb/consensus/ratis/RatisConsensusTest.java  |  20 ++-
 .../iotdb/consensus/ratis/RecoverReadTest.java     |  31 ++--
 .../apache/iotdb/consensus/ratis/TestUtils.java    |  18 +--
 .../consensus/simple/SimpleConsensusTest.java      |   5 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |  22 +--
 .../execution/executor/RegionReadExecutor.java     |  18 +--
 .../java/org/apache/iotdb/db/service/DataNode.java |  26 +---
 .../apache/iotdb/db/service/IoTDBShutdownHook.java |   3 +-
 .../execution/executor/RegionReadExecutorTest.java |  17 ++-
 .../executor/RegionWriteExecutorTest.java          |   3 +-
 23 files changed, 435 insertions(+), 242 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index f1f6cbdef2f..bbfaff13330 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -664,8 +664,7 @@ public class ConfigManager implements IManager {
         new GetSchemaPartitionPlan(
             partitionSlotsMap.entrySet().stream()
                 .collect(Collectors.toMap(Map.Entry::getKey, e -> new 
ArrayList<>(e.getValue()))));
-    SchemaPartitionResp queryResult =
-        (SchemaPartitionResp) 
partitionManager.getSchemaPartition(getSchemaPartitionPlan);
+    SchemaPartitionResp queryResult = 
partitionManager.getSchemaPartition(getSchemaPartitionPlan);
     resp = queryResult.convertToRpcSchemaPartitionTableResp();
 
     LOGGER.debug("GetSchemaPartition receive paths: {}, return: {}", 
relatedPaths, resp);
@@ -796,8 +795,7 @@ public class ConfigManager implements IManager {
     if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       return resp.setStatus(status);
     }
-    DataPartitionResp queryResult =
-        (DataPartitionResp) 
partitionManager.getDataPartition(getDataPartitionPlan);
+    DataPartitionResp queryResult = 
partitionManager.getDataPartition(getDataPartitionPlan);
 
     resp = queryResult.convertToTDataPartitionTableResp();
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ModelManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ModelManager.java
index 7fd18b5efca..97b2e8cf845 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ModelManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ModelManager.java
@@ -36,7 +36,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowTrailReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowTrailResp;
 import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelInfoReq;
 import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelStateReq;
-import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -104,17 +104,14 @@ public class ModelManager {
 
   public TShowModelResp showModel(TShowModelReq req) {
     try {
-      ConsensusReadResponse response =
-          configManager.getConsensusManager().read(new ShowModelPlan(req));
-      if (response.getDataset() != null) {
-        return ((ModelTableResp) 
response.getDataset()).convertToThriftResponse();
-      } else {
-        LOGGER.warn("Unexpected error happened while showing model: ", 
response.getException());
-        // consensus layer related errors
-        TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
-        res.setMessage(response.getException().toString());
-        return new TShowModelResp(res, Collections.emptyList());
-      }
+      DataSet response = configManager.getConsensusManager().read(new 
ShowModelPlan(req));
+      return ((ModelTableResp) response).convertToThriftResponse();
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
read API.", e);
+      // consensus layer related errors
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      return new TShowModelResp(res, Collections.emptyList());
     } catch (IOException e) {
       LOGGER.error("Fail to get ModelTable", e);
       return new TShowModelResp(
@@ -126,17 +123,14 @@ public class ModelManager {
 
   public TShowTrailResp showTrail(TShowTrailReq req) {
     try {
-      ConsensusReadResponse response =
-          configManager.getConsensusManager().read(new ShowTrailPlan(req));
-      if (response.getDataset() != null) {
-        return ((TrailTableResp) 
response.getDataset()).convertToThriftResponse();
-      } else {
-        LOGGER.warn("Unexpected error happened while showing trail: ", 
response.getException());
-        // consensus layer related errors
-        TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
-        res.setMessage(response.getException().toString());
-        return new TShowTrailResp(res, Collections.emptyList());
-      }
+      DataSet response = configManager.getConsensusManager().read(new 
ShowTrailPlan(req));
+      return ((TrailTableResp) response).convertToThriftResponse();
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
read API.", e);
+      // consensus layer related errors
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      return new TShowTrailResp(res, Collections.emptyList());
     } catch (IOException e) {
       LOGGER.error("Fail to get TrailTable", e);
       return new TShowTrailResp(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
index 1f5c5a999d8..e7213f9a7ca 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
@@ -88,7 +88,16 @@ public class PermissionManager {
    * @return PermissionInfoResp
    */
   public PermissionInfoResp queryPermission(AuthorPlan authorPlan) {
-    return (PermissionInfoResp) 
getConsensusManager().read(authorPlan).getDataset();
+    try {
+      return (PermissionInfoResp) getConsensusManager().read(authorPlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
read API.", e);
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      PermissionInfoResp response = new PermissionInfoResp();
+      response.setStatus(res);
+      return response;
+    }
   }
 
   private ConsensusManager getConsensusManager() {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
index e42a0fb029e..49ed4256346 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java
@@ -152,7 +152,7 @@ public class TriggerManager {
               configManager.getConsensusManager().read(new 
GetTriggerLocationPlan(triggerName)))
           .convertToThriftResponse();
     } catch (ConsensusException e) {
-      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
read API.", e);
       return new TGetLocationForTriggerResp(
           new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
               .setMessage(e.getMessage()));
@@ -160,12 +160,16 @@ public class TriggerManager {
   }
 
   public TGetJarInListResp getTriggerJar(TGetJarInListReq req) {
-    return ((JarResp)
-            configManager
-                .getConsensusManager()
-                .read(new GetTriggerJarPlan(req.getJarNameList()))
-                .getDataset())
-        .convertToThriftResponse();
+    try {
+      return ((JarResp)
+              configManager.getConsensusManager().read(new 
GetTriggerJarPlan(req.getJarNameList())))
+          .convertToThriftResponse();
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
read API.", e);
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      return new JarResp(res, 
Collections.emptyList()).convertToThriftResponse();
+    }
   }
 
   /**
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
index f2b747104c1..877c6c8bca2 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java
@@ -38,6 +38,7 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TDropFunctionInstanceReq;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -168,9 +169,9 @@ public class UDFManager {
   public TGetUDFTableResp getUDFTable() {
     try {
       return ((FunctionTableResp)
-              configManager.getConsensusManager().read(new 
GetFunctionTablePlan()).getDataset())
+              configManager.getConsensusManager().read(new 
GetFunctionTablePlan()))
           .convertToThriftResponse();
-    } catch (IOException e) {
+    } catch (IOException | ConsensusException e) {
       LOGGER.error("Fail to get TriggerTable", e);
       return new TGetUDFTableResp(
           new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
@@ -180,11 +181,15 @@ public class UDFManager {
   }
 
   public TGetJarInListResp getUDFJar(TGetJarInListReq req) {
-    return ((JarResp)
-            configManager
-                .getConsensusManager()
-                .read(new GetUDFJarPlan(req.getJarNameList()))
-                .getDataset())
-        .convertToThriftResponse();
+    try {
+      return ((JarResp)
+              configManager.getConsensusManager().read(new 
GetUDFJarPlan(req.getJarNameList())))
+          .convertToThriftResponse();
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
read API.", e);
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      return new JarResp(res, 
Collections.emptyList()).convertToThriftResponse();
+    }
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
index 43878c16c96..63b5db0624a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
@@ -33,7 +33,7 @@ import org.apache.iotdb.confignode.persistence.cq.CQInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
-import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -90,14 +90,14 @@ public class CQManager {
   }
 
   public TShowCQResp showCQ() {
-    ConsensusReadResponse response = 
configManager.getConsensusManager().read(new ShowCQPlan());
-    if (response.getDataset() != null) {
-      return ((ShowCQResp) response.getDataset()).convertToRpcShowCQResp();
-    } else {
-      LOGGER.warn("Unexpected error happened while showing cq: ", 
response.getException());
+    try {
+      DataSet response = configManager.getConsensusManager().read(new 
ShowCQPlan());
+      return ((ShowCQResp) response).convertToRpcShowCQResp();
+    } catch (ConsensusException e) {
+      LOGGER.warn("Unexpected error happened while showing cq: ", e);
       // consensus layer related errors
       TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
-      res.setMessage(response.getException().toString());
+      res.setMessage(e.getMessage());
       return new TShowCQResp(res, Collections.emptyList());
     }
   }
@@ -155,16 +155,15 @@ public class CQManager {
       }
       // keep fetching until we get all CQEntries if this node is still leader
       while (needFetch(allCQs)) {
-        ConsensusReadResponse response = 
configManager.getConsensusManager().read(new ShowCQPlan());
-        if (response.getDataset() != null) {
-          allCQs = ((ShowCQResp) response.getDataset()).getCqList();
-        } else {
+        try {
+          DataSet response = configManager.getConsensusManager().read(new 
ShowCQPlan());
+          allCQs = ((ShowCQResp) response).getCqList();
+        } catch (ConsensusException e) {
           // consensus layer related errors
-          LOGGER.warn(
-              "Unexpected error happened while fetching cq list: ", 
response.getException());
+          LOGGER.warn("Unexpected error happened while fetching cq list: ", e);
           try {
             Thread.sleep(500);
-          } catch (InterruptedException e) {
+          } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
           }
         }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index bf59d5c2b77..646c7381104 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -396,7 +396,16 @@ public class NodeManager {
    *     GetDataNodeConfigurationPlan is -1
    */
   public DataNodeConfigurationResp 
getDataNodeConfiguration(GetDataNodeConfigurationPlan req) {
-    return (DataNodeConfigurationResp) 
getConsensusManager().read(req).getDataset();
+    try {
+      return (DataNodeConfigurationResp) getConsensusManager().read(req);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
read API.", e);
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      DataNodeConfigurationResp response = new DataNodeConfigurationResp();
+      response.setStatus(res);
+      return response;
+    }
   }
 
   /**
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 18c1c4f44e2..c633c567d7c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -84,8 +84,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
 import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
-import org.apache.iotdb.consensus.common.DataSet;
-import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
@@ -97,6 +95,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -165,8 +164,15 @@ public class PartitionManager {
    * @param req SchemaPartitionPlan with partitionSlotsMap
    * @return SchemaPartitionDataSet that contains only existing SchemaPartition
    */
-  public DataSet getSchemaPartition(GetSchemaPartitionPlan req) {
-    return getConsensusManager().read(req).getDataset();
+  public SchemaPartitionResp getSchemaPartition(GetSchemaPartitionPlan req) {
+    try {
+      return (SchemaPartitionResp) getConsensusManager().read(req);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
read API.", e);
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      return new SchemaPartitionResp(res, false, Collections.emptyMap());
+    }
   }
 
   /**
@@ -176,8 +182,15 @@ public class PartitionManager {
    *     TTimeSlotList>>
    * @return DataPartitionDataSet that contains only existing DataPartition
    */
-  public DataSet getDataPartition(GetDataPartitionPlan req) {
-    return getConsensusManager().read(req).getDataset();
+  public DataPartitionResp getDataPartition(GetDataPartitionPlan req) {
+    try {
+      return (DataPartitionResp) getConsensusManager().read(req);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
read API.", e);
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      return new DataPartitionResp(res, false, Collections.emptyMap());
+    }
   }
 
   /**
@@ -205,7 +218,7 @@ public class PartitionManager {
 
     // After all the SchemaPartitions are allocated,
     // all the read requests about SchemaPartitionTable are parallel.
-    SchemaPartitionResp resp = (SchemaPartitionResp) getSchemaPartition(req);
+    SchemaPartitionResp resp = getSchemaPartition(req);
     if (resp.isAllPartitionsExist()) {
       return resp;
     }
@@ -218,7 +231,7 @@ public class PartitionManager {
     synchronized (this) {
       // Here we should check again if the SchemaPartition
       // has been created by other threads to improve concurrent performance
-      resp = (SchemaPartitionResp) getSchemaPartition(req);
+      resp = getSchemaPartition(req);
       if (resp.isAllPartitionsExist()) {
         return resp;
       }
@@ -276,7 +289,7 @@ public class PartitionManager {
       }
     }
 
-    resp = (SchemaPartitionResp) getSchemaPartition(req);
+    resp = getSchemaPartition(req);
     if (!resp.isAllPartitionsExist()) {
       // Count the fail rate
       AtomicInteger totalSlotNum = new AtomicInteger();
@@ -328,7 +341,7 @@ public class PartitionManager {
 
     // After all the DataPartitions are allocated,
     // all the read requests about DataPartitionTable are parallel.
-    DataPartitionResp resp = (DataPartitionResp) getDataPartition(req);
+    DataPartitionResp resp = getDataPartition(req);
     if (resp.isAllPartitionsExist()) {
       return resp;
     }
@@ -341,7 +354,7 @@ public class PartitionManager {
     synchronized (this) {
       // Here we should check again if the DataPartition
       // has been created by other threads to improve concurrent performance
-      resp = (DataPartitionResp) getDataPartition(req);
+      resp = getDataPartition(req);
       if (resp.isAllPartitionsExist()) {
         return resp;
       }
@@ -399,7 +412,7 @@ public class PartitionManager {
       }
     }
 
-    resp = (DataPartitionResp) getDataPartition(req);
+    resp = getDataPartition(req);
     if (!resp.isAllPartitionsExist()) {
       // Count the fail rate
       AtomicInteger totalSlotNum = new AtomicInteger();
@@ -886,10 +899,16 @@ public class PartitionManager {
    *     SchemaPartition and matched child paths aboveMTree
    */
   public SchemaNodeManagementResp 
getNodePathsPartition(GetNodePathsPartitionPlan physicalPlan) {
-    SchemaNodeManagementResp schemaNodeManagementResp;
-    ConsensusReadResponse consensusReadResponse = 
getConsensusManager().read(physicalPlan);
-    schemaNodeManagementResp = (SchemaNodeManagementResp) 
consensusReadResponse.getDataset();
-    return schemaNodeManagementResp;
+    try {
+      return (SchemaNodeManagementResp) 
getConsensusManager().read(physicalPlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
read API.", e);
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      SchemaNodeManagementResp resp = new SchemaNodeManagementResp();
+      resp.setStatus(res);
+      return resp;
+    }
   }
 
   public void preDeleteDatabase(
@@ -918,30 +937,39 @@ public class PartitionManager {
   }
 
   public RegionInfoListResp getRegionInfoList(GetRegionInfoListPlan req) {
-    // Get static result
-    RegionInfoListResp regionInfoListResp =
-        (RegionInfoListResp) getConsensusManager().read(req).getDataset();
-
-    // Get cached result
-    Map<TConsensusGroupId, Integer> allLeadership = 
getLoadManager().getRegionLeaderMap();
-    regionInfoListResp
-        .getRegionInfoList()
-        .forEach(
-            regionInfo -> {
-              regionInfo.setStatus(
-                  getLoadManager()
-                      .getRegionStatus(regionInfo.getConsensusGroupId(), 
regionInfo.getDataNodeId())
-                      .getStatus());
-
-              String regionType =
-                  regionInfo.getDataNodeId()
-                          == 
allLeadership.getOrDefault(regionInfo.getConsensusGroupId(), -1)
-                      ? RegionRoleType.Leader.toString()
-                      : RegionRoleType.Follower.toString();
-              regionInfo.setRoleType(regionType);
-            });
+    try {
+      // Get static result
+      RegionInfoListResp regionInfoListResp = (RegionInfoListResp) 
getConsensusManager().read(req);
+      // Get cached result
+      Map<TConsensusGroupId, Integer> allLeadership = 
getLoadManager().getRegionLeaderMap();
+      regionInfoListResp
+          .getRegionInfoList()
+          .forEach(
+              regionInfo -> {
+                regionInfo.setStatus(
+                    getLoadManager()
+                        .getRegionStatus(
+                            regionInfo.getConsensusGroupId(), 
regionInfo.getDataNodeId())
+                        .getStatus());
+
+                String regionType =
+                    regionInfo.getDataNodeId()
+                            == 
allLeadership.getOrDefault(regionInfo.getConsensusGroupId(), -1)
+                        ? RegionRoleType.Leader.toString()
+                        : RegionRoleType.Follower.toString();
+                regionInfo.setRoleType(regionType);
+              });
+
+      return regionInfoListResp;
 
-    return regionInfoListResp;
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
read API.", e);
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      RegionInfoListResp resp = new RegionInfoListResp();
+      resp.setStatus(res);
+      return resp;
+    }
   }
 
   /**
@@ -988,7 +1016,14 @@ public class PartitionManager {
           new TTimePartitionSlot(
               req.getTimeStamp() - req.getTimeStamp() % 
COMMON_CONFIG.getTimePartitionInterval()));
     }
-    return (GetRegionIdResp) getConsensusManager().read(plan).getDataset();
+    try {
+      return (GetRegionIdResp) getConsensusManager().read(plan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
read API.", e);
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      return new GetRegionIdResp(res, Collections.emptyList());
+    }
   }
 
   public GetTimeSlotListResp getTimeSlotList(TGetTimeSlotListReq req) {
@@ -1008,7 +1043,14 @@ public class PartitionManager {
       plan.setRegionId(
           new TConsensusGroupId(TConsensusGroupType.DataRegion, (int) 
req.getRegionId()));
     }
-    return (GetTimeSlotListResp) getConsensusManager().read(plan).getDataset();
+    try {
+      return (GetTimeSlotListResp) getConsensusManager().read(plan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
read API.", e);
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      return new GetTimeSlotListResp(res, Collections.emptyList());
+    }
   }
 
   public CountTimeSlotListResp countTimeSlotList(TCountTimeSlotListReq req) {
@@ -1028,12 +1070,26 @@ public class PartitionManager {
       plan.setRegionId(
           new TConsensusGroupId(TConsensusGroupType.DataRegion, (int) 
req.getRegionId()));
     }
-    return (CountTimeSlotListResp) 
getConsensusManager().read(plan).getDataset();
+    try {
+      return (CountTimeSlotListResp) getConsensusManager().read(plan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
read API.", e);
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      return new CountTimeSlotListResp(res, 0);
+    }
   }
 
   public GetSeriesSlotListResp getSeriesSlotList(TGetSeriesSlotListReq req) {
     GetSeriesSlotListPlan plan = new GetSeriesSlotListPlan(req.getDatabase(), 
req.getType());
-    return (GetSeriesSlotListResp) 
getConsensusManager().read(plan).getDataset();
+    try {
+      return (GetSeriesSlotListResp) getConsensusManager().read(plan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
read API.", e);
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      return new GetSeriesSlotListResp(res, Collections.emptyList());
+    }
   }
 
   /**
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/plugin/PipePluginCoordinator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/plugin/PipePluginCoordinator.java
index 6fb9e93a0d8..e13d6a6df01 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/plugin/PipePluginCoordinator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/plugin/PipePluginCoordinator.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TCreatePipePluginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
@@ -81,9 +82,9 @@ public class PipePluginCoordinator {
   public TGetPipePluginTableResp getPipePluginTable() {
     try {
       return ((PipePluginTableResp)
-              configManager.getConsensusManager().read(new 
GetPipePluginTablePlan()).getDataset())
+              configManager.getConsensusManager().read(new 
GetPipePluginTablePlan()))
           .convertToThriftResponse();
-    } catch (IOException e) {
+    } catch (IOException | ConsensusException e) {
       LOGGER.error("Fail to get PipePluginTable", e);
       return new TGetPipePluginTableResp(
           new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
@@ -93,11 +94,17 @@ public class PipePluginCoordinator {
   }
 
   public TGetJarInListResp getPipePluginJar(TGetJarInListReq req) {
-    return ((JarResp)
-            configManager
-                .getConsensusManager()
-                .read(new GetPipePluginJarPlan(req.getJarNameList()))
-                .getDataset())
-        .convertToThriftResponse();
+    try {
+      return ((JarResp)
+              configManager
+                  .getConsensusManager()
+                  .read(new GetPipePluginJarPlan(req.getJarNameList())))
+          .convertToThriftResponse();
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
read API.", e);
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      return new JarResp(res, 
Collections.emptyList()).convertToThriftResponse();
+    }
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
index 886a728916b..65dc59b1147 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -141,10 +142,14 @@ public class PipeTaskCoordinator {
   public TShowPipeResp showPipes(TShowPipeReq req) {
     lock();
     try {
-      return ((PipeTableResp)
-              configManager.getConsensusManager().read(new 
ShowPipePlanV2()).getDataset())
+      return ((PipeTableResp) configManager.getConsensusManager().read(new 
ShowPipePlanV2()))
           .filter(req.whereClause, req.pipeName)
           .convertToTShowPipeResp();
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
read API.", e);
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      return new PipeTableResp(res, 
Collections.emptyList()).convertToTShowPipeResp();
     } finally {
       unlock();
     }
@@ -153,10 +158,9 @@ public class PipeTaskCoordinator {
   public TGetAllPipeInfoResp getAllPipeInfo() {
     lock();
     try {
-      return ((PipeTableResp)
-              configManager.getConsensusManager().read(new 
ShowPipePlanV2()).getDataset())
+      return ((PipeTableResp) configManager.getConsensusManager().read(new 
ShowPipePlanV2()))
           .convertToTGetAllPipeInfoResp();
-    } catch (IOException e) {
+    } catch (IOException | ConsensusException e) {
       LOGGER.warn("Failed to get all pipe info.", e);
       return new TGetAllPipeInfoResp(
           new 
TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage()),
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
index 8ef9a1b64f3..638df19d499 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
@@ -259,7 +259,16 @@ public class ClusterSchemaManager {
    * @return CountDatabaseResp
    */
   public CountDatabaseResp countMatchedDatabases(CountDatabasePlan 
countDatabasePlan) {
-    return (CountDatabaseResp) 
getConsensusManager().read(countDatabasePlan).getDataset();
+    try {
+      return (CountDatabaseResp) getConsensusManager().read(countDatabasePlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
read API.", e);
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      CountDatabaseResp response = new CountDatabaseResp();
+      response.setStatus(res);
+      return response;
+    }
   }
 
   /**
@@ -270,8 +279,16 @@ public class ClusterSchemaManager {
    * @return DatabaseSchemaResp
    */
   public DatabaseSchemaResp getMatchedDatabaseSchema(GetDatabasePlan 
getStorageGroupPlan) {
-    DatabaseSchemaResp resp =
-        (DatabaseSchemaResp) 
getConsensusManager().read(getStorageGroupPlan).getDataset();
+    DatabaseSchemaResp resp;
+    try {
+      resp = (DatabaseSchemaResp) 
getConsensusManager().read(getStorageGroupPlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
read API.", e);
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      resp = new DatabaseSchemaResp();
+      resp.setStatus(res);
+    }
     List<String> preDeletedDatabaseList = new ArrayList<>();
     for (String database : resp.getSchemaMap().keySet()) {
       if (getPartitionManager().isDatabasePreDeleted(database)) {
@@ -286,8 +303,16 @@ public class ClusterSchemaManager {
 
   /** Only used in cluster tool show Databases. */
   public TShowDatabaseResp showDatabase(GetDatabasePlan getStorageGroupPlan) {
-    DatabaseSchemaResp databaseSchemaResp =
-        (DatabaseSchemaResp) 
getConsensusManager().read(getStorageGroupPlan).getDataset();
+    DatabaseSchemaResp databaseSchemaResp;
+    try {
+      databaseSchemaResp = (DatabaseSchemaResp) 
getConsensusManager().read(getStorageGroupPlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
read API.", e);
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      databaseSchemaResp = new DatabaseSchemaResp();
+      databaseSchemaResp.setStatus(res);
+    }
     if (databaseSchemaResp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       // Return immediately if some Database doesn't exist
       return new TShowDatabaseResp().setStatus(databaseSchemaResp.getStatus());
@@ -683,7 +708,14 @@ public class ClusterSchemaManager {
    * @return TSStatus
    */
   public TSStatus createTemplate(CreateSchemaTemplatePlan 
createSchemaTemplatePlan) {
-    return getConsensusManager().write(createSchemaTemplatePlan).getStatus();
+    try {
+      return getConsensusManager().write(createSchemaTemplatePlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      return res;
+    }
   }
 
   /**
@@ -693,8 +725,16 @@ public class ClusterSchemaManager {
    */
   public TGetAllTemplatesResp getAllTemplates() {
     GetAllSchemaTemplatePlan getAllSchemaTemplatePlan = new 
GetAllSchemaTemplatePlan();
-    TemplateInfoResp templateResp =
-        (TemplateInfoResp) 
getConsensusManager().read(getAllSchemaTemplatePlan).getDataset();
+    TemplateInfoResp templateResp;
+    try {
+      templateResp = (TemplateInfoResp) 
getConsensusManager().read(getAllSchemaTemplatePlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
read API.", e);
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      templateResp = new TemplateInfoResp();
+      templateResp.setStatus(res);
+    }
     TGetAllTemplatesResp resp = new TGetAllTemplatesResp();
     resp.setStatus(templateResp.getStatus());
     if (resp.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()
@@ -709,8 +749,16 @@ public class ClusterSchemaManager {
   /** show nodes in schemaengine template */
   public TGetTemplateResp getTemplate(String req) {
     GetSchemaTemplatePlan getSchemaTemplatePlan = new 
GetSchemaTemplatePlan(req);
-    TemplateInfoResp templateResp =
-        (TemplateInfoResp) 
getConsensusManager().read(getSchemaTemplatePlan).getDataset();
+    TemplateInfoResp templateResp;
+    try {
+      templateResp = (TemplateInfoResp) 
getConsensusManager().read(getSchemaTemplatePlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
read API.", e);
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      templateResp = new TemplateInfoResp();
+      templateResp.setStatus(res);
+    }
     TGetTemplateResp resp = new TGetTemplateResp();
     if (templateResp.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()
         && templateResp.getTemplateList() != null
@@ -725,8 +773,16 @@ public class ClusterSchemaManager {
   /** show path set template xx */
   public TGetPathsSetTemplatesResp getPathsSetTemplate(String templateName) {
     GetPathsSetTemplatePlan getPathsSetTemplatePlan = new 
GetPathsSetTemplatePlan(templateName);
-    PathInfoResp pathInfoResp =
-        (PathInfoResp) 
getConsensusManager().read(getPathsSetTemplatePlan).getDataset();
+    PathInfoResp pathInfoResp;
+    try {
+      pathInfoResp = (PathInfoResp) 
getConsensusManager().read(getPathsSetTemplatePlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
read API.", e);
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      pathInfoResp = new PathInfoResp();
+      pathInfoResp.setStatus(res);
+    }
     if (pathInfoResp.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       TGetPathsSetTemplatesResp resp = new TGetPathsSetTemplatesResp();
       resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
@@ -742,21 +798,42 @@ public class ClusterSchemaManager {
    * template info won't be taken
    */
   public byte[] getAllTemplateSetInfo() {
-    AllTemplateSetInfoResp resp =
-        (AllTemplateSetInfoResp)
-            getConsensusManager().read(new 
GetAllTemplateSetInfoPlan()).getDataset();
-    return resp.getTemplateInfo();
+    try {
+      AllTemplateSetInfoResp resp =
+          (AllTemplateSetInfoResp) getConsensusManager().read(new 
GetAllTemplateSetInfoPlan());
+      return resp.getTemplateInfo();
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
read API.", e);
+      return null;
+    }
   }
 
   public TemplateSetInfoResp getTemplateSetInfo(List<PartialPath> patternList) 
{
-    return (TemplateSetInfoResp)
-        getConsensusManager().read(new 
GetTemplateSetInfoPlan(patternList)).getDataset();
+    try {
+      return (TemplateSetInfoResp)
+          getConsensusManager().read(new GetTemplateSetInfoPlan(patternList));
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
read API.", e);
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      TemplateSetInfoResp response = new TemplateSetInfoResp();
+      response.setStatus(res);
+      return response;
+    }
   }
 
   public Pair<TSStatus, Template> checkIsTemplateSetOnPath(String 
templateName, String path) {
     GetSchemaTemplatePlan getSchemaTemplatePlan = new 
GetSchemaTemplatePlan(templateName);
-    TemplateInfoResp templateResp =
-        (TemplateInfoResp) 
getConsensusManager().read(getSchemaTemplatePlan).getDataset();
+    TemplateInfoResp templateResp;
+    try {
+      templateResp = (TemplateInfoResp) 
getConsensusManager().read(getSchemaTemplatePlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
read API.", e);
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      templateResp = new TemplateInfoResp();
+      templateResp.setStatus(res);
+    }
     if (templateResp.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       if (templateResp.getTemplateList() == null || 
templateResp.getTemplateList().isEmpty()) {
         return new Pair<>(
@@ -770,8 +847,16 @@ public class ClusterSchemaManager {
     }
 
     GetPathsSetTemplatePlan getPathsSetTemplatePlan = new 
GetPathsSetTemplatePlan(templateName);
-    PathInfoResp pathInfoResp =
-        (PathInfoResp) 
getConsensusManager().read(getPathsSetTemplatePlan).getDataset();
+    PathInfoResp pathInfoResp;
+    try {
+      pathInfoResp = (PathInfoResp) 
getConsensusManager().read(getPathsSetTemplatePlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
read API.", e);
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      pathInfoResp = new PathInfoResp();
+      pathInfoResp.setStatus(res);
+    }
     if (pathInfoResp.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       List<String> templateSetPathList = pathInfoResp.getPathList();
       if (templateSetPathList == null
@@ -827,8 +912,16 @@ public class ClusterSchemaManager {
 
     // check template existence
     GetSchemaTemplatePlan getSchemaTemplatePlan = new 
GetSchemaTemplatePlan(templateName);
-    TemplateInfoResp templateInfoResp =
-        (TemplateInfoResp) 
getConsensusManager().read(getSchemaTemplatePlan).getDataset();
+    TemplateInfoResp templateInfoResp;
+    try {
+      templateInfoResp = (TemplateInfoResp) 
getConsensusManager().read(getSchemaTemplatePlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
read API.", e);
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      templateInfoResp = new TemplateInfoResp();
+      templateInfoResp.setStatus(res);
+    }
     if (templateInfoResp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       return templateInfoResp.getStatus();
     } else if (templateInfoResp.getTemplateList() == null
@@ -840,8 +933,16 @@ public class ClusterSchemaManager {
 
     // check is template set on some path, block all template set operation
     GetPathsSetTemplatePlan getPathsSetTemplatePlan = new 
GetPathsSetTemplatePlan(templateName);
-    PathInfoResp pathInfoResp =
-        (PathInfoResp) 
getConsensusManager().read(getPathsSetTemplatePlan).getDataset();
+    PathInfoResp pathInfoResp;
+    try {
+      pathInfoResp = (PathInfoResp) 
getConsensusManager().read(getPathsSetTemplatePlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
read API.", e);
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      pathInfoResp = new PathInfoResp();
+      pathInfoResp.setStatus(res);
+    }
     if (pathInfoResp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       return pathInfoResp.getStatus();
     } else if (pathInfoResp.getPathList() != null && 
!pathInfoResp.getPathList().isEmpty()) {
@@ -899,7 +1000,14 @@ public class ClusterSchemaManager {
 
     ExtendSchemaTemplatePlan extendSchemaTemplatePlan =
         new ExtendSchemaTemplatePlan(templateExtendInfo);
-    TSStatus status = 
getConsensusManager().write(extendSchemaTemplatePlan).getStatus();
+    TSStatus status;
+    try {
+      status = getConsensusManager().write(extendSchemaTemplatePlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
write API.", e);
+      status = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      status.setMessage(e.getMessage());
+    }
     if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       return status;
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTemplateProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTemplateProcedure.java
index a8be16f171e..c12c4447c2b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTemplateProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTemplateProcedure.java
@@ -149,12 +149,18 @@ public class SetTemplateProcedure
     // check whether the template can be set on given path
     CheckTemplateSettablePlan checkTemplateSettablePlan =
         new CheckTemplateSettablePlan(templateName, templateSetPath);
-    TemplateInfoResp resp =
-        (TemplateInfoResp)
-            env.getConfigManager()
-                .getConsensusManager()
-                .read(checkTemplateSettablePlan)
-                .getDataset();
+    TemplateInfoResp resp;
+    try {
+      resp =
+          (TemplateInfoResp)
+              
env.getConfigManager().getConsensusManager().read(checkTemplateSettablePlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
read API.", e);
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      resp = new TemplateInfoResp();
+      resp.setStatus(res);
+    }
     if (resp.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       setNextState(SetTemplateState.PRE_SET);
     } else {
@@ -221,9 +227,18 @@ public class SetTemplateProcedure
 
   private Template getTemplate(ConfigNodeProcedureEnv env) {
     GetSchemaTemplatePlan getSchemaTemplatePlan = new 
GetSchemaTemplatePlan(templateName);
-    TemplateInfoResp templateResp =
-        (TemplateInfoResp)
-            
env.getConfigManager().getConsensusManager().read(getSchemaTemplatePlan).getDataset();
+    TemplateInfoResp templateResp;
+    try {
+      templateResp =
+          (TemplateInfoResp)
+              
env.getConfigManager().getConsensusManager().read(getSchemaTemplatePlan);
+    } catch (ConsensusException e) {
+      LOGGER.warn("Something wrong happened while calling consensus layer's 
read API.", e);
+      TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      templateResp = new TemplateInfoResp();
+      templateResp.setStatus(res);
+    }
     if (templateResp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       setFailure(
           new ProcedureException(
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
index 8b95ae0d4d0..39eab9c5dd7 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.common.ConsensusGroup;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.consensus.iot.util.TestEntry;
 import org.apache.iotdb.consensus.iot.util.TestStateMachine;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
@@ -157,7 +158,8 @@ public class ReplicateTest {
    * The three nodes use the requests in the queue to replicate the requests 
to the other two nodes.
    */
   @Test
-  public void replicateUsingQueueTest() throws IOException, 
InterruptedException {
+  public void replicateUsingQueueTest()
+      throws IOException, InterruptedException, ConsensusException {
     logger.info("Start ReplicateUsingQueueTest");
     servers.get(0).createLocalPeer(group.getGroupId(), group.getPeers());
     servers.get(1).createLocalPeer(group.getGroupId(), group.getPeers());
@@ -235,7 +237,7 @@ public class ReplicateTest {
    * nodes finally consistent.
    */
   @Test
-  public void replicateUsingWALTest() throws IOException, InterruptedException 
{
+  public void replicateUsingWALTest() throws IOException, 
InterruptedException, ConsensusException {
     logger.info("Start ReplicateUsingWALTest");
     servers.get(0).createLocalPeer(group.getGroupId(), group.getPeers());
     servers.get(1).createLocalPeer(group.getGroupId(), group.getPeers());
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index d9cd902538a..8da2d467248 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.consensus.ratis;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.IStateMachine;
@@ -26,10 +27,10 @@ import org.apache.iotdb.consensus.common.ConsensusGroup;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
-import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.consensus.config.RatisConfig;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.consensus.exception.RatisRequestFailedException;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.ratis.util.TimeDuration;
 import org.junit.After;
@@ -220,11 +221,15 @@ public class RatisConsensusTest {
           () -> {
             ByteBufferConsensusRequest incrReq = 
TestUtils.TestRequest.incrRequest();
 
-            ConsensusWriteResponse response = consensus.write(gid, incrReq);
-            if (response.getException() != null) {
-              response.getException().printStackTrace(System.out);
+            TSStatus response;
+            try {
+              response = consensus.write(gid, incrReq);
+            } catch (ConsensusException e) {
+              response = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+              response.setMessage(e.getMessage());
+              e.printStackTrace(System.out);
             }
-            Assert.assertEquals(200, response.getStatus().getCode());
+            Assert.assertEquals(200, response.getCode());
             latch.countDown();
           });
     }
@@ -253,8 +258,7 @@ public class RatisConsensusTest {
     Assert.assertNotNull(leader);
 
     // Check we reached a consensus
-    ConsensusReadResponse response = leader.read(gid, getReq);
-    TestUtils.TestDataSet result = (TestUtils.TestDataSet) 
response.getDataset();
+    TestUtils.TestDataSet result = (TestUtils.TestDataSet) leader.read(gid, 
getReq);
     Assert.assertEquals(target, result.getNumber());
   }
 }
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java
index c90d5d9952d..a4f7c2a1ecb 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java
@@ -20,10 +20,11 @@ package org.apache.iotdb.consensus.ratis;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.config.RatisConfig;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.consensus.exception.RatisUnderRecoveryException;
 
 import org.apache.ratis.util.TimeDuration;
@@ -171,20 +172,20 @@ public class RecoverReadTest {
 
     // try max 3 minutes
     final long startTs = System.currentTimeMillis();
-    ConsensusReadResponse resp = TestUtils.doRead(miniCluster.getServer(0), 
gid);
-    while (!resp.isSuccess()) {
+    DataSet resp;
+    try {
+      resp = TestUtils.doRead(miniCluster.getServer(0), gid);
+    } catch (ConsensusException e) {
       final long timeElapsed = System.currentTimeMillis() - startTs;
       if (timeElapsed > 1000 * 60 * 3) { // 3 min
-        Assert.fail(
-            "Linearizable read failed after 3 minutes, last exception seen: "
-                + resp.getException());
+        Assert.fail("Linearizable read failed after 3 minutes, last exception 
seen: " + e);
       }
       Thread.sleep(100);
-      logger.info("linearizable read failed  when restart, retrying: ", 
resp.getException());
+      logger.info("linearizable read failed  when restart, retrying: ", e);
       resp = TestUtils.doRead(miniCluster.getServer(0), gid);
     }
 
-    Assert.assertEquals(10, ((TestUtils.TestDataSet) 
resp.getDataset()).getNumber());
+    Assert.assertEquals(10, ((TestUtils.TestDataSet) resp).getNumber());
   }
 
   @Test
@@ -209,8 +210,11 @@ public class RecoverReadTest {
     miniCluster.restart();
 
     // query during redo: get exception that ratis is under recovery
-    final ConsensusReadResponse readResponse = 
TestUtils.doRead(miniCluster.getServer(0), gid);
-    Assert.assertTrue(readResponse.getException() instanceof 
RatisUnderRecoveryException);
+    try {
+      TestUtils.doRead(miniCluster.getServer(0), gid);
+    } catch (ConsensusException e) {
+      Assert.assertTrue(e instanceof RatisUnderRecoveryException);
+    }
   }
 
   @Test
@@ -238,7 +242,10 @@ public class RecoverReadTest {
     miniCluster.waitUntilActiveLeader();
 
     // query during redo: get exception that ratis is under recovery
-    final ConsensusReadResponse readResponse = 
TestUtils.doRead(miniCluster.getServer(0), gid);
-    Assert.assertTrue(readResponse.getException() instanceof 
RatisUnderRecoveryException);
+    try {
+      TestUtils.doRead(miniCluster.getServer(0), gid);
+    } catch (ConsensusException e) {
+      Assert.assertTrue(e instanceof RatisUnderRecoveryException);
+    }
   }
 }
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
index 604d6afcec0..75e0426aa5e 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
@@ -33,8 +33,6 @@ import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
 import org.apache.iotdb.consensus.config.RatisConfig;
 import org.apache.iotdb.consensus.exception.ConsensusException;
@@ -379,24 +377,22 @@ public class TestUtils {
     }
   }
 
-  static void write(IConsensus consensus, ConsensusGroupId gid, int count) {
+  static void write(IConsensus consensus, ConsensusGroupId gid, int count)
+      throws ConsensusException {
     for (int i = 0; i < count; i++) {
       final ByteBufferConsensusRequest increment = TestRequest.incrRequest();
-      final ConsensusWriteResponse response = consensus.write(gid, increment);
-      Assert.assertEquals(200, response.getStatus().getCode());
+      final TSStatus response = consensus.write(gid, increment);
+      Assert.assertEquals(200, response.getCode());
     }
   }
 
   static int read(IConsensus consensus, ConsensusGroupId gid) throws 
ConsensusException {
-    final ConsensusReadResponse response = doRead(consensus, gid);
-    if (!response.isSuccess()) {
-      throw response.getException();
-    }
-    final TestUtils.TestDataSet result = (TestUtils.TestDataSet) 
response.getDataset();
+    final DataSet response = doRead(consensus, gid);
+    final TestUtils.TestDataSet result = (TestUtils.TestDataSet) response;
     return result.getNumber();
   }
 
-  static ConsensusReadResponse doRead(IConsensus consensus, ConsensusGroupId 
gid) {
+  static DataSet doRead(IConsensus consensus, ConsensusGroupId gid) throws 
ConsensusException {
     final ByteBufferConsensusRequest getReq = 
TestUtils.TestRequest.getRequest();
     return consensus.read(gid, getReq);
   }
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java
index 5cd64edf167..e70c1a6b24e 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java
@@ -37,10 +37,7 @@ import 
org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
-import 
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
-import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
-import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
-import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
+import org.apache.iotdb.consensus.exception.*;
 
 import org.apache.ratis.util.FileUtils;
 import org.junit.After;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 8c2d1be433c..6c150b8fc70 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -944,7 +944,7 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
 
       return exceptionMessages.isEmpty()
           ? new TPushPipeMetaResp()
-          .setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()))
+              .setStatus(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()))
           : new TPushPipeMetaResp()
               .setStatus(new 
TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()))
               .setExceptionMessages(exceptionMessages);
@@ -1802,16 +1802,16 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     // kill the datanode process 20 seconds later
     // because datanode process cannot exit normally for the reason of 
InterruptedException
     new Thread(
-        () -> {
-          try {
-            TimeUnit.SECONDS.sleep(20);
-          } catch (InterruptedException e) {
-            LOGGER.warn("Meets InterruptedException in stopDataNode RPC 
method");
-          } finally {
-            LOGGER.info("Executing system.exit(0) in stopDataNode RPC method 
after 20 seconds");
-            System.exit(0);
-          }
-        })
+            () -> {
+              try {
+                TimeUnit.SECONDS.sleep(20);
+              } catch (InterruptedException e) {
+                LOGGER.warn("Meets InterruptedException in stopDataNode RPC 
method");
+              } finally {
+                LOGGER.info("Executing system.exit(0) in stopDataNode RPC 
method after 20 seconds");
+                System.exit(0);
+              }
+            })
         .start();
 
     try {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
index 4e2506a2d3f..9ac38701dac 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.IConsensus;
-import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceInfo;
@@ -69,7 +69,7 @@ public class RegionReadExecutor {
   public RegionExecutionResult execute(
       ConsensusGroupId groupId, FragmentInstance fragmentInstance) {
     // execute fragment instance in state machine
-    ConsensusReadResponse readResponse;
+    DataSet readResponse;
     try (SetThreadName threadName = new 
SetThreadName(fragmentInstance.getId().getFullId())) {
       if (groupId instanceof DataRegionId) {
         readResponse = dataRegionConsensus.read(groupId, fragmentInstance);
@@ -81,20 +81,8 @@ public class RegionReadExecutor {
         LOGGER.error(RESPONSE_NULL_ERROR_MSG);
         resp.setAccepted(false);
         resp.setMessage(RESPONSE_NULL_ERROR_MSG);
-      } else if (!readResponse.isSuccess()) {
-        LOGGER.error(
-            "Execute FragmentInstance in ConsensusGroup {} failed.",
-            groupId,
-            readResponse.getException());
-        resp.setAccepted(false);
-        resp.setMessage(
-            String.format(
-                ERROR_MSG_FORMAT,
-                readResponse.getException() == null
-                    ? ""
-                    : readResponse.getException().getMessage()));
       } else {
-        FragmentInstanceInfo info = (FragmentInstanceInfo) 
readResponse.getDataset();
+        FragmentInstanceInfo info = (FragmentInstanceInfo) readResponse;
         resp.setAccepted(!info.getState().isFailed());
         resp.setMessage(info.getMessage());
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index d89e9bb0480..79f703e8e65 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -128,15 +128,11 @@ public class DataNode implements DataNodeMBean {
 
   private final TEndPoint thisNode = new TEndPoint();
 
-  /**
-   * Hold the information of trigger, udf......
-   */
+  /** Hold the information of trigger, udf...... */
   private final ResourcesInformationHolder resourcesInformationHolder =
       new ResourcesInformationHolder();
 
-  /**
-   * Responsible for keeping trigger information up to date.
-   */
+  /** Responsible for keeping trigger information up to date. */
   private final TriggerInformationUpdater triggerInformationUpdater =
       new TriggerInformationUpdater();
 
@@ -207,9 +203,7 @@ public class DataNode implements DataNodeMBean {
     }
   }
 
-  /**
-   * Prepare cluster IoTDB-DataNode
-   */
+  /** Prepare cluster IoTDB-DataNode */
   private boolean prepareDataNode() throws StartupException, IOException {
     // Set cluster mode
     config.setClusterMode(true);
@@ -354,7 +348,7 @@ public class DataNode implements DataNodeMBean {
    * Register this DataNode into cluster.
    *
    * @throws StartupException if register failed.
-   * @throws IOException      if serialize cluster name and dataNode Id failed.
+   * @throws IOException if serialize cluster name and dataNode Id failed.
    */
   private void sendRegisterRequestToConfigNode() throws StartupException, 
IOException {
     logger.info("Sending register request to ConfigNode-leader...");
@@ -558,9 +552,7 @@ public class DataNode implements DataNodeMBean {
     registerManager.register(PipeAgent.runtime());
   }
 
-  /**
-   * Set up RPC and protocols after DataNode is available
-   */
+  /** Set up RPC and protocols after DataNode is available */
   private void setUpRPCService() throws StartupException {
     // Start InternalRPCService to indicate that the current DataNode can 
accept cluster scheduling
     registerManager.register(DataNodeInternalRPCService.getInstance());
@@ -705,9 +697,7 @@ public class DataNode implements DataNodeMBean {
     }
   }
 
-  /**
-   * Generate a list for UDFs that do not have jar on this node.
-   */
+  /** Generate a list for UDFs that do not have jar on this node. */
   private List<UDFInformation> getJarListForUDF() {
     List<UDFInformation> res = new ArrayList<>();
     for (UDFInformation udfInformation : 
resourcesInformationHolder.getUDFInformationList()) {
@@ -820,9 +810,7 @@ public class DataNode implements DataNodeMBean {
     }
   }
 
-  /**
-   * Generate a list for triggers that do not have jar on this node.
-   */
+  /** Generate a list for triggers that do not have jar on this node. */
   private List<TriggerInformation> getJarListForTrigger() {
     List<TriggerInformation> res = new ArrayList<>();
     for (TriggerInformation triggerInformation :
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
index 8cd6673f3c0..4a6d2eb5044 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.service;
 
-import java.io.IOException;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.concurrent.ThreadName;
@@ -43,6 +42,8 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+
 public class IoTDBShutdownHook extends Thread {
 
   private static final Logger logger = 
LoggerFactory.getLogger(IoTDBShutdownHook.class);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutorTest.java
index 9ae205aebef..426a5a729fd 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutorTest.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
@@ -46,7 +47,7 @@ import static org.junit.Assert.assertTrue;
 public class RegionReadExecutorTest {
 
   @Test
-  public void testSuccessfulExecute() {
+  public void testSuccessfulExecute() throws ConsensusException {
 
     // data query
     ConsensusGroupId dataRegionGroupId = new DataRegionId(1);
@@ -70,7 +71,7 @@ public class RegionReadExecutorTest {
     Mockito.when(fragmentInstanceInfo.getMessage()).thenReturn("data-success");
 
     Mockito.when(dataRegionConsensus.read(dataRegionGroupId, fragmentInstance))
-        .thenReturn(readResponse);
+        .thenReturn((DataSet) readResponse);
 
     RegionExecutionResult res = executor.execute(dataRegionGroupId, 
fragmentInstance);
 
@@ -86,7 +87,7 @@ public class RegionReadExecutorTest {
     
Mockito.when(fragmentInstanceInfo.getMessage()).thenReturn("schema-success");
 
     Mockito.when(schemaRegionConsensus.read(schemaRegionGroupId, 
fragmentInstance))
-        .thenReturn(readResponse);
+        .thenReturn((DataSet) readResponse);
 
     res = executor.execute(schemaRegionGroupId, fragmentInstance);
 
@@ -95,7 +96,7 @@ public class RegionReadExecutorTest {
   }
 
   @Test
-  public void testResponseNullExecute() {
+  public void testResponseNullExecute() throws ConsensusException {
 
     // data query
     ConsensusGroupId dataRegionGroupId = new DataRegionId(1);
@@ -131,7 +132,7 @@ public class RegionReadExecutorTest {
   }
 
   @Test
-  public void testFailedExecute() {
+  public void testFailedExecute() throws ConsensusException {
 
     // data query
     ConsensusGroupId dataRegionGroupId = new DataRegionId(1);
@@ -152,7 +153,7 @@ public class RegionReadExecutorTest {
     Mockito.when(readResponse.getException()).thenReturn(new 
ConsensusException("data-exception"));
 
     Mockito.when(dataRegionConsensus.read(dataRegionGroupId, fragmentInstance))
-        .thenReturn(readResponse);
+        .thenReturn((DataSet) readResponse);
 
     RegionExecutionResult res = executor.execute(dataRegionGroupId, 
fragmentInstance);
 
@@ -166,7 +167,7 @@ public class RegionReadExecutorTest {
     Mockito.when(readResponse.getException()).thenReturn(null);
 
     Mockito.when(schemaRegionConsensus.read(schemaRegionGroupId, 
fragmentInstance))
-        .thenReturn(readResponse);
+        .thenReturn((DataSet) readResponse);
 
     res = executor.execute(schemaRegionGroupId, fragmentInstance);
 
@@ -175,7 +176,7 @@ public class RegionReadExecutorTest {
   }
 
   @Test
-  public void testExceptionHappened() {
+  public void testExceptionHappened() throws ConsensusException {
 
     ConsensusGroupId dataRegionGroupId = new DataRegionId(1);
     FragmentInstanceId fragmentInstanceId =
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutorTest.java
index 6e8eafd851c..59eefb601b3 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutorTest.java
@@ -26,6 +26,7 @@ import 
org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.db.protocol.thrift.impl.DataNodeRegionManager;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
@@ -49,7 +50,7 @@ import static org.junit.Assert.fail;
 public class RegionWriteExecutorTest {
 
   @Test
-  public void testInsertRowNode() {
+  public void testInsertRowNode() throws ConsensusException {
 
     IConsensus dataRegionConsensus = Mockito.mock(IConsensus.class);
     IConsensus schemaRegionConsensus = Mockito.mock(IConsensus.class);

Reply via email to