This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/RetryTooManyQueries
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 227074be91c7d6fccdd41949842aa09501fbff32
Author: Jackie Tien <[email protected]>
AuthorDate: Tue Aug 13 17:47:03 2024 +0800

    resolve conflicts
---
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |  1 +
 .../metadata/AliasAlreadyExistException.java       |  4 +-
 .../metadata/AlignedTimeseriesException.java       |  7 +--
 .../metadata/DatabaseNotSetException.java          | 12 +++--
 .../metadata/IllegalParameterOfPathException.java  |  7 +--
 .../metadata/MeasurementAlreadyExistException.java |  4 +-
 .../metadata/PathAlreadyExistException.java        |  4 +-
 .../template/TemplateIncompatibleException.java    | 11 ++---
 .../impl/DataNodeInternalRPCServiceImpl.java       |  1 +
 .../execution/executor/RegionReadExecutor.java     |  7 +++
 .../fragment/FragmentInstanceContext.java          | 29 +++++++++++-
 .../fragment/FragmentInstanceExecution.java        | 22 +++++++--
 .../execution/fragment/FragmentInstanceInfo.java   | 20 ++++++++
 .../fragment/FragmentInstanceManager.java          | 55 ++++++++++++++++++----
 .../schedule/queue/IndexedBlockingQueue.java       |  5 +-
 .../analyze/schema/ClusterSchemaFetchExecutor.java |  7 +--
 .../scheduler/AbstractFragInsStateTracker.java     |  3 +-
 .../plan/scheduler/ClusterScheduler.java           |  4 +-
 .../scheduler/FixedRateFragInsStateTracker.java    |  6 +++
 .../scheduler/FragmentInstanceDispatcherImpl.java  |  8 ++++
 .../apache/iotdb/db/utils/ErrorHandlingUtils.java  |  8 +++-
 .../commons/exception/IllegalPathException.java    | 14 +++---
 .../exception/IllegalPrivilegeException.java       | 11 ++---
 .../iotdb/commons/exception/IoTDBException.java    |  7 ++-
 .../apache/iotdb/commons/utils/StatusUtils.java    |  1 +
 .../src/main/thrift/datanode.thrift                |  1 +
 26 files changed, 198 insertions(+), 61 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 80e31e2469d..7ce87a64886 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -116,6 +116,7 @@ public enum TSStatusCode {
   NO_SUCH_QUERY(714),
   QUERY_WAS_KILLED(715),
   EXPLAIN_ANALYZE_FETCH_ERROR(716),
+  TOO_MANY_CONCURRENT_QUERIES_ERROR(717),
 
   // Authentication
   INIT_AUTH_ERROR(800),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AliasAlreadyExistException.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AliasAlreadyExistException.java
index 2df35dead3d..9b8dcd7ca69 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AliasAlreadyExistException.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AliasAlreadyExistException.java
@@ -30,7 +30,7 @@ public class AliasAlreadyExistException extends 
MetadataException {
   public AliasAlreadyExistException(String path, String alias) {
     super(
         String.format("Alias [%s] for Path [%s] already exist", alias, path),
-        TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode());
-    this.isUserException = true;
+        TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode(),
+        true);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeseriesException.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeseriesException.java
index 1594d6b203b..f11c986afdc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeseriesException.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeseriesException.java
@@ -25,8 +25,9 @@ import org.apache.iotdb.rpc.TSStatusCode;
 public class AlignedTimeseriesException extends MetadataException {
 
   public AlignedTimeseriesException(String message, String path) {
-    super(String.format("%s (Path: %s)", message, path));
-    errorCode = TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode();
-    this.isUserException = true;
+    super(
+        String.format("%s (Path: %s)", message, path),
+        TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode(),
+        true);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DatabaseNotSetException.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DatabaseNotSetException.java
index ef395f5de48..7e700c117e4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DatabaseNotSetException.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DatabaseNotSetException.java
@@ -27,14 +27,16 @@ public class DatabaseNotSetException extends 
MetadataException {
   private static final long serialVersionUID = 3739300272099030533L;
 
   public DatabaseNotSetException(String path) {
-    super(String.format("Database is not set for current seriesPath: [%s]", 
path));
-    this.errorCode = TSStatusCode.DATABASE_NOT_EXIST.getStatusCode();
+    super(
+        String.format("Database is not set for current seriesPath: [%s]", 
path),
+        TSStatusCode.DATABASE_NOT_EXIST.getStatusCode());
   }
 
   public DatabaseNotSetException(String path, boolean isUserException) {
-    super(String.format("Database is not set for current seriesPath: [%s]", 
path));
-    this.isUserException = isUserException;
-    this.errorCode = TSStatusCode.DATABASE_NOT_EXIST.getStatusCode();
+    super(
+        String.format("Database is not set for current seriesPath: [%s]", 
path),
+        TSStatusCode.DATABASE_NOT_EXIST.getStatusCode(),
+        isUserException);
   }
 
   public DatabaseNotSetException(String path, String reason) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/IllegalParameterOfPathException.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/IllegalParameterOfPathException.java
index febe7b000b9..9ae6e79f406 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/IllegalParameterOfPathException.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/IllegalParameterOfPathException.java
@@ -25,8 +25,9 @@ import org.apache.iotdb.rpc.TSStatusCode;
 public class IllegalParameterOfPathException extends MetadataException {
 
   public IllegalParameterOfPathException(String msg, String path) {
-    super(String.format("%s. Failed to create timeseries for path %s", msg, 
path));
-    errorCode = TSStatusCode.ILLEGAL_PARAMETER.getStatusCode();
-    this.isUserException = true;
+    super(
+        String.format("%s. Failed to create timeseries for path %s", msg, 
path),
+        TSStatusCode.ILLEGAL_PARAMETER.getStatusCode(),
+        true);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/MeasurementAlreadyExistException.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/MeasurementAlreadyExistException.java
index 3b63c01eeff..a3d5adabac8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/MeasurementAlreadyExistException.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/MeasurementAlreadyExistException.java
@@ -31,8 +31,8 @@ public class MeasurementAlreadyExistException extends 
MetadataException {
   public MeasurementAlreadyExistException(String path, MeasurementPath 
measurementPath) {
     super(
         String.format("Path [%s] already exist", path),
-        TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode());
-    this.isUserException = true;
+        TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode(),
+        true);
     this.measurementPath = measurementPath;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/PathAlreadyExistException.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/PathAlreadyExistException.java
index d91ea75dba0..5da3c957cba 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/PathAlreadyExistException.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/PathAlreadyExistException.java
@@ -30,7 +30,7 @@ public class PathAlreadyExistException extends 
MetadataException {
   public PathAlreadyExistException(String path) {
     super(
         String.format("Path [%s] already exist", path),
-        TSStatusCode.PATH_ALREADY_EXIST.getStatusCode());
-    this.isUserException = true;
+        TSStatusCode.PATH_ALREADY_EXIST.getStatusCode(),
+        true);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/template/TemplateIncompatibleException.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/template/TemplateIncompatibleException.java
index bb1d0c1a997..cc39a3b50b2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/template/TemplateIncompatibleException.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/template/TemplateIncompatibleException.java
@@ -32,8 +32,8 @@ public class TemplateIncompatibleException extends 
MetadataException {
         String.format(
             "Cannot create timeseries [%s] since device template [%s] already 
set on path [%s].",
             path, templateName, templateSetPath),
-        TSStatusCode.TEMPLATE_INCOMPATIBLE.getStatusCode());
-    this.isUserException = true;
+        TSStatusCode.TEMPLATE_INCOMPATIBLE.getStatusCode(),
+        true);
   }
 
   public TemplateIncompatibleException(String templateName, PartialPath 
templateSetPath) {
@@ -42,12 +42,11 @@ public class TemplateIncompatibleException extends 
MetadataException {
             "Cannot set device template [%s] to path [%s] "
                 + "since there's timeseries under path [%s].",
             templateName, templateSetPath, templateSetPath),
-        TSStatusCode.TEMPLATE_INCOMPATIBLE.getStatusCode());
-    this.isUserException = true;
+        TSStatusCode.TEMPLATE_INCOMPATIBLE.getStatusCode(),
+        true);
   }
 
   public TemplateIncompatibleException(String reason) {
-    super(reason, TSStatusCode.TEMPLATE_INCOMPATIBLE.getStatusCode());
-    this.isUserException = true;
+    super(reason, TSStatusCode.TEMPLATE_INCOMPATIBLE.getStatusCode(), true);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index ac2bf85ad75..870aed465e4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -398,6 +398,7 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
           failureInfoList.add(failureInfo.serialize());
         }
         resp.setFailureInfoList(failureInfoList);
+        info.getErrorCode().ifPresent(resp::setErrorCode);
         return resp;
       } catch (IOException e) {
         return resp;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
index 3b6a0f0adb0..8e9a16e9b14 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.execution.executor;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.DataSet;
@@ -92,6 +93,12 @@ public class RegionReadExecutor {
         FragmentInstanceInfo info = (FragmentInstanceInfo) readResponse;
         resp.setAccepted(!info.getState().isFailed());
         resp.setMessage(info.getMessage());
+        info.getErrorCode()
+            .ifPresent(
+                s -> {
+                  resp.setStatus(s);
+                  resp.setNeedRetry(StatusUtils.needRetryHelper(s));
+                });
       }
       return resp;
     } catch (ConsensusGroupNotExistException e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index 7302e7f0b45..92ea44735c8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.queryengine.execution.fragment;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -310,6 +312,18 @@ public class FragmentInstanceContext extends QueryContext {
         .collect(Collectors.toList());
   }
 
+  public Optional<TSStatus> getErrorCode() {
+    return stateMachine.getFailureCauses().stream()
+        .filter(IoTDBException.class::isInstance)
+        .findFirst()
+        .flatMap(
+            t -> {
+              TSStatus status = new TSStatus(((IoTDBException) 
t).getErrorCode());
+              status.setMessage(t.getMessage());
+              return Optional.of(status);
+            });
+  }
+
   public void finished() {
     stateMachine.finished();
   }
@@ -348,8 +362,19 @@ public class FragmentInstanceContext extends QueryContext {
   }
 
   public FragmentInstanceInfo getInstanceInfo() {
-    return new FragmentInstanceInfo(
-        stateMachine.getState(), getEndTime(), getFailedCause(), 
getFailureInfoList());
+    return getErrorCode()
+        .map(
+            s ->
+                new FragmentInstanceInfo(
+                    stateMachine.getState(),
+                    getEndTime(),
+                    getFailedCause(),
+                    getFailureInfoList(),
+                    s))
+        .orElseGet(
+            () ->
+                new FragmentInstanceInfo(
+                    stateMachine.getState(), getEndTime(), getFailedCause(), 
getFailureInfoList()));
   }
 
   public FragmentInstanceStateMachine getStateMachine() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
index f631ea8a246..c42834f5190 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
@@ -117,11 +117,23 @@ public class FragmentInstanceExecution {
   }
 
   public FragmentInstanceInfo getInstanceInfo() {
-    return new FragmentInstanceInfo(
-        stateMachine.getState(),
-        context.getEndTime(),
-        context.getFailedCause(),
-        context.getFailureInfoList());
+    return context
+        .getErrorCode()
+        .map(
+            s ->
+                new FragmentInstanceInfo(
+                    stateMachine.getState(),
+                    context.getEndTime(),
+                    context.getFailedCause(),
+                    context.getFailureInfoList(),
+                    s))
+        .orElseGet(
+            () ->
+                new FragmentInstanceInfo(
+                    stateMachine.getState(),
+                    context.getEndTime(),
+                    context.getFailedCause(),
+                    context.getFailureInfoList()));
   }
 
   public long getStartTime() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceInfo.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceInfo.java
index 9c67d67de64..4717a23f279 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceInfo.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceInfo.java
@@ -19,9 +19,11 @@
 
 package org.apache.iotdb.db.queryengine.execution.fragment;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.consensus.common.DataSet;
 
 import java.util.List;
+import java.util.Optional;
 
 public class FragmentInstanceInfo implements DataSet {
   private final FragmentInstanceState state;
@@ -30,6 +32,8 @@ public class FragmentInstanceInfo implements DataSet {
 
   private List<FragmentInstanceFailureInfo> failureInfoList;
 
+  private TSStatus errorCode;
+
   public FragmentInstanceInfo(FragmentInstanceState state) {
     this.state = state;
   }
@@ -49,6 +53,18 @@ public class FragmentInstanceInfo implements DataSet {
     this.failureInfoList = failureInfoList;
   }
 
+  public FragmentInstanceInfo(
+      FragmentInstanceState state,
+      long endTime,
+      String message,
+      List<FragmentInstanceFailureInfo> failureInfoList,
+      TSStatus errorStatus) {
+    this(state, endTime);
+    this.message = message;
+    this.failureInfoList = failureInfoList;
+    this.errorCode = errorStatus;
+  }
+
   public FragmentInstanceState getState() {
     return state;
   }
@@ -61,6 +77,10 @@ public class FragmentInstanceInfo implements DataSet {
     return message;
   }
 
+  public Optional<TSStatus> getErrorCode() {
+    return Optional.ofNullable(errorCode);
+  }
+
   public List<FragmentInstanceFailureInfo> getFailureInfoList() {
     return failureInfoList;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
index 900865dc887..40a120da725 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
@@ -19,9 +19,11 @@
 
 package org.apache.iotdb.db.queryengine.execution.fragment;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
 import org.apache.iotdb.db.queryengine.common.QueryId;
@@ -48,6 +50,7 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
@@ -58,7 +61,9 @@ import java.util.concurrent.atomic.AtomicLong;
 import static java.util.Objects.requireNonNull;
 import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceExecution.createFragmentInstanceExecution;
+import static 
org.apache.iotdb.db.queryengine.execution.schedule.queue.IndexedBlockingQueue.TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG;
 import static 
org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.LOCAL_EXECUTION_PLANNER;
+import static 
org.apache.iotdb.rpc.TSStatusCode.TOO_MANY_CONCURRENT_QUERIES_ERROR;
 
 @SuppressWarnings("squid:S6548")
 public class FragmentInstanceManager {
@@ -178,8 +183,18 @@ public class FragmentInstanceManager {
                       exchangeManager);
                 } catch (Throwable t) {
                   clearFIRelatedResources(instanceId);
-                  logger.warn("error when create FragmentInstanceExecution.", 
t);
-                  stateMachine.failed(t);
+                  // deal with
+                  if (t instanceof IllegalStateException
+                      && 
TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG.equals(t.getMessage())) {
+                    logger.warn(TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG);
+                    stateMachine.failed(
+                        new IoTDBException(
+                            TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG,
+                            
TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode()));
+                  } else {
+                    logger.warn("error when create 
FragmentInstanceExecution.", t);
+                    stateMachine.failed(t);
+                  }
                   return null;
                 }
               });
@@ -259,8 +274,18 @@ public class FragmentInstanceManager {
                     exchangeManager);
               } catch (Throwable t) {
                 clearFIRelatedResources(instanceId);
-                logger.warn("Execute error caused by ", t);
-                stateMachine.failed(t);
+                // deal with
+                if (t instanceof IllegalStateException
+                    && 
TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG.equals(t.getMessage())) {
+                  logger.warn(TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG);
+                  stateMachine.failed(
+                      new IoTDBException(
+                          TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG,
+                          TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode()));
+                } else {
+                  logger.warn("Execute error caused by ", t);
+                  stateMachine.failed(t);
+                }
                 return null;
               }
             });
@@ -350,11 +375,23 @@ public class FragmentInstanceManager {
 
   private FragmentInstanceInfo createFailedInstanceInfo(FragmentInstanceId 
instanceId) {
     FragmentInstanceContext context = instanceContext.get(instanceId);
-    return new FragmentInstanceInfo(
-        FragmentInstanceState.FAILED,
-        context.getEndTime(),
-        context.getFailedCause(),
-        context.getFailureInfoList());
+    Optional<TSStatus> errorCode = context.getErrorCode();
+    return errorCode
+        .map(
+            tsStatus ->
+                new FragmentInstanceInfo(
+                    FragmentInstanceState.FAILED,
+                    context.getEndTime(),
+                    context.getFailedCause(),
+                    context.getFailureInfoList(),
+                    tsStatus))
+        .orElseGet(
+            () ->
+                new FragmentInstanceInfo(
+                    FragmentInstanceState.FAILED,
+                    context.getEndTime(),
+                    context.getFailedCause(),
+                    context.getFailureInfoList()));
   }
 
   private void removeOldInstances() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/IndexedBlockingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/IndexedBlockingQueue.java
index 1a63f01ffff..632b5b48ed6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/IndexedBlockingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/IndexedBlockingQueue.java
@@ -38,6 +38,9 @@ import com.google.common.base.Preconditions;
  */
 public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
 
+  public static final String TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG =
+      "The system can't allow more queries.";
+
   protected final int capacity;
   protected final E queryHolder;
   protected int size;
@@ -87,7 +90,7 @@ public abstract class IndexedBlockingQueue<E extends 
IDIndexedAccessible> {
     if (element == null) {
       throw new NullPointerException("pushed element is null");
     }
-    Preconditions.checkState(size < capacity, "The system can't allow more 
queries.");
+    Preconditions.checkState(size < capacity, 
TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG);
     pushToQueue(element);
     size++;
     this.notifyAll();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index 64fe751c377..d5b6b27f0a7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -244,9 +244,10 @@ class ClusterSchemaFetchExecutor {
       ExecutionResult executionResult = executionStatement(queryId, 
fetchStatement, context);
       if (executionResult.status.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         throw new RuntimeException(
-            String.format(
-                "cannot fetch schema, status is: %s, msg is: %s",
-                executionResult.status.getCode(), 
executionResult.status.getMessage()));
+            new IoTDBException(
+                String.format(
+                    "Fetch Schema failed, because %s", 
executionResult.status.getMessage()),
+                executionResult.status.getCode()));
       }
       try (SetThreadName threadName = new 
SetThreadName(executionResult.queryId.getId())) {
         ClusterSchemaTree result = new ClusterSchemaTree();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AbstractFragInsStateTracker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AbstractFragInsStateTracker.java
index 305c41b9aef..dd6d5945cf6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AbstractFragInsStateTracker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AbstractFragInsStateTracker.java
@@ -95,7 +95,8 @@ public abstract class AbstractFragInsStateTracker implements 
IFragInstanceStateT
             FragmentInstanceState.valueOf(resp.getState()),
             resp.getEndTime(),
             failedMessage,
-            failureInfoList);
+            failureInfoList,
+            resp.getErrorCode());
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
index 1a704fc4706..448f0829b5a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
@@ -105,7 +105,9 @@ public class ClusterScheduler implements IScheduler {
   private boolean needRetry(TSStatus failureStatus) {
     return failureStatus != null
         && queryType == QueryType.READ
-        && failureStatus.getCode() == 
TSStatusCode.DISPATCH_ERROR.getStatusCode();
+        && (failureStatus.getCode() == 
TSStatusCode.DISPATCH_ERROR.getStatusCode()
+            || failureStatus.getCode()
+                == 
TSStatusCode.TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode());
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
index 36e4bdc99ac..31b5742a9d0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
 import org.apache.iotdb.db.queryengine.execution.QueryStateMachine;
 import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceInfo;
@@ -149,6 +150,11 @@ public class FixedRateFragInsStateTracker extends 
AbstractFragInsStateTracker {
             new RuntimeException(
                 String.format(
                     "FragmentInstance[%s] is failed. %s", instanceId, 
instanceInfo.getMessage())));
+      } else if (instanceInfo.getErrorCode().isPresent()) {
+        stateMachine.transitionToFailed(
+            new IoTDBException(
+                instanceInfo.getErrorCode().get().getMessage(),
+                instanceInfo.getErrorCode().get().getCode()));
       } else {
         
stateMachine.transitionToFailed(instanceInfo.getFailureInfoList().get(0).toException());
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 41063b49571..8c4e79f5d44 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -334,6 +334,9 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
               } else if (sendFragmentInstanceResp.status.getCode()
                   == TSStatusCode.CONSENSUS_GROUP_NOT_EXIST.getStatusCode()) {
                 throw new 
ConsensusGroupNotExistException(sendFragmentInstanceResp.message);
+              } else if (sendFragmentInstanceResp.status.getCode()
+                  == 
TSStatusCode.TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode()) {
+                throw new 
FragmentInstanceDispatchException(sendFragmentInstanceResp.status);
               }
             }
             throw new FragmentInstanceDispatchException(
@@ -453,6 +456,11 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
         if (!readResult.isAccepted()) {
           LOGGER.warn(readResult.getMessage());
           if (readResult.isNeedRetry()) {
+            if (readResult.getStatus() != null
+                && readResult.getStatus().getCode()
+                    == 
TSStatusCode.TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode()) {
+              throw new 
FragmentInstanceDispatchException(readResult.getStatus());
+            }
             throw new FragmentInstanceDispatchException(
                 RpcUtils.getStatus(TSStatusCode.DISPATCH_ERROR, 
readResult.getMessage()));
           } else {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
index e698ba8487e..2ac49494acd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
@@ -97,8 +97,12 @@ public class ErrorHandlingUtils {
       if (status.getCode() != 
TSStatusCode.STORAGE_ENGINE_NOT_READY.getStatusCode()) {
         String message =
             String.format(
-                "Status code: %s, Query Statement: %s failed", 
status.getCode(), operation);
-        if (status.getCode() == TSStatusCode.SQL_PARSE_ERROR.getStatusCode()) {
+                "Status code: %s, Query Statement: %s failed because %s",
+                status.getCode(), operation, status.getMessage());
+        if (status.getCode() == TSStatusCode.SQL_PARSE_ERROR.getStatusCode()
+            || status.getCode() == TSStatusCode.SEMANTIC_ERROR.getStatusCode()
+            || status.getCode() == TSStatusCode.NO_PERMISSION.getStatusCode()
+            || status.getCode() == TSStatusCode.ILLEGAL_PATH.getStatusCode()) {
           LOGGER.warn(message);
         } else {
           LOGGER.warn(message, e);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IllegalPathException.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IllegalPathException.java
index f1c46742cc5..1332284ec0e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IllegalPathException.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IllegalPathException.java
@@ -25,14 +25,16 @@ public class IllegalPathException extends MetadataException 
{
   private static final long serialVersionUID = 2693272249167539978L;
 
   public IllegalPathException(String path) {
-    super(String.format("%s is not a legal path", path));
-    errorCode = TSStatusCode.ILLEGAL_PATH.getStatusCode();
-    this.isUserException = true;
+    super(
+        String.format("%s is not a legal path", path),
+        TSStatusCode.ILLEGAL_PATH.getStatusCode(),
+        true);
   }
 
   public IllegalPathException(String path, String reason) {
-    super(String.format("%s is not a legal path, because %s", path, reason));
-    errorCode = TSStatusCode.ILLEGAL_PATH.getStatusCode();
-    this.isUserException = true;
+    super(
+        String.format("%s is not a legal path, because %s", path, reason),
+        TSStatusCode.ILLEGAL_PATH.getStatusCode(),
+        true);
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IllegalPrivilegeException.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IllegalPrivilegeException.java
index 475edefaaee..da8a4ae7815 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IllegalPrivilegeException.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IllegalPrivilegeException.java
@@ -26,14 +26,13 @@ public class IllegalPrivilegeException extends 
MetadataException {
   private static final long serialVersionUID = 2693272249167539978L;
 
   public IllegalPrivilegeException(Integer priv) {
-    super(String.format("%s is not a legal privilege", 
PrivilegeType.values()[priv].toString()));
-    errorCode = TSStatusCode.ILLEGAL_PRIVILEGE.getStatusCode();
-    this.isUserException = true;
+    super(
+        String.format("%s is not a legal privilege", 
PrivilegeType.values()[priv].toString()),
+        TSStatusCode.ILLEGAL_PRIVILEGE.getStatusCode(),
+        true);
   }
 
   public IllegalPrivilegeException(String reason) {
-    super(String.format("%s", reason));
-    errorCode = TSStatusCode.ILLEGAL_PATH.getStatusCode();
-    this.isUserException = true;
+    super(String.format("%s", reason), 
TSStatusCode.ILLEGAL_PATH.getStatusCode(), true);
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IoTDBException.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IoTDBException.java
index 0a93d8054cd..3fca9bc6210 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IoTDBException.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IoTDBException.java
@@ -23,17 +23,18 @@ package org.apache.iotdb.commons.exception;
 public class IoTDBException extends Exception {
 
   private static final long serialVersionUID = 8480450962311247736L;
-  protected int errorCode;
+  private final int errorCode;
 
   /**
    * This kind of exception is caused by users' wrong sql, and there is no 
need for server to print
    * the full stack of the exception
    */
-  protected boolean isUserException = false;
+  private final boolean isUserException;
 
   public IoTDBException(String message, int errorCode) {
     super(message);
     this.errorCode = errorCode;
+    this.isUserException = false;
   }
 
   public IoTDBException(String message, int errorCode, boolean 
isUserException) {
@@ -45,11 +46,13 @@ public class IoTDBException extends Exception {
   public IoTDBException(String message, Throwable cause, int errorCode) {
     super(message, cause);
     this.errorCode = errorCode;
+    this.isUserException = false;
   }
 
   public IoTDBException(Throwable cause, int errorCode) {
     super(cause);
     this.errorCode = errorCode;
+    this.isUserException = false;
   }
 
   public IoTDBException(Throwable cause, int errorCode, boolean 
isUserException) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
index 97811bc76a9..ca7e7cd75cd 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
@@ -64,6 +64,7 @@ public class StatusUtils {
     NEED_RETRY.add(TSStatusCode.NO_AVAILABLE_REGION_GROUP.getStatusCode());
     NEED_RETRY.add(TSStatusCode.LACK_PARTITION_ALLOCATION.getStatusCode());
     NEED_RETRY.add(TSStatusCode.NO_ENOUGH_DATANODE.getStatusCode());
+    
NEED_RETRY.add(TSStatusCode.TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode());
   }
 
   /**
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index cb39f4937bc..90ee7818435 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -165,6 +165,7 @@ struct TFragmentInstanceInfoResp {
   2: optional i64 endTime
   3: optional list<string> failedMessages
   4: optional list<binary> failureInfoList
+  5: optional common.TSStatus errorCode
 }
 
 struct TCancelQueryReq {


Reply via email to