This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/AllowRetry
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4bc006f1fba9ed8b7dfb89f2b1b3f5c16b93fa89
Author: JackieTien97 <[email protected]>
AuthorDate: Tue Aug 13 14:55:18 2024 +0800

    retry for too many queries error
---
 .../impl/DataNodeInternalRPCServiceImpl.java       |  1 +
 .../execution/executor/RegionReadExecutor.java     |  7 ++++
 .../fragment/FragmentInstanceContext.java          | 29 +++++++++++++++--
 .../fragment/FragmentInstanceExecution.java        | 22 ++++++++++---
 .../execution/fragment/FragmentInstanceInfo.java   | 20 ++++++++++++
 .../fragment/FragmentInstanceManager.java          | 38 ++++++++++++++++++----
 .../scheduler/AbstractFragInsStateTracker.java     |  3 +-
 .../plan/scheduler/ClusterScheduler.java           |  4 ++-
 .../scheduler/FixedRateFragInsStateTracker.java    |  6 ++++
 .../scheduler/FragmentInstanceDispatcherImpl.java  |  8 +++++
 .../src/main/thrift/datanode.thrift                |  1 +
 11 files changed, 123 insertions(+), 16 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index cf082082796..9a2201075d2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -406,6 +406,7 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
           failureInfoList.add(failureInfo.serialize());
         }
         resp.setFailureInfoList(failureInfoList);
