This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/AllowRetry in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ab20502d87e2b387c1ba7484979dad712130353e Author: JackieTien97 <[email protected]> AuthorDate: Thu Aug 8 13:51:50 2024 +0800 partial --- .../src/main/java/org/apache/iotdb/rpc/TSStatusCode.java | 1 + .../execution/fragment/FragmentInstanceManager.java | 13 +++++++++++-- .../execution/schedule/queue/IndexedBlockingQueue.java | 4 +++- .../org/apache/iotdb/commons/exception/IoTDBException.java | 7 +++++-- .../java/org/apache/iotdb/commons/utils/StatusUtils.java | 1 + 5 files changed, 21 insertions(+), 5 deletions(-) diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index 80e31e2469d..7ce87a64886 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -116,6 +116,7 @@ public enum TSStatusCode { NO_SUCH_QUERY(714), QUERY_WAS_KILLED(715), EXPLAIN_ANALYZE_FETCH_ERROR(716), + TOO_MANY_CONCURRENT_QUERIES_ERROR(717), // Authentication INIT_AUTH_ERROR(800), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java index 900865dc887..e5a3d66a511 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.execution.fragment; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; +import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; import org.apache.iotdb.db.queryengine.common.QueryId; @@ -58,7 +59,9 @@ import java.util.concurrent.atomic.AtomicLong; import static java.util.Objects.requireNonNull; import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceExecution.createFragmentInstanceExecution; +import static org.apache.iotdb.db.queryengine.execution.schedule.queue.IndexedBlockingQueue.TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG; import static org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.LOCAL_EXECUTION_PLANNER; +import static org.apache.iotdb.rpc.TSStatusCode.TOO_MANY_CONCURRENT_QUERIES_ERROR; @SuppressWarnings("squid:S6548") public class FragmentInstanceManager { @@ -259,8 +262,14 @@ public class FragmentInstanceManager { exchangeManager); } catch (Throwable t) { clearFIRelatedResources(instanceId); - logger.warn("Execute error caused by ", t); - stateMachine.failed(t); + // deal with + if (t instanceof IllegalStateException && TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG.equals(t.getMessage())) { + logger.warn(TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG); + stateMachine.failed(new IoTDBException(TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG, TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode())); + } else { + logger.warn("Execute error caused by ", t); + stateMachine.failed(t); + } return null; } }); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/IndexedBlockingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/IndexedBlockingQueue.java index 1a63f01ffff..019b6db17ca 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/IndexedBlockingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/IndexedBlockingQueue.java @@ -38,6 +38,8 @@ import com.google.common.base.Preconditions; */ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> { + public static final String TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG = "The system can't allow more queries."; + protected final int capacity; protected final E queryHolder; protected int size; @@ -87,7 +89,7 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> { if (element == null) { throw new NullPointerException("pushed element is null"); } - Preconditions.checkState(size < capacity, "The system can't allow more queries."); + Preconditions.checkState(size < capacity, TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG); pushToQueue(element); size++; this.notifyAll(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IoTDBException.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IoTDBException.java index 0a93d8054cd..a5f8104e072 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IoTDBException.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/IoTDBException.java @@ -23,17 +23,18 @@ package org.apache.iotdb.commons.exception; public class IoTDBException extends Exception { private static final long serialVersionUID = 8480450962311247736L; - protected int errorCode; + protected final int errorCode; /** * This kind of exception is caused by users' wrong sql, and there is no need for server to print * the full stack of the exception */ - protected boolean isUserException = false; + protected final boolean isUserException; public IoTDBException(String message, int errorCode) { super(message); this.errorCode = errorCode; + this.isUserException = false; } public IoTDBException(String message, int errorCode, boolean isUserException) { @@ -45,11 +46,13 @@ public class IoTDBException extends Exception { public IoTDBException(String message, Throwable cause, int errorCode) { super(message, cause); this.errorCode = errorCode; + this.isUserException = false; } public IoTDBException(Throwable cause, int errorCode) { super(cause); this.errorCode = errorCode; + this.isUserException = false; } public IoTDBException(Throwable cause, int errorCode, boolean isUserException) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java index 97811bc76a9..ca7e7cd75cd 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java @@ -64,6 +64,7 @@ public class StatusUtils { NEED_RETRY.add(TSStatusCode.NO_AVAILABLE_REGION_GROUP.getStatusCode()); NEED_RETRY.add(TSStatusCode.LACK_PARTITION_ALLOCATION.getStatusCode()); NEED_RETRY.add(TSStatusCode.NO_ENOUGH_DATANODE.getStatusCode()); + NEED_RETRY.add(TSStatusCode.TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode()); } /**
