This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/RetryTooManyQueries in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 227074be91c7d6fccdd41949842aa09501fbff32 Author: Jackie Tien <[email protected]> AuthorDate: Tue Aug 13 17:47:03 2024 +0800 resolve conflicts --- .../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 + .../metadata/AliasAlreadyExistException.java | 4 +- .../metadata/AlignedTimeseriesException.java | 7 +-- .../metadata/DatabaseNotSetException.java | 12 +++-- .../metadata/IllegalParameterOfPathException.java | 7 +-- .../metadata/MeasurementAlreadyExistException.java | 4 +- .../metadata/PathAlreadyExistException.java | 4 +- .../template/TemplateIncompatibleException.java | 11 ++--- .../impl/DataNodeInternalRPCServiceImpl.java | 1 + .../execution/executor/RegionReadExecutor.java | 7 +++ .../fragment/FragmentInstanceContext.java | 29 +++++++++++- .../fragment/FragmentInstanceExecution.java | 22 +++++++-- .../execution/fragment/FragmentInstanceInfo.java | 20 ++++++++ .../fragment/FragmentInstanceManager.java | 55 ++++++++++++++++++---- .../schedule/queue/IndexedBlockingQueue.java | 5 +- .../analyze/schema/ClusterSchemaFetchExecutor.java | 7 +-- .../scheduler/AbstractFragInsStateTracker.java | 3 +- .../plan/scheduler/ClusterScheduler.java | 4 +- .../scheduler/FixedRateFragInsStateTracker.java | 6 +++ .../scheduler/FragmentInstanceDispatcherImpl.java | 8 ++++ .../apache/iotdb/db/utils/ErrorHandlingUtils.java | 8 +++- .../commons/exception/IllegalPathException.java | 14 +++--- .../exception/IllegalPrivilegeException.java | 11 ++--- .../iotdb/commons/exception/IoTDBException.java | 7 ++- .../apache/iotdb/commons/utils/StatusUtils.java | 1 + .../src/main/thrift/datanode.thrift | 1 + 26 files changed, 198 insertions(+), 61 deletions(-) diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index 80e31e2469d..7ce87a64886 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -116,6 +116,7 @@ public enum TSStatusCode { NO_SUCH_QUERY(714), QUERY_WAS_KILLED(715), EXPLAIN_ANALYZE_FETCH_ERROR(716), + TOO_MANY_CONCURRENT_QUERIES_ERROR(717), // Authentication INIT_AUTH_ERROR(800), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AliasAlreadyExistException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AliasAlreadyExistException.java index 2df35dead3d..9b8dcd7ca69 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AliasAlreadyExistException.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AliasAlreadyExistException.java @@ -30,7 +30,7 @@ public class AliasAlreadyExistException extends MetadataException { public AliasAlreadyExistException(String path, String alias) { super( String.format("Alias [%s] for Path [%s] already exist", alias, path), - TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()); - this.isUserException = true; + TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode(), + true); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeseriesException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeseriesException.java index 1594d6b203b..f11c986afdc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeseriesException.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeseriesException.java @@ -25,8 +25,9 @@ import org.apache.iotdb.rpc.TSStatusCode; public class AlignedTimeseriesException extends MetadataException { public AlignedTimeseriesException(String message, String path) { - super(String.format("%s (Path: %s)", message, path)); - errorCode = TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode(); - this.isUserException = true; + super( + String.format("%s (Path: %s)", message, path), + TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode(), + true); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DatabaseNotSetException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DatabaseNotSetException.java index ef395f5de48..7e700c117e4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DatabaseNotSetException.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DatabaseNotSetException.java @@ -27,14 +27,16 @@ public class DatabaseNotSetException extends MetadataException { private static final long serialVersionUID = 3739300272099030533L; public DatabaseNotSetException(String path) { - super(String.format("Database is not set for current seriesPath: [%s]", path)); - this.errorCode = TSStatusCode.DATABASE_NOT_EXIST.getStatusCode(); + super( + String.format("Database is not set for current seriesPath: [%s]", path), + TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()); } public DatabaseNotSetException(String path, boolean isUserException) { - super(String.format("Database is not set for current seriesPath: [%s]", path)); - this.isUserException = isUserException; - this.errorCode = TSStatusCode.DATABASE_NOT_EXIST.getStatusCode(); + super( + String.format("Database is not set for current seriesPath: [%s]", path), + TSStatusCode.DATABASE_NOT_EXIST.getStatusCode(), + isUserException); } public DatabaseNotSetException(String path, String reason) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/IllegalParameterOfPathException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/IllegalParameterOfPathException.java index febe7b000b9..9ae6e79f406 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/IllegalParameterOfPathException.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/IllegalParameterOfPathException.java @@ -25,8 +25,9 @@ import org.apache.iotdb.rpc.TSStatusCode; public class IllegalParameterOfPathException extends MetadataException { public IllegalParameterOfPathException(String msg, String path) { - super(String.format("%s. Failed to create timeseries for path %s", msg, path)); - errorCode = TSStatusCode.ILLEGAL_PARAMETER.getStatusCode(); - this.isUserException = true; + super( + String.format("%s. Failed to create timeseries for path %s", msg, path), + TSStatusCode.ILLEGAL_PARAMETER.getStatusCode(), + true); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/MeasurementAlreadyExistException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/MeasurementAlreadyExistException.java index 3b63c01eeff..a3d5adabac8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/MeasurementAlreadyExistException.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/MeasurementAlreadyExistException.java @@ -31,8 +31,8 @@ public class MeasurementAlreadyExistException extends MetadataException { public MeasurementAlreadyExistException(String path, MeasurementPath measurementPath) { super( String.format("Path [%s] already exist", path), - TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()); - this.isUserException = true; + TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode(), + true); this.measurementPath = measurementPath; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/PathAlreadyExistException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/PathAlreadyExistException.java index d91ea75dba0..5da3c957cba 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/PathAlreadyExistException.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/PathAlreadyExistException.java @@ -30,7 +30,7 @@ public class PathAlreadyExistException extends MetadataException { public PathAlreadyExistException(String path) { super( String.format("Path [%s] already exist", path), - TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()); - this.isUserException = true; + TSStatusCode.PATH_ALREADY_EXIST.getStatusCode(), + true); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/template/TemplateIncompatibleException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/template/TemplateIncompatibleException.java index bb1d0c1a997..cc39a3b50b2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/template/TemplateIncompatibleException.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/template/TemplateIncompatibleException.java @@ -32,8 +32,8 @@ public class TemplateIncompatibleException extends MetadataException { String.format( "Cannot create timeseries [%s] since device template [%s] already set on path [%s].", path, templateName, templateSetPath), - TSStatusCode.TEMPLATE_INCOMPATIBLE.getStatusCode()); - this.isUserException = true; + TSStatusCode.TEMPLATE_INCOMPATIBLE.getStatusCode(), + true); } public TemplateIncompatibleException(String templateName, PartialPath templateSetPath) { @@ -42,12 +42,11 @@ public class TemplateIncompatibleException extends MetadataException { "Cannot set device template [%s] to path [%s] " + "since there's timeseries under path [%s].", templateName, templateSetPath, templateSetPath), - TSStatusCode.TEMPLATE_INCOMPATIBLE.getStatusCode()); - this.isUserException = true; + TSStatusCode.TEMPLATE_INCOMPATIBLE.getStatusCode(), + true); } public TemplateIncompatibleException(String reason) { - super(reason, TSStatusCode.TEMPLATE_INCOMPATIBLE.getStatusCode()); - this.isUserException = true; + super(reason, TSStatusCode.TEMPLATE_INCOMPATIBLE.getStatusCode(), true); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index ac2bf85ad75..870aed465e4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -398,6 +398,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface failureInfoList.add(failureInfo.serialize()); } resp.setFailureInfoList(failureInfoList); + info.getErrorCode().ifPresent(resp::setErrorCode); return resp; } catch (IOException e) { return resp; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java index 3b6a0f0adb0..8e9a16e9b14 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.execution.executor; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.consensus.DataRegionId; +import org.apache.iotdb.commons.utils.StatusUtils; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.consensus.IConsensus; import org.apache.iotdb.consensus.common.DataSet; @@ -92,6 +93,12 @@ public class RegionReadExecutor { FragmentInstanceInfo info = (FragmentInstanceInfo) readResponse; resp.setAccepted(!info.getState().isFailed()); resp.setMessage(info.getMessage()); + info.getErrorCode() + .ifPresent( + s -> { + resp.setStatus(s); + resp.setNeedRetry(StatusUtils.needRetryHelper(s)); + }); } return resp; } catch (ConsensusGroupNotExistException e) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index 7302e7f0b45..92ea44735c8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.queryengine.execution.fragment; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.exception.query.QueryProcessException; @@ -310,6 +312,18 @@ public class FragmentInstanceContext extends QueryContext { .collect(Collectors.toList()); } + public Optional<TSStatus> getErrorCode() { + return stateMachine.getFailureCauses().stream() + .filter(IoTDBException.class::isInstance) + .findFirst() + .flatMap( + t -> { + TSStatus status = new TSStatus(((IoTDBException) t).getErrorCode()); + status.setMessage(t.getMessage()); + return Optional.of(status); + }); + } + public void finished() { stateMachine.finished(); } @@ -348,8 +362,19 @@ public class FragmentInstanceContext extends QueryContext { } public FragmentInstanceInfo getInstanceInfo() { - return new FragmentInstanceInfo( - stateMachine.getState(), getEndTime(), getFailedCause(), getFailureInfoList()); + return getErrorCode() + .map( + s -> + new FragmentInstanceInfo( + stateMachine.getState(), + getEndTime(), + getFailedCause(), + getFailureInfoList(), + s)) + .orElseGet( + () -> + new FragmentInstanceInfo( + stateMachine.getState(), getEndTime(), getFailedCause(), getFailureInfoList())); } public FragmentInstanceStateMachine getStateMachine() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java index f631ea8a246..c42834f5190 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java @@ -117,11 +117,23 @@ public class FragmentInstanceExecution { } public FragmentInstanceInfo getInstanceInfo() { - return new FragmentInstanceInfo( - stateMachine.getState(), - context.getEndTime(), - context.getFailedCause(), - context.getFailureInfoList()); + return context + .getErrorCode() + .map( + s -> + new FragmentInstanceInfo( + stateMachine.getState(), + context.getEndTime(), + context.getFailedCause(), + context.getFailureInfoList(), + s)) + .orElseGet( + () -> + new FragmentInstanceInfo( + stateMachine.getState(), + context.getEndTime(), + context.getFailedCause(), + context.getFailureInfoList())); } public long getStartTime() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceInfo.java index 9c67d67de64..4717a23f279 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceInfo.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceInfo.java @@ -19,9 +19,11 @@ package org.apache.iotdb.db.queryengine.execution.fragment; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.consensus.common.DataSet; import java.util.List; +import java.util.Optional; public class FragmentInstanceInfo implements DataSet { private final FragmentInstanceState state; @@ -30,6 +32,8 @@ public class FragmentInstanceInfo implements DataSet { private List<FragmentInstanceFailureInfo> failureInfoList; + private TSStatus errorCode; + public FragmentInstanceInfo(FragmentInstanceState state) { this.state = state; } @@ -49,6 +53,18 @@ public class FragmentInstanceInfo implements DataSet { this.failureInfoList = failureInfoList; } + public FragmentInstanceInfo( + FragmentInstanceState state, + long endTime, + String message, + List<FragmentInstanceFailureInfo> failureInfoList, + TSStatus errorStatus) { + this(state, endTime); + this.message = message; + this.failureInfoList = failureInfoList; + this.errorCode = errorStatus; + } + public FragmentInstanceState getState() { return state; } @@ -61,6 +77,10 @@ public class FragmentInstanceInfo implements DataSet { return message; } + public Optional<TSStatus> getErrorCode() { + return Optional.ofNullable(errorCode); + } + public List<FragmentInstanceFailureInfo> getFailureInfoList() { return failureInfoList; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java index 900865dc887..40a120da725 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java @@ -19,9 +19,11 @@ package org.apache.iotdb.db.queryengine.execution.fragment; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; +import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; import org.apache.iotdb.db.queryengine.common.QueryId; @@ -48,6 +50,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; @@ -58,7 +61,9 @@ import java.util.concurrent.atomic.AtomicLong; import static java.util.Objects.requireNonNull; import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceExecution.createFragmentInstanceExecution; +import static org.apache.iotdb.db.queryengine.execution.schedule.queue.IndexedBlockingQueue.TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG; import static org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.LOCAL_EXECUTION_PLANNER; +import static org.apache.iotdb.rpc.TSStatusCode.TOO_MANY_CONCURRENT_QUERIES_ERROR; @SuppressWarnings("squid:S6548") public class FragmentInstanceManager { @@ -178,8 +183,18 @@ public class FragmentInstanceManager { exchangeManager); } catch (Throwable t) { clearFIRelatedResources(instanceId); - logger.warn("error when create FragmentInstanceExecution.", t); - stateMachine.failed(t); + // deal with + if (t instanceof IllegalStateException + && TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG.equals(t.getMessage())) { + logger.warn(TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG); + stateMachine.failed( + new IoTDBException( + TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG, + TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode())); + } else { + logger.warn("error when create FragmentInstanceExecution.", t); + stateMachine.failed(t); + } return null; } }); @@ -259,8 +274,18 @@ public class FragmentInstanceManager { exchangeManager); } catch (Throwable t) { clearFIRelatedResources(instanceId); - logger.warn("Execute error caused by ", t); - stateMachine.failed(t); + // deal with + if (t instanceof IllegalStateException + && TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG.equals(t.getMessage())) { + logger.warn(TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG); + stateMachine.failed( + new IoTDBException( + TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG, + TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode())); + } else { + logger.warn("Execute error caused by ", t); + stateMachine.failed(t); + } return null; } }); @@ -350,11 +375,23 @@ public class FragmentInstanceManager { private FragmentInstanceInfo createFailedInstanceInfo(FragmentInstanceId instanceId) { FragmentInstanceContext context = instanceContext.get(instanceId); - return new FragmentInstanceInfo( - FragmentInstanceState.FAILED, - context.getEndTime(), - context.getFailedCause(), - context.getFailureInfoList()); + Optional<TSStatus> errorCode = context.getErrorCode(); + return errorCode + .map( + tsStatus -> + new FragmentInstanceInfo( + FragmentInstanceState.FAILED, + context.getEndTime(), + context.getFailedCause(), + context.getFailureInfoList(), + tsStatus)) + .orElseGet( + () -> + new FragmentInstanceInfo( + FragmentInstanceState.FAILED, + context.getEndTime(), + context.getFailedCause(), + context.getFailureInfoList())); } private void removeOldInstances() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/IndexedBlockingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/IndexedBlockingQueue.java index 1a63f01ffff..632b5b48ed6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/IndexedBlockingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/IndexedBlockingQueue.java @@ -38,6 +38,9 @@ import com.google.common.base.Preconditions; */ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> { + public static final String TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG = + "The system can't allow more queries."; + protected final int capacity; protected final E queryHolder; protected int size; @@ -87,7 +90,7 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> { if (element == null) { throw new NullPointerException("pushed element is null"); } - Preconditions.checkState(size < capacity, "The system can't allow more queries."); + Preconditions.checkState(size < capacity, TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG); pushToQueue(element); size++; this.notifyAll(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java index 64fe751c377..d5b6b27f0a7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java @@ -244,9 +244,10 @@ class ClusterSchemaFetchExecutor { ExecutionResult executionResult = executionStatement(queryId, fetchStatement, context); if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { throw new RuntimeException( - String.format( - "cannot fetch schema, status is: %s, msg is: %s", - executionResult.status.getCode(), executionResult.status.getMessage())); + new IoTDBException( + String.format( + "Fetch Schema failed, because %s", executionResult.status.getMessage()), + executionResult.status.getCode())); } try (SetThreadName threadName = new SetThreadName(executionResult.queryId.getId())) { ClusterSchemaTree result = new ClusterSchemaTree(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AbstractFragInsStateTracker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AbstractFragInsStateTracker.java index 305c41b9aef..dd6d5945cf6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AbstractFragInsStateTracker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AbstractFragInsStateTracker.java @@ -95,7 +95,8 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT FragmentInstanceState.valueOf(resp.getState()), resp.getEndTime(), failedMessage, - failureInfoList); + failureInfoList, + resp.getErrorCode()); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java index 1a704fc4706..448f0829b5a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java @@ -105,7 +105,9 @@ public class ClusterScheduler implements IScheduler { private boolean needRetry(TSStatus failureStatus) { return failureStatus != null && queryType == QueryType.READ - && failureStatus.getCode() == TSStatusCode.DISPATCH_ERROR.getStatusCode(); + && (failureStatus.getCode() == TSStatusCode.DISPATCH_ERROR.getStatusCode() + || failureStatus.getCode() + == TSStatusCode.TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode()); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java index 36e4bdc99ac..31b5742a9d0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; +import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; import org.apache.iotdb.db.queryengine.execution.QueryStateMachine; import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceInfo; @@ -149,6 +150,11 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker { new RuntimeException( String.format( "FragmentInstance[%s] is failed. %s", instanceId, instanceInfo.getMessage()))); + } else if (instanceInfo.getErrorCode().isPresent()) { + stateMachine.transitionToFailed( + new IoTDBException( + instanceInfo.getErrorCode().get().getMessage(), + instanceInfo.getErrorCode().get().getCode())); } else { stateMachine.transitionToFailed(instanceInfo.getFailureInfoList().get(0).toException()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java index 41063b49571..8c4e79f5d44 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -334,6 +334,9 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { } else if (sendFragmentInstanceResp.status.getCode() == TSStatusCode.CONSENSUS_GROUP_NOT_EXIST.getStatusCode()) { throw new ConsensusGroupNotExistException(sendFragmentInstanceResp.message); + } else if (sendFragmentInstanceResp.status.getCode() + == TSStatusCode.TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode()) { + throw new FragmentInstanceDispatchException(sendFragmentInstanceResp.status); } } throw new FragmentInstanceDispatchException( @@ -453,6 +456,11 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { if (!readResult.isAccepted()) { LOGGER.warn(readResult.getMessage()); if (readResult.isNeedRetry()) { + if (readResult.getStatus() != null + && readResult.getStatus().getCode() + == TSStatusCode.TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode()) { + throw new FragmentInstanceDispatchException(readResult.getStatus()); + } throw new FragmentInstanceDispatchException( RpcUtils.getStatus(TSStatusCode.DISPATCH_ERROR, readResult.getMessage())); } else { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java index e698ba8487e..2ac49494acd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java @@ -97,8 +97,12 @@ public class ErrorHandlingUtils { if (status.getCode() != TSStatusCode.STORAGE_ENGINE_NOT_READY.getStatusCode()) { String message = String.format( - "Status code: %s, Query Statement: %s failed", status.getCode(), operation); - if (status.getCode() == TSStatusCode.SQL_PARSE_ERROR.getStatusCode()) { + "Status code: %s, Query Statement: %s failed because %s", + status.getCode(), operation, status.getMessage()); + if (status.getCode() == TSStatusCode.SQL_PARSE_ERROR.getStatusCode() + || status.getCode() == TSStatusCode.SEMANTIC_ERROR.getStatusCode() + || status.getCode() == TSStatusCode.NO_PERMISSION.getStatusCode() + || status.getCode() == TSStatusCode.ILLEGAL_PATH.getStatusCode()) { LOGGER.warn(message); } else { LOGGER.warn(message, e); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IllegalPathException.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IllegalPathException.java index f1c46742cc5..1332284ec0e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IllegalPathException.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IllegalPathException.java @@ -25,14 +25,16 @@ public class IllegalPathException extends MetadataException { private static final long serialVersionUID = 2693272249167539978L; public IllegalPathException(String path) { - super(String.format("%s is not a legal path", path)); - errorCode = TSStatusCode.ILLEGAL_PATH.getStatusCode(); - this.isUserException = true; + super( + String.format("%s is not a legal path", path), + TSStatusCode.ILLEGAL_PATH.getStatusCode(), + true); } public IllegalPathException(String path, String reason) { - super(String.format("%s is not a legal path, because %s", path, reason)); - errorCode = TSStatusCode.ILLEGAL_PATH.getStatusCode(); - this.isUserException = true; + super( + String.format("%s is not a legal path, because %s", path, reason), + TSStatusCode.ILLEGAL_PATH.getStatusCode(), + true); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IllegalPrivilegeException.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IllegalPrivilegeException.java index 475edefaaee..da8a4ae7815 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IllegalPrivilegeException.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IllegalPrivilegeException.java @@ -26,14 +26,13 @@ public class IllegalPrivilegeException extends MetadataException { private static final long serialVersionUID = 2693272249167539978L; public IllegalPrivilegeException(Integer priv) { - super(String.format("%s is not a legal privilege", PrivilegeType.values()[priv].toString())); - errorCode = TSStatusCode.ILLEGAL_PRIVILEGE.getStatusCode(); - this.isUserException = true; + super( + String.format("%s is not a legal privilege", PrivilegeType.values()[priv].toString()), + TSStatusCode.ILLEGAL_PRIVILEGE.getStatusCode(), + true); } public IllegalPrivilegeException(String reason) { - super(String.format("%s", reason)); - errorCode = TSStatusCode.ILLEGAL_PATH.getStatusCode(); - this.isUserException = true; + super(String.format("%s", reason), TSStatusCode.ILLEGAL_PATH.getStatusCode(), true); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IoTDBException.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IoTDBException.java index 0a93d8054cd..3fca9bc6210 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IoTDBException.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IoTDBException.java @@ -23,17 +23,18 @@ package org.apache.iotdb.commons.exception; public class IoTDBException extends Exception { private static final long serialVersionUID = 8480450962311247736L; - protected int errorCode; + private final int errorCode; /** * This kind of exception is caused by users' wrong sql, and there is no need for server to print * the full stack of the exception */ - protected boolean isUserException = false; + private final boolean isUserException; public IoTDBException(String message, int errorCode) { super(message); this.errorCode = errorCode; + this.isUserException = false; } public IoTDBException(String message, int errorCode, boolean isUserException) { @@ -45,11 +46,13 @@ public class IoTDBException extends Exception { public IoTDBException(String message, Throwable cause, int errorCode) { super(message, cause); this.errorCode = errorCode; + this.isUserException = false; } public IoTDBException(Throwable cause, int errorCode) { super(cause); this.errorCode = errorCode; + this.isUserException = false; } public IoTDBException(Throwable cause, int errorCode, boolean isUserException) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java index 97811bc76a9..ca7e7cd75cd 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java @@ -64,6 +64,7 @@ public class StatusUtils { NEED_RETRY.add(TSStatusCode.NO_AVAILABLE_REGION_GROUP.getStatusCode()); NEED_RETRY.add(TSStatusCode.LACK_PARTITION_ALLOCATION.getStatusCode()); NEED_RETRY.add(TSStatusCode.NO_ENOUGH_DATANODE.getStatusCode()); + NEED_RETRY.add(TSStatusCode.TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode()); } /** diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift index cb39f4937bc..90ee7818435 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift @@ -165,6 +165,7 @@ struct TFragmentInstanceInfoResp { 2: optional i64 endTime 3: optional list<string> failedMessages 4: optional list<binary> failureInfoList + 5: optional common.TSStatus errorCode } struct TCancelQueryReq {