+        info.getErrorCode().ifPresent(resp::setErrorCode);
         return resp;
       } catch (IOException e) {
         return resp;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
index 3b6a0f0adb0..8e9a16e9b14 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.execution.executor;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.DataSet;
@@ -92,6 +93,12 @@ public class RegionReadExecutor {
         FragmentInstanceInfo info = (FragmentInstanceInfo) readResponse;
         resp.setAccepted(!info.getState().isFailed());
         resp.setMessage(info.getMessage());
+        info.getErrorCode()
+            .ifPresent(
+                s -> {
+                  resp.setStatus(s);
+                  resp.setNeedRetry(StatusUtils.needRetryHelper(s));
+                });
       }
       return resp;
     } catch (ConsensusGroupNotExistException e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index 4e8928deebb..9648b1701fb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.queryengine.execution.fragment;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.path.IFullPath;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -312,6 +314,18 @@ public class FragmentInstanceContext extends QueryContext {
         .collect(Collectors.toList());
   }
 
+  public Optional<TSStatus> getErrorCode() {
+    return stateMachine.getFailureCauses().stream()
+        .filter(IoTDBException.class::isInstance)
+        .findFirst()
+        .flatMap(
+            t -> {
+              TSStatus status = new TSStatus(((IoTDBException) 
t).getErrorCode());
+              status.setMessage(t.getMessage());
+              return Optional.of(status);
+            });
+  }
+
   public void finished() {
     stateMachine.finished();
   }
@@ -350,8 +364,19 @@ public class FragmentInstanceContext extends QueryContext {
   }
 
   public FragmentInstanceInfo getInstanceInfo() {
-    return new FragmentInstanceInfo(
-        stateMachine.getState(), getEndTime(), getFailedCause(), 
getFailureInfoList());
+    return getErrorCode()
+        .map(
+            s ->
+                new FragmentInstanceInfo(
+                    stateMachine.getState(),
+                    getEndTime(),
+                    getFailedCause(),
+                    getFailureInfoList(),
+                    s))
+        .orElseGet(
+            () ->
+                new FragmentInstanceInfo(
+                    stateMachine.getState(), getEndTime(), getFailedCause(), 
getFailureInfoList()));
   }
 
   public FragmentInstanceStateMachine getStateMachine() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
index f631ea8a246..c42834f5190 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
@@ -117,11 +117,23 @@ public class FragmentInstanceExecution {
   }
 
   public FragmentInstanceInfo getInstanceInfo() {
-    return new FragmentInstanceInfo(
-        stateMachine.getState(),
-        context.getEndTime(),
-        context.getFailedCause(),
-        context.getFailureInfoList());
+    return context
+        .getErrorCode()
+        .map(
+            s ->
+                new FragmentInstanceInfo(
+                    stateMachine.getState(),
+                    context.getEndTime(),
+                    context.getFailedCause(),
+                    context.getFailureInfoList(),
+                    s))
+        .orElseGet(
+            () ->
+                new FragmentInstanceInfo(
+                    stateMachine.getState(),
+                    context.getEndTime(),
+                    context.getFailedCause(),
+                    context.getFailureInfoList()));
   }
 
   public long getStartTime() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceInfo.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceInfo.java
index 9c67d67de64..4717a23f279 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceInfo.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceInfo.java
@@ -19,9 +19,11 @@
 
 package org.apache.iotdb.db.queryengine.execution.fragment;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.consensus.common.DataSet;
 
 import java.util.List;
+import java.util.Optional;
 
 public class FragmentInstanceInfo implements DataSet {
   private final FragmentInstanceState state;
@@ -30,6 +32,8 @@ public class FragmentInstanceInfo implements DataSet {
 
   private List<FragmentInstanceFailureInfo> failureInfoList;
 
+  private TSStatus errorCode;
+
   public FragmentInstanceInfo(FragmentInstanceState state) {
     this.state = state;
   }
@@ -49,6 +53,18 @@ public class FragmentInstanceInfo implements DataSet {
     this.failureInfoList = failureInfoList;
   }
 
+  public FragmentInstanceInfo(
+      FragmentInstanceState state,
+      long endTime,
+      String message,
+      List<FragmentInstanceFailureInfo> failureInfoList,
+      TSStatus errorStatus) {
+    this(state, endTime);
+    this.message = message;
+    this.failureInfoList = failureInfoList;
+    this.errorCode = errorStatus;
+  }
+
   public FragmentInstanceState getState() {
     return state;
   }
@@ -61,6 +77,10 @@ public class FragmentInstanceInfo implements DataSet {
     return message;
   }
 
+  public Optional<TSStatus> getErrorCode() {
+    return Optional.ofNullable(errorCode);
+  }
+
   public List<FragmentInstanceFailureInfo> getFailureInfoList() {
     return failureInfoList;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
index 0886cb406bb..1a034eda478 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.queryengine.execution.fragment;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
@@ -49,6 +50,7 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
@@ -181,8 +183,18 @@ public class FragmentInstanceManager {
                       exchangeManager);
                 } catch (Throwable t) {
                   clearFIRelatedResources(instanceId);
-                  logger.warn("error when create FragmentInstanceExecution.", 
t);
-                  stateMachine.failed(t);
+                  // deal with
+                  if (t instanceof IllegalStateException
+                      && 
TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG.equals(t.getMessage())) {
+                    logger.warn(TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG);
+                    stateMachine.failed(
+                        new IoTDBException(
+                            TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG,
+                            
TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode()));
+                  } else {
+                    logger.warn("error when create 
FragmentInstanceExecution.", t);
+                    stateMachine.failed(t);
+                  }
                   return null;
                 }
               });
@@ -367,11 +379,23 @@ public class FragmentInstanceManager {
 
   private FragmentInstanceInfo createFailedInstanceInfo(FragmentInstanceId 
instanceId) {
     FragmentInstanceContext context = instanceContext.get(instanceId);
-    return new FragmentInstanceInfo(
-        FragmentInstanceState.FAILED,
-        context.getEndTime(),
-        context.getFailedCause(),
-        context.getFailureInfoList());
+    Optional<TSStatus> errorCode = context.getErrorCode();
+    return errorCode
+        .map(
+            tsStatus ->
+                new FragmentInstanceInfo(
+                    FragmentInstanceState.FAILED,
+                    context.getEndTime(),
+                    context.getFailedCause(),
+                    context.getFailureInfoList(),
+                    tsStatus))
+        .orElseGet(
+            () ->
+                new FragmentInstanceInfo(
+                    FragmentInstanceState.FAILED,
+                    context.getEndTime(),
+                    context.getFailedCause(),
+                    context.getFailureInfoList()));
   }
 
   private void removeOldInstances() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AbstractFragInsStateTracker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AbstractFragInsStateTracker.java
index 305c41b9aef..dd6d5945cf6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AbstractFragInsStateTracker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AbstractFragInsStateTracker.java
@@ -95,7 +95,8 @@ public abstract class AbstractFragInsStateTracker implements 
IFragInstanceStateT
             FragmentInstanceState.valueOf(resp.getState()),
             resp.getEndTime(),
             failedMessage,
-            failureInfoList);
+            failureInfoList,
+            resp.getErrorCode());
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
index 1a704fc4706..448f0829b5a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
@@ -105,7 +105,9 @@ public class ClusterScheduler implements IScheduler {
   private boolean needRetry(TSStatus failureStatus) {
     return failureStatus != null
         && queryType == QueryType.READ
-        && failureStatus.getCode() == 
TSStatusCode.DISPATCH_ERROR.getStatusCode();
+        && (failureStatus.getCode() == 
TSStatusCode.DISPATCH_ERROR.getStatusCode()
+            || failureStatus.getCode()
+                == 
TSStatusCode.TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode());
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
index 36e4bdc99ac..31b5742a9d0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
 import org.apache.iotdb.db.queryengine.execution.QueryStateMachine;
 import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceInfo;
@@ -149,6 +150,11 @@ public class FixedRateFragInsStateTracker extends 
AbstractFragInsStateTracker {
             new RuntimeException(
                 String.format(
                     "FragmentInstance[%s] is failed. %s", instanceId, 
instanceInfo.getMessage())));
+      } else if (instanceInfo.getErrorCode().isPresent()) {
+        stateMachine.transitionToFailed(
+            new IoTDBException(
+                instanceInfo.getErrorCode().get().getMessage(),
+                instanceInfo.getErrorCode().get().getCode()));
       } else {
         
stateMachine.transitionToFailed(instanceInfo.getFailureInfoList().get(0).toException());
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 41063b49571..8c4e79f5d44 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -334,6 +334,9 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
               } else if (sendFragmentInstanceResp.status.getCode()
                   == TSStatusCode.CONSENSUS_GROUP_NOT_EXIST.getStatusCode()) {
                 throw new 
ConsensusGroupNotExistException(sendFragmentInstanceResp.message);
+              } else if (sendFragmentInstanceResp.status.getCode()
+                  == 
TSStatusCode.TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode()) {
+                throw new 
FragmentInstanceDispatchException(sendFragmentInstanceResp.status);
               }
             }
             throw new FragmentInstanceDispatchException(
@@ -453,6 +456,11 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
         if (!readResult.isAccepted()) {
           LOGGER.warn(readResult.getMessage());
           if (readResult.isNeedRetry()) {
+            if (readResult.getStatus() != null
+                && readResult.getStatus().getCode()
+                    == 
TSStatusCode.TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode()) {
+              throw new 
FragmentInstanceDispatchException(readResult.getStatus());
+            }
             throw new FragmentInstanceDispatchException(
                 RpcUtils.getStatus(TSStatusCode.DISPATCH_ERROR, 
readResult.getMessage()));
           } else {
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index e63a6f77622..cd45a72e46b 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -165,6 +165,7 @@ struct TFragmentInstanceInfoResp {
   2: optional i64 endTime
   3: optional list<string> failedMessages
   4: optional list<binary> failureInfoList
+  5: optional common.TSStatus errorCode
 }
 
 struct TCancelQueryReq {

Reply via email to