This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 97ce3e8a84a [Region Migration] Add retry when the read region does not
exist (#13001)
97ce3e8a84a is described below
commit 97ce3e8a84acd332c92d046f4548cdaabc7f2f5c
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Thu Jul 25 09:43:10 2024 +0800
[Region Migration] Add retry when the read region does not exist (#13001)
* add retry
* use TSStatus
* use TSStatus
* modify if judgment
* merge exception
---
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 4 +-
.../exception/ConsensusGroupNotExistException.java | 5 ++
.../impl/DataNodeInternalRPCServiceImpl.java | 1 +
.../execution/executor/RegionReadExecutor.java | 7 +++
.../scheduler/FragmentInstanceDispatcherImpl.java | 65 +++++++++++++++-------
.../src/main/thrift/datanode.thrift | 1 +
6 files changed, 61 insertions(+), 22 deletions(-)
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 4b8a5fee813..20fba208d4f 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -257,13 +257,15 @@ public enum TSStatusCode {
ALTER_CONSUMER_ERROR(2102),
CONSUMER_PUSH_META_ERROR(2103),
- // Pipe Consensus
+ // Consensus Exception
PIPE_CONSENSUS_CONNECTOR_RESTART_ERROR(2200),
PIPE_CONSENSUS_VERSION_ERROR(2201),
PIPE_CONSENSUS_DEPRECATED_REQUEST(2202),
PIPE_CONSENSUS_TRANSFER_FILE_OFFSET_RESET(2203),
PIPE_CONSENSUS_TRANSFER_FILE_ERROR(2204),
PIPE_CONSENSUS_TYPE_ERROR(2205),
+ CONSENSUS_GROUP_NOT_EXIST(2206),
+ RATIS_READ_UNAVAILABLE(2207),
;
private final int statusCode;
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupNotExistException.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupNotExistException.java
index 1354772d3be..d80df6a615c 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupNotExistException.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupNotExistException.java
@@ -25,6 +25,11 @@ public class ConsensusGroupNotExistException extends
ConsensusException {
private final transient ConsensusGroupId groupId;
+ public ConsensusGroupNotExistException(String cause) {
+ super(cause);
+ this.groupId = null;
+ }
+
public ConsensusGroupNotExistException(ConsensusGroupId groupId) {
super(String.format("The consensus group %s doesn't exist", groupId));
this.groupId = groupId;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index d5c894c13ad..ac2bf85ad75 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -358,6 +358,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
resp.setAccepted(executionResult.isAccepted());
resp.setMessage(executionResult.getMessage());
resp.setNeedRetry(executionResult.isNeedRetry());
+ resp.setStatus(executionResult.getStatus());
return resp;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
index 37c6639fb2c..016c3ed21fb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
@@ -19,11 +19,13 @@
package org.apache.iotdb.db.queryengine.execution.executor;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceInfo;
@@ -31,6 +33,7 @@ import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManage
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion;
import org.apache.iotdb.db.utils.SetThreadName;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.exceptions.ReadException;
@@ -100,6 +103,10 @@ public class RegionReadExecutor {
|| t instanceof NotLeaderException
|| t instanceof ServerNotReadyException) {
resp.setNeedRetry(true);
+ resp.setStatus(new
TSStatus(TSStatusCode.RATIS_READ_UNAVAILABLE.getStatusCode()));
+ } else if (t instanceof ConsensusGroupNotExistException) {
+ resp.setNeedRetry(true);
+ resp.setStatus(new
TSStatus(TSStatusCode.CONSENSUS_GROUP_NOT_EXIST.getStatusCode()));
}
return resp;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 7e8de27460e..41063b49571 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
+import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.consensus.exception.RatisReadUnavailableException;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
@@ -308,7 +309,8 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
throws FragmentInstanceDispatchException,
TException,
ClientManagerException,
- RatisReadUnavailableException {
+ RatisReadUnavailableException,
+ ConsensusGroupNotExistException {
try (final SyncDataNodeInternalServiceClient client =
syncInternalServiceClientManager.borrowClient(endPoint)) {
switch (instance.getType()) {
@@ -324,13 +326,19 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
if (!sendFragmentInstanceResp.accepted) {
LOGGER.warn(sendFragmentInstanceResp.message);
if (sendFragmentInstanceResp.isSetNeedRetry()
- && sendFragmentInstanceResp.isNeedRetry()) {
- throw new
RatisReadUnavailableException(sendFragmentInstanceResp.message);
- } else {
- throw new FragmentInstanceDispatchException(
- RpcUtils.getStatus(
- TSStatusCode.EXECUTE_STATEMENT_ERROR,
sendFragmentInstanceResp.message));
+ && sendFragmentInstanceResp.isNeedRetry()
+ && sendFragmentInstanceResp.status != null) {
+ if (sendFragmentInstanceResp.status.getCode()
+ == TSStatusCode.RATIS_READ_UNAVAILABLE.getStatusCode()) {
+ throw new
RatisReadUnavailableException(sendFragmentInstanceResp.message);
+ } else if (sendFragmentInstanceResp.status.getCode()
+ == TSStatusCode.CONSENSUS_GROUP_NOT_EXIST.getStatusCode()) {
+ throw new
ConsensusGroupNotExistException(sendFragmentInstanceResp.message);
+ }
}
+ throw new FragmentInstanceDispatchException(
+ RpcUtils.getStatus(
+ TSStatusCode.EXECUTE_STATEMENT_ERROR,
sendFragmentInstanceResp.message));
}
break;
case WRITE:
@@ -378,6 +386,21 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
}
}
+ private void dispatchRemoteFailed(TEndPoint endPoint, Exception e)
+ throws FragmentInstanceDispatchException {
+ LOGGER.warn(
+ "can't execute request on node {} in second try, error msg is {}.",
+ endPoint,
+ ExceptionUtils.getRootCause(e).toString());
+ TSStatus status = new TSStatus();
+ status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
+ status.setMessage("can't connect to node " + endPoint);
+ // If the DataNode cannot be connected, its endPoint will be put into
black list
+ // so that the following retry will avoid dispatching instance towards
this DataNode.
+ queryContext.addFailedEndPoint(endPoint);
+ throw new FragmentInstanceDispatchException(status);
+ }
+
private void dispatchRemote(FragmentInstance instance, TEndPoint endPoint)
throws FragmentInstanceDispatchException {
@@ -391,19 +414,14 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
// we just retry once to clear stale connection for a restart node.
try {
dispatchRemoteHelper(instance, endPoint);
- } catch (ClientManagerException | TException |
RatisReadUnavailableException e1) {
- LOGGER.warn(
- "can't execute request on node {} in second try, error msg is
{}.",
- endPoint,
- ExceptionUtils.getRootCause(e1).toString());
- TSStatus status = new TSStatus();
- status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
- status.setMessage("can't connect to node " + endPoint);
- // If the DataNode cannot be connected, its endPoint will be put into
black list
- // so that the following retry will avoid dispatching instance towards
this DataNode.
- queryContext.addFailedEndPoint(endPoint);
- throw new FragmentInstanceDispatchException(status);
+ } catch (ClientManagerException
+ | TException
+ | RatisReadUnavailableException
+ | ConsensusGroupNotExistException e1) {
+ dispatchRemoteFailed(endPoint, e1);
}
+ } catch (ConsensusGroupNotExistException e) {
+ dispatchRemoteFailed(endPoint, e);
}
}
@@ -434,8 +452,13 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
: readExecutor.execute(groupId, instance);
if (!readResult.isAccepted()) {
LOGGER.warn(readResult.getMessage());
- throw new FragmentInstanceDispatchException(
- RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
readResult.getMessage()));
+ if (readResult.isNeedRetry()) {
+ throw new FragmentInstanceDispatchException(
+ RpcUtils.getStatus(TSStatusCode.DISPATCH_ERROR,
readResult.getMessage()));
+ } else {
+ throw new FragmentInstanceDispatchException(
+ RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
readResult.getMessage()));
+ }
}
break;
case WRITE:
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 854e3cf0b29..cb39f4937bc 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -133,6 +133,7 @@ struct TSendFragmentInstanceResp {
1: required bool accepted
2: optional string message
3: optional bool needRetry
+ 4: optional common.TSStatus status
}
struct TSendSinglePlanNodeReq {