This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5a889412bdfecb0d805797aa06ccc342eeb9dffb
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Thu Jul 25 09:43:10 2024 +0800

    [Region Migration] Add retry when the read region does not exist (#13001)
    
    * add retry
    
    * use TSStatus
    
    * use TSStatus
    
    * modify if judgment
    
    * merge exception
    
    (cherry picked from commit 97ce3e8a84acd332c92d046f4548cdaabc7f2f5c)
---
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |  4 +-
 .../exception/ConsensusGroupNotExistException.java |  5 ++
 .../impl/DataNodeInternalRPCServiceImpl.java       |  1 +
 .../execution/executor/RegionReadExecutor.java     |  7 +++
 .../scheduler/FragmentInstanceDispatcherImpl.java  | 65 +++++++++++++++-------
 .../src/main/thrift/datanode.thrift                |  1 +
 6 files changed, 61 insertions(+), 22 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 6ecbb3061b2..abbe1425a22 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -270,13 +270,15 @@ public enum TSStatusCode {
   ALTER_CONSUMER_ERROR(2102),
   CONSUMER_PUSH_META_ERROR(2103),
 
-  // Pipe Consensus
+  // Consensus Exception
   PIPE_CONSENSUS_CONNECTOR_RESTART_ERROR(2200),
   PIPE_CONSENSUS_VERSION_ERROR(2201),
   PIPE_CONSENSUS_DEPRECATED_REQUEST(2202),
   PIPE_CONSENSUS_TRANSFER_FILE_OFFSET_RESET(2203),
   PIPE_CONSENSUS_TRANSFER_FILE_ERROR(2204),
   PIPE_CONSENSUS_TYPE_ERROR(2205),
+  CONSENSUS_GROUP_NOT_EXIST(2206),
+  RATIS_READ_UNAVAILABLE(2207),
   ;
 
   private final int statusCode;
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupNotExistException.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupNotExistException.java
index 1354772d3be..d80df6a615c 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupNotExistException.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupNotExistException.java
@@ -25,6 +25,11 @@ public class ConsensusGroupNotExistException extends 
ConsensusException {
 
   private final transient ConsensusGroupId groupId;
 
+  public ConsensusGroupNotExistException(String cause) {
+    super(cause);
+    this.groupId = null;
+  }
+
   public ConsensusGroupNotExistException(ConsensusGroupId groupId) {
     super(String.format("The consensus group %s doesn't exist", groupId));
     this.groupId = groupId;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 2203842addd..4a25a2a7611 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -366,6 +366,7 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     resp.setAccepted(executionResult.isAccepted());
     resp.setMessage(executionResult.getMessage());
     resp.setNeedRetry(executionResult.isNeedRetry());
+    resp.setStatus(executionResult.getStatus());
     return resp;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
index 37c6639fb2c..016c3ed21fb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
@@ -19,11 +19,13 @@
 
 package org.apache.iotdb.db.queryengine.execution.executor;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceInfo;
@@ -31,6 +33,7 @@ import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManage
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion;
 import org.apache.iotdb.db.utils.SetThreadName;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.apache.ratis.protocol.exceptions.ReadException;
@@ -100,6 +103,10 @@ public class RegionReadExecutor {
           || t instanceof NotLeaderException
           || t instanceof ServerNotReadyException) {
         resp.setNeedRetry(true);
+        resp.setStatus(new 
TSStatus(TSStatusCode.RATIS_READ_UNAVAILABLE.getStatusCode()));
+      } else if (t instanceof ConsensusGroupNotExistException) {
+        resp.setNeedRetry(true);
+        resp.setStatus(new 
TSStatus(TSStatusCode.CONSENSUS_GROUP_NOT_EXIST.getStatusCode()));
       }
       return resp;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 7e8de27460e..41063b49571 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
+import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.consensus.exception.RatisReadUnavailableException;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
@@ -308,7 +309,8 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
       throws FragmentInstanceDispatchException,
           TException,
           ClientManagerException,
-          RatisReadUnavailableException {
+          RatisReadUnavailableException,
+          ConsensusGroupNotExistException {
     try (final SyncDataNodeInternalServiceClient client =
         syncInternalServiceClientManager.borrowClient(endPoint)) {
       switch (instance.getType()) {
@@ -324,13 +326,19 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
           if (!sendFragmentInstanceResp.accepted) {
             LOGGER.warn(sendFragmentInstanceResp.message);
             if (sendFragmentInstanceResp.isSetNeedRetry()
-                && sendFragmentInstanceResp.isNeedRetry()) {
-              throw new 
RatisReadUnavailableException(sendFragmentInstanceResp.message);
-            } else {
-              throw new FragmentInstanceDispatchException(
-                  RpcUtils.getStatus(
-                      TSStatusCode.EXECUTE_STATEMENT_ERROR, 
sendFragmentInstanceResp.message));
+                && sendFragmentInstanceResp.isNeedRetry()
+                && sendFragmentInstanceResp.status != null) {
+              if (sendFragmentInstanceResp.status.getCode()
+                  == TSStatusCode.RATIS_READ_UNAVAILABLE.getStatusCode()) {
+                throw new 
RatisReadUnavailableException(sendFragmentInstanceResp.message);
+              } else if (sendFragmentInstanceResp.status.getCode()
+                  == TSStatusCode.CONSENSUS_GROUP_NOT_EXIST.getStatusCode()) {
+                throw new 
ConsensusGroupNotExistException(sendFragmentInstanceResp.message);
+              }
             }
+            throw new FragmentInstanceDispatchException(
+                RpcUtils.getStatus(
+                    TSStatusCode.EXECUTE_STATEMENT_ERROR, 
sendFragmentInstanceResp.message));
           }
           break;
         case WRITE:
@@ -378,6 +386,21 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
     }
   }
 
+  private void dispatchRemoteFailed(TEndPoint endPoint, Exception e)
+      throws FragmentInstanceDispatchException {
+    LOGGER.warn(
+        "can't execute request on node  {} in second try, error msg is {}.",
+        endPoint,
+        ExceptionUtils.getRootCause(e).toString());
+    TSStatus status = new TSStatus();
+    status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
+    status.setMessage("can't connect to node " + endPoint);
+    // If the DataNode cannot be connected, its endPoint will be put into 
black list
+    // so that the following retry will avoid dispatching instance towards 
this DataNode.
+    queryContext.addFailedEndPoint(endPoint);
+    throw new FragmentInstanceDispatchException(status);
+  }
+
   private void dispatchRemote(FragmentInstance instance, TEndPoint endPoint)
       throws FragmentInstanceDispatchException {
 
@@ -391,19 +414,14 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
       // we just retry once to clear stale connection for a restart node.
       try {
         dispatchRemoteHelper(instance, endPoint);
-      } catch (ClientManagerException | TException | 
RatisReadUnavailableException e1) {
-        LOGGER.warn(
-            "can't execute request on node  {} in second try, error msg is 
{}.",
-            endPoint,
-            ExceptionUtils.getRootCause(e1).toString());
-        TSStatus status = new TSStatus();
-        status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
-        status.setMessage("can't connect to node " + endPoint);
-        // If the DataNode cannot be connected, its endPoint will be put into 
black list
-        // so that the following retry will avoid dispatching instance towards 
this DataNode.
-        queryContext.addFailedEndPoint(endPoint);
-        throw new FragmentInstanceDispatchException(status);
+      } catch (ClientManagerException
+          | TException
+          | RatisReadUnavailableException
+          | ConsensusGroupNotExistException e1) {
+        dispatchRemoteFailed(endPoint, e1);
       }
+    } catch (ConsensusGroupNotExistException e) {
+      dispatchRemoteFailed(endPoint, e);
     }
   }
 
@@ -434,8 +452,13 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
                 : readExecutor.execute(groupId, instance);
         if (!readResult.isAccepted()) {
           LOGGER.warn(readResult.getMessage());
-          throw new FragmentInstanceDispatchException(
-              RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, 
readResult.getMessage()));
+          if (readResult.isNeedRetry()) {
+            throw new FragmentInstanceDispatchException(
+                RpcUtils.getStatus(TSStatusCode.DISPATCH_ERROR, 
readResult.getMessage()));
+          } else {
+            throw new FragmentInstanceDispatchException(
+                RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, 
readResult.getMessage()));
+          }
         }
         break;
       case WRITE:
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 7165d20a5e7..e63a6f77622 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -133,6 +133,7 @@ struct TSendFragmentInstanceResp {
   1: required bool accepted
   2: optional string message
   3: optional bool needRetry
+  4: optional common.TSStatus status
 }
 
 struct TSendSinglePlanNodeReq {

Reply via email to