This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch consensus_module_refactor in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 72f9d25a609a065f58a00bc66c85b44b14631c90 Author: BUAAserein <[email protected]> AuthorDate: Thu Aug 17 12:48:23 2023 +0800 refactor read --- .../iotdb/confignode/manager/ConfigManager.java | 6 +- .../iotdb/confignode/manager/ModelManager.java | 40 +++-- .../confignode/manager/PermissionManager.java | 11 +- .../iotdb/confignode/manager/TriggerManager.java | 18 ++- .../iotdb/confignode/manager/UDFManager.java | 21 ++- .../iotdb/confignode/manager/cq/CQManager.java | 27 ++-- .../iotdb/confignode/manager/node/NodeManager.java | 11 +- .../manager/partition/PartitionManager.java | 142 ++++++++++++------ .../manager/pipe/plugin/PipePluginCoordinator.java | 23 ++- .../manager/pipe/task/PipeTaskCoordinator.java | 14 +- .../manager/schema/ClusterSchemaManager.java | 162 +++++++++++++++++---- .../impl/schema/SetTemplateProcedure.java | 33 +++-- .../apache/iotdb/consensus/iot/ReplicateTest.java | 6 +- .../iotdb/consensus/ratis/RatisConsensusTest.java | 20 ++- .../iotdb/consensus/ratis/RecoverReadTest.java | 31 ++-- .../apache/iotdb/consensus/ratis/TestUtils.java | 18 +-- .../consensus/simple/SimpleConsensusTest.java | 5 +- .../impl/DataNodeInternalRPCServiceImpl.java | 22 +-- .../execution/executor/RegionReadExecutor.java | 18 +-- .../java/org/apache/iotdb/db/service/DataNode.java | 26 +--- .../apache/iotdb/db/service/IoTDBShutdownHook.java | 3 +- .../execution/executor/RegionReadExecutorTest.java | 17 ++- .../executor/RegionWriteExecutorTest.java | 3 +- 23 files changed, 435 insertions(+), 242 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index f1f6cbdef2f..bbfaff13330 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -664,8 +664,7 @@ public class ConfigManager implements IManager { new GetSchemaPartitionPlan( partitionSlotsMap.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> new ArrayList<>(e.getValue())))); - SchemaPartitionResp queryResult = - (SchemaPartitionResp) partitionManager.getSchemaPartition(getSchemaPartitionPlan); + SchemaPartitionResp queryResult = partitionManager.getSchemaPartition(getSchemaPartitionPlan); resp = queryResult.convertToRpcSchemaPartitionTableResp(); LOGGER.debug("GetSchemaPartition receive paths: {}, return: {}", relatedPaths, resp); @@ -796,8 +795,7 @@ public class ConfigManager implements IManager { if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { return resp.setStatus(status); } - DataPartitionResp queryResult = - (DataPartitionResp) partitionManager.getDataPartition(getDataPartitionPlan); + DataPartitionResp queryResult = partitionManager.getDataPartition(getDataPartitionPlan); resp = queryResult.convertToTDataPartitionTableResp(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ModelManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ModelManager.java index 7fd18b5efca..97b2e8cf845 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ModelManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ModelManager.java @@ -36,7 +36,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowTrailReq; import org.apache.iotdb.confignode.rpc.thrift.TShowTrailResp; import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelInfoReq; import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelStateReq; -import org.apache.iotdb.consensus.common.response.ConsensusReadResponse; +import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.rpc.TSStatusCode; @@ -104,17 +104,14 @@ public class ModelManager { public TShowModelResp showModel(TShowModelReq req) { try { - ConsensusReadResponse response = - configManager.getConsensusManager().read(new ShowModelPlan(req)); - if (response.getDataset() != null) { - return ((ModelTableResp) response.getDataset()).convertToThriftResponse(); - } else { - LOGGER.warn("Unexpected error happened while showing model: ", response.getException()); - // consensus layer related errors - TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); - res.setMessage(response.getException().toString()); - return new TShowModelResp(res, Collections.emptyList()); - } + DataSet response = configManager.getConsensusManager().read(new ShowModelPlan(req)); + return ((ModelTableResp) response).convertToThriftResponse(); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's read API.", e); + // consensus layer related errors + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + return new TShowModelResp(res, Collections.emptyList()); } catch (IOException e) { LOGGER.error("Fail to get ModelTable", e); return new TShowModelResp( @@ -126,17 +123,14 @@ public class ModelManager { public TShowTrailResp showTrail(TShowTrailReq req) { try { - ConsensusReadResponse response = - configManager.getConsensusManager().read(new ShowTrailPlan(req)); - if (response.getDataset() != null) { - return ((TrailTableResp) response.getDataset()).convertToThriftResponse(); - } else { - LOGGER.warn("Unexpected error happened while showing trail: ", response.getException()); - // consensus layer related errors - TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); - res.setMessage(response.getException().toString()); - return new TShowTrailResp(res, Collections.emptyList()); - } + DataSet response = configManager.getConsensusManager().read(new ShowTrailPlan(req)); + return ((TrailTableResp) response).convertToThriftResponse(); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's read API.", e); + // consensus layer related errors + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + return new TShowTrailResp(res, Collections.emptyList()); } catch (IOException e) { LOGGER.error("Fail to get TrailTable", e); return new TShowTrailResp( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java index 1f5c5a999d8..e7213f9a7ca 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java @@ -88,7 +88,16 @@ public class PermissionManager { * @return PermissionInfoResp */ public PermissionInfoResp queryPermission(AuthorPlan authorPlan) { - return (PermissionInfoResp) getConsensusManager().read(authorPlan).getDataset(); + try { + return (PermissionInfoResp) getConsensusManager().read(authorPlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's read API.", e); + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + PermissionInfoResp response = new PermissionInfoResp(); + response.setStatus(res); + return response; + } } private ConsensusManager getConsensusManager() { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java index e42a0fb029e..49ed4256346 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java @@ -152,7 +152,7 @@ public class TriggerManager { configManager.getConsensusManager().read(new GetTriggerLocationPlan(triggerName))) .convertToThriftResponse(); } catch (ConsensusException e) { - LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + LOGGER.warn("Something wrong happened while calling consensus layer's read API.", e); return new TGetLocationForTriggerResp( new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) .setMessage(e.getMessage())); @@ -160,12 +160,16 @@ public class TriggerManager { } public TGetJarInListResp getTriggerJar(TGetJarInListReq req) { - return ((JarResp) - configManager - .getConsensusManager() - .read(new GetTriggerJarPlan(req.getJarNameList())) - .getDataset()) - .convertToThriftResponse(); + try { + return ((JarResp) + configManager.getConsensusManager().read(new GetTriggerJarPlan(req.getJarNameList()))) + .convertToThriftResponse(); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's read API.", e); + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + return new JarResp(res, Collections.emptyList()).convertToThriftResponse(); + } } /** diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java index f2b747104c1..877c6c8bca2 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java @@ -38,6 +38,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq; import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq; import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp; import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionInstanceReq; import org.apache.iotdb.mpp.rpc.thrift.TDropFunctionInstanceReq; import org.apache.iotdb.rpc.RpcUtils; @@ -168,9 +169,9 @@ public class UDFManager { public TGetUDFTableResp getUDFTable() { try { return ((FunctionTableResp) - configManager.getConsensusManager().read(new GetFunctionTablePlan()).getDataset()) + configManager.getConsensusManager().read(new GetFunctionTablePlan())) .convertToThriftResponse(); - } catch (IOException e) { + } catch (IOException | ConsensusException e) { LOGGER.error("Fail to get TriggerTable", e); return new TGetUDFTableResp( new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) @@ -180,11 +181,15 @@ public class UDFManager { } public TGetJarInListResp getUDFJar(TGetJarInListReq req) { - return ((JarResp) - configManager - .getConsensusManager() - .read(new GetUDFJarPlan(req.getJarNameList())) - .getDataset()) - .convertToThriftResponse(); + try { + return ((JarResp) + configManager.getConsensusManager().read(new GetUDFJarPlan(req.getJarNameList()))) + .convertToThriftResponse(); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's read API.", e); + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + return new JarResp(res, Collections.emptyList()).convertToThriftResponse(); + } } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java index 43878c16c96..63b5db0624a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java @@ -33,7 +33,7 @@ import org.apache.iotdb.confignode.persistence.cq.CQInfo; import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq; import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq; import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp; -import org.apache.iotdb.consensus.common.response.ConsensusReadResponse; +import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.rpc.TSStatusCode; @@ -90,14 +90,14 @@ public class CQManager { } public TShowCQResp showCQ() { - ConsensusReadResponse response = configManager.getConsensusManager().read(new ShowCQPlan()); - if (response.getDataset() != null) { - return ((ShowCQResp) response.getDataset()).convertToRpcShowCQResp(); - } else { - LOGGER.warn("Unexpected error happened while showing cq: ", response.getException()); + try { + DataSet response = configManager.getConsensusManager().read(new ShowCQPlan()); + return ((ShowCQResp) response).convertToRpcShowCQResp(); + } catch (ConsensusException e) { + LOGGER.warn("Unexpected error happened while showing cq: ", e); // consensus layer related errors TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); - res.setMessage(response.getException().toString()); + res.setMessage(e.getMessage()); return new TShowCQResp(res, Collections.emptyList()); } } @@ -155,16 +155,15 @@ public class CQManager { } // keep fetching until we get all CQEntries if this node is still leader while (needFetch(allCQs)) { - ConsensusReadResponse response = configManager.getConsensusManager().read(new ShowCQPlan()); - if (response.getDataset() != null) { - allCQs = ((ShowCQResp) response.getDataset()).getCqList(); - } else { + try { + DataSet response = configManager.getConsensusManager().read(new ShowCQPlan()); + allCQs = ((ShowCQResp) response).getCqList(); + } catch (ConsensusException e) { // consensus layer related errors - LOGGER.warn( - "Unexpected error happened while fetching cq list: ", response.getException()); + LOGGER.warn("Unexpected error happened while fetching cq list: ", e); try { Thread.sleep(500); - } catch (InterruptedException e) { + } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java index bf59d5c2b77..646c7381104 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java @@ -396,7 +396,16 @@ public class NodeManager { * GetDataNodeConfigurationPlan is -1 */ public DataNodeConfigurationResp getDataNodeConfiguration(GetDataNodeConfigurationPlan req) { - return (DataNodeConfigurationResp) getConsensusManager().read(req).getDataset(); + try { + return (DataNodeConfigurationResp) getConsensusManager().read(req); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's read API.", e); + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + DataNodeConfigurationResp response = new DataNodeConfigurationResp(); + response.setStatus(res); + return response; + } } /** diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java index 18c1c4f44e2..c633c567d7c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java @@ -84,8 +84,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq; import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq; import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq; import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; -import org.apache.iotdb.consensus.common.DataSet; -import org.apache.iotdb.consensus.common.response.ConsensusReadResponse; import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq; import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq; @@ -97,6 +95,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -165,8 +164,15 @@ public class PartitionManager { * @param req SchemaPartitionPlan with partitionSlotsMap * @return SchemaPartitionDataSet that contains only existing SchemaPartition */ - public DataSet getSchemaPartition(GetSchemaPartitionPlan req) { - return getConsensusManager().read(req).getDataset(); + public SchemaPartitionResp getSchemaPartition(GetSchemaPartitionPlan req) { + try { + return (SchemaPartitionResp) getConsensusManager().read(req); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's read API.", e); + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + return new SchemaPartitionResp(res, false, Collections.emptyMap()); + } } /** @@ -176,8 +182,15 @@ public class PartitionManager { * TTimeSlotList>> * @return DataPartitionDataSet that contains only existing DataPartition */ - public DataSet getDataPartition(GetDataPartitionPlan req) { - return getConsensusManager().read(req).getDataset(); + public DataPartitionResp getDataPartition(GetDataPartitionPlan req) { + try { + return (DataPartitionResp) getConsensusManager().read(req); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's read API.", e); + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + return new DataPartitionResp(res, false, Collections.emptyMap()); + } } /** @@ -205,7 +218,7 @@ public class PartitionManager { // After all the SchemaPartitions are allocated, // all the read requests about SchemaPartitionTable are parallel. - SchemaPartitionResp resp = (SchemaPartitionResp) getSchemaPartition(req); + SchemaPartitionResp resp = getSchemaPartition(req); if (resp.isAllPartitionsExist()) { return resp; } @@ -218,7 +231,7 @@ public class PartitionManager { synchronized (this) { // Here we should check again if the SchemaPartition // has been created by other threads to improve concurrent performance - resp = (SchemaPartitionResp) getSchemaPartition(req); + resp = getSchemaPartition(req); if (resp.isAllPartitionsExist()) { return resp; } @@ -276,7 +289,7 @@ public class PartitionManager { } } - resp = (SchemaPartitionResp) getSchemaPartition(req); + resp = getSchemaPartition(req); if (!resp.isAllPartitionsExist()) { // Count the fail rate AtomicInteger totalSlotNum = new AtomicInteger(); @@ -328,7 +341,7 @@ public class PartitionManager { // After all the DataPartitions are allocated, // all the read requests about DataPartitionTable are parallel. - DataPartitionResp resp = (DataPartitionResp) getDataPartition(req); + DataPartitionResp resp = getDataPartition(req); if (resp.isAllPartitionsExist()) { return resp; } @@ -341,7 +354,7 @@ public class PartitionManager { synchronized (this) { // Here we should check again if the DataPartition // has been created by other threads to improve concurrent performance - resp = (DataPartitionResp) getDataPartition(req); + resp = getDataPartition(req); if (resp.isAllPartitionsExist()) { return resp; } @@ -399,7 +412,7 @@ public class PartitionManager { } } - resp = (DataPartitionResp) getDataPartition(req); + resp = getDataPartition(req); if (!resp.isAllPartitionsExist()) { // Count the fail rate AtomicInteger totalSlotNum = new AtomicInteger(); @@ -886,10 +899,16 @@ public class PartitionManager { * SchemaPartition and matched child paths aboveMTree */ public SchemaNodeManagementResp getNodePathsPartition(GetNodePathsPartitionPlan physicalPlan) { - SchemaNodeManagementResp schemaNodeManagementResp; - ConsensusReadResponse consensusReadResponse = getConsensusManager().read(physicalPlan); - schemaNodeManagementResp = (SchemaNodeManagementResp) consensusReadResponse.getDataset(); - return schemaNodeManagementResp; + try { + return (SchemaNodeManagementResp) getConsensusManager().read(physicalPlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's read API.", e); + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + SchemaNodeManagementResp resp = new SchemaNodeManagementResp(); + resp.setStatus(res); + return resp; + } } public void preDeleteDatabase( @@ -918,30 +937,39 @@ public class PartitionManager { } public RegionInfoListResp getRegionInfoList(GetRegionInfoListPlan req) { - // Get static result - RegionInfoListResp regionInfoListResp = - (RegionInfoListResp) getConsensusManager().read(req).getDataset(); - - // Get cached result - Map<TConsensusGroupId, Integer> allLeadership = getLoadManager().getRegionLeaderMap(); - regionInfoListResp - .getRegionInfoList() - .forEach( - regionInfo -> { - regionInfo.setStatus( - getLoadManager() - .getRegionStatus(regionInfo.getConsensusGroupId(), regionInfo.getDataNodeId()) - .getStatus()); - - String regionType = - regionInfo.getDataNodeId() - == allLeadership.getOrDefault(regionInfo.getConsensusGroupId(), -1) - ? RegionRoleType.Leader.toString() - : RegionRoleType.Follower.toString(); - regionInfo.setRoleType(regionType); - }); + try { + // Get static result + RegionInfoListResp regionInfoListResp = (RegionInfoListResp) getConsensusManager().read(req); + // Get cached result + Map<TConsensusGroupId, Integer> allLeadership = getLoadManager().getRegionLeaderMap(); + regionInfoListResp + .getRegionInfoList() + .forEach( + regionInfo -> { + regionInfo.setStatus( + getLoadManager() + .getRegionStatus( + regionInfo.getConsensusGroupId(), regionInfo.getDataNodeId()) + .getStatus()); + + String regionType = + regionInfo.getDataNodeId() + == allLeadership.getOrDefault(regionInfo.getConsensusGroupId(), -1) + ? RegionRoleType.Leader.toString() + : RegionRoleType.Follower.toString(); + regionInfo.setRoleType(regionType); + }); + + return regionInfoListResp; - return regionInfoListResp; + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's read API.", e); + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + RegionInfoListResp resp = new RegionInfoListResp(); + resp.setStatus(res); + return resp; + } } /** @@ -988,7 +1016,14 @@ public class PartitionManager { new TTimePartitionSlot( req.getTimeStamp() - req.getTimeStamp() % COMMON_CONFIG.getTimePartitionInterval())); } - return (GetRegionIdResp) getConsensusManager().read(plan).getDataset(); + try { + return (GetRegionIdResp) getConsensusManager().read(plan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's read API.", e); + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + return new GetRegionIdResp(res, Collections.emptyList()); + } } public GetTimeSlotListResp getTimeSlotList(TGetTimeSlotListReq req) { @@ -1008,7 +1043,14 @@ public class PartitionManager { plan.setRegionId( new TConsensusGroupId(TConsensusGroupType.DataRegion, (int) req.getRegionId())); } - return (GetTimeSlotListResp) getConsensusManager().read(plan).getDataset(); + try { + return (GetTimeSlotListResp) getConsensusManager().read(plan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's read API.", e); + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + return new GetTimeSlotListResp(res, Collections.emptyList()); + } } public CountTimeSlotListResp countTimeSlotList(TCountTimeSlotListReq req) { @@ -1028,12 +1070,26 @@ public class PartitionManager { plan.setRegionId( new TConsensusGroupId(TConsensusGroupType.DataRegion, (int) req.getRegionId())); } - return (CountTimeSlotListResp) getConsensusManager().read(plan).getDataset(); + try { + return (CountTimeSlotListResp) getConsensusManager().read(plan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's read API.", e); + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + return new CountTimeSlotListResp(res, 0); + } } public GetSeriesSlotListResp getSeriesSlotList(TGetSeriesSlotListReq req) { GetSeriesSlotListPlan plan = new GetSeriesSlotListPlan(req.getDatabase(), req.getType()); - return (GetSeriesSlotListResp) getConsensusManager().read(plan).getDataset(); + try { + return (GetSeriesSlotListResp) getConsensusManager().read(plan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's read API.", e); + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + return new GetSeriesSlotListResp(res, Collections.emptyList()); + } } /** diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/plugin/PipePluginCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/plugin/PipePluginCoordinator.java index 6fb9e93a0d8..e13d6a6df01 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/plugin/PipePluginCoordinator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/plugin/PipePluginCoordinator.java @@ -31,6 +31,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TCreatePipePluginReq; import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq; import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp; import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.rpc.TSStatusCode; import org.slf4j.Logger; @@ -81,9 +82,9 @@ public class PipePluginCoordinator { public TGetPipePluginTableResp getPipePluginTable() { try { return ((PipePluginTableResp) - configManager.getConsensusManager().read(new GetPipePluginTablePlan()).getDataset()) + configManager.getConsensusManager().read(new GetPipePluginTablePlan())) .convertToThriftResponse(); - } catch (IOException e) { + } catch (IOException | ConsensusException e) { LOGGER.error("Fail to get PipePluginTable", e); return new TGetPipePluginTableResp( new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) @@ -93,11 +94,17 @@ public class PipePluginCoordinator { } public TGetJarInListResp getPipePluginJar(TGetJarInListReq req) { - return ((JarResp) - configManager - .getConsensusManager() - .read(new GetPipePluginJarPlan(req.getJarNameList())) - .getDataset()) - .convertToThriftResponse(); + try { + return ((JarResp) + configManager + .getConsensusManager() + .read(new GetPipePluginJarPlan(req.getJarNameList()))) + .convertToThriftResponse(); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's read API.", e); + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + return new JarResp(res, Collections.emptyList()).convertToThriftResponse(); + } } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java index 886a728916b..65dc59b1147 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java @@ -28,6 +28,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -141,10 +142,14 @@ public class PipeTaskCoordinator { public TShowPipeResp showPipes(TShowPipeReq req) { lock(); try { - return ((PipeTableResp) - configManager.getConsensusManager().read(new ShowPipePlanV2()).getDataset()) + return ((PipeTableResp) configManager.getConsensusManager().read(new ShowPipePlanV2())) .filter(req.whereClause, req.pipeName) .convertToTShowPipeResp(); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's read API.", e); + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + return new PipeTableResp(res, Collections.emptyList()).convertToTShowPipeResp(); } finally { unlock(); } @@ -153,10 +158,9 @@ public class PipeTaskCoordinator { public TGetAllPipeInfoResp getAllPipeInfo() { lock(); try { - return ((PipeTableResp) - configManager.getConsensusManager().read(new ShowPipePlanV2()).getDataset()) + return ((PipeTableResp) configManager.getConsensusManager().read(new ShowPipePlanV2())) .convertToTGetAllPipeInfoResp(); - } catch (IOException e) { + } catch (IOException | ConsensusException e) { LOGGER.warn("Failed to get all pipe info.", e); return new TGetAllPipeInfoResp( new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage()), diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java index 8ef9a1b64f3..638df19d499 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java @@ -259,7 +259,16 @@ public class ClusterSchemaManager { * @return CountDatabaseResp */ public CountDatabaseResp countMatchedDatabases(CountDatabasePlan countDatabasePlan) { - return (CountDatabaseResp) getConsensusManager().read(countDatabasePlan).getDataset(); + try { + return (CountDatabaseResp) getConsensusManager().read(countDatabasePlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's read API.", e); + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + CountDatabaseResp response = new CountDatabaseResp(); + response.setStatus(res); + return response; + } } /** @@ -270,8 +279,16 @@ public class ClusterSchemaManager { * @return DatabaseSchemaResp */ public DatabaseSchemaResp getMatchedDatabaseSchema(GetDatabasePlan getStorageGroupPlan) { - DatabaseSchemaResp resp = - (DatabaseSchemaResp) getConsensusManager().read(getStorageGroupPlan).getDataset(); + DatabaseSchemaResp resp; + try { + resp = (DatabaseSchemaResp) getConsensusManager().read(getStorageGroupPlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's read API.", e); + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + resp = new DatabaseSchemaResp(); + resp.setStatus(res); + } List<String> preDeletedDatabaseList = new ArrayList<>(); for (String database : resp.getSchemaMap().keySet()) { if (getPartitionManager().isDatabasePreDeleted(database)) { @@ -286,8 +303,16 @@ public class ClusterSchemaManager { /** Only used in cluster tool show Databases. */ public TShowDatabaseResp showDatabase(GetDatabasePlan getStorageGroupPlan) { - DatabaseSchemaResp databaseSchemaResp = - (DatabaseSchemaResp) getConsensusManager().read(getStorageGroupPlan).getDataset(); + DatabaseSchemaResp databaseSchemaResp; + try { + databaseSchemaResp = (DatabaseSchemaResp) getConsensusManager().read(getStorageGroupPlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's read API.", e); + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + databaseSchemaResp = new DatabaseSchemaResp(); + databaseSchemaResp.setStatus(res); + } if (databaseSchemaResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { // Return immediately if some Database doesn't exist return new TShowDatabaseResp().setStatus(databaseSchemaResp.getStatus()); @@ -683,7 +708,14 @@ public class ClusterSchemaManager { * @return TSStatus */ public TSStatus createTemplate(CreateSchemaTemplatePlan createSchemaTemplatePlan) { - return getConsensusManager().write(createSchemaTemplatePlan).getStatus(); + try { + return getConsensusManager().write(createSchemaTemplatePlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + return res; + } } /** @@ -693,8 +725,16 @@ public class ClusterSchemaManager { */ public TGetAllTemplatesResp getAllTemplates() { GetAllSchemaTemplatePlan getAllSchemaTemplatePlan = new GetAllSchemaTemplatePlan(); - TemplateInfoResp templateResp = - (TemplateInfoResp) getConsensusManager().read(getAllSchemaTemplatePlan).getDataset(); + TemplateInfoResp templateResp; + try { + templateResp = (TemplateInfoResp) getConsensusManager().read(getAllSchemaTemplatePlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's read API.", e); + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + templateResp = new TemplateInfoResp(); + templateResp.setStatus(res); + } TGetAllTemplatesResp resp = new TGetAllTemplatesResp(); resp.setStatus(templateResp.getStatus()); if (resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() @@ -709,8 +749,16 @@ public class ClusterSchemaManager { /** show nodes in schemaengine template */ public TGetTemplateResp getTemplate(String req) { GetSchemaTemplatePlan getSchemaTemplatePlan = new GetSchemaTemplatePlan(req); - TemplateInfoResp templateResp = - (TemplateInfoResp) getConsensusManager().read(getSchemaTemplatePlan).getDataset(); + TemplateInfoResp templateResp; + try { + templateResp = (TemplateInfoResp) getConsensusManager().read(getSchemaTemplatePlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's read API.", e); + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + templateResp = new TemplateInfoResp(); + templateResp.setStatus(res); + } TGetTemplateResp resp = new TGetTemplateResp(); if (templateResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() && templateResp.getTemplateList() != null @@ -725,8 +773,16 @@ public class ClusterSchemaManager { /** show path set template xx */ public TGetPathsSetTemplatesResp getPathsSetTemplate(String templateName) { GetPathsSetTemplatePlan getPathsSetTemplatePlan = new GetPathsSetTemplatePlan(templateName); - PathInfoResp pathInfoResp = - (PathInfoResp) getConsensusManager().read(getPathsSetTemplatePlan).getDataset(); + PathInfoResp pathInfoResp; + try { + pathInfoResp = (PathInfoResp) getConsensusManager().read(getPathsSetTemplatePlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's read API.", e); + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + pathInfoResp = new PathInfoResp(); + pathInfoResp.setStatus(res); + } if (pathInfoResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { TGetPathsSetTemplatesResp resp = new TGetPathsSetTemplatesResp(); resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); @@ -742,21 +798,42 @@ public class ClusterSchemaManager { * template info won't be taken */ public byte[] getAllTemplateSetInfo() { - AllTemplateSetInfoResp resp = - (AllTemplateSetInfoResp) - getConsensusManager().read(new GetAllTemplateSetInfoPlan()).getDataset(); - return resp.getTemplateInfo(); + try { + AllTemplateSetInfoResp resp = + (AllTemplateSetInfoResp) getConsensusManager().read(new GetAllTemplateSetInfoPlan()); + return resp.getTemplateInfo(); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's read API.", e); + return null; + } } public TemplateSetInfoResp getTemplateSetInfo(List<PartialPath> patternList) { - return (TemplateSetInfoResp) - getConsensusManager().read(new GetTemplateSetInfoPlan(patternList)).getDataset(); + try { + return (TemplateSetInfoResp) + getConsensusManager().read(new GetTemplateSetInfoPlan(patternList)); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's read API.", e); + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + TemplateSetInfoResp response = new TemplateSetInfoResp(); + response.setStatus(res); + return response; + } } public Pair<TSStatus, Template> checkIsTemplateSetOnPath(String templateName, String path) { GetSchemaTemplatePlan getSchemaTemplatePlan = new GetSchemaTemplatePlan(templateName); - TemplateInfoResp templateResp = - (TemplateInfoResp) getConsensusManager().read(getSchemaTemplatePlan).getDataset(); + TemplateInfoResp templateResp; + try { + templateResp = (TemplateInfoResp) getConsensusManager().read(getSchemaTemplatePlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's read API.", e); + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + templateResp = new TemplateInfoResp(); + templateResp.setStatus(res); + } if (templateResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { if (templateResp.getTemplateList() == null || templateResp.getTemplateList().isEmpty()) { return new Pair<>( @@ -770,8 +847,16 @@ public class ClusterSchemaManager { } GetPathsSetTemplatePlan getPathsSetTemplatePlan = new GetPathsSetTemplatePlan(templateName); - PathInfoResp pathInfoResp = - (PathInfoResp) getConsensusManager().read(getPathsSetTemplatePlan).getDataset(); + PathInfoResp pathInfoResp; + try { + pathInfoResp = (PathInfoResp) getConsensusManager().read(getPathsSetTemplatePlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's read API.", e); + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + pathInfoResp = new PathInfoResp(); + pathInfoResp.setStatus(res); + } if (pathInfoResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { List<String> templateSetPathList = pathInfoResp.getPathList(); if (templateSetPathList == null @@ -827,8 +912,16 @@ public class ClusterSchemaManager { // check template existence GetSchemaTemplatePlan getSchemaTemplatePlan = new GetSchemaTemplatePlan(templateName); - TemplateInfoResp templateInfoResp = - (TemplateInfoResp) getConsensusManager().read(getSchemaTemplatePlan).getDataset(); + TemplateInfoResp templateInfoResp; + try { + templateInfoResp = (TemplateInfoResp) getConsensusManager().read(getSchemaTemplatePlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's read API.", e); + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + templateInfoResp = new TemplateInfoResp(); + templateInfoResp.setStatus(res); + } if (templateInfoResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { return templateInfoResp.getStatus(); } else if (templateInfoResp.getTemplateList() == null @@ -840,8 +933,16 @@ public class ClusterSchemaManager { // check is template set on some path, block all template set operation GetPathsSetTemplatePlan getPathsSetTemplatePlan = new GetPathsSetTemplatePlan(templateName); - PathInfoResp pathInfoResp = - (PathInfoResp) getConsensusManager().read(getPathsSetTemplatePlan).getDataset(); + PathInfoResp pathInfoResp; + try { + pathInfoResp = (PathInfoResp) getConsensusManager().read(getPathsSetTemplatePlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's read API.", e); + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + pathInfoResp = new PathInfoResp(); + pathInfoResp.setStatus(res); + } if (pathInfoResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { return pathInfoResp.getStatus(); } else if (pathInfoResp.getPathList() != null && !pathInfoResp.getPathList().isEmpty()) { @@ -899,7 +1000,14 @@ public class ClusterSchemaManager { ExtendSchemaTemplatePlan extendSchemaTemplatePlan = new ExtendSchemaTemplatePlan(templateExtendInfo); - TSStatus status = getConsensusManager().write(extendSchemaTemplatePlan).getStatus(); + TSStatus status; + try { + status = getConsensusManager().write(extendSchemaTemplatePlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + status = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + status.setMessage(e.getMessage()); + } if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { return status; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTemplateProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTemplateProcedure.java index a8be16f171e..c12c4447c2b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTemplateProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTemplateProcedure.java @@ -149,12 +149,18 @@ public class SetTemplateProcedure // check whether the template can be set on given path CheckTemplateSettablePlan checkTemplateSettablePlan = new CheckTemplateSettablePlan(templateName, templateSetPath); - TemplateInfoResp resp = - (TemplateInfoResp) - env.getConfigManager() - .getConsensusManager() - .read(checkTemplateSettablePlan) - .getDataset(); + TemplateInfoResp resp; + try { + resp = + (TemplateInfoResp) + env.getConfigManager().getConsensusManager().read(checkTemplateSettablePlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's read API.", e); + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + resp = new TemplateInfoResp(); + resp.setStatus(res); + } if (resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { setNextState(SetTemplateState.PRE_SET); } else { @@ -221,9 +227,18 @@ public class SetTemplateProcedure private Template getTemplate(ConfigNodeProcedureEnv env) { GetSchemaTemplatePlan getSchemaTemplatePlan = new GetSchemaTemplatePlan(templateName); - TemplateInfoResp templateResp = - (TemplateInfoResp) - env.getConfigManager().getConsensusManager().read(getSchemaTemplatePlan).getDataset(); + TemplateInfoResp templateResp; + try { + templateResp = + (TemplateInfoResp) + env.getConfigManager().getConsensusManager().read(getSchemaTemplatePlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's read API.", e); + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + templateResp = new TemplateInfoResp(); + templateResp.setStatus(res); + } if (templateResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { setFailure( new ProcedureException( diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java index 8b95ae0d4d0..39eab9c5dd7 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java @@ -27,6 +27,7 @@ import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.consensus.common.ConsensusGroup; import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.config.ConsensusConfig; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.consensus.iot.util.TestEntry; import org.apache.iotdb.consensus.iot.util.TestStateMachine; import org.apache.iotdb.tsfile.utils.PublicBAOS; @@ -157,7 +158,8 @@ public class ReplicateTest { * The three nodes use the requests in the queue to replicate the requests to the other two nodes. */ @Test - public void replicateUsingQueueTest() throws IOException, InterruptedException { + public void replicateUsingQueueTest() + throws IOException, InterruptedException, ConsensusException { logger.info("Start ReplicateUsingQueueTest"); servers.get(0).createLocalPeer(group.getGroupId(), group.getPeers()); servers.get(1).createLocalPeer(group.getGroupId(), group.getPeers()); @@ -235,7 +237,7 @@ public class ReplicateTest { * nodes finally consistent. */ @Test - public void replicateUsingWALTest() throws IOException, InterruptedException { + public void replicateUsingWALTest() throws IOException, InterruptedException, ConsensusException { logger.info("Start ReplicateUsingWALTest"); servers.get(0).createLocalPeer(group.getGroupId(), group.getPeers()); servers.get(1).createLocalPeer(group.getGroupId(), group.getPeers()); diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java index d9cd902538a..8da2d467248 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java @@ -19,6 +19,7 @@ package org.apache.iotdb.consensus.ratis; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.consensus.IConsensus; import org.apache.iotdb.consensus.IStateMachine; @@ -26,10 +27,10 @@ import org.apache.iotdb.consensus.common.ConsensusGroup; import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest; import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse; -import org.apache.iotdb.consensus.common.response.ConsensusReadResponse; -import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; import org.apache.iotdb.consensus.config.RatisConfig; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.consensus.exception.RatisRequestFailedException; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.ratis.util.TimeDuration; import org.junit.After; @@ -220,11 +221,15 @@ public class RatisConsensusTest { () -> { ByteBufferConsensusRequest incrReq = TestUtils.TestRequest.incrRequest(); - ConsensusWriteResponse response = consensus.write(gid, incrReq); - if (response.getException() != null) { - response.getException().printStackTrace(System.out); + TSStatus response; + try { + response = consensus.write(gid, incrReq); + } catch (ConsensusException e) { + response = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + response.setMessage(e.getMessage()); + e.printStackTrace(System.out); } - Assert.assertEquals(200, response.getStatus().getCode()); + Assert.assertEquals(200, response.getCode()); latch.countDown(); }); } @@ -253,8 +258,7 @@ public class RatisConsensusTest { Assert.assertNotNull(leader); // Check we reached a consensus - ConsensusReadResponse response = leader.read(gid, getReq); - TestUtils.TestDataSet result = (TestUtils.TestDataSet) response.getDataset(); + TestUtils.TestDataSet result = (TestUtils.TestDataSet) leader.read(gid, getReq); Assert.assertEquals(target, result.getNumber()); } } diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java index c90d5d9952d..a4f7c2a1ecb 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java @@ -20,10 +20,11 @@ package org.apache.iotdb.consensus.ratis; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.consensus.ConsensusGroupId; +import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.common.request.IConsensusRequest; -import org.apache.iotdb.consensus.common.response.ConsensusReadResponse; import org.apache.iotdb.consensus.config.RatisConfig; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.consensus.exception.RatisUnderRecoveryException; import org.apache.ratis.util.TimeDuration; @@ -171,20 +172,20 @@ public class RecoverReadTest { // try max 3 minutes final long startTs = System.currentTimeMillis(); - ConsensusReadResponse resp = TestUtils.doRead(miniCluster.getServer(0), gid); - while (!resp.isSuccess()) { + DataSet resp; + try { + resp = TestUtils.doRead(miniCluster.getServer(0), gid); + } catch (ConsensusException e) { final long timeElapsed = System.currentTimeMillis() - startTs; if (timeElapsed > 1000 * 60 * 3) { // 3 min - Assert.fail( - "Linearizable read failed after 3 minutes, last exception seen: " - + resp.getException()); + Assert.fail("Linearizable read failed after 3 minutes, last exception seen: " + e); } Thread.sleep(100); - logger.info("linearizable read failed when restart, retrying: ", resp.getException()); + logger.info("linearizable read failed when restart, retrying: ", e); resp = TestUtils.doRead(miniCluster.getServer(0), gid); } - Assert.assertEquals(10, ((TestUtils.TestDataSet) resp.getDataset()).getNumber()); + Assert.assertEquals(10, ((TestUtils.TestDataSet) resp).getNumber()); } @Test @@ -209,8 +210,11 @@ public class RecoverReadTest { miniCluster.restart(); // query during redo: get exception that ratis is under recovery - final ConsensusReadResponse readResponse = TestUtils.doRead(miniCluster.getServer(0), gid); - Assert.assertTrue(readResponse.getException() instanceof RatisUnderRecoveryException); + try { + TestUtils.doRead(miniCluster.getServer(0), gid); + } catch (ConsensusException e) { + Assert.assertTrue(e instanceof RatisUnderRecoveryException); + } } @Test @@ -238,7 +242,10 @@ public class RecoverReadTest { miniCluster.waitUntilActiveLeader(); // query during redo: get exception that ratis is under recovery - final ConsensusReadResponse readResponse = TestUtils.doRead(miniCluster.getServer(0), gid); - Assert.assertTrue(readResponse.getException() instanceof RatisUnderRecoveryException); + try { + TestUtils.doRead(miniCluster.getServer(0), gid); + } catch (ConsensusException e) { + Assert.assertTrue(e instanceof RatisUnderRecoveryException); + } } } diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java index 604d6afcec0..75e0426aa5e 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java @@ -33,8 +33,6 @@ import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest; import org.apache.iotdb.consensus.common.request.IConsensusRequest; -import org.apache.iotdb.consensus.common.response.ConsensusReadResponse; -import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; import org.apache.iotdb.consensus.config.ConsensusConfig; import org.apache.iotdb.consensus.config.RatisConfig; import org.apache.iotdb.consensus.exception.ConsensusException; @@ -379,24 +377,22 @@ public class TestUtils { } } - static void write(IConsensus consensus, ConsensusGroupId gid, int count) { + static void write(IConsensus consensus, ConsensusGroupId gid, int count) + throws ConsensusException { for (int i = 0; i < count; i++) { final ByteBufferConsensusRequest increment = TestRequest.incrRequest(); - final ConsensusWriteResponse response = consensus.write(gid, increment); - Assert.assertEquals(200, response.getStatus().getCode()); + final TSStatus response = consensus.write(gid, increment); + Assert.assertEquals(200, response.getCode()); } } static int read(IConsensus consensus, ConsensusGroupId gid) throws ConsensusException { - final ConsensusReadResponse response = doRead(consensus, gid); - if (!response.isSuccess()) { - throw response.getException(); - } - final TestUtils.TestDataSet result = (TestUtils.TestDataSet) response.getDataset(); + final DataSet response = doRead(consensus, gid); + final TestUtils.TestDataSet result = (TestUtils.TestDataSet) response; return result.getNumber(); } - static ConsensusReadResponse doRead(IConsensus consensus, ConsensusGroupId gid) { + static DataSet doRead(IConsensus consensus, ConsensusGroupId gid) throws ConsensusException { final ByteBufferConsensusRequest getReq = TestUtils.TestRequest.getRequest(); return consensus.read(gid, getReq); } diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java index 5cd64edf167..e70c1a6b24e 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java @@ -37,10 +37,7 @@ import org.apache.iotdb.consensus.common.request.IConsensusRequest; import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse; import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; import org.apache.iotdb.consensus.config.ConsensusConfig; -import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException; -import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException; -import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException; -import org.apache.iotdb.consensus.exception.IllegalPeerNumException; +import org.apache.iotdb.consensus.exception.*; import org.apache.ratis.util.FileUtils; import org.junit.After; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 8c2d1be433c..6c150b8fc70 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -944,7 +944,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface return exceptionMessages.isEmpty() ? new TPushPipeMetaResp() - .setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())) + .setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())) : new TPushPipeMetaResp() .setStatus(new TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode())) .setExceptionMessages(exceptionMessages); @@ -1802,16 +1802,16 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface // kill the datanode process 20 seconds later // because datanode process cannot exit normally for the reason of InterruptedException new Thread( - () -> { - try { - TimeUnit.SECONDS.sleep(20); - } catch (InterruptedException e) { - LOGGER.warn("Meets InterruptedException in stopDataNode RPC method"); - } finally { - LOGGER.info("Executing system.exit(0) in stopDataNode RPC method after 20 seconds"); - System.exit(0); - } - }) + () -> { + try { + TimeUnit.SECONDS.sleep(20); + } catch (InterruptedException e) { + LOGGER.warn("Meets InterruptedException in stopDataNode RPC method"); + } finally { + LOGGER.info("Executing system.exit(0) in stopDataNode RPC method after 20 seconds"); + System.exit(0); + } + }) .start(); try { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java index 4e2506a2d3f..9ac38701dac 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java @@ -23,7 +23,7 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.consensus.IConsensus; -import org.apache.iotdb.consensus.common.response.ConsensusReadResponse; +import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.db.consensus.DataRegionConsensusImpl; import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl; import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceInfo; @@ -69,7 +69,7 @@ public class RegionReadExecutor { public RegionExecutionResult execute( ConsensusGroupId groupId, FragmentInstance fragmentInstance) { // execute fragment instance in state machine - ConsensusReadResponse readResponse; + DataSet readResponse; try (SetThreadName threadName = new SetThreadName(fragmentInstance.getId().getFullId())) { if (groupId instanceof DataRegionId) { readResponse = dataRegionConsensus.read(groupId, fragmentInstance); @@ -81,20 +81,8 @@ public class RegionReadExecutor { LOGGER.error(RESPONSE_NULL_ERROR_MSG); resp.setAccepted(false); resp.setMessage(RESPONSE_NULL_ERROR_MSG); - } else if (!readResponse.isSuccess()) { - LOGGER.error( - "Execute FragmentInstance in ConsensusGroup {} failed.", - groupId, - readResponse.getException()); - resp.setAccepted(false); - resp.setMessage( - String.format( - ERROR_MSG_FORMAT, - readResponse.getException() == null - ? "" - : readResponse.getException().getMessage())); } else { - FragmentInstanceInfo info = (FragmentInstanceInfo) readResponse.getDataset(); + FragmentInstanceInfo info = (FragmentInstanceInfo) readResponse; resp.setAccepted(!info.getState().isFailed()); resp.setMessage(info.getMessage()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java index d89e9bb0480..79f703e8e65 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java @@ -128,15 +128,11 @@ public class DataNode implements DataNodeMBean { private final TEndPoint thisNode = new TEndPoint(); - /** - * Hold the information of trigger, udf...... - */ + /** Hold the information of trigger, udf...... */ private final ResourcesInformationHolder resourcesInformationHolder = new ResourcesInformationHolder(); - /** - * Responsible for keeping trigger information up to date. - */ + /** Responsible for keeping trigger information up to date. */ private final TriggerInformationUpdater triggerInformationUpdater = new TriggerInformationUpdater(); @@ -207,9 +203,7 @@ public class DataNode implements DataNodeMBean { } } - /** - * Prepare cluster IoTDB-DataNode - */ + /** Prepare cluster IoTDB-DataNode */ private boolean prepareDataNode() throws StartupException, IOException { // Set cluster mode config.setClusterMode(true); @@ -354,7 +348,7 @@ public class DataNode implements DataNodeMBean { * Register this DataNode into cluster. * * @throws StartupException if register failed. - * @throws IOException if serialize cluster name and dataNode Id failed. + * @throws IOException if serialize cluster name and dataNode Id failed. */ private void sendRegisterRequestToConfigNode() throws StartupException, IOException { logger.info("Sending register request to ConfigNode-leader..."); @@ -558,9 +552,7 @@ public class DataNode implements DataNodeMBean { registerManager.register(PipeAgent.runtime()); } - /** - * Set up RPC and protocols after DataNode is available - */ + /** Set up RPC and protocols after DataNode is available */ private void setUpRPCService() throws StartupException { // Start InternalRPCService to indicate that the current DataNode can accept cluster scheduling registerManager.register(DataNodeInternalRPCService.getInstance()); @@ -705,9 +697,7 @@ public class DataNode implements DataNodeMBean { } } - /** - * Generate a list for UDFs that do not have jar on this node. - */ + /** Generate a list for UDFs that do not have jar on this node. */ private List<UDFInformation> getJarListForUDF() { List<UDFInformation> res = new ArrayList<>(); for (UDFInformation udfInformation : resourcesInformationHolder.getUDFInformationList()) { @@ -820,9 +810,7 @@ public class DataNode implements DataNodeMBean { } } - /** - * Generate a list for triggers that do not have jar on this node. - */ + /** Generate a list for triggers that do not have jar on this node. */ private List<TriggerInformation> getJarListForTrigger() { List<TriggerInformation> res = new ArrayList<>(); for (TriggerInformation triggerInformation : diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java index 8cd6673f3c0..4a6d2eb5044 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/IoTDBShutdownHook.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.service; -import java.io.IOException; import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.concurrent.ThreadName; @@ -43,6 +42,8 @@ import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; + public class IoTDBShutdownHook extends Thread { private static final Logger logger = LoggerFactory.getLogger(IoTDBShutdownHook.class); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutorTest.java index 9ae205aebef..426a5a729fd 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutorTest.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.consensus.SchemaRegionId; import org.apache.iotdb.consensus.IConsensus; +import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.common.response.ConsensusReadResponse; import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; @@ -46,7 +47,7 @@ import static org.junit.Assert.assertTrue; public class RegionReadExecutorTest { @Test - public void testSuccessfulExecute() { + public void testSuccessfulExecute() throws ConsensusException { // data query ConsensusGroupId dataRegionGroupId = new DataRegionId(1); @@ -70,7 +71,7 @@ public class RegionReadExecutorTest { Mockito.when(fragmentInstanceInfo.getMessage()).thenReturn("data-success"); Mockito.when(dataRegionConsensus.read(dataRegionGroupId, fragmentInstance)) - .thenReturn(readResponse); + .thenReturn((DataSet) readResponse); RegionExecutionResult res = executor.execute(dataRegionGroupId, fragmentInstance); @@ -86,7 +87,7 @@ public class RegionReadExecutorTest { Mockito.when(fragmentInstanceInfo.getMessage()).thenReturn("schema-success"); Mockito.when(schemaRegionConsensus.read(schemaRegionGroupId, fragmentInstance)) - .thenReturn(readResponse); + .thenReturn((DataSet) readResponse); res = executor.execute(schemaRegionGroupId, fragmentInstance); @@ -95,7 +96,7 @@ public class RegionReadExecutorTest { } @Test - public void testResponseNullExecute() { + public void testResponseNullExecute() throws ConsensusException { // data query ConsensusGroupId dataRegionGroupId = new DataRegionId(1); @@ -131,7 +132,7 @@ public class RegionReadExecutorTest { } @Test - public void testFailedExecute() { + public void testFailedExecute() throws ConsensusException { // data query ConsensusGroupId dataRegionGroupId = new DataRegionId(1); @@ -152,7 +153,7 @@ public class RegionReadExecutorTest { Mockito.when(readResponse.getException()).thenReturn(new ConsensusException("data-exception")); Mockito.when(dataRegionConsensus.read(dataRegionGroupId, fragmentInstance)) - .thenReturn(readResponse); + .thenReturn((DataSet) readResponse); RegionExecutionResult res = executor.execute(dataRegionGroupId, fragmentInstance); @@ -166,7 +167,7 @@ public class RegionReadExecutorTest { Mockito.when(readResponse.getException()).thenReturn(null); Mockito.when(schemaRegionConsensus.read(schemaRegionGroupId, fragmentInstance)) - .thenReturn(readResponse); + .thenReturn((DataSet) readResponse); res = executor.execute(schemaRegionGroupId, fragmentInstance); @@ -175,7 +176,7 @@ public class RegionReadExecutorTest { } @Test - public void testExceptionHappened() { + public void testExceptionHappened() throws ConsensusException { ConsensusGroupId dataRegionGroupId = new DataRegionId(1); FragmentInstanceId fragmentInstanceId = diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutorTest.java index 6e8eafd851c..59eefb601b3 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutorTest.java @@ -26,6 +26,7 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.consensus.IConsensus; import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.db.protocol.thrift.impl.DataNodeRegionManager; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; @@ -49,7 +50,7 @@ import static org.junit.Assert.fail; public class RegionWriteExecutorTest { @Test - public void testInsertRowNode() { + public void testInsertRowNode() throws ConsensusException { IConsensus dataRegionConsensus = Mockito.mock(IConsensus.class); IConsensus schemaRegionConsensus = Mockito.mock(IConsensus.class);
