This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new d8517b7a4c6 [To rc/1.3.3] Make fetch schema error more specific and
support retry for TOO_MANY_CONCURRENT_QUERIES_ERROR
d8517b7a4c6 is described below
commit d8517b7a4c64264c0488fbc10d3a6a369721f212
Author: Jackie Tien <[email protected]>
AuthorDate: Tue Aug 13 20:54:19 2024 +0800
[To rc/1.3.3] Make fetch schema error more specific and support retry for
TOO_MANY_CONCURRENT_QUERIES_ERROR
---
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
.../metadata/AliasAlreadyExistException.java | 4 +-
.../metadata/AlignedTimeseriesException.java | 7 +--
.../metadata/DatabaseNotSetException.java | 12 +++--
.../metadata/IllegalParameterOfPathException.java | 7 +--
.../metadata/MeasurementAlreadyExistException.java | 4 +-
.../metadata/PathAlreadyExistException.java | 4 +-
.../template/TemplateIncompatibleException.java | 11 ++---
.../impl/DataNodeInternalRPCServiceImpl.java | 1 +
.../execution/executor/RegionReadExecutor.java | 7 +++
.../fragment/FragmentInstanceContext.java | 29 +++++++++++-
.../fragment/FragmentInstanceExecution.java | 22 +++++++--
.../execution/fragment/FragmentInstanceInfo.java | 20 ++++++++
.../fragment/FragmentInstanceManager.java | 55 ++++++++++++++++++----
.../schedule/queue/IndexedBlockingQueue.java | 5 +-
.../analyze/schema/ClusterSchemaFetchExecutor.java | 7 +--
.../scheduler/AbstractFragInsStateTracker.java | 3 +-
.../plan/scheduler/ClusterScheduler.java | 4 +-
.../scheduler/FixedRateFragInsStateTracker.java | 6 +++
.../scheduler/FragmentInstanceDispatcherImpl.java | 8 ++++
.../apache/iotdb/db/utils/ErrorHandlingUtils.java | 8 +++-
.../commons/exception/IllegalPathException.java | 14 +++---
.../exception/IllegalPrivilegeException.java | 11 ++---
.../iotdb/commons/exception/IoTDBException.java | 7 ++-
.../apache/iotdb/commons/utils/StatusUtils.java | 1 +
.../src/main/thrift/datanode.thrift | 1 +
26 files changed, 198 insertions(+), 61 deletions(-)
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 80e31e2469d..7ce87a64886 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -116,6 +116,7 @@ public enum TSStatusCode {
NO_SUCH_QUERY(714),
QUERY_WAS_KILLED(715),
EXPLAIN_ANALYZE_FETCH_ERROR(716),
+ TOO_MANY_CONCURRENT_QUERIES_ERROR(717),
// Authentication
INIT_AUTH_ERROR(800),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AliasAlreadyExistException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AliasAlreadyExistException.java
index 2df35dead3d..9b8dcd7ca69 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AliasAlreadyExistException.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AliasAlreadyExistException.java
@@ -30,7 +30,7 @@ public class AliasAlreadyExistException extends
MetadataException {
public AliasAlreadyExistException(String path, String alias) {
super(
String.format("Alias [%s] for Path [%s] already exist", alias, path),
- TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode());
- this.isUserException = true;
+ TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode(),
+ true);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeseriesException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeseriesException.java
index 1594d6b203b..f11c986afdc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeseriesException.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeseriesException.java
@@ -25,8 +25,9 @@ import org.apache.iotdb.rpc.TSStatusCode;
public class AlignedTimeseriesException extends MetadataException {
public AlignedTimeseriesException(String message, String path) {
- super(String.format("%s (Path: %s)", message, path));
- errorCode = TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode();
- this.isUserException = true;
+ super(
+ String.format("%s (Path: %s)", message, path),
+ TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode(),
+ true);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DatabaseNotSetException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DatabaseNotSetException.java
index ef395f5de48..7e700c117e4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DatabaseNotSetException.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DatabaseNotSetException.java
@@ -27,14 +27,16 @@ public class DatabaseNotSetException extends
MetadataException {
private static final long serialVersionUID = 3739300272099030533L;
public DatabaseNotSetException(String path) {
- super(String.format("Database is not set for current seriesPath: [%s]",
path));
- this.errorCode = TSStatusCode.DATABASE_NOT_EXIST.getStatusCode();
+ super(
+ String.format("Database is not set for current seriesPath: [%s]",
path),
+ TSStatusCode.DATABASE_NOT_EXIST.getStatusCode());
}
public DatabaseNotSetException(String path, boolean isUserException) {
- super(String.format("Database is not set for current seriesPath: [%s]",
path));
- this.isUserException = isUserException;
- this.errorCode = TSStatusCode.DATABASE_NOT_EXIST.getStatusCode();
+ super(
+ String.format("Database is not set for current seriesPath: [%s]",
path),
+ TSStatusCode.DATABASE_NOT_EXIST.getStatusCode(),
+ isUserException);
}
public DatabaseNotSetException(String path, String reason) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/IllegalParameterOfPathException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/IllegalParameterOfPathException.java
index febe7b000b9..9ae6e79f406 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/IllegalParameterOfPathException.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/IllegalParameterOfPathException.java
@@ -25,8 +25,9 @@ import org.apache.iotdb.rpc.TSStatusCode;
public class IllegalParameterOfPathException extends MetadataException {
public IllegalParameterOfPathException(String msg, String path) {
- super(String.format("%s. Failed to create timeseries for path %s", msg,
path));
- errorCode = TSStatusCode.ILLEGAL_PARAMETER.getStatusCode();
- this.isUserException = true;
+ super(
+ String.format("%s. Failed to create timeseries for path %s", msg,
path),
+ TSStatusCode.ILLEGAL_PARAMETER.getStatusCode(),
+ true);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/MeasurementAlreadyExistException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/MeasurementAlreadyExistException.java
index 3b63c01eeff..a3d5adabac8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/MeasurementAlreadyExistException.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/MeasurementAlreadyExistException.java
@@ -31,8 +31,8 @@ public class MeasurementAlreadyExistException extends
MetadataException {
public MeasurementAlreadyExistException(String path, MeasurementPath
measurementPath) {
super(
String.format("Path [%s] already exist", path),
- TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode());
- this.isUserException = true;
+ TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode(),
+ true);
this.measurementPath = measurementPath;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/PathAlreadyExistException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/PathAlreadyExistException.java
index d91ea75dba0..5da3c957cba 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/PathAlreadyExistException.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/PathAlreadyExistException.java
@@ -30,7 +30,7 @@ public class PathAlreadyExistException extends
MetadataException {
public PathAlreadyExistException(String path) {
super(
String.format("Path [%s] already exist", path),
- TSStatusCode.PATH_ALREADY_EXIST.getStatusCode());
- this.isUserException = true;
+ TSStatusCode.PATH_ALREADY_EXIST.getStatusCode(),
+ true);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/template/TemplateIncompatibleException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/template/TemplateIncompatibleException.java
index bb1d0c1a997..cc39a3b50b2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/template/TemplateIncompatibleException.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/template/TemplateIncompatibleException.java
@@ -32,8 +32,8 @@ public class TemplateIncompatibleException extends
MetadataException {
String.format(
"Cannot create timeseries [%s] since device template [%s] already
set on path [%s].",
path, templateName, templateSetPath),
- TSStatusCode.TEMPLATE_INCOMPATIBLE.getStatusCode());
- this.isUserException = true;
+ TSStatusCode.TEMPLATE_INCOMPATIBLE.getStatusCode(),
+ true);
}
public TemplateIncompatibleException(String templateName, PartialPath
templateSetPath) {
@@ -42,12 +42,11 @@ public class TemplateIncompatibleException extends
MetadataException {
"Cannot set device template [%s] to path [%s] "
+ "since there's timeseries under path [%s].",
templateName, templateSetPath, templateSetPath),
- TSStatusCode.TEMPLATE_INCOMPATIBLE.getStatusCode());
- this.isUserException = true;
+ TSStatusCode.TEMPLATE_INCOMPATIBLE.getStatusCode(),
+ true);
}
public TemplateIncompatibleException(String reason) {
- super(reason, TSStatusCode.TEMPLATE_INCOMPATIBLE.getStatusCode());
- this.isUserException = true;
+ super(reason, TSStatusCode.TEMPLATE_INCOMPATIBLE.getStatusCode(), true);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index ac2bf85ad75..870aed465e4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -398,6 +398,7 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
failureInfoList.add(failureInfo.serialize());
}
resp.setFailureInfoList(failureInfoList);
+ info.getErrorCode().ifPresent(resp::setErrorCode);
return resp;
} catch (IOException e) {
return resp;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
index 3b6a0f0adb0..8e9a16e9b14 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.execution.executor;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.common.DataSet;
@@ -92,6 +93,12 @@ public class RegionReadExecutor {
FragmentInstanceInfo info = (FragmentInstanceInfo) readResponse;
resp.setAccepted(!info.getState().isFailed());
resp.setMessage(info.getMessage());
+ info.getErrorCode()
+ .ifPresent(
+ s -> {
+ resp.setStatus(s);
+ resp.setNeedRetry(StatusUtils.needRetryHelper(s));
+ });
}
return resp;
} catch (ConsensusGroupNotExistException e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index 7302e7f0b45..92ea44735c8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.queryengine.execution.fragment;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -310,6 +312,18 @@ public class FragmentInstanceContext extends QueryContext {
.collect(Collectors.toList());
}
+ public Optional<TSStatus> getErrorCode() {
+ return stateMachine.getFailureCauses().stream()
+ .filter(IoTDBException.class::isInstance)
+ .findFirst()
+ .flatMap(
+ t -> {
+ TSStatus status = new TSStatus(((IoTDBException)
t).getErrorCode());
+ status.setMessage(t.getMessage());
+ return Optional.of(status);
+ });
+ }
+
public void finished() {
stateMachine.finished();
}
@@ -348,8 +362,19 @@ public class FragmentInstanceContext extends QueryContext {
}
public FragmentInstanceInfo getInstanceInfo() {
- return new FragmentInstanceInfo(
- stateMachine.getState(), getEndTime(), getFailedCause(),
getFailureInfoList());
+ return getErrorCode()
+ .map(
+ s ->
+ new FragmentInstanceInfo(
+ stateMachine.getState(),
+ getEndTime(),
+ getFailedCause(),
+ getFailureInfoList(),
+ s))
+ .orElseGet(
+ () ->
+ new FragmentInstanceInfo(
+ stateMachine.getState(), getEndTime(), getFailedCause(),
getFailureInfoList()));
}
public FragmentInstanceStateMachine getStateMachine() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
index f631ea8a246..c42834f5190 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
@@ -117,11 +117,23 @@ public class FragmentInstanceExecution {
}
public FragmentInstanceInfo getInstanceInfo() {
- return new FragmentInstanceInfo(
- stateMachine.getState(),
- context.getEndTime(),
- context.getFailedCause(),
- context.getFailureInfoList());
+ return context
+ .getErrorCode()
+ .map(
+ s ->
+ new FragmentInstanceInfo(
+ stateMachine.getState(),
+ context.getEndTime(),
+ context.getFailedCause(),
+ context.getFailureInfoList(),
+ s))
+ .orElseGet(
+ () ->
+ new FragmentInstanceInfo(
+ stateMachine.getState(),
+ context.getEndTime(),
+ context.getFailedCause(),
+ context.getFailureInfoList()));
}
public long getStartTime() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceInfo.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceInfo.java
index 9c67d67de64..4717a23f279 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceInfo.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceInfo.java
@@ -19,9 +19,11 @@
package org.apache.iotdb.db.queryengine.execution.fragment;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.consensus.common.DataSet;
import java.util.List;
+import java.util.Optional;
public class FragmentInstanceInfo implements DataSet {
private final FragmentInstanceState state;
@@ -30,6 +32,8 @@ public class FragmentInstanceInfo implements DataSet {
private List<FragmentInstanceFailureInfo> failureInfoList;
+ private TSStatus errorCode;
+
public FragmentInstanceInfo(FragmentInstanceState state) {
this.state = state;
}
@@ -49,6 +53,18 @@ public class FragmentInstanceInfo implements DataSet {
this.failureInfoList = failureInfoList;
}
+ public FragmentInstanceInfo(
+ FragmentInstanceState state,
+ long endTime,
+ String message,
+ List<FragmentInstanceFailureInfo> failureInfoList,
+ TSStatus errorStatus) {
+ this(state, endTime);
+ this.message = message;
+ this.failureInfoList = failureInfoList;
+ this.errorCode = errorStatus;
+ }
+
public FragmentInstanceState getState() {
return state;
}
@@ -61,6 +77,10 @@ public class FragmentInstanceInfo implements DataSet {
return message;
}
+ public Optional<TSStatus> getErrorCode() {
+ return Optional.ofNullable(errorCode);
+ }
+
public List<FragmentInstanceFailureInfo> getFailureInfoList() {
return failureInfoList;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
index 900865dc887..40a120da725 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
@@ -19,9 +19,11 @@
package org.apache.iotdb.db.queryengine.execution.fragment;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.common.QueryId;
@@ -48,6 +50,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
@@ -58,7 +61,9 @@ import java.util.concurrent.atomic.AtomicLong;
import static java.util.Objects.requireNonNull;
import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceExecution.createFragmentInstanceExecution;
+import static
org.apache.iotdb.db.queryengine.execution.schedule.queue.IndexedBlockingQueue.TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG;
import static
org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.LOCAL_EXECUTION_PLANNER;
+import static
org.apache.iotdb.rpc.TSStatusCode.TOO_MANY_CONCURRENT_QUERIES_ERROR;
@SuppressWarnings("squid:S6548")
public class FragmentInstanceManager {
@@ -178,8 +183,18 @@ public class FragmentInstanceManager {
exchangeManager);
} catch (Throwable t) {
clearFIRelatedResources(instanceId);
- logger.warn("error when create FragmentInstanceExecution.",
t);
- stateMachine.failed(t);
+ // deal with
+ if (t instanceof IllegalStateException
+ &&
TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG.equals(t.getMessage())) {
+ logger.warn(TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG);
+ stateMachine.failed(
+ new IoTDBException(
+ TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG,
+
TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode()));
+ } else {
+ logger.warn("error when create
FragmentInstanceExecution.", t);
+ stateMachine.failed(t);
+ }
return null;
}
});
@@ -259,8 +274,18 @@ public class FragmentInstanceManager {
exchangeManager);
} catch (Throwable t) {
clearFIRelatedResources(instanceId);
- logger.warn("Execute error caused by ", t);
- stateMachine.failed(t);
+ // deal with
+ if (t instanceof IllegalStateException
+ &&
TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG.equals(t.getMessage())) {
+ logger.warn(TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG);
+ stateMachine.failed(
+ new IoTDBException(
+ TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG,
+ TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode()));
+ } else {
+ logger.warn("Execute error caused by ", t);
+ stateMachine.failed(t);
+ }
return null;
}
});
@@ -350,11 +375,23 @@ public class FragmentInstanceManager {
private FragmentInstanceInfo createFailedInstanceInfo(FragmentInstanceId
instanceId) {
FragmentInstanceContext context = instanceContext.get(instanceId);
- return new FragmentInstanceInfo(
- FragmentInstanceState.FAILED,
- context.getEndTime(),
- context.getFailedCause(),
- context.getFailureInfoList());
+ Optional<TSStatus> errorCode = context.getErrorCode();
+ return errorCode
+ .map(
+ tsStatus ->
+ new FragmentInstanceInfo(
+ FragmentInstanceState.FAILED,
+ context.getEndTime(),
+ context.getFailedCause(),
+ context.getFailureInfoList(),
+ tsStatus))
+ .orElseGet(
+ () ->
+ new FragmentInstanceInfo(
+ FragmentInstanceState.FAILED,
+ context.getEndTime(),
+ context.getFailedCause(),
+ context.getFailureInfoList()));
}
private void removeOldInstances() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/IndexedBlockingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/IndexedBlockingQueue.java
index 1a63f01ffff..632b5b48ed6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/IndexedBlockingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/IndexedBlockingQueue.java
@@ -38,6 +38,9 @@ import com.google.common.base.Preconditions;
*/
public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> {
+ public static final String TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG =
+ "The system can't allow more queries.";
+
protected final int capacity;
protected final E queryHolder;
protected int size;
@@ -87,7 +90,7 @@ public abstract class IndexedBlockingQueue<E extends
IDIndexedAccessible> {
if (element == null) {
throw new NullPointerException("pushed element is null");
}
- Preconditions.checkState(size < capacity, "The system can't allow more
queries.");
+ Preconditions.checkState(size < capacity,
TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG);
pushToQueue(element);
size++;
this.notifyAll();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index 64fe751c377..d5b6b27f0a7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -244,9 +244,10 @@ class ClusterSchemaFetchExecutor {
ExecutionResult executionResult = executionStatement(queryId,
fetchStatement, context);
if (executionResult.status.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new RuntimeException(
- String.format(
- "cannot fetch schema, status is: %s, msg is: %s",
- executionResult.status.getCode(),
executionResult.status.getMessage()));
+ new IoTDBException(
+ String.format(
+ "Fetch Schema failed, because %s",
executionResult.status.getMessage()),
+ executionResult.status.getCode()));
}
try (SetThreadName threadName = new
SetThreadName(executionResult.queryId.getId())) {
ClusterSchemaTree result = new ClusterSchemaTree();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AbstractFragInsStateTracker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AbstractFragInsStateTracker.java
index 305c41b9aef..dd6d5945cf6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AbstractFragInsStateTracker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AbstractFragInsStateTracker.java
@@ -95,7 +95,8 @@ public abstract class AbstractFragInsStateTracker implements
IFragInstanceStateT
FragmentInstanceState.valueOf(resp.getState()),
resp.getEndTime(),
failedMessage,
- failureInfoList);
+ failureInfoList,
+ resp.getErrorCode());
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
index 1a704fc4706..448f0829b5a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
@@ -105,7 +105,9 @@ public class ClusterScheduler implements IScheduler {
private boolean needRetry(TSStatus failureStatus) {
return failureStatus != null
&& queryType == QueryType.READ
- && failureStatus.getCode() ==
TSStatusCode.DISPATCH_ERROR.getStatusCode();
+ && (failureStatus.getCode() ==
TSStatusCode.DISPATCH_ERROR.getStatusCode()
+ || failureStatus.getCode()
+ ==
TSStatusCode.TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode());
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
index 36e4bdc99ac..31b5742a9d0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.execution.QueryStateMachine;
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceInfo;
@@ -149,6 +150,11 @@ public class FixedRateFragInsStateTracker extends
AbstractFragInsStateTracker {
new RuntimeException(
String.format(
"FragmentInstance[%s] is failed. %s", instanceId,
instanceInfo.getMessage())));
+ } else if (instanceInfo.getErrorCode().isPresent()) {
+ stateMachine.transitionToFailed(
+ new IoTDBException(
+ instanceInfo.getErrorCode().get().getMessage(),
+ instanceInfo.getErrorCode().get().getCode()));
} else {
stateMachine.transitionToFailed(instanceInfo.getFailureInfoList().get(0).toException());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 41063b49571..8c4e79f5d44 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -334,6 +334,9 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
} else if (sendFragmentInstanceResp.status.getCode()
== TSStatusCode.CONSENSUS_GROUP_NOT_EXIST.getStatusCode()) {
throw new
ConsensusGroupNotExistException(sendFragmentInstanceResp.message);
+ } else if (sendFragmentInstanceResp.status.getCode()
+ ==
TSStatusCode.TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode()) {
+ throw new
FragmentInstanceDispatchException(sendFragmentInstanceResp.status);
}
}
throw new FragmentInstanceDispatchException(
@@ -453,6 +456,11 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
if (!readResult.isAccepted()) {
LOGGER.warn(readResult.getMessage());
if (readResult.isNeedRetry()) {
+ if (readResult.getStatus() != null
+ && readResult.getStatus().getCode()
+ ==
TSStatusCode.TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode()) {
+ throw new
FragmentInstanceDispatchException(readResult.getStatus());
+ }
throw new FragmentInstanceDispatchException(
RpcUtils.getStatus(TSStatusCode.DISPATCH_ERROR,
readResult.getMessage()));
} else {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
index e698ba8487e..2ac49494acd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
@@ -97,8 +97,12 @@ public class ErrorHandlingUtils {
if (status.getCode() !=
TSStatusCode.STORAGE_ENGINE_NOT_READY.getStatusCode()) {
String message =
String.format(
- "Status code: %s, Query Statement: %s failed",
status.getCode(), operation);
- if (status.getCode() == TSStatusCode.SQL_PARSE_ERROR.getStatusCode()) {
+ "Status code: %s, Query Statement: %s failed because %s",
+ status.getCode(), operation, status.getMessage());
+ if (status.getCode() == TSStatusCode.SQL_PARSE_ERROR.getStatusCode()
+ || status.getCode() == TSStatusCode.SEMANTIC_ERROR.getStatusCode()
+ || status.getCode() == TSStatusCode.NO_PERMISSION.getStatusCode()
+ || status.getCode() == TSStatusCode.ILLEGAL_PATH.getStatusCode()) {
LOGGER.warn(message);
} else {
LOGGER.warn(message, e);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IllegalPathException.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IllegalPathException.java
index f1c46742cc5..1332284ec0e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IllegalPathException.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IllegalPathException.java
@@ -25,14 +25,16 @@ public class IllegalPathException extends MetadataException
{
private static final long serialVersionUID = 2693272249167539978L;
public IllegalPathException(String path) {
- super(String.format("%s is not a legal path", path));
- errorCode = TSStatusCode.ILLEGAL_PATH.getStatusCode();
- this.isUserException = true;
+ super(
+ String.format("%s is not a legal path", path),
+ TSStatusCode.ILLEGAL_PATH.getStatusCode(),
+ true);
}
public IllegalPathException(String path, String reason) {
- super(String.format("%s is not a legal path, because %s", path, reason));
- errorCode = TSStatusCode.ILLEGAL_PATH.getStatusCode();
- this.isUserException = true;
+ super(
+ String.format("%s is not a legal path, because %s", path, reason),
+ TSStatusCode.ILLEGAL_PATH.getStatusCode(),
+ true);
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IllegalPrivilegeException.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IllegalPrivilegeException.java
index 475edefaaee..da8a4ae7815 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IllegalPrivilegeException.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IllegalPrivilegeException.java
@@ -26,14 +26,13 @@ public class IllegalPrivilegeException extends
MetadataException {
private static final long serialVersionUID = 2693272249167539978L;
public IllegalPrivilegeException(Integer priv) {
- super(String.format("%s is not a legal privilege",
PrivilegeType.values()[priv].toString()));
- errorCode = TSStatusCode.ILLEGAL_PRIVILEGE.getStatusCode();
- this.isUserException = true;
+ super(
+ String.format("%s is not a legal privilege",
PrivilegeType.values()[priv].toString()),
+ TSStatusCode.ILLEGAL_PRIVILEGE.getStatusCode(),
+ true);
}
public IllegalPrivilegeException(String reason) {
- super(String.format("%s", reason));
- errorCode = TSStatusCode.ILLEGAL_PATH.getStatusCode();
- this.isUserException = true;
+ super(String.format("%s", reason),
TSStatusCode.ILLEGAL_PATH.getStatusCode(), true);
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IoTDBException.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IoTDBException.java
index 0a93d8054cd..3fca9bc6210 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IoTDBException.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IoTDBException.java
@@ -23,17 +23,18 @@ package org.apache.iotdb.commons.exception;
public class IoTDBException extends Exception {
private static final long serialVersionUID = 8480450962311247736L;
- protected int errorCode;
+ private final int errorCode;
/**
* This kind of exception is caused by users' wrong sql, and there is no
need for server to print
* the full stack of the exception
*/
- protected boolean isUserException = false;
+ private final boolean isUserException;
public IoTDBException(String message, int errorCode) {
super(message);
this.errorCode = errorCode;
+ this.isUserException = false;
}
public IoTDBException(String message, int errorCode, boolean
isUserException) {
@@ -45,11 +46,13 @@ public class IoTDBException extends Exception {
public IoTDBException(String message, Throwable cause, int errorCode) {
super(message, cause);
this.errorCode = errorCode;
+ this.isUserException = false;
}
public IoTDBException(Throwable cause, int errorCode) {
super(cause);
this.errorCode = errorCode;
+ this.isUserException = false;
}
public IoTDBException(Throwable cause, int errorCode, boolean
isUserException) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
index 97811bc76a9..ca7e7cd75cd 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
@@ -64,6 +64,7 @@ public class StatusUtils {
NEED_RETRY.add(TSStatusCode.NO_AVAILABLE_REGION_GROUP.getStatusCode());
NEED_RETRY.add(TSStatusCode.LACK_PARTITION_ALLOCATION.getStatusCode());
NEED_RETRY.add(TSStatusCode.NO_ENOUGH_DATANODE.getStatusCode());
+
NEED_RETRY.add(TSStatusCode.TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode());
}
/**
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index cb39f4937bc..90ee7818435 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -165,6 +165,7 @@ struct TFragmentInstanceInfoResp {
2: optional i64 endTime
3: optional list<string> failedMessages
4: optional list<binary> failureInfoList
+ 5: optional common.TSStatus errorCode
}
struct TCancelQueryReq {