This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/TableModelGrammar in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5a889412bdfecb0d805797aa06ccc342eeb9dffb Author: Xiangpeng Hu <[email protected]> AuthorDate: Thu Jul 25 09:43:10 2024 +0800 [Region Migration] Add retry when the read region does not exist (#13001) * add retry * use TSStatus * use TSStatus * modify if judgment * merge exception (cherry picked from commit 97ce3e8a84acd332c92d046f4548cdaabc7f2f5c) --- .../java/org/apache/iotdb/rpc/TSStatusCode.java | 4 +- .../exception/ConsensusGroupNotExistException.java | 5 ++ .../impl/DataNodeInternalRPCServiceImpl.java | 1 + .../execution/executor/RegionReadExecutor.java | 7 +++ .../scheduler/FragmentInstanceDispatcherImpl.java | 65 +++++++++++++++------- .../src/main/thrift/datanode.thrift | 1 + 6 files changed, 61 insertions(+), 22 deletions(-) diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index 6ecbb3061b2..abbe1425a22 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -270,13 +270,15 @@ public enum TSStatusCode { ALTER_CONSUMER_ERROR(2102), CONSUMER_PUSH_META_ERROR(2103), - // Pipe Consensus + // Consensus Exception PIPE_CONSENSUS_CONNECTOR_RESTART_ERROR(2200), PIPE_CONSENSUS_VERSION_ERROR(2201), PIPE_CONSENSUS_DEPRECATED_REQUEST(2202), PIPE_CONSENSUS_TRANSFER_FILE_OFFSET_RESET(2203), PIPE_CONSENSUS_TRANSFER_FILE_ERROR(2204), PIPE_CONSENSUS_TYPE_ERROR(2205), + CONSENSUS_GROUP_NOT_EXIST(2206), + RATIS_READ_UNAVAILABLE(2207), ; private final int statusCode; diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupNotExistException.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupNotExistException.java index 1354772d3be..d80df6a615c 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupNotExistException.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupNotExistException.java @@ -25,6 +25,11 @@ public class ConsensusGroupNotExistException extends ConsensusException { private final transient ConsensusGroupId groupId; + public ConsensusGroupNotExistException(String cause) { + super(cause); + this.groupId = null; + } + public ConsensusGroupNotExistException(ConsensusGroupId groupId) { super(String.format("The consensus group %s doesn't exist", groupId)); this.groupId = groupId; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 2203842addd..4a25a2a7611 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -366,6 +366,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface resp.setAccepted(executionResult.isAccepted()); resp.setMessage(executionResult.getMessage()); resp.setNeedRetry(executionResult.isNeedRetry()); + resp.setStatus(executionResult.getStatus()); return resp; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java index 37c6639fb2c..016c3ed21fb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java @@ -19,11 +19,13 @@ package org.apache.iotdb.db.queryengine.execution.executor; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.consensus.IConsensus; import org.apache.iotdb.consensus.common.DataSet; +import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException; import org.apache.iotdb.db.consensus.DataRegionConsensusImpl; import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl; import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceInfo; @@ -31,6 +33,7 @@ import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManage import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; import org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion; import org.apache.iotdb.db.utils.SetThreadName; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.ratis.protocol.exceptions.NotLeaderException; import org.apache.ratis.protocol.exceptions.ReadException; @@ -100,6 +103,10 @@ public class RegionReadExecutor { || t instanceof NotLeaderException || t instanceof ServerNotReadyException) { resp.setNeedRetry(true); + resp.setStatus(new TSStatus(TSStatusCode.RATIS_READ_UNAVAILABLE.getStatusCode())); + } else if (t instanceof ConsensusGroupNotExistException) { + resp.setNeedRetry(true); + resp.setStatus(new TSStatus(TSStatusCode.CONSENSUS_GROUP_NOT_EXIST.getStatusCode())); } return resp; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java index 7e8de27460e..41063b49571 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -29,6 +29,7 @@ import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics; +import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException; import org.apache.iotdb.consensus.exception.RatisReadUnavailableException; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException; @@ -308,7 +309,8 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { throws FragmentInstanceDispatchException, TException, ClientManagerException, - RatisReadUnavailableException { + RatisReadUnavailableException, + ConsensusGroupNotExistException { try (final SyncDataNodeInternalServiceClient client = syncInternalServiceClientManager.borrowClient(endPoint)) { switch (instance.getType()) { @@ -324,13 +326,19 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { if (!sendFragmentInstanceResp.accepted) { LOGGER.warn(sendFragmentInstanceResp.message); if (sendFragmentInstanceResp.isSetNeedRetry() - && sendFragmentInstanceResp.isNeedRetry()) { - throw new RatisReadUnavailableException(sendFragmentInstanceResp.message); - } else { - throw new FragmentInstanceDispatchException( - RpcUtils.getStatus( - TSStatusCode.EXECUTE_STATEMENT_ERROR, sendFragmentInstanceResp.message)); + && sendFragmentInstanceResp.isNeedRetry() + && sendFragmentInstanceResp.status != null) { + if (sendFragmentInstanceResp.status.getCode() + == TSStatusCode.RATIS_READ_UNAVAILABLE.getStatusCode()) { + throw new RatisReadUnavailableException(sendFragmentInstanceResp.message); + } else if (sendFragmentInstanceResp.status.getCode() + == TSStatusCode.CONSENSUS_GROUP_NOT_EXIST.getStatusCode()) { + throw new ConsensusGroupNotExistException(sendFragmentInstanceResp.message); + } } + throw new FragmentInstanceDispatchException( + RpcUtils.getStatus( + TSStatusCode.EXECUTE_STATEMENT_ERROR, sendFragmentInstanceResp.message)); } break; case WRITE: @@ -378,6 +386,21 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { } } + private void dispatchRemoteFailed(TEndPoint endPoint, Exception e) + throws FragmentInstanceDispatchException { + LOGGER.warn( + "can't execute request on node {} in second try, error msg is {}.", + endPoint, + ExceptionUtils.getRootCause(e).toString()); + TSStatus status = new TSStatus(); + status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode()); + status.setMessage("can't connect to node " + endPoint); + // If the DataNode cannot be connected, its endPoint will be put into black list + // so that the following retry will avoid dispatching instance towards this DataNode. + queryContext.addFailedEndPoint(endPoint); + throw new FragmentInstanceDispatchException(status); + } + private void dispatchRemote(FragmentInstance instance, TEndPoint endPoint) throws FragmentInstanceDispatchException { @@ -391,19 +414,14 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { // we just retry once to clear stale connection for a restart node. try { dispatchRemoteHelper(instance, endPoint); - } catch (ClientManagerException | TException | RatisReadUnavailableException e1) { - LOGGER.warn( - "can't execute request on node {} in second try, error msg is {}.", - endPoint, - ExceptionUtils.getRootCause(e1).toString()); - TSStatus status = new TSStatus(); - status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode()); - status.setMessage("can't connect to node " + endPoint); - // If the DataNode cannot be connected, its endPoint will be put into black list - // so that the following retry will avoid dispatching instance towards this DataNode. - queryContext.addFailedEndPoint(endPoint); - throw new FragmentInstanceDispatchException(status); + } catch (ClientManagerException + | TException + | RatisReadUnavailableException + | ConsensusGroupNotExistException e1) { + dispatchRemoteFailed(endPoint, e1); } + } catch (ConsensusGroupNotExistException e) { + dispatchRemoteFailed(endPoint, e); } } @@ -434,8 +452,13 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { : readExecutor.execute(groupId, instance); if (!readResult.isAccepted()) { LOGGER.warn(readResult.getMessage()); - throw new FragmentInstanceDispatchException( - RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, readResult.getMessage())); + if (readResult.isNeedRetry()) { + throw new FragmentInstanceDispatchException( + RpcUtils.getStatus(TSStatusCode.DISPATCH_ERROR, readResult.getMessage())); + } else { + throw new FragmentInstanceDispatchException( + RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, readResult.getMessage())); + } } break; case WRITE: diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift index 7165d20a5e7..e63a6f77622 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift @@ -133,6 +133,7 @@ struct TSendFragmentInstanceResp { 1: required bool accepted 2: optional string message 3: optional bool needRetry + 4: optional common.TSStatus status } struct TSendSinglePlanNodeReq {
