This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/AllowRetry in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4bc006f1fba9ed8b7dfb89f2b1b3f5c16b93fa89 Author: JackieTien97 <[email protected]> AuthorDate: Tue Aug 13 14:55:18 2024 +0800 retry for too many queries error --- .../impl/DataNodeInternalRPCServiceImpl.java | 1 + .../execution/executor/RegionReadExecutor.java | 7 ++++ .../fragment/FragmentInstanceContext.java | 29 +++++++++++++++-- .../fragment/FragmentInstanceExecution.java | 22 ++++++++++--- .../execution/fragment/FragmentInstanceInfo.java | 20 ++++++++++++ .../fragment/FragmentInstanceManager.java | 38 ++++++++++++++++++---- .../scheduler/AbstractFragInsStateTracker.java | 3 +- .../plan/scheduler/ClusterScheduler.java | 4 ++- .../scheduler/FixedRateFragInsStateTracker.java | 6 ++++ .../scheduler/FragmentInstanceDispatcherImpl.java | 8 +++++ .../src/main/thrift/datanode.thrift | 1 + 11 files changed, 123 insertions(+), 16 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index cf082082796..9a2201075d2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -406,6 +406,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface failureInfoList.add(failureInfo.serialize()); } resp.setFailureInfoList(failureInfoList); + info.getErrorCode().ifPresent(resp::setErrorCode); return resp; } catch (IOException e) { return resp; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java index 3b6a0f0adb0..8e9a16e9b14 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionReadExecutor.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.execution.executor; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.consensus.DataRegionId; +import org.apache.iotdb.commons.utils.StatusUtils; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.consensus.IConsensus; import org.apache.iotdb.consensus.common.DataSet; @@ -92,6 +93,12 @@ public class RegionReadExecutor { FragmentInstanceInfo info = (FragmentInstanceInfo) readResponse; resp.setAccepted(!info.getState().isFailed()); resp.setMessage(info.getMessage()); + info.getErrorCode() + .ifPresent( + s -> { + resp.setStatus(s); + resp.setNeedRetry(StatusUtils.needRetryHelper(s)); + }); } return resp; } catch (ConsensusGroupNotExistException e) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index 4e8928deebb..9648b1701fb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.queryengine.execution.fragment; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.path.IFullPath; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.exception.query.QueryProcessException; @@ -312,6 +314,18 @@ public class FragmentInstanceContext extends QueryContext { .collect(Collectors.toList()); } + public Optional<TSStatus> getErrorCode() { + return stateMachine.getFailureCauses().stream() + .filter(IoTDBException.class::isInstance) + .findFirst() + .flatMap( + t -> { + TSStatus status = new TSStatus(((IoTDBException) t).getErrorCode()); + status.setMessage(t.getMessage()); + return Optional.of(status); + }); + } + public void finished() { stateMachine.finished(); } @@ -350,8 +364,19 @@ public class FragmentInstanceContext extends QueryContext { } public FragmentInstanceInfo getInstanceInfo() { - return new FragmentInstanceInfo( - stateMachine.getState(), getEndTime(), getFailedCause(), getFailureInfoList()); + return getErrorCode() + .map( + s -> + new FragmentInstanceInfo( + stateMachine.getState(), + getEndTime(), + getFailedCause(), + getFailureInfoList(), + s)) + .orElseGet( + () -> + new FragmentInstanceInfo( + stateMachine.getState(), getEndTime(), getFailedCause(), getFailureInfoList())); } public FragmentInstanceStateMachine getStateMachine() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java index f631ea8a246..c42834f5190 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java @@ -117,11 +117,23 @@ public class FragmentInstanceExecution { } public FragmentInstanceInfo getInstanceInfo() { - return new FragmentInstanceInfo( - stateMachine.getState(), - context.getEndTime(), - context.getFailedCause(), - context.getFailureInfoList()); + return context + .getErrorCode() + .map( + s -> + new FragmentInstanceInfo( + stateMachine.getState(), + context.getEndTime(), + context.getFailedCause(), + context.getFailureInfoList(), + s)) + .orElseGet( + () -> + new FragmentInstanceInfo( + stateMachine.getState(), + context.getEndTime(), + context.getFailedCause(), + context.getFailureInfoList())); } public long getStartTime() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceInfo.java index 9c67d67de64..4717a23f279 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceInfo.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceInfo.java @@ -19,9 +19,11 @@ package org.apache.iotdb.db.queryengine.execution.fragment; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.consensus.common.DataSet; import java.util.List; +import java.util.Optional; public class FragmentInstanceInfo implements DataSet { private final FragmentInstanceState state; @@ -30,6 +32,8 @@ public class FragmentInstanceInfo implements DataSet { private List<FragmentInstanceFailureInfo> failureInfoList; + private TSStatus errorCode; + public FragmentInstanceInfo(FragmentInstanceState state) { this.state = state; } @@ -49,6 +53,18 @@ public class FragmentInstanceInfo implements DataSet { this.failureInfoList = failureInfoList; } + public FragmentInstanceInfo( + FragmentInstanceState state, + long endTime, + String message, + List<FragmentInstanceFailureInfo> failureInfoList, + TSStatus errorStatus) { + this(state, endTime); + this.message = message; + this.failureInfoList = failureInfoList; + this.errorCode = errorStatus; + } + public FragmentInstanceState getState() { return state; } @@ -61,6 +77,10 @@ public class FragmentInstanceInfo implements DataSet { return message; } + public Optional<TSStatus> getErrorCode() { + return Optional.ofNullable(errorCode); + } + public List<FragmentInstanceFailureInfo> getFailureInfoList() { return failureInfoList; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java index 0886cb406bb..1a034eda478 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.execution.fragment; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; @@ -49,6 +50,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; @@ -181,8 +183,18 @@ public class FragmentInstanceManager { exchangeManager); } catch (Throwable t) { clearFIRelatedResources(instanceId); - logger.warn("error when create FragmentInstanceExecution.", t); - stateMachine.failed(t); + // deal with + if (t instanceof IllegalStateException + && TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG.equals(t.getMessage())) { + logger.warn(TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG); + stateMachine.failed( + new IoTDBException( + TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG, + TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode())); + } else { + logger.warn("error when create FragmentInstanceExecution.", t); + stateMachine.failed(t); + } return null; } }); @@ -367,11 +379,23 @@ public class FragmentInstanceManager { private FragmentInstanceInfo createFailedInstanceInfo(FragmentInstanceId instanceId) { FragmentInstanceContext context = instanceContext.get(instanceId); - return new FragmentInstanceInfo( - FragmentInstanceState.FAILED, - context.getEndTime(), - context.getFailedCause(), - context.getFailureInfoList()); + Optional<TSStatus> errorCode = context.getErrorCode(); + return errorCode + .map( + tsStatus -> + new FragmentInstanceInfo( + FragmentInstanceState.FAILED, + context.getEndTime(), + context.getFailedCause(), + context.getFailureInfoList(), + tsStatus)) + .orElseGet( + () -> + new FragmentInstanceInfo( + FragmentInstanceState.FAILED, + context.getEndTime(), + context.getFailedCause(), + context.getFailureInfoList())); } private void removeOldInstances() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AbstractFragInsStateTracker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AbstractFragInsStateTracker.java index 305c41b9aef..dd6d5945cf6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AbstractFragInsStateTracker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AbstractFragInsStateTracker.java @@ -95,7 +95,8 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT FragmentInstanceState.valueOf(resp.getState()), resp.getEndTime(), failedMessage, - failureInfoList); + failureInfoList, + resp.getErrorCode()); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java index 1a704fc4706..448f0829b5a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java @@ -105,7 +105,9 @@ public class ClusterScheduler implements IScheduler { private boolean needRetry(TSStatus failureStatus) { return failureStatus != null && queryType == QueryType.READ - && failureStatus.getCode() == TSStatusCode.DISPATCH_ERROR.getStatusCode(); + && (failureStatus.getCode() == TSStatusCode.DISPATCH_ERROR.getStatusCode() + || failureStatus.getCode() + == TSStatusCode.TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode()); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java index 36e4bdc99ac..31b5742a9d0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; +import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; import org.apache.iotdb.db.queryengine.execution.QueryStateMachine; import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceInfo; @@ -149,6 +150,11 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker { new RuntimeException( String.format( "FragmentInstance[%s] is failed. %s", instanceId, instanceInfo.getMessage()))); + } else if (instanceInfo.getErrorCode().isPresent()) { + stateMachine.transitionToFailed( + new IoTDBException( + instanceInfo.getErrorCode().get().getMessage(), + instanceInfo.getErrorCode().get().getCode())); } else { stateMachine.transitionToFailed(instanceInfo.getFailureInfoList().get(0).toException()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java index 41063b49571..8c4e79f5d44 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -334,6 +334,9 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { } else if (sendFragmentInstanceResp.status.getCode() == TSStatusCode.CONSENSUS_GROUP_NOT_EXIST.getStatusCode()) { throw new ConsensusGroupNotExistException(sendFragmentInstanceResp.message); + } else if (sendFragmentInstanceResp.status.getCode() + == TSStatusCode.TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode()) { + throw new FragmentInstanceDispatchException(sendFragmentInstanceResp.status); } } throw new FragmentInstanceDispatchException( @@ -453,6 +456,11 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { if (!readResult.isAccepted()) { LOGGER.warn(readResult.getMessage()); if (readResult.isNeedRetry()) { + if (readResult.getStatus() != null + && readResult.getStatus().getCode() + == TSStatusCode.TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode()) { + throw new FragmentInstanceDispatchException(readResult.getStatus()); + } throw new FragmentInstanceDispatchException( RpcUtils.getStatus(TSStatusCode.DISPATCH_ERROR, readResult.getMessage())); } else { diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift index e63a6f77622..cd45a72e46b 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift @@ -165,6 +165,7 @@ struct TFragmentInstanceInfoResp { 2: optional i64 endTime 3: optional list<string> failedMessages 4: optional list<binary> failureInfoList + 5: optional common.TSStatus errorCode } struct TCancelQueryReq {
