This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch consensus_module_refactor in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8b9d90ff1e6fea9d1ed1b98533f4e1fd006a484f Author: BUAAserein <[email protected]> AuthorDate: Wed Aug 16 22:37:48 2023 +0800 refactor write --- .../confignode/manager/ClusterQuotaManager.java | 48 ++++----- .../iotdb/confignode/manager/ModelManager.java | 32 +++--- .../confignode/manager/PermissionManager.java | 29 ++++-- .../iotdb/confignode/manager/TriggerManager.java | 41 ++++---- .../iotdb/confignode/manager/UDFManager.java | 7 +- .../manager/consensus/ConsensusManager.java | 8 +- .../iotdb/confignode/manager/cq/CQManager.java | 15 ++- .../confignode/manager/cq/CQScheduleTask.java | 27 ++--- .../iotdb/confignode/manager/node/NodeManager.java | 44 +++++++-- .../manager/partition/PartitionManager.java | 38 ++++--- .../manager/schema/ClusterSchemaManager.java | 107 +++++++++++++++++--- .../procedure/env/ConfigNodeProcedureEnv.java | 18 +++- .../procedure/env/DataNodeRemoveHandler.java | 7 +- .../procedure/impl/cq/CreateCQProcedure.java | 110 ++++++++++----------- .../procedure/impl/model/CreateModelProcedure.java | 18 ++-- .../procedure/impl/model/DropModelProcedure.java | 7 +- .../pipe/plugin/CreatePipePluginProcedure.java | 27 +++-- .../impl/pipe/plugin/DropPipePluginProcedure.java | 13 ++- .../runtime/PipeHandleLeaderChangeProcedure.java | 19 ++-- .../runtime/PipeHandleMetaChangeProcedure.java | 23 +++-- .../impl/pipe/task/CreatePipeProcedureV2.java | 42 +++++--- .../impl/pipe/task/DropPipeProcedureV2.java | 18 +++- .../impl/pipe/task/StartPipeProcedureV2.java | 42 +++++--- .../impl/pipe/task/StopPipeProcedureV2.java | 42 +++++--- .../impl/schema/DeleteDatabaseProcedure.java | 3 +- .../impl/schema/SetTemplateProcedure.java | 41 ++++++-- .../statemachine/CreateRegionGroupsProcedure.java | 7 +- .../impl/trigger/CreateTriggerProcedure.java | 32 ++++-- .../procedure/store/ConfigProcedureStore.java | 13 ++- .../org/apache/iotdb/consensus/IConsensus.java | 44 ++++----- .../apache/iotdb/consensus/iot/IoTConsensus.java | 88 +++++++++-------- .../iotdb/consensus/ratis/RatisConsensus.java | 60 +++++------ .../iotdb/consensus/simple/SimpleConsensus.java | 1 - .../execution/executor/RegionWriteExecutor.java | 47 ++++----- 34 files changed, 683 insertions(+), 435 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterQuotaManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterQuotaManager.java index e013d1f6d46..01ee76691bf 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterQuotaManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterQuotaManager.java @@ -36,7 +36,7 @@ import org.apache.iotdb.confignode.persistence.quota.QuotaInfo; import org.apache.iotdb.confignode.rpc.thrift.TShowThrottleReq; import org.apache.iotdb.confignode.rpc.thrift.TSpaceQuotaResp; import org.apache.iotdb.confignode.rpc.thrift.TThrottleQuotaResp; -import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -79,12 +79,12 @@ public class ClusterQuotaManager { "The used quota exceeds the preset quota. Please set a larger value."); } // TODO: Datanode failed to receive rpc - ConsensusWriteResponse response = - configManager - .getConsensusManager() - .write(new SetSpaceQuotaPlan(req.getDatabase(), req.getSpaceLimit())); - if (response.getStatus() != null) { - if (response.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + try { + TSStatus response = + configManager + .getConsensusManager() + .write(new SetSpaceQuotaPlan(req.getDatabase(), req.getSpaceLimit())); + if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { Map<Integer, TDataNodeLocation> dataNodeLocationMap = configManager.getNodeManager().getRegisteredDataNodeLocations(); AsyncClientHandler<TSetSpaceQuotaReq, TSStatus> clientHandler = @@ -92,15 +92,12 @@ public class ClusterQuotaManager { AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler); return RpcUtils.squashResponseStatusList(clientHandler.getResponseList()); } - return response.getStatus(); - } else { - LOGGER.warn( - "Unexpected error happened while setting space quota on {}: ", - req.getDatabase().toString(), - response.getException()); + return response; + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); // consensus layer related errors TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); - res.setMessage(response.getErrorMessage()); + res.setMessage(e.getMessage()); return res; } } @@ -183,12 +180,12 @@ public class ClusterQuotaManager { } public TSStatus setThrottleQuota(TSetThrottleQuotaReq req) { - ConsensusWriteResponse response = - configManager - .getConsensusManager() - .write(new SetThrottleQuotaPlan(req.getUserName(), req.getThrottleQuota())); - if (response.getStatus() != null) { - if (response.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + try { + TSStatus response = + configManager + .getConsensusManager() + .write(new SetThrottleQuotaPlan(req.getUserName(), req.getThrottleQuota())); + if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { Map<Integer, TDataNodeLocation> dataNodeLocationMap = configManager.getNodeManager().getRegisteredDataNodeLocations(); AsyncClientHandler<TSetThrottleQuotaReq, TSStatus> clientHandler = @@ -197,15 +194,12 @@ public class ClusterQuotaManager { AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler); return RpcUtils.squashResponseStatusList(clientHandler.getResponseList()); } - return response.getStatus(); - } else { - LOGGER.warn( - "Unexpected error happened while setting throttle quota on user: {}: ", - req.getUserName(), - response.getException()); + return response; + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); // consensus layer related errors TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); - res.setMessage(response.getErrorMessage()); + res.setMessage(e.getMessage()); return res; } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ModelManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ModelManager.java index 425bafbd4c0..7fd18b5efca 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ModelManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ModelManager.java @@ -37,7 +37,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowTrailResp; import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelInfoReq; import org.apache.iotdb.confignode.rpc.thrift.TUpdateModelStateReq; import org.apache.iotdb.consensus.common.response.ConsensusReadResponse; -import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.rpc.TSStatusCode; import org.slf4j.Logger; @@ -79,35 +79,25 @@ public class ModelManager { } public TSStatus updateModelInfo(TUpdateModelInfoReq req) { - ConsensusWriteResponse response = - configManager.getConsensusManager().write(new UpdateModelInfoPlan(req)); - if (response.getStatus() != null) { - return response.getStatus(); - } else { - LOGGER.warn( - "Unexpected error happened while updating model {}: ", - req.getModelId(), - response.getException()); + try { + return configManager.getConsensusManager().write(new UpdateModelInfoPlan(req)); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); // consensus layer related errors TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); - res.setMessage(response.getErrorMessage()); + res.setMessage(e.getMessage()); return res; } } public TSStatus updateModelState(TUpdateModelStateReq req) { - ConsensusWriteResponse response = - configManager.getConsensusManager().write(new UpdateModelStatePlan(req)); - if (response.getStatus() != null) { - return response.getStatus(); - } else { - LOGGER.warn( - "Unexpected error happened while updating state of model {}: ", - req.getModelId(), - response.getException()); + try { + return configManager.getConsensusManager().write(new UpdateModelStatePlan(req)); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); // consensus layer related errors TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); - res.setMessage(response.getErrorMessage()); + res.setMessage(e.getMessage()); return res; } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java index bfe0c3925fc..1f5c5a999d8 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java @@ -30,15 +30,21 @@ import org.apache.iotdb.confignode.consensus.response.auth.PermissionInfoResp; import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; import org.apache.iotdb.confignode.persistence.AuthorInfo; import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.List; /** Manager permission read and operation. */ public class PermissionManager { + private static final Logger LOGGER = LoggerFactory.getLogger(PermissionManager.class); + private final ConfigManager configManager; private final AuthorInfo authorInfo; @@ -56,16 +62,23 @@ public class PermissionManager { public TSStatus operatePermission(AuthorPlan authorPlan) { TSStatus tsStatus; // If the permissions change, clear the cache content affected by the operation - if (authorPlan.getAuthorType() == ConfigPhysicalPlanType.CreateUser - || authorPlan.getAuthorType() == ConfigPhysicalPlanType.CreateRole) { - tsStatus = getConsensusManager().write(authorPlan).getStatus(); - } else { - tsStatus = invalidateCache(authorPlan.getUserName(), authorPlan.getRoleName()); - if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - tsStatus = getConsensusManager().write(authorPlan).getStatus(); + try { + if (authorPlan.getAuthorType() == ConfigPhysicalPlanType.CreateUser + || authorPlan.getAuthorType() == ConfigPhysicalPlanType.CreateRole) { + tsStatus = getConsensusManager().write(authorPlan); + } else { + tsStatus = invalidateCache(authorPlan.getUserName(), authorPlan.getRoleName()); + if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + tsStatus = getConsensusManager().write(authorPlan); + } } + return tsStatus; + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + return res; } - return tsStatus; } /** diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java index 1cb38042600..e42a0fb029e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TriggerManager.java @@ -47,6 +47,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp; import org.apache.iotdb.confignode.rpc.thrift.TGetLocationForTriggerResp; import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp; import org.apache.iotdb.confignode.rpc.thrift.TTriggerState; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -58,7 +59,6 @@ import org.apache.iotdb.tsfile.utils.Binary; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; @@ -135,12 +135,9 @@ public class TriggerManager { public TGetTriggerTableResp getTriggerTable(boolean onlyStateful) { try { return ((TriggerTableResp) - configManager - .getConsensusManager() - .read(new GetTriggerTablePlan(onlyStateful)) - .getDataset()) + configManager.getConsensusManager().read(new GetTriggerTablePlan(onlyStateful))) .convertToThriftResponse(); - } catch (IOException e) { + } catch (Exception e) { LOGGER.error("Fail to get TriggerTable", e); return new TGetTriggerTableResp( new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) @@ -150,12 +147,16 @@ public class TriggerManager { } public TGetLocationForTriggerResp getLocationOfStatefulTrigger(String triggerName) { - return ((TriggerLocationResp) - configManager - .getConsensusManager() - .read(new GetTriggerLocationPlan(triggerName)) - .getDataset()) - .convertToThriftResponse(); + try { + return ((TriggerLocationResp) + configManager.getConsensusManager().read(new GetTriggerLocationPlan(triggerName))) + .convertToThriftResponse(); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + return new TGetLocationForTriggerResp( + new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .setMessage(e.getMessage())); + } } public TGetJarInListResp getTriggerJar(TGetJarInListReq req) { @@ -191,16 +192,13 @@ public class TriggerManager { NodeManager nodeManager = configManager.getNodeManager(); transferResult = - consensusManager - .write(new UpdateTriggersOnTransferNodesPlan(newUnknownDataNodeList)) - .getStatus(); + consensusManager.write(new UpdateTriggersOnTransferNodesPlan(newUnknownDataNodeList)); if (transferResult.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { return transferResult; } List<String> transferringTriggers = - ((TransferringTriggersResp) - consensusManager.read(new GetTransferringTriggersPlan()).getDataset()) + ((TransferringTriggersResp) consensusManager.read(new GetTransferringTriggersPlan())) .getTransferringTriggers(); for (String trigger : transferringTriggers) { @@ -215,13 +213,16 @@ public class TriggerManager { } transferResult = - consensusManager - .write(new UpdateTriggerLocationPlan(trigger, newDataNodeLocation)) - .getStatus(); + consensusManager.write(new UpdateTriggerLocationPlan(trigger, newDataNodeLocation)); if (transferResult.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { return transferResult; } } + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + return res; } finally { triggerInfo.releaseTriggerTableLock(); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java index a7f5ef245b8..f2b747104c1 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/UDFManager.java @@ -110,7 +110,7 @@ public class UDFManager { LOGGER.info("Start to add UDF [{}] in UDF_Table on Config Nodes", udfName); - return configManager.getConsensusManager().write(createFunctionPlan).getStatus(); + return configManager.getConsensusManager().write(createFunctionPlan); } catch (Exception e) { LOGGER.warn(e.getMessage(), e); return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) @@ -143,10 +143,7 @@ public class UDFManager { return result; } - return configManager - .getConsensusManager() - .write(new DropFunctionPlan(functionName)) - .getStatus(); + return configManager.getConsensusManager().write(new DropFunctionPlan(functionName)); } catch (Exception e) { LOGGER.warn(e.getMessage(), e); return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java index 95182ce9ab1..5d0d0c6551f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java @@ -36,11 +36,11 @@ import org.apache.iotdb.confignode.manager.IManager; import org.apache.iotdb.confignode.manager.node.NodeManager; import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.consensus.IConsensus; +import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.common.Peer; -import org.apache.iotdb.consensus.common.response.ConsensusReadResponse; -import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; import org.apache.iotdb.consensus.config.ConsensusConfig; import org.apache.iotdb.consensus.config.RatisConfig; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.ratis.util.SizeInBytes; @@ -294,12 +294,12 @@ public class ConsensusManager { } /** Transmit PhysicalPlan to confignode.consensus.statemachine */ - public ConsensusWriteResponse write(ConfigPhysicalPlan plan) { + public TSStatus write(ConfigPhysicalPlan plan) throws ConsensusException { return consensusImpl.write(DEFAULT_CONSENSUS_GROUP_ID, plan); } /** Transmit PhysicalPlan to confignode.consensus.statemachine */ - public ConsensusReadResponse read(ConfigPhysicalPlan plan) { + public DataSet read(ConfigPhysicalPlan plan) throws ConsensusException { return consensusImpl.read(DEFAULT_CONSENSUS_GROUP_ID, plan); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java index bbbae4f3b2e..43878c16c96 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java @@ -34,7 +34,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq; import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq; import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp; import org.apache.iotdb.consensus.common.response.ConsensusReadResponse; -import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.rpc.TSStatusCode; import org.slf4j.Logger; @@ -78,16 +78,13 @@ public class CQManager { } public TSStatus dropCQ(TDropCQReq req) { - ConsensusWriteResponse response = - configManager.getConsensusManager().write(new DropCQPlan(req.cqId)); - if (response.getStatus() != null) { - return response.getStatus(); - } else { - LOGGER.warn( - "Unexpected error happened while dropping cq {}: ", req.cqId, response.getException()); + try { + return configManager.getConsensusManager().write(new DropCQPlan(req.cqId)); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); // consensus layer related errors TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); - res.setMessage(response.getErrorMessage()); + res.setMessage(e.getMessage()); return res; } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java index 5fa60c834c8..beb97355496 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java @@ -29,7 +29,7 @@ import org.apache.iotdb.confignode.consensus.request.write.cq.UpdateCQLastExecTi import org.apache.iotdb.confignode.manager.ConfigManager; import org.apache.iotdb.confignode.persistence.cq.CQInfo; import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq; -import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.mpp.rpc.thrift.TExecuteCQ; import org.apache.iotdb.rpc.TSStatusCode; @@ -245,24 +245,29 @@ public class CQScheduleTask implements Runnable { startTime, endTime, System.currentTimeMillis() * FACTOR); - - ConsensusWriteResponse result = - configManager - .getConsensusManager() - .write(new UpdateCQLastExecTimePlan(cqId, executionTime, md5)); + TSStatus result; + try { + result = + configManager + .getConsensusManager() + .write(new UpdateCQLastExecTimePlan(cqId, executionTime, md5)); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + result = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + result.setMessage(e.getMessage()); + } // while leadership changed, the update last exec time operation for CQTasks in new leader // may still update failed because stale CQTask in old leader may update it in advance - if (!result.isSuccessful()) { + if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { LOGGER.warn( "Failed to update the last execution time {} of CQ {}, because {}", executionTime, cqId, - result.getErrorMessage()); + result.getMessage()); // no such cq, we don't need to submit it again - if (result.getStatus() != null - && result.getStatus().code == TSStatusCode.NO_SUCH_CQ.getStatusCode()) { - LOGGER.info("Stop submitting CQ {} because {}", cqId, result.getStatus().message); + if (result.getCode() == TSStatusCode.NO_SUCH_CQ.getStatusCode()) { + LOGGER.info("Stop submitting CQ {} because {}", cqId, result.getMessage()); return; } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java index 1ed797f6fdd..bf59d5c2b77 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java @@ -77,7 +77,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TSetDataNodeStatusReq; import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse; -import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -250,12 +250,20 @@ public class NodeManager { new RegisterDataNodePlan(req.getDataNodeConfiguration()); // Register new DataNode registerDataNodePlan.getDataNodeConfiguration().getLocation().setDataNodeId(dataNodeId); - getConsensusManager().write(registerDataNodePlan); + try { + getConsensusManager().write(registerDataNodePlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + } // update datanode's versionInfo UpdateVersionInfoPlan updateVersionInfoPlan = new UpdateVersionInfoPlan(req.getVersionInfo(), dataNodeId); - getConsensusManager().write(updateVersionInfoPlan); + try { + getConsensusManager().write(updateVersionInfoPlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + } // Bind DataNode metrics PartitionMetrics.bindDataNodePartitionMetrics( @@ -281,14 +289,22 @@ public class NodeManager { // Update DataNodeConfiguration when modified during restart UpdateDataNodePlan updateDataNodePlan = new UpdateDataNodePlan(req.getDataNodeConfiguration()); - getConsensusManager().write(updateDataNodePlan); + try { + getConsensusManager().write(updateDataNodePlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + } } TNodeVersionInfo versionInfo = nodeInfo.getVersionInfo(nodeId); if (!req.getVersionInfo().equals(versionInfo)) { // Update versionInfo when modified during restart UpdateVersionInfoPlan updateVersionInfoPlan = new UpdateVersionInfoPlan(req.getVersionInfo(), nodeId); - getConsensusManager().write(updateVersionInfoPlan); + try { + getConsensusManager().write(updateVersionInfoPlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + } } TDataNodeRestartResp resp = new TDataNodeRestartResp(); @@ -363,11 +379,11 @@ public class NodeManager { // Update versionInfo when modified during restart UpdateVersionInfoPlan updateConfigNodePlan = new UpdateVersionInfoPlan(versionInfo, configNodeId); - ConsensusWriteResponse result = getConsensusManager().write(updateConfigNodePlan); - if (result.getException() != null) { + try { + return getConsensusManager().write(updateConfigNodePlan); + } catch (ConsensusException e) { return new TSStatus(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode()); } - return result.getStatus(); } return ClusterNodeStartUtils.ACCEPT_NODE_RESTART; } @@ -526,10 +542,18 @@ public class NodeManager { public void applyConfigNode( TConfigNodeLocation configNodeLocation, TNodeVersionInfo versionInfo) { ApplyConfigNodePlan applyConfigNodePlan = new ApplyConfigNodePlan(configNodeLocation); - getConsensusManager().write(applyConfigNodePlan); + try { + getConsensusManager().write(applyConfigNodePlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + } UpdateVersionInfoPlan updateVersionInfoPlan = new UpdateVersionInfoPlan(versionInfo, configNodeLocation.getConfigNodeId()); - getConsensusManager().write(updateVersionInfoPlan); + try { + getConsensusManager().write(updateVersionInfoPlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + } } /** diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java index ac0df2916a9..18c1c4f44e2 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java @@ -86,7 +86,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq; import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.common.response.ConsensusReadResponse; -import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq; import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq; import org.apache.iotdb.rpc.RpcUtils; @@ -438,15 +438,15 @@ public class PartitionManager { // since the RegionGroup creating process might take some time return status; } - - ConsensusWriteResponse writeResp = getConsensusManager().write(plan); - if (!writeResp.isSuccessful()) { + try { + return getConsensusManager().write(plan); + } catch (ConsensusException e) { // The allocation might fail due to consensus error - status = writeResp.getStatus(); - status.setMessage(writeResp.getErrorMessage()); LOGGER.error("Write DataPartition allocation result failed because: {}", status); + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + return res; } - return status; } // ====================================================== @@ -896,7 +896,11 @@ public class PartitionManager { String database, PreDeleteDatabasePlan.PreDeleteType preDeleteType) { final PreDeleteDatabasePlan preDeleteDatabasePlan = new PreDeleteDatabasePlan(database, preDeleteType); - getConsensusManager().write(preDeleteDatabasePlan); + try { + getConsensusManager().write(preDeleteDatabasePlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + } } public boolean isDatabasePreDeleted(String database) { @@ -956,7 +960,14 @@ public class PartitionManager { * @return TSStatus */ public TSStatus updateRegionLocation(UpdateRegionLocationPlan req) { - return getConsensusManager().write(req).getStatus(); + try { + return getConsensusManager().write(req); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + return res; + } } public GetRegionIdResp getRegionId(TGetRegionIdReq req) { @@ -1233,8 +1244,13 @@ public class PartitionManager { } // Poll the head entry if success - getConsensusManager() - .write(new PollSpecificRegionMaintainTaskPlan(successfulTask)); + try { + getConsensusManager() + .write(new PollSpecificRegionMaintainTaskPlan(successfulTask)); + } catch (ConsensusException e) { + LOGGER.warn( + "Something wrong happened while calling consensus layer's write API.", e); + } if (successfulTask.size() < selectedRegionMaintainTask.size()) { // Here we just break and wait until next schedule task diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java index 808627fd82c..8ef9a1b64f3 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java @@ -72,6 +72,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp; import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp; import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp; import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.exception.metadata.DatabaseAlreadySetException; import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException; @@ -152,12 +153,16 @@ public class ClusterSchemaManager { clusterSchemaInfo.checkDatabaseLimit(); } // Cache DatabaseSchema - result = getConsensusManager().write(databaseSchemaPlan).getStatus(); + result = getConsensusManager().write(databaseSchemaPlan); // Bind Database metrics PartitionMetrics.bindDatabasePartitionMetrics( MetricService.getInstance(), configManager, databaseSchemaPlan.getSchema().getName()); // Adjust the maximum RegionGroup number of each Database adjustMaxRegionGroupNum(); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + result = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + result.setMessage(e.getMessage()); } catch (MetadataException metadataException) { // Reject if StorageGroup already set if (metadataException instanceof IllegalPathException) { @@ -220,12 +225,26 @@ public class ClusterSchemaManager { } // Alter DatabaseSchema - return getConsensusManager().write(databaseSchemaPlan).getStatus(); + try { + return getConsensusManager().write(databaseSchemaPlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + result = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + result.setMessage(e.getMessage()); + return result; + } } /** Delete DatabaseSchema. */ public TSStatus deleteDatabase(DeleteDatabasePlan deleteDatabasePlan) { - TSStatus result = getConsensusManager().write(deleteDatabasePlan).getStatus(); + TSStatus result; + try { + result = getConsensusManager().write(deleteDatabasePlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + result = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + result.setMessage(e.getMessage()); + } if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { adjustMaxRegionGroupNum(); } @@ -374,25 +393,53 @@ public class ClusterSchemaManager { // TODO: Check response AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler); - return getConsensusManager().write(setTTLPlan).getStatus(); + try { + return getConsensusManager().write(setTTLPlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + TSStatus result = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + result.setMessage(e.getMessage()); + return result; + } } public TSStatus setSchemaReplicationFactor( SetSchemaReplicationFactorPlan setSchemaReplicationFactorPlan) { // TODO: Inform DataNodes - return getConsensusManager().write(setSchemaReplicationFactorPlan).getStatus(); + try { + return getConsensusManager().write(setSchemaReplicationFactorPlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + TSStatus result = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + result.setMessage(e.getMessage()); + return result; + } } public TSStatus setDataReplicationFactor( SetDataReplicationFactorPlan setDataReplicationFactorPlan) { // TODO: Inform DataNodes - return getConsensusManager().write(setDataReplicationFactorPlan).getStatus(); + try { + return getConsensusManager().write(setDataReplicationFactorPlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + TSStatus result = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + result.setMessage(e.getMessage()); + return result; + } } public TSStatus setTimePartitionInterval( SetTimePartitionIntervalPlan setTimePartitionIntervalPlan) { // TODO: Inform DataNodes - return getConsensusManager().write(setTimePartitionIntervalPlan).getStatus(); + try { + return getConsensusManager().write(setTimePartitionIntervalPlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + TSStatus result = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + result.setMessage(e.getMessage()); + return result; + } } /** @@ -478,7 +525,11 @@ public class ClusterSchemaManager { LOGGER.warn("Adjust maxRegionGroupNum failed because StorageGroup doesn't exist", e); } } - getConsensusManager().write(adjustMaxRegionGroupNumPlan); + try { + getConsensusManager().write(adjustMaxRegionGroupNumPlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + } } public static int calcMaxRegionGroupNum( @@ -740,19 +791,36 @@ public class ClusterSchemaManager { } public TSStatus preUnsetSchemaTemplate(int templateId, PartialPath path) { - return getConsensusManager() - .write(new PreUnsetSchemaTemplatePlan(templateId, path)) - .getStatus(); + try { + return getConsensusManager().write(new PreUnsetSchemaTemplatePlan(templateId, path)); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + TSStatus result = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + result.setMessage(e.getMessage()); + return result; + } } public TSStatus rollbackPreUnsetSchemaTemplate(int templateId, PartialPath path) { - return getConsensusManager() - .write(new RollbackPreUnsetSchemaTemplatePlan(templateId, path)) - .getStatus(); + try { + return getConsensusManager().write(new RollbackPreUnsetSchemaTemplatePlan(templateId, path)); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + TSStatus result = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + result.setMessage(e.getMessage()); + return result; + } } public TSStatus unsetSchemaTemplateInBlackList(int templateId, PartialPath path) { - return getConsensusManager().write(new UnsetSchemaTemplatePlan(templateId, path)).getStatus(); + try { + return getConsensusManager().write(new UnsetSchemaTemplatePlan(templateId, path)); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + TSStatus result = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + result.setMessage(e.getMessage()); + return result; + } } public synchronized TSStatus dropSchemaTemplate(String templateName) { @@ -784,7 +852,14 @@ public class ClusterSchemaManager { } // execute drop template - return getConsensusManager().write(new DropSchemaTemplatePlan(templateName)).getStatus(); + try { + return getConsensusManager().write(new DropSchemaTemplatePlan(templateName)); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + TSStatus result = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + result.setMessage(e.getMessage()); + return result; + } } public synchronized TSStatus extendSchemaTemplate(TemplateExtendInfo templateExtendInfo) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java index 371243bc197..996d44a301d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java @@ -58,6 +58,7 @@ import org.apache.iotdb.confignode.procedure.scheduler.LockQueue; import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler; import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq; import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq; import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq; import org.apache.iotdb.mpp.rpc.thrift.TCreatePipePluginInstanceReq; @@ -263,14 +264,19 @@ public class ConfigNodeProcedureEnv { try { // Execute removePeer if (getConsensusManager().removeConfigNodePeer(tConfigNodeLocation)) { - tsStatus = - getConsensusManager().write(new RemoveConfigNodePlan(tConfigNodeLocation)).getStatus(); + tsStatus = getConsensusManager().write(new RemoveConfigNodePlan(tConfigNodeLocation)); } else { tsStatus = new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode()) .setMessage( "Remove ConfigNode failed because update ConsensusGroup peer information failed."); } + } catch (ConsensusException e) { + LOG.warn("Something wrong happened while calling consensus layer's write API.", e); + tsStatus = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + tsStatus.setMessage(e.getMessage()); + } + try { if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { throw new ProcedureException(tsStatus.getMessage()); } @@ -363,7 +369,7 @@ public class ConfigNodeProcedureEnv { new TUpdateConfigNodeGroupReq(registeredConfigNodes), registeredDataNodes); - if (registeredDataNodes.size() > 0) { + if (!registeredDataNodes.isEmpty()) { LOG.info( "Begin to broadcast the latest configNodeGroup to DataNodes, ConfigNodeGroups: {}, DataNodes: {}", registeredConfigNodes, @@ -536,7 +542,11 @@ public class ConfigNodeProcedureEnv { public void persistRegionGroup(CreateRegionGroupsPlan createRegionGroupsPlan) { // Persist the allocation result - getConsensusManager().write(createRegionGroupsPlan); + try { + getConsensusManager().write(createRegionGroupsPlan); + } catch (ConsensusException e) { + LOG.warn("Something wrong happened while calling consensus layer's write API.", e); + } } /** diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java index d4cde051a84..e15aa30c2e5 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java @@ -38,6 +38,7 @@ import org.apache.iotdb.confignode.manager.ConfigManager; import org.apache.iotdb.confignode.manager.partition.PartitionMetrics; import org.apache.iotdb.confignode.persistence.node.NodeInfo; import org.apache.iotdb.confignode.procedure.scheduler.LockQueue; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq; import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq; import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq; @@ -560,7 +561,11 @@ public class DataNodeRemoveHandler { public void removeDataNodePersistence(TDataNodeLocation dataNodeLocation) { // Remove consensus record List<TDataNodeLocation> removeDataNodes = Collections.singletonList(dataNodeLocation); - configManager.getConsensusManager().write(new RemoveDataNodePlan(removeDataNodes)); + try { + configManager.getConsensusManager().write(new RemoveDataNodePlan(removeDataNodes)); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + } // Adjust maxRegionGroupNum configManager.getClusterSchemaManager().adjustMaxRegionGroupNum(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java index 480fca9abbd..ec7eb850f59 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java @@ -34,7 +34,7 @@ import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure; import org.apache.iotdb.confignode.procedure.state.cq.CreateCQState; import org.apache.iotdb.confignode.procedure.store.ProcedureType; import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq; -import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; @@ -119,51 +119,47 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { } private void addCQ(ConfigNodeProcedureEnv env) { - ConsensusWriteResponse response = - env.getConfigManager() - .getConsensusManager() - .write(new AddCQPlan(req, md5, firstExecutionTime)); - TSStatus res = response.getStatus(); - if (res != null) { - if (res.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - LOGGER.debug("Finish init CQ {} successfully", req.cqId); - setNextState(INACTIVE); - } else if (res.code == TSStatusCode.CQ_ALREADY_EXIST.getStatusCode()) { - LOGGER.info("Failed to init CQ {} because such cq already exists", req.cqId); - setFailure(new ProcedureException(new IoTDBException(res.message, res.code))); - } else { - LOGGER.warn("Failed to init CQ {} because of unknown reasons {}", req.cqId, res); - setFailure(new ProcedureException(new IoTDBException(res.message, res.code))); - } + TSStatus res; + try { + res = + env.getConfigManager() + .getConsensusManager() + .write(new AddCQPlan(req, md5, firstExecutionTime)); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + } + if (res.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOGGER.debug("Finish init CQ {} successfully", req.cqId); + setNextState(INACTIVE); + } else if (res.code == TSStatusCode.CQ_ALREADY_EXIST.getStatusCode()) { + LOGGER.info("Failed to init CQ {} because such cq already exists", req.cqId); + setFailure(new ProcedureException(new IoTDBException(res.message, res.code))); } else { - LOGGER.warn( - "Failed to init CQ {} because of unexpected exception: ", - req.cqId, - response.getException()); - setFailure(new ProcedureException(response.getException())); + LOGGER.warn("Failed to init CQ {} because of unknown reasons {}", req.cqId, res); + setFailure(new ProcedureException(new IoTDBException(res.message, res.code))); } } private void activeCQ(ConfigNodeProcedureEnv env) { - ConsensusWriteResponse response = - env.getConfigManager().getConsensusManager().write(new ActiveCQPlan(req.cqId, md5)); - TSStatus res = response.getStatus(); - if (res != null) { - if (res.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - LOGGER.debug("Finish Scheduling CQ {} successfully", req.cqId); - } else if (res.code == TSStatusCode.NO_SUCH_CQ.getStatusCode()) { - LOGGER.warn("Failed to active CQ {} because of no such cq: {}", req.cqId, res.message); - } else if (res.code == TSStatusCode.CQ_ALREADY_ACTIVE.getStatusCode()) { - LOGGER.warn("Failed to active CQ {} because this cq has already been active", req.cqId); - } else { - LOGGER.warn( - "Failed to active CQ {} successfully because of unknown reasons {}", req.cqId, res); - } + TSStatus res; + try { + res = env.getConfigManager().getConsensusManager().write(new ActiveCQPlan(req.cqId, md5)); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + } + if (res.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOGGER.debug("Finish Scheduling CQ {} successfully", req.cqId); + } else if (res.code == TSStatusCode.NO_SUCH_CQ.getStatusCode()) { + LOGGER.warn("Failed to active CQ {} because of no such cq: {}", req.cqId, res.message); + } else if (res.code == TSStatusCode.CQ_ALREADY_ACTIVE.getStatusCode()) { + LOGGER.warn("Failed to active CQ {} because this cq has already been active", req.cqId); } else { LOGGER.warn( - "Failed to active CQ {} successfully because of unexpected exception: ", - req.cqId, - response.getException()); + "Failed to active CQ {} successfully because of unknown reasons {}", req.cqId, res); } } @@ -177,28 +173,26 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { break; case INACTIVE: LOGGER.info("Start [INACTIVE] rollback of CQ {}", req.cqId); - ConsensusWriteResponse response = - env.getConfigManager().getConsensusManager().write(new DropCQPlan(req.cqId, md5)); - TSStatus res = response.getStatus(); - if (res != null) { - if (res.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - LOGGER.info("Finish [INACTIVE] rollback of CQ {} successfully", req.cqId); - } else if (res.code == TSStatusCode.NO_SUCH_CQ.getStatusCode()) { - LOGGER.warn( - "Failed to do [INACTIVE] rollback of CQ {} because of no such cq: {}", - req.cqId, - res.message); - } else { - LOGGER.warn( - "Failed to do [INACTIVE] rollback of CQ {} because of unknown reasons {}", - req.cqId, - res); - } + TSStatus res; + try { + res = env.getConfigManager().getConsensusManager().write(new DropCQPlan(req.cqId, md5)); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + } + if (res.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOGGER.info("Finish [INACTIVE] rollback of CQ {} successfully", req.cqId); + } else if (res.code == TSStatusCode.NO_SUCH_CQ.getStatusCode()) { + LOGGER.warn( + "Failed to do [INACTIVE] rollback of CQ {} because of no such cq: {}", + req.cqId, + res.message); } else { LOGGER.warn( - "Failed to do [INACTIVE] rollback of CQ {} because of unexpected exception: ", + "Failed to do [INACTIVE] rollback of CQ {} because of unknown reasons {}", req.cqId, - response.getException()); + res); } break; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/CreateModelProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/CreateModelProcedure.java index be189bd5ce1..f4e0dbb989c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/CreateModelProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/CreateModelProcedure.java @@ -31,7 +31,7 @@ import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure; import org.apache.iotdb.confignode.procedure.state.model.CreateModelState; import org.apache.iotdb.confignode.procedure.store.ProcedureType; -import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.db.protocol.client.MLNodeClient; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; @@ -91,13 +91,13 @@ public class CreateModelProcedure extends AbstractNodeProcedure<CreateModelState LOGGER.info("Start to add model [{}] in ModelTable on Config Nodes", modelId); - ConsensusWriteResponse response = + TSStatus response = configManager.getConsensusManager().write(new CreateModelPlan(modelInformation)); - if (!response.isSuccessful()) { + if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { throw new ModelManagementException( String.format( "Failed to add model [%s] in ModelTable on Config Nodes: %s", - modelId, response.getErrorMessage())); + modelId, response.getMessage())); } setNextState(CreateModelState.CONFIG_NODE_ACTIVE); @@ -160,9 +160,13 @@ public class CreateModelProcedure extends AbstractNodeProcedure<CreateModelState case VALIDATED: LOGGER.info("Start [VALIDATED] rollback of model [{}]", modelInformation.getModelId()); - env.getConfigManager() - .getConsensusManager() - .write(new DropModelPlan(modelInformation.getModelId())); + try { + env.getConfigManager() + .getConsensusManager() + .write(new DropModelPlan(modelInformation.getModelId())); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + } break; default: diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/DropModelProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/DropModelProcedure.java index 23ef71e898e..6c1f92c2d4c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/DropModelProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/DropModelProcedure.java @@ -31,7 +31,6 @@ import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure; import org.apache.iotdb.confignode.procedure.state.model.DropModelState; import org.apache.iotdb.confignode.procedure.store.ProcedureType; -import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; import org.apache.iotdb.db.protocol.client.MLNodeClient; import org.apache.iotdb.mpp.rpc.thrift.TDeleteModelMetricsReq; import org.apache.iotdb.rpc.TSStatusCode; @@ -131,13 +130,13 @@ public class DropModelProcedure extends AbstractNodeProcedure<DropModelState> { case ML_NODE_DROPPED: LOGGER.info("Start to drop model [{}] on Config Nodes", modelId); - ConsensusWriteResponse response = + TSStatus response = env.getConfigManager().getConsensusManager().write(new DropModelPlan(modelId)); - if (!response.isSuccessful()) { + if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { throw new ModelManagementException( String.format( "Failed to drop model [%s], fail to drop model on Config Nodes: %s", - modelId, response.getErrorMessage())); + modelId, response.getMessage())); } setNextState(DropModelState.CONFIG_NODE_DROPPED); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java index c6d68587493..b0fd11933e5 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java @@ -19,6 +19,7 @@ package org.apache.iotdb.confignode.procedure.impl.pipe.plugin; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan; @@ -35,7 +36,7 @@ import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure; import org.apache.iotdb.confignode.procedure.state.pipe.plugin.CreatePipePluginState; import org.apache.iotdb.confignode.procedure.store.ProcedureType; -import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -163,10 +164,16 @@ public class CreatePipePluginProcedure extends AbstractNodeProcedure<CreatePipeP final CreatePipePluginPlan createPluginPlan = new CreatePipePluginPlan(pipePluginMeta, needToSaveJar ? new Binary(jarFile) : null); - final ConsensusWriteResponse response = - configNodeManager.getConsensusManager().write(createPluginPlan); - if (!response.isSuccessful()) { - throw new PipeException(response.getErrorMessage()); + TSStatus response; + try { + response = configNodeManager.getConsensusManager().write(createPluginPlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + response = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + response.setMessage(e.getMessage()); + } + if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new PipeException(response.getMessage()); } setNextState(CreatePipePluginState.CREATE_ON_DATA_NODES); @@ -226,9 +233,13 @@ public class CreatePipePluginProcedure extends AbstractNodeProcedure<CreatePipeP "CreatePipePluginProcedure: rollbackFromCreateOnConfigNodes({})", pipePluginMeta.getPluginName()); - env.getConfigManager() - .getConsensusManager() - .write(new DropPipePluginPlan(pipePluginMeta.getPluginName())); + try { + env.getConfigManager() + .getConsensusManager() + .write(new DropPipePluginPlan(pipePluginMeta.getPluginName())); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + } } private void rollbackFromCreateOnDataNodes(ConfigNodeProcedureEnv env) throws ProcedureException { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java index 676d127e27c..f60ff784043 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java @@ -31,6 +31,7 @@ import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure; import org.apache.iotdb.confignode.procedure.state.pipe.plugin.DropPipePluginState; import org.apache.iotdb.confignode.procedure.store.ProcedureType; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -118,7 +119,11 @@ public class DropPipePluginProcedure extends AbstractNodeProcedure<DropPipePlugi return Flow.NO_MORE_STATE; } - env.getConfigManager().getConsensusManager().write(new DropPipePluginPlan(pluginName)); + try { + env.getConfigManager().getConsensusManager().write(new DropPipePluginPlan(pluginName)); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + } setNextState(DropPipePluginState.DROP_ON_DATA_NODES); return Flow.HAS_MORE_STATE; @@ -141,7 +146,11 @@ public class DropPipePluginProcedure extends AbstractNodeProcedure<DropPipePlugi private Flow executeFromDropOnConfigNodes(ConfigNodeProcedureEnv env) { LOGGER.info("DropPipePluginProcedure: executeFromDropOnConfigNodes({})", pluginName); - env.getConfigManager().getConsensusManager().write(new DropPipePluginPlan(pluginName)); + try { + env.getConfigManager().getConsensusManager().write(new DropPipePluginPlan(pluginName)); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + } setNextState(DropPipePluginState.UNLOCK); return Flow.HAS_MORE_STATE; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java index 5d8fcf96e23..7536b822793 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java @@ -21,13 +21,15 @@ package org.apache.iotdb.confignode.procedure.impl.pipe.runtime; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleLeaderChangePlan; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2; import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation; import org.apache.iotdb.confignode.procedure.store.ProcedureType; -import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; @@ -91,11 +93,16 @@ public class PipeHandleLeaderChangeProcedure extends AbstractOperatePipeProcedur final PipeHandleLeaderChangePlan pipeHandleLeaderChangePlan = new PipeHandleLeaderChangePlan(newDataRegionGroupIdToLeaderDataRegionIdMap); - - final ConsensusWriteResponse response = - env.getConfigManager().getConsensusManager().write(pipeHandleLeaderChangePlan); - if (!response.isSuccessful()) { - throw new PipeException(response.getErrorMessage()); + TSStatus response; + try { + response = env.getConfigManager().getConsensusManager().write(pipeHandleLeaderChangePlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + response = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + response.setMessage(e.getMessage()); + } + if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new PipeException(response.getMessage()); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java index 7d12e89b8d8..eb32e30281b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java @@ -19,14 +19,16 @@ package org.apache.iotdb.confignode.procedure.impl.pipe.runtime; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.pipe.task.meta.PipeMeta; import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2; import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation; import org.apache.iotdb.confignode.procedure.store.ProcedureType; -import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; @@ -89,12 +91,19 @@ public class PipeHandleMetaChangeProcedure extends AbstractOperatePipeProcedureV pipeMetaList.add(pipeMeta); } - final ConsensusWriteResponse response = - env.getConfigManager() - .getConsensusManager() - .write(new PipeHandleMetaChangePlan(pipeMetaList)); - if (!response.isSuccessful()) { - throw new PipeException(response.getErrorMessage()); + TSStatus response; + try { + response = + env.getConfigManager() + .getConsensusManager() + .write(new PipeHandleMetaChangePlan(pipeMetaList)); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + response = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + response.setMessage(e.getMessage()); + } + if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new PipeException(response.getMessage()); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java index 5aefe33c222..12db604916a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java @@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.procedure.impl.pipe.task; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta; import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta; @@ -33,9 +34,10 @@ import org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProced import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation; import org.apache.iotdb.confignode.procedure.store.ProcedureType; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; -import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; @@ -127,12 +129,19 @@ public class CreatePipeProcedureV2 extends AbstractOperatePipeProcedureV2 { "CreatePipeProcedureV2: executeFromWriteConfigNodeConsensus({})", createPipeRequest.getPipeName()); - final ConsensusWriteResponse response = - env.getConfigManager() - .getConsensusManager() - .write(new CreatePipePlanV2(pipeStaticMeta, pipeRuntimeMeta)); - if (!response.isSuccessful()) { - throw new PipeException(response.getErrorMessage()); + TSStatus response; + try { + response = + env.getConfigManager() + .getConsensusManager() + .write(new CreatePipePlanV2(pipeStaticMeta, pipeRuntimeMeta)); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + response = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + response.setMessage(e.getMessage()); + } + if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new PipeException(response.getMessage()); } } @@ -175,12 +184,19 @@ public class CreatePipeProcedureV2 extends AbstractOperatePipeProcedureV2 { "CreatePipeProcedureV2: rollbackFromWriteConfigNodeConsensus({})", createPipeRequest.getPipeName()); - final ConsensusWriteResponse response = - env.getConfigManager() - .getConsensusManager() - .write(new DropPipePlanV2(createPipeRequest.getPipeName())); - if (!response.isSuccessful()) { - throw new PipeException(response.getErrorMessage()); + TSStatus response; + try { + response = + env.getConfigManager() + .getConsensusManager() + .write(new DropPipePlanV2(createPipeRequest.getPipeName())); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + response = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + response.setMessage(e.getMessage()); + } + if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new PipeException(response.getMessage()); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java index 510d87d2763..19a8af9cd1b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java @@ -19,13 +19,15 @@ package org.apache.iotdb.confignode.procedure.impl.pipe.task; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2; import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation; import org.apache.iotdb.confignode.procedure.store.ProcedureType; -import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; @@ -73,10 +75,16 @@ public class DropPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { throws PipeException { LOGGER.info("DropPipeProcedureV2: executeFromWriteConfigNodeConsensus({})", pipeName); - final ConsensusWriteResponse response = - env.getConfigManager().getConsensusManager().write(new DropPipePlanV2(pipeName)); - if (!response.isSuccessful()) { - throw new PipeException(response.getErrorMessage()); + TSStatus response; + try { + response = env.getConfigManager().getConsensusManager().write(new DropPipePlanV2(pipeName)); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + response = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + response.setMessage(e.getMessage()); + } + if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new PipeException(response.getMessage()); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java index eae446df24b..070bc20058b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java @@ -19,14 +19,16 @@ package org.apache.iotdb.confignode.procedure.impl.pipe.task; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.pipe.task.meta.PipeStatus; import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2; import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation; import org.apache.iotdb.confignode.procedure.store.ProcedureType; -import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; @@ -74,12 +76,19 @@ public class StartPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { throws PipeException { LOGGER.info("StartPipeProcedureV2: executeFromWriteConfigNodeConsensus({})", pipeName); - final ConsensusWriteResponse response = - env.getConfigManager() - .getConsensusManager() - .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.RUNNING)); - if (!response.isSuccessful()) { - throw new PipeException(response.getErrorMessage()); + TSStatus response; + try { + response = + env.getConfigManager() + .getConsensusManager() + .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.RUNNING)); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + response = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + response.setMessage(e.getMessage()); + } + if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new PipeException(response.getMessage()); } } @@ -116,12 +125,19 @@ public class StartPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { protected void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) { LOGGER.info("StartPipeProcedureV2: rollbackFromWriteConfigNodeConsensus({})", pipeName); - final ConsensusWriteResponse response = - env.getConfigManager() - .getConsensusManager() - .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.STOPPED)); - if (!response.isSuccessful()) { - throw new PipeException(response.getErrorMessage()); + TSStatus response; + try { + response = + env.getConfigManager() + .getConsensusManager() + .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.STOPPED)); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + response = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + response.setMessage(e.getMessage()); + } + if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new PipeException(response.getMessage()); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java index a3a85c8364e..c624c36a42f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java @@ -19,14 +19,16 @@ package org.apache.iotdb.confignode.procedure.impl.pipe.task; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.pipe.task.meta.PipeStatus; import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2; import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation; import org.apache.iotdb.confignode.procedure.store.ProcedureType; -import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; @@ -74,12 +76,19 @@ public class StopPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { throws PipeException { LOGGER.info("StopPipeProcedureV2: executeFromWriteConfigNodeConsensus({})", pipeName); - final ConsensusWriteResponse response = - env.getConfigManager() - .getConsensusManager() - .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.STOPPED)); - if (!response.isSuccessful()) { - throw new PipeException(response.getErrorMessage()); + TSStatus response; + try { + response = + env.getConfigManager() + .getConsensusManager() + .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.STOPPED)); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + response = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + response.setMessage(e.getMessage()); + } + if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new PipeException(response.getMessage()); } } @@ -112,12 +121,19 @@ public class StopPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { protected void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) { LOGGER.info("StopPipeProcedureV2: rollbackFromWriteConfigNodeConsensus({})", pipeName); - final ConsensusWriteResponse response = - env.getConfigManager() - .getConsensusManager() - .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.RUNNING)); - if (!response.isSuccessful()) { - throw new PipeException(response.getErrorMessage()); + TSStatus response; + try { + response = + env.getConfigManager() + .getConsensusManager() + .write(new SetPipeStatusPlanV2(pipeName, PipeStatus.RUNNING)); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + response = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + response.setMessage(e.getMessage()); + } + if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new PipeException(response.getMessage()); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java index 7ea774cc33c..7fd0a1af2bb 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java @@ -42,6 +42,7 @@ import org.apache.iotdb.confignode.procedure.impl.statemachine.StateMachineProce import org.apache.iotdb.confignode.procedure.state.schema.DeleteStorageGroupState; import org.apache.iotdb.confignode.procedure.store.ProcedureType; import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.thrift.TException; @@ -215,7 +216,7 @@ public class DeleteDatabaseProcedure new ProcedureException("[DeleteDatabaseProcedure] Delete DatabaseSchema failed")); } } - } catch (TException | IOException e) { + } catch (ConsensusException | TException | IOException e) { if (isRollbackSupported(state)) { setFailure( new ProcedureException("[DeleteDatabaseProcedure] Delete Database failed " + state)); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTemplateProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTemplateProcedure.java index 78e04c750d6..a8be16f171e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTemplateProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTemplateProcedure.java @@ -43,6 +43,7 @@ import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException; import org.apache.iotdb.confignode.procedure.impl.statemachine.StateMachineProcedure; import org.apache.iotdb.confignode.procedure.state.schema.SetTemplateState; import org.apache.iotdb.confignode.procedure.store.ProcedureType; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.db.exception.metadata.template.TemplateIncompatibleException; import org.apache.iotdb.db.exception.metadata.template.UndefinedTemplateException; import org.apache.iotdb.db.schemaengine.template.Template; @@ -166,8 +167,14 @@ public class SetTemplateProcedure private void preSetTemplate(ConfigNodeProcedureEnv env) { PreSetSchemaTemplatePlan preSetSchemaTemplatePlan = new PreSetSchemaTemplatePlan(templateName, templateSetPath); - TSStatus status = - env.getConfigManager().getConsensusManager().write(preSetSchemaTemplatePlan).getStatus(); + TSStatus status; + try { + status = env.getConfigManager().getConsensusManager().write(preSetSchemaTemplatePlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + status = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + status.setMessage(e.getMessage()); + } if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { setNextState(SetTemplateState.PRE_RELEASE); } else { @@ -317,8 +324,14 @@ public class SetTemplateProcedure private void commitSetTemplate(ConfigNodeProcedureEnv env) { CommitSetSchemaTemplatePlan commitSetSchemaTemplatePlan = new CommitSetSchemaTemplatePlan(templateName, templateSetPath); - TSStatus status = - env.getConfigManager().getConsensusManager().write(commitSetSchemaTemplatePlan).getStatus(); + TSStatus status = null; + try { + status = env.getConfigManager().getConsensusManager().write(commitSetSchemaTemplatePlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + status = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + status.setMessage(e.getMessage()); + } if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { setNextState(SetTemplateState.COMMIT_RELEASE); } else { @@ -414,8 +427,14 @@ public class SetTemplateProcedure private void rollbackPreSet(ConfigNodeProcedureEnv env) { PreSetSchemaTemplatePlan preSetSchemaTemplatePlan = new PreSetSchemaTemplatePlan(templateName, templateSetPath, true); - TSStatus status = - env.getConfigManager().getConsensusManager().write(preSetSchemaTemplatePlan).getStatus(); + TSStatus status = null; + try { + status = env.getConfigManager().getConsensusManager().write(preSetSchemaTemplatePlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + status = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + status.setMessage(e.getMessage()); + } if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { LOGGER.warn( "Failed to rollback pre set template {} on path {} due to {}", @@ -465,8 +484,14 @@ public class SetTemplateProcedure private void rollbackCommitSet(ConfigNodeProcedureEnv env) { CommitSetSchemaTemplatePlan commitSetSchemaTemplatePlan = new CommitSetSchemaTemplatePlan(templateName, templateSetPath, true); - TSStatus status = - env.getConfigManager().getConsensusManager().write(commitSetSchemaTemplatePlan).getStatus(); + TSStatus status = null; + try { + status = env.getConfigManager().getConsensusManager().write(commitSetSchemaTemplatePlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + status = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + status.setMessage(e.getMessage()); + } if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { LOGGER.warn( "Failed to rollback commit set template {} on path {} due to {}", diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java index 2dab4c451aa..b709a1f00bd 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java @@ -33,6 +33,7 @@ import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDelete import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.state.CreateRegionGroupsState; import org.apache.iotdb.confignode.procedure.store.ProcedureType; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -161,7 +162,11 @@ public class CreateRegionGroupsProcedure })); env.persistRegionGroup(persistPlan); - env.getConfigManager().getConsensusManager().write(offerPlan); + try { + env.getConfigManager().getConsensusManager().write(offerPlan); + } catch (ConsensusException e) { + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); + } setNextState(CreateRegionGroupsState.ACTIVATE_REGION_GROUPS); break; case ACTIVATE_REGION_GROUPS: diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java index f6ed578c3ca..53e9faae943 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java @@ -19,6 +19,7 @@ package org.apache.iotdb.confignode.procedure.impl.trigger; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.trigger.TriggerInformation; import org.apache.iotdb.commons.trigger.exception.TriggerManagementException; import org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan; @@ -32,7 +33,7 @@ import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure; import org.apache.iotdb.confignode.procedure.state.CreateTriggerState; import org.apache.iotdb.confignode.procedure.store.ProcedureType; import org.apache.iotdb.confignode.rpc.thrift.TTriggerState; -import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.utils.Binary; @@ -90,12 +91,19 @@ public class CreateTriggerProcedure extends AbstractNodeProcedure<CreateTriggerS triggerInformation.getTriggerName(), jarFile != null); - ConsensusWriteResponse response = - configManager - .getConsensusManager() - .write(new AddTriggerInTablePlan(triggerInformation, jarFile)); - if (!response.isSuccessful()) { - throw new TriggerManagementException(response.getErrorMessage()); + TSStatus response; + try { + response = + configManager + .getConsensusManager() + .write(new AddTriggerInTablePlan(triggerInformation, jarFile)); + } catch (ConsensusException e) { + LOG.warn("Something wrong happened while calling consensus layer's write API.", e); + response = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + response.setMessage(e.getMessage()); + } + if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new TriggerManagementException(response.getMessage()); } setNextState(CreateTriggerState.CONFIG_NODE_INACTIVE); @@ -189,9 +197,13 @@ public class CreateTriggerProcedure extends AbstractNodeProcedure<CreateTriggerS case VALIDATED: LOG.info("Start [VALIDATED] rollback of trigger [{}]", triggerInformation.getTriggerName()); - env.getConfigManager() - .getConsensusManager() - .write(new DeleteTriggerInTablePlan(triggerInformation.getTriggerName())); + try { + env.getConfigManager() + .getConsensusManager() + .write(new DeleteTriggerInTablePlan(triggerInformation.getTriggerName())); + } catch (ConsensusException e) { + LOG.warn("Something wrong happened while calling consensus layer's write API.", e); + } break; case CONFIG_NODE_INACTIVE: diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java index 0d209a5f9e1..ba535ff11fc 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java @@ -26,6 +26,7 @@ import org.apache.iotdb.confignode.manager.ConfigManager; import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; import org.apache.iotdb.confignode.persistence.ProcedureInfo; import org.apache.iotdb.confignode.procedure.Procedure; +import org.apache.iotdb.consensus.exception.ConsensusException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,7 +81,11 @@ public class ConfigProcedureStore implements IProcedureStore { if (procedureType != null) { updateProcedurePlan.setProcedure(procedure); } - getConsensusManager().write(updateProcedurePlan); + try { + getConsensusManager().write(updateProcedurePlan); + } catch (ConsensusException e) { + LOG.warn("Something wrong happened while calling consensus layer's write API.", e); + } } @Override @@ -94,7 +99,11 @@ public class ConfigProcedureStore implements IProcedureStore { public void delete(long procId) { DeleteProcedurePlan deleteProcedurePlan = new DeleteProcedurePlan(); deleteProcedurePlan.setProcId(procId); - getConsensusManager().write(deleteProcedurePlan); + try { + getConsensusManager().write(deleteProcedurePlan); + } catch (ConsensusException e) { + LOG.warn("Something wrong happened while calling consensus layer's write API.", e); + } } @Override diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java index 1f51c09c743..39839d118ef 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java @@ -24,7 +24,6 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.common.request.IConsensusRequest; -import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse; import org.apache.iotdb.consensus.exception.ConsensusException; import javax.annotation.concurrent.ThreadSafe; @@ -32,20 +31,14 @@ import javax.annotation.concurrent.ThreadSafe; import java.io.IOException; import java.util.List; -/** - * Consensus module base interface. - */ +/** Consensus module base interface. */ @ThreadSafe public interface IConsensus { - /** - * Start the consensus module - */ + /** Start the consensus module */ void start() throws IOException; - /** - * Stop the consensus module - */ + /** Stop the consensus module */ void stop() throws IOException; /** @@ -69,14 +62,13 @@ public interface IConsensus { /** * Require the <em>local node</em> to create a Peer and become a member of the given consensus * group. This node will prepare and initialize local statemachine {@link IStateMachine} and other - * data structures. After this method returns, we can call - * {@link #addRemotePeer(ConsensusGroupId, Peer)} to notify original group that this new Peer is - * prepared to be added into the latest configuration. createLocalPeer should be called on a node - * that does not contain any peer of the consensus group, to avoid one node having more than one - * replica. + * data structures. After this method returns, we can call {@link #addRemotePeer(ConsensusGroupId, + * Peer)} to notify original group that this new Peer is prepared to be added into the latest + * configuration. createLocalPeer should be called on a node that does not contain any peer of the + * consensus group, to avoid one node having more than one replica. * * @param groupId the consensus group this Peer belongs - * @param peers other known peers in this group + * @param peers other known peers in this group */ void createLocalPeer(ConsensusGroupId groupId, List<Peer> peers) throws ConsensusException; @@ -84,8 +76,8 @@ public interface IConsensus { * When the <em>local node</em> is no longer a member of the given consensus group, call this * method to do cleanup works. This method will close local statemachine {@link IStateMachine}, * delete local data and do other cleanup works. deleteLocalPeer should be called after - * successfully removing this peer from current consensus group configuration (by calling - * {@link #removeRemotePeer(ConsensusGroupId, Peer)}). + * successfully removing this peer from current consensus group configuration (by calling {@link + * #removeRemotePeer(ConsensusGroupId, Peer)}). * * @param groupId the consensus group this Peer used to belong */ @@ -94,15 +86,15 @@ public interface IConsensus { // single consensus group API /** - * Tell the group that a new Peer is prepared to be added into this group. Call - * {@link #createLocalPeer(ConsensusGroupId, List)} on the new Peer before calling this method. - * When this method returns, the group data should be already transmitted to the new Peer. That - * is, the new peer is available to answer client requests by the time this method successfully - * returns. addRemotePeer should be called on a living peer of the consensus group. For example: - * We'd like to add a peer D to (A, B, C) group. We need to execute addPeer in A, B or C. + * Tell the group that a new Peer is prepared to be added into this group. Call {@link + * #createLocalPeer(ConsensusGroupId, List)} on the new Peer before calling this method. When this + * method returns, the group data should be already transmitted to the new Peer. That is, the new + * peer is available to answer client requests by the time this method successfully returns. + * addRemotePeer should be called on a living peer of the consensus group. For example: We'd like + * to add a peer D to (A, B, C) group. We need to execute addPeer in A, B or C. * * @param groupId the consensus group this peer belongs - * @param peer the newly added peer + * @param peer the newly added peer */ void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusException; @@ -114,7 +106,7 @@ public interface IConsensus { * to remove C, in case C is dead, the removePeer should be sent to A or B. * * @param groupId the consensus group this peer belongs - * @param peer the peer to be removed + * @param peer the peer to be removed */ void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusException; diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java index aee5a970ec3..3d4fa742d25 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java @@ -19,12 +19,10 @@ package org.apache.iotdb.consensus.iot; -import java.util.Optional; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.consensus.ConsensusGroupId; -import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.exception.StartupException; import org.apache.iotdb.commons.service.RegisterManager; import org.apache.iotdb.commons.utils.FileUtils; @@ -35,9 +33,6 @@ import org.apache.iotdb.consensus.IStateMachine.Registry; import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.common.request.IConsensusRequest; -import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse; -import org.apache.iotdb.consensus.common.response.ConsensusReadResponse; -import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; import org.apache.iotdb.consensus.config.ConsensusConfig; import org.apache.iotdb.consensus.config.IoTConsensusConfig; import org.apache.iotdb.consensus.exception.ConsensusException; @@ -54,11 +49,9 @@ import org.apache.iotdb.consensus.iot.client.SyncIoTConsensusServiceClient; import org.apache.iotdb.consensus.iot.logdispatcher.IoTConsensusMemoryManager; import org.apache.iotdb.consensus.iot.service.IoTConsensusRPCService; import org.apache.iotdb.consensus.iot.service.IoTConsensusRPCServiceProcessor; -import org.apache.iotdb.consensus.simple.SimpleConsensusServerImpl; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; -import org.apache.ratis.protocol.GroupManagementRequest.Op; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,6 +63,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -161,12 +155,14 @@ public class IoTConsensus implements IConsensus { @Override public TSStatus write(ConsensusGroupId groupId, IConsensusRequest request) throws ConsensusException { - IoTConsensusServerImpl impl = Optional.ofNullable(stateMachineMap.get(groupId)) - .orElseThrow(() -> new ConsensusGroupNotExistException(groupId)); + IoTConsensusServerImpl impl = + Optional.ofNullable(stateMachineMap.get(groupId)) + .orElseThrow(() -> new ConsensusGroupNotExistException(groupId)); if (impl.isReadOnly()) { return StatusUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY); } else if (!impl.isActive()) { - return RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT, + return RpcUtils.getStatus( + TSStatusCode.WRITE_PROCESS_REJECT, "peer is inactive and not ready to receive sync log request."); } else { return impl.write(request); @@ -193,31 +189,35 @@ public class IoTConsensus implements IConsensus { throw new IllegalPeerEndpointException(thisNode, peers); } AtomicBoolean exist = new AtomicBoolean(true); - Optional.ofNullable(stateMachineMap.computeIfAbsent( - groupId, - k -> { - exist.set(false); - - String path = buildPeerDir(storageDir, groupId); - File file = new File(path); - if (!file.mkdirs()) { - logger.warn("Unable to create consensus dir for group {} at {}", groupId, path); - return null; - } - - IoTConsensusServerImpl impl = - new IoTConsensusServerImpl( - path, - new Peer(groupId, thisNodeId, thisNode), - peers, - registry.apply(groupId), - clientManager, - syncClientManager, - config); - impl.start(); - return impl; - })).orElseThrow(() -> new ConsensusException( - String.format("Unable to create consensus dir for group %s", groupId))); + Optional.ofNullable( + stateMachineMap.computeIfAbsent( + groupId, + k -> { + exist.set(false); + + String path = buildPeerDir(storageDir, groupId); + File file = new File(path); + if (!file.mkdirs()) { + logger.warn("Unable to create consensus dir for group {} at {}", groupId, path); + return null; + } + + IoTConsensusServerImpl impl = + new IoTConsensusServerImpl( + path, + new Peer(groupId, thisNodeId, thisNode), + peers, + registry.apply(groupId), + clientManager, + syncClientManager, + config); + impl.start(); + return impl; + })) + .orElseThrow( + () -> + new ConsensusException( + String.format("Unable to create consensus dir for group %s", groupId))); if (exist.get()) { throw new ConsensusGroupAlreadyExistException(groupId); } @@ -241,8 +241,9 @@ public class IoTConsensus implements IConsensus { @Override public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusException { - IoTConsensusServerImpl impl = Optional.ofNullable(stateMachineMap.get(groupId)) - .orElseThrow(() -> new ConsensusGroupNotExistException(groupId)); + IoTConsensusServerImpl impl = + Optional.ofNullable(stateMachineMap.get(groupId)) + .orElseThrow(() -> new ConsensusGroupNotExistException(groupId)); if (impl.getConfiguration().contains(peer)) { throw new PeerAlreadyInConsensusGroupException(groupId, peer); } @@ -291,10 +292,10 @@ public class IoTConsensus implements IConsensus { } @Override - public void removeRemotePeer(ConsensusGroupId groupId, Peer peer) - throws ConsensusException { - IoTConsensusServerImpl impl = Optional.ofNullable(stateMachineMap.get(groupId)) - .orElseThrow(() -> new ConsensusGroupNotExistException(groupId)); + public void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusException { + IoTConsensusServerImpl impl = + Optional.ofNullable(stateMachineMap.get(groupId)) + .orElseThrow(() -> new ConsensusGroupNotExistException(groupId)); try { // let other peers remove the sync channel with target peer @@ -322,8 +323,9 @@ public class IoTConsensus implements IConsensus { @Override public void triggerSnapshot(ConsensusGroupId groupId) throws ConsensusException { - IoTConsensusServerImpl impl = Optional.ofNullable(stateMachineMap.get(groupId)) - .orElseThrow(() -> new ConsensusGroupNotExistException(groupId)); + IoTConsensusServerImpl impl = + Optional.ofNullable(stateMachineMap.get(groupId)) + .orElseThrow(() -> new ConsensusGroupNotExistException(groupId)); try { impl.takeSnapshot(); } catch (ConsensusGroupModifyPeerException e) { diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java index 6d3d1e5567c..df72318f4ee 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java @@ -19,7 +19,6 @@ package org.apache.iotdb.consensus.ratis; -import java.util.Optional; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; @@ -95,6 +94,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; @@ -103,16 +103,12 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -/** - * A multi-raft consensus implementation based on Apache Ratis. - */ +/** A multi-raft consensus implementation based on Apache Ratis. */ class RatisConsensus implements IConsensus { private static final Logger logger = LoggerFactory.getLogger(RatisConsensus.class); - /** - * the unique net communication endpoint - */ + /** the unique net communication endpoint */ private final RaftPeer myself; private final RaftServer server; @@ -120,7 +116,8 @@ class RatisConsensus implements IConsensus { private final RaftProperties properties = new RaftProperties(); private final RaftClientRpc clientRpc; - private final IClientManager<RaftGroup, org.apache.iotdb.consensus.ratis.RatisClient> clientManager; + private final IClientManager<RaftGroup, org.apache.iotdb.consensus.ratis.RatisClient> + clientManager; private final Map<RaftGroupId, RaftGroup> lastSeen = new ConcurrentHashMap<>(); @@ -215,9 +212,7 @@ class RatisConsensus implements IConsensus { return !reply.isSuccess() && (reply.getException() instanceof ResourceUnavailableException); } - /** - * launch a consensus write with retry mechanism - */ + /** launch a consensus write with retry mechanism */ private RaftClientReply writeWithRetry(CheckedSupplier<RaftClientReply, IOException> caller) throws IOException { @@ -257,8 +252,7 @@ class RatisConsensus implements IConsensus { } private RaftClientReply writeRemotelyWithRetry( - org.apache.iotdb.consensus.ratis.RatisClient client, Message message) - throws IOException { + org.apache.iotdb.consensus.ratis.RatisClient client, Message message) throws IOException { return writeWithRetry(() -> client.getRaftClient().io().send(message)); } @@ -299,8 +293,10 @@ class RatisConsensus implements IConsensus { RatisMetricsManager.getInstance().startWriteLocallyTimer(consensusGroupType)) { RaftClientReply localServerReply = writeLocallyWithRetry(clientRequest); if (localServerReply.isSuccess()) { - org.apache.iotdb.consensus.ratis.ResponseMessage responseMessage = (org.apache.iotdb.consensus.ratis.ResponseMessage) localServerReply.getMessage(); - return Optional.ofNullable(responseMessage.getContentHolder()).map(TSStatus.class::cast) + org.apache.iotdb.consensus.ratis.ResponseMessage responseMessage = + (org.apache.iotdb.consensus.ratis.ResponseMessage) localServerReply.getMessage(); + return Optional.ofNullable(responseMessage.getContentHolder()) + .map(TSStatus.class::cast) .orElse(null); } NotLeaderException ex = localServerReply.getNotLeaderException(); @@ -315,7 +311,7 @@ class RatisConsensus implements IConsensus { // 2. try raft client TSStatus writeResult; try (AutoCloseable ignored = - RatisMetricsManager.getInstance().startWriteRemotelyTimer(consensusGroupType); + RatisMetricsManager.getInstance().startWriteRemotelyTimer(consensusGroupType); org.apache.iotdb.consensus.ratis.RatisClient client = getRaftClient(raftGroup)) { RaftClientReply reply = writeRemotelyWithRetry(client, message); if (!reply.isSuccess()) { @@ -333,9 +329,7 @@ class RatisConsensus implements IConsensus { return writeResult; } - /** - * Read directly from LOCAL COPY notice: - */ + /** Read directly from LOCAL COPY notice: */ @Override public DataSet read(ConsensusGroupId groupId, IConsensusRequest request) throws ConsensusException { @@ -367,14 +361,14 @@ class RatisConsensus implements IConsensus { throw new RatisRequestFailedException(e); } Message ret = reply.getMessage(); - org.apache.iotdb.consensus.ratis.ResponseMessage readResponseMessage = (org.apache.iotdb.consensus.ratis.ResponseMessage) ret; - return Optional.ofNullable(readResponseMessage.getContentHolder()).map(DataSet.class::cast) + org.apache.iotdb.consensus.ratis.ResponseMessage readResponseMessage = + (org.apache.iotdb.consensus.ratis.ResponseMessage) ret; + return Optional.ofNullable(readResponseMessage.getContentHolder()) + .map(DataSet.class::cast) .orElse(null); } - /** - * return a success raft client reply or throw an Exception - */ + /** return a success raft client reply or throw an Exception */ private RaftClientReply doRead( RaftGroupId gid, IConsensusRequest readRequest, boolean linearizable) throws Exception { final RaftClientRequest.Type readType = @@ -412,8 +406,8 @@ class RatisConsensus implements IConsensus { RaftGroup clientGroup = group.getPeers().isEmpty() ? RaftGroup.valueOf(group.getGroupId(), myself) : group; try (org.apache.iotdb.consensus.ratis.RatisClient client = getRaftClient(clientGroup)) { - RaftClientReply reply = client.getRaftClient().getGroupManagementApi(myself.getId()) - .add(group); + RaftClientReply reply = + client.getRaftClient().getGroupManagementApi(myself.getId()).add(group); if (!reply.isSuccess()) { throw new RatisRequestFailedException(reply.getException()); } @@ -522,15 +516,14 @@ class RatisConsensus implements IConsensus { return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build(); } - /** * NOTICE: transferLeader *does not guarantee* the leader be transferred to newLeader. * transferLeader is implemented by 1. modify peer priority 2. ask current leader to step down * - * <p>1. call setConfiguration to upgrade newLeader's priority to 1 and degrade all follower - * peers to 0. By default, Ratis gives every Raft Peer same priority 0. Ratis does not allow a - * peer with priority <= currentLeader.priority to becomes the leader, so we have to upgrade - * leader's priority to 1 + * <p>1. call setConfiguration to upgrade newLeader's priority to 1 and degrade all follower peers + * to 0. By default, Ratis gives every Raft Peer same priority 0. Ratis does not allow a peer with + * priority <= currentLeader.priority to becomes the leader, so we have to upgrade leader's + * priority to 1 * * <p>2. call transferLeadership to force current leader to step down and raise a new round of * election. In this election, the newLeader peer with priority 1 is guaranteed to be elected. @@ -540,8 +533,9 @@ class RatisConsensus implements IConsensus { // first fetch the newest information RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId); - RaftGroup raftGroup = Optional.ofNullable(getGroupInfo(raftGroupId)) - .orElseThrow(() -> new ConsensusGroupNotExistException(groupId)); + RaftGroup raftGroup = + Optional.ofNullable(getGroupInfo(raftGroupId)) + .orElseThrow(() -> new ConsensusGroupNotExistException(groupId)); RaftPeer newRaftLeader = Utils.fromPeerAndPriorityToRaftPeer(newLeader, LEADER_PRIORITY); diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java index 5544bad9a53..3ea153d25fe 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java @@ -32,7 +32,6 @@ import org.apache.iotdb.consensus.IStateMachine.Registry; import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.common.request.IConsensusRequest; -import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse; import org.apache.iotdb.consensus.config.ConsensusConfig; import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java index 05ef52758e8..e8d97bd8883 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java @@ -31,7 +31,6 @@ import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.consensus.IConsensus; -import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse; import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -166,25 +165,21 @@ public class RegionWriteExecutor { try { TSStatus status = executePlanNodeInConsensusLayer(context.getRegionId(), node); - response.setAccepted( - TSStatusCode.SUCCESS_STATUS.getStatusCode() == status.getCode()); + response.setAccepted(TSStatusCode.SUCCESS_STATUS.getStatusCode() == status.getCode()); response.setMessage(status.getMessage()); response.setStatus(status); } catch (ConsensusException e) { - LOGGER.error( - "Something wrong happened while calling consensus layer's write API.", - e); + LOGGER.error("Something wrong happened while calling consensus layer's write API.", e); response.setAccepted(false); response.setMessage(e.toString()); response.setStatus( - RpcUtils.getStatus( - TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage())); + RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage())); } return response; } - private TSStatus executePlanNodeInConsensusLayer( - ConsensusGroupId groupId, PlanNode planNode) throws ConsensusException { + private TSStatus executePlanNodeInConsensusLayer(ConsensusGroupId groupId, PlanNode planNode) + throws ConsensusException { if (groupId instanceof DataRegionId) { return dataRegionConsensus.write(groupId, planNode); } else { @@ -227,32 +222,26 @@ public class RegionWriteExecutor { RegionExecutionResult response = new RegionExecutionResult(); context.getRegionWriteValidationRWLock().readLock().lock(); try { - TSStatus status = - fireTriggerAndInsert(context.getRegionId(), insertNode); - response.setAccepted( - TSStatusCode.SUCCESS_STATUS.getStatusCode() == status.getCode()); + TSStatus status = fireTriggerAndInsert(context.getRegionId(), insertNode); + response.setAccepted(TSStatusCode.SUCCESS_STATUS.getStatusCode() == status.getCode()); response.setMessage(status.message); if (!response.isAccepted()) { response.setStatus(status); } return response; } catch (ConsensusException e) { - LOGGER.warn( - "Something wrong happened while calling consensus layer's write API.", - e); + LOGGER.warn("Something wrong happened while calling consensus layer's write API.", e); response.setAccepted(false); response.setMessage(e.toString()); - response.setStatus( - RpcUtils.getStatus( - TSStatusCode.WRITE_PROCESS_ERROR, e.toString())); + response.setStatus(RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_ERROR, e.toString())); return response; } finally { context.getRegionWriteValidationRWLock().readLock().unlock(); } } - private TSStatus fireTriggerAndInsert( - ConsensusGroupId groupId, PlanNode planNode) throws ConsensusException { + private TSStatus fireTriggerAndInsert(ConsensusGroupId groupId, PlanNode planNode) + throws ConsensusException { long triggerCostTime = 0; TSStatus status; long startTime = System.nanoTime(); @@ -260,8 +249,10 @@ public class RegionWriteExecutor { TriggerFireResult result = triggerFireVisitor.process(planNode, TriggerEvent.BEFORE_INSERT); triggerCostTime += (System.nanoTime() - startTime); if (result.equals(TriggerFireResult.TERMINATION)) { - status = RpcUtils.getStatus(TSStatusCode.TRIGGER_FIRE_ERROR.getStatusCode(), - "Failed to complete the insertion because trigger error before the insertion."); + status = + RpcUtils.getStatus( + TSStatusCode.TRIGGER_FIRE_ERROR.getStatusCode(), + "Failed to complete the insertion because trigger error before the insertion."); } else { boolean hasFailedTriggerBeforeInsertion = result.equals(TriggerFireResult.FAILED_NO_TERMINATION); @@ -274,8 +265,10 @@ public class RegionWriteExecutor { startTime = System.nanoTime(); result = triggerFireVisitor.process(planNode, TriggerEvent.AFTER_INSERT); if (hasFailedTriggerBeforeInsertion || !result.equals(TriggerFireResult.SUCCESS)) { - status = RpcUtils.getStatus(TSStatusCode.TRIGGER_FIRE_ERROR.getStatusCode(), - "Meet trigger error before/after the insertion, the insertion itself is completed."); + status = + RpcUtils.getStatus( + TSStatusCode.TRIGGER_FIRE_ERROR.getStatusCode(), + "Meet trigger error before/after the insertion, the insertion itself is completed."); } triggerCostTime += (System.nanoTime() - startTime); } @@ -792,7 +785,7 @@ public class RegionWriteExecutor { String.format( "Template is being unsetting from prefix path of %s. Please try activating later.", new PartialPath( - Arrays.copyOf(entry.getKey().getNodes(), entry.getValue().right + 1)) + Arrays.copyOf(entry.getKey().getNodes(), entry.getValue().right + 1)) .getFullPath()); result.setMessage(message); result.setStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, message));
