This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch benchants_branch_optimize_fe in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 789e78da8bed0dd5611cf5a894968a659196e164 Author: Beyyes <[email protected]> AuthorDate: Tue Jun 6 13:57:27 2023 +0800 change for optimizing fe --- .../java/org/apache/iotdb/isession/ISession.java | 4 + .../java/org/apache/iotdb/session/Session.java | 32 ++++++ .../thrift/src/main/thrift/client.thrift | 1 + .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +- .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 119 +++++++++++++++++++++ .../db/mpp/plan/execution/QueryExecution.java | 7 +- .../ConcatExpressionWithSuffixPathsVisitor.java | 1 + .../db/mpp/plan/parser/StatementGenerator.java | 4 + .../db/mpp/plan/planner/LogicalPlanBuilder.java | 1 + .../db/mpp/plan/planner/LogicalPlanVisitor.java | 38 +++++-- .../plan/planner/distribution/SourceRewriter.java | 28 +++-- .../db/mpp/plan/statement/crud/QueryStatement.java | 11 ++ .../service/thrift/impl/ClientRPCServiceImpl.java | 87 +++++++++++++++ 13 files changed, 309 insertions(+), 26 deletions(-) diff --git a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java index 4095afe3680..e068e584011 100644 --- a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java +++ b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java @@ -189,6 +189,10 @@ public interface ISession extends AutoCloseable { long slidingStep) throws StatementExecutionException, IoTDBConnectionException; + SessionDataSet executeSingleSeriesAggregationQuery( + String path, TAggregationType aggregationType, long startTime, long endTime, long interval) + throws StatementExecutionException, IoTDBConnectionException; + void insertRecord( String deviceId, long time, diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java index 76f290e8448..37d78b96954 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java @@ -957,6 +957,38 @@ public class Session implements ISession { } } + @Override + public SessionDataSet executeSingleSeriesAggregationQuery( + String path, TAggregationType aggregationType, long startTime, long endTime, long interval) + throws StatementExecutionException, IoTDBConnectionException { + try { + return defaultSessionConnection.executeAggregationQuery( + Collections.singletonList(path), + Collections.singletonList(aggregationType), + startTime, + endTime, + interval); + } catch (RedirectException e) { + handleQueryRedirection(e.getEndPoint()); + if (enableQueryRedirection) { + // retry + try { + return defaultSessionConnection.executeAggregationQuery( + Collections.singletonList(path), + Collections.singletonList(aggregationType), + startTime, + endTime, + interval); + } catch (RedirectException redirectException) { + logger.error("redirect twice", redirectException); + throw new StatementExecutionException("redirect twice, please try again."); + } + } else { + throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT); + } + } + } + /** * insert data in one row, if you want to improve your performance, please use insertRecords * method or insertTablet method diff --git a/iotdb-protocol/thrift/src/main/thrift/client.thrift b/iotdb-protocol/thrift/src/main/thrift/client.thrift index 8f6cf8e6297..0679b1ba453 100644 --- a/iotdb-protocol/thrift/src/main/thrift/client.thrift +++ b/iotdb-protocol/thrift/src/main/thrift/client.thrift @@ -350,6 +350,7 @@ struct TSAggregationQueryReq { 9: optional i32 fetchSize 10: optional i64 timeout 11: optional bool legalPathNodes + 12: optional bool singleSeriesAggregation } struct TSCreateMultiTimeseriesReq { diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index cdd894b5d47..b68c302cd0c 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -584,7 +584,7 @@ public class IoTDBConfig { private long cacheFileReaderClearPeriod = 100000; /** the max executing time of query in ms. Unit: millisecond */ - private long queryTimeoutThreshold = 60000; + private long queryTimeoutThreshold = 60000000; /** the max time to live of a session in ms. Unit: millisecond */ private int sessionTimeoutThreshold = 0; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java index a96bd165a59..325d782fb76 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java @@ -236,6 +236,11 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> @Override public Analysis visitQuery(QueryStatement queryStatement, MPPQueryContext context) { + if (queryStatement.isSingleSeriesAggregation() || queryStatement.isGroupBy()) { + queryStatement.setSingleSeriesAggregation(true); + return visitSimpleSingleSeriesAggregationQuery(queryStatement, context); + } + Analysis analysis = new Analysis(); try { // check for semantic errors @@ -375,6 +380,120 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> return analysis; } + // single aggregation with single timeseries (could associate with start time, end time and + // interval) + public Analysis visitSimpleSingleSeriesAggregationQuery( + QueryStatement queryStatement, MPPQueryContext context) { + Analysis analysis = new Analysis(); + try { + if (queryStatement.isGroupByTag()) { + throw new SemanticException("GroupByTag is not supported in SimpleSingleSeriesAggregation"); + } + if (queryStatement.isLastQuery()) { + throw new SemanticException("LastQuery is not supported in SimpleSingleSeriesAggregation"); + } + if (analysis.useLogicalView()) { + throw new SemanticException("View is not supported in SimpleSingleSeriesAggregation"); + } + if (queryStatement.isAlignByDevice()) { + throw new SemanticException( + "AlignByDevice is not supported in SimpleSingleSeriesAggregation"); + } + if (queryStatement.hasHaving()) { + throw new SemanticException("Having is not supported in SimpleSingleSeriesAggregation"); + } + if (queryStatement.getFillComponent() != null) { + throw new SemanticException("Fill is not supported in SimpleSingleSeriesAggregation"); + } + if (queryStatement.useWildcard()) { + throw new SemanticException("Wildcard is not supported in SimpleSingleSeriesAggregation"); + } + if (queryStatement.hasOrderByExpression()) { + throw new SemanticException("OrderBy is not supported in SimpleSingleSeriesAggregation"); + } + if (queryStatement.isSelectInto()) { + throw new SemanticException("SelectInto is not supported in SimpleSingleSeriesAggregation"); + } + if (queryStatement.hasWhere()) { + throw new SemanticException("Where is not supported in SimpleSingleSeriesAggregation"); + } + + // check for semantic errors + // queryStatement.semanticCheck(); + + // concat path and construct path pattern tree + PathPatternTree patternTree = new PathPatternTree(false); + // TODO optimize rewrite method, just concat PrefixPath and Expression + // List<PartialPath> prefixPaths = queryStatement.getFromComponent().getPrefixPaths(); + // List<ResultColumn> resultColumns = + // concatSelectWithFrom(queryStatement.getSelectComponent(), prefixPaths, false); + // queryStatement.getSelectComponent().setResultColumns(resultColumns); + queryStatement = + (QueryStatement) new ConcatPathRewriter().rewrite(queryStatement, patternTree); + analysis.setStatement(queryStatement); + + // request schema fetch API + long startTime = System.nanoTime(); + ISchemaTree schemaTree; + try { + schemaTree = schemaFetcher.fetchSchema(patternTree, context); + // If there is no leaf node in the schema tree, the query should be completed immediately + if (schemaTree.isEmpty()) { + return finishQuery(queryStatement, analysis); + } + } finally { + QueryPlanCostMetricSet.getInstance() + .recordPlanCost(SCHEMA_FETCHER, System.nanoTime() - startTime); + } + + // extract global time filter from query filter and determine if there is a value filter + analyzeGlobalTimeFilter(analysis, queryStatement); + + List<Pair<Expression, String>> outputExpressions; + Map<Integer, List<Pair<Expression, String>>> outputExpressionMap = + analyzeSelect(analysis, queryStatement, schemaTree); + + outputExpressions = new ArrayList<>(); + outputExpressionMap.values().forEach(outputExpressions::addAll); + analysis.setOutputExpressions(outputExpressions); + if (outputExpressions.isEmpty()) { + return finishQuery(queryStatement, analysis); + } + + analyzeGroupBy(analysis, queryStatement, schemaTree); + analyzeGroupByLevel(analysis, queryStatement, outputExpressionMap, outputExpressions); + + Set<Expression> selectExpressions = new LinkedHashSet<>(); + if (queryStatement.isOutputEndTime()) { + selectExpressions.add(endTimeExpression); + } + for (Pair<Expression, String> outputExpressionAndAlias : outputExpressions) { + selectExpressions.add(outputExpressionAndAlias.left); + } + analysis.setSelectExpressions(selectExpressions); + + analyzeAggregation(analysis, queryStatement); + + analyzeSourceTransform(analysis, queryStatement); + + analyzeSource(analysis, queryStatement); + + analyzeGroupByTime(analysis, queryStatement); + + // generate result set header according to output expressions + analyzeOutput(analysis, queryStatement, outputExpressions); + + // fetch partition information + analyzeDataPartition(analysis, queryStatement, schemaTree); + + } catch (StatementAnalyzeException e) { + logger.warn("Meet error when analyzing the query statement: ", e); + throw new StatementAnalyzeException( + "Meet error when analyzing the query statement: " + e.getMessage()); + } + return analysis; + } + private Analysis finishQuery(QueryStatement queryStatement, Analysis analysis) { if (queryStatement.isSelectInto()) { analysis.setRespDatasetHeader( diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java index 8c4200fe391..ea2a1b3d1f1 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java @@ -64,6 +64,7 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertBaseStatement; import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement; import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement; +import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement; import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; import org.apache.iotdb.rpc.RpcUtils; @@ -360,11 +361,15 @@ public class QueryExecution implements IQueryExecution { DistributionPlanner planner = new DistributionPlanner(this.analysis, this.logicalPlan); this.distributedPlan = planner.planFragments(); + if (rawStatement instanceof QueryStatement && ((QueryStatement) rawStatement).isSingleSeriesAggregation()) { + // simplify planner.planFragments() process + } + if (rawStatement.isQuery()) { QUERY_PLAN_COST_METRIC_SET.recordPlanCost( DISTRIBUTION_PLANNER, System.nanoTime() - startTime); } - if (isQuery() && logger.isDebugEnabled()) { + if (logger.isDebugEnabled() && isQuery()) { logger.debug( "distribution plan done. Fragment instance count is {}, details is: \n {}", distributedPlan.getInstances().size(), diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/cartesian/ConcatExpressionWithSuffixPathsVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/cartesian/ConcatExpressionWithSuffixPathsVisitor.java index ae66a3e3e46..c028d365443 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/cartesian/ConcatExpressionWithSuffixPathsVisitor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/cartesian/ConcatExpressionWithSuffixPathsVisitor.java @@ -39,6 +39,7 @@ import static org.apache.iotdb.db.mpp.plan.analyze.ExpressionUtils.reconstructTi public class ConcatExpressionWithSuffixPathsVisitor extends CartesianProductVisitor<ConcatExpressionWithSuffixPathsVisitor.Context> { + @Override public List<Expression> visitFunctionExpression( FunctionExpression functionExpression, Context context) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java index 5fc4f3a2348..57b388a43bf 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java @@ -218,6 +218,7 @@ public class StatementGenerator { FromComponent fromComponent = new FromComponent(); fromComponent.addPrefixPath(new PartialPath("", false)); queryStatement.setFromComponent(fromComponent); + queryStatement.setSingleSeriesAggregation(true); SelectComponent selectComponent = new SelectComponent(zoneId); List<PartialPath> selectPaths = new ArrayList<>(); @@ -228,6 +229,9 @@ public class StatementGenerator { selectPaths.add(new PartialPath(pathStr)); } } + if (req.isSingleSeries()) { + queryStatement.setSingleSeriesAggregation(true); + } List<TAggregationType> aggregations = req.getAggregations(); for (int i = 0; i < aggregations.size(); i++) { selectComponent.addResultColumn( diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java index f2d19fe9025..80b5460e1f5 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java @@ -315,6 +315,7 @@ public class LogicalPlanBuilder { List<String> tagKeys, Map<List<String>, LinkedHashMap<Expression, List<Expression>>> tagValuesToGroupedTimeseriesOperands) { + boolean needCheckAscending = groupByTimeParameter == null; Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations = new HashMap<>(); Map<PartialPath, List<AggregationDescriptor>> descendingAggregations = new HashMap<>(); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java index 14a49611613..76266046fed 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java @@ -83,8 +83,6 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.view.ShowLogicalViewState import org.apache.iotdb.db.mpp.plan.statement.sys.ShowQueriesStatement; import org.apache.iotdb.tsfile.utils.Pair; -import org.apache.commons.lang3.Validate; - import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -113,6 +111,10 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte @Override public PlanNode visitQuery(QueryStatement queryStatement, MPPQueryContext context) { + if (queryStatement.isSingleSeriesAggregation()) { + return visitSingleSeriesAggregationQuery(queryStatement, context); + } + LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context); if (queryStatement.isLastQuery()) { @@ -218,6 +220,24 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte return planBuilder.getRoot(); } + private PlanNode visitSingleSeriesAggregationQuery( + QueryStatement queryStatement, MPPQueryContext context) { + LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context); + + planBuilder = + planBuilder.planAggregationSource( + AggregationStep.SINGLE, + queryStatement.getResultTimeOrder(), + analysis.getGlobalTimeFilter(), + analysis.getGroupByTimeParameter(), + analysis.getAggregationExpressions(), + analysis.getSourceTransformExpressions(), + analysis.getCrossGroupByExpressions(), + analysis.getTagKeys(), + analysis.getTagValuesToGroupedTimeseriesOperands()); + return planBuilder.getRoot(); + } + public PlanNode visitQueryBody( QueryStatement queryStatement, Set<Expression> sourceExpressions, @@ -346,12 +366,14 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte private boolean cannotUseStatistics(Set<Expression> expressions) { for (Expression expression : expressions) { - Validate.isTrue( - expression instanceof FunctionExpression, - String.format("Invalid Aggregation Expression: %s", expression.getExpressionString())); - if (!BuiltinAggregationFunction.canUseStatistics( - ((FunctionExpression) expression).getFunctionName())) { - return true; + if (expression instanceof FunctionExpression) { + if (!BuiltinAggregationFunction.canUseStatistics( + ((FunctionExpression) expression).getFunctionName())) { + return true; + } + } else { + throw new IllegalArgumentException( + String.format("Invalid Aggregation Expression: %s", expression.getExpressionString())); } } return false; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java index 9c66ff9c46d..67b0cd101af 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java @@ -483,14 +483,12 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte List<AggregationDescriptor> leafAggDescriptorList = new ArrayList<>(); node.getAggregationDescriptorList() .forEach( - descriptor -> { - leafAggDescriptorList.add( - new AggregationDescriptor( - descriptor.getAggregationFuncName(), - AggregationStep.PARTIAL, - descriptor.getInputExpressions(), - descriptor.getInputAttributes())); - }); + descriptor -> leafAggDescriptorList.add( + new AggregationDescriptor( + descriptor.getAggregationFuncName(), + AggregationStep.PARTIAL, + descriptor.getInputExpressions(), + descriptor.getInputAttributes()))); leafAggDescriptorList.forEach( d -> LogicalPlanBuilder.updateTypeProviderByPartialAggregation( @@ -498,14 +496,12 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte List<AggregationDescriptor> rootAggDescriptorList = new ArrayList<>(); node.getAggregationDescriptorList() .forEach( - descriptor -> { - rootAggDescriptorList.add( - new AggregationDescriptor( - descriptor.getAggregationFuncName(), - context.isRoot ? AggregationStep.FINAL : AggregationStep.INTERMEDIATE, - descriptor.getInputExpressions(), - descriptor.getInputAttributes())); - }); + descriptor -> rootAggDescriptorList.add( + new AggregationDescriptor( + descriptor.getAggregationFuncName(), + context.isRoot ? AggregationStep.FINAL : AggregationStep.INTERMEDIATE, + descriptor.getInputExpressions(), + descriptor.getInputAttributes()))); AggregationNode aggregationNode = new AggregationNode( diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java index 3b7fe31c517..3e374021f25 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java @@ -114,6 +114,9 @@ public class QueryStatement extends Statement { private boolean useWildcard = true; + // single timeseries, single aggregation type + private boolean isSingleSeriesAggregation = false; + public QueryStatement() { this.statementType = StatementType.QUERY; } @@ -480,6 +483,14 @@ public class QueryStatement extends Statement { return useWildcard; } + public void setSingleSeriesAggregation(boolean singleSeriesAggregation) { + this.isSingleSeriesAggregation = singleSeriesAggregation; + } + + public boolean isSingleSeriesAggregation() { + return this.isSingleSeriesAggregation; + } + public void semanticCheck() { if (isAggregationQuery()) { if (disableAlign()) { diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java index 28bec0979bd..a814e3ea56e 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java @@ -523,6 +523,93 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { return resp; } + } catch (Exception e) { + finished = true; + t = e; + return RpcUtils.getTSExecuteStatementResp( + onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_AGG_QUERY)); + } catch (Error error) { + t = error; + throw error; + } finally { + + long currentOperationCost = System.nanoTime() - startTime; + COORDINATOR.recordExecutionTime(queryId, currentOperationCost); + + // record each operation time cost + addStatementExecutionLatency( + OperationType.EXECUTE_AGG_QUERY, StatementType.QUERY, currentOperationCost); + + if (finished) { + // record total time cost for one query + long executionTime = COORDINATOR.getTotalExecutionTime(queryId); + addQueryLatency( + StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); + COORDINATOR.cleanupQueryExecution(queryId, t); + } + + SESSION_MANAGER.updateIdleTime(); + if (quota != null) { + quota.close(); + } + } + } + + private TSExecuteStatementResp executeSingleSeriesAggregationQueryInternal( + TSAggregationQueryReq req, SelectResult setResult) { + boolean finished = false; + long queryId = Long.MIN_VALUE; + IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); + OperationQuota quota = null; + if (!SESSION_MANAGER.checkLogin(clientSession)) { + return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus()); + } + long startTime = System.nanoTime(); + Throwable t = null; + try { + Statement s = StatementGenerator.createStatement(req, clientSession.getZoneId()); + // permission check + TSStatus status = AuthorityChecker.checkAuthority(s, clientSession); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return RpcUtils.getTSExecuteStatementResp(status); + } + + quota = + DataNodeThrottleQuotaManager.getInstance() + .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s); + + queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId); + // create and cache dataset + ExecutionResult result = + COORDINATOR.execute( + s, + queryId, + SESSION_MANAGER.getSessionInfo(clientSession), + "", + partitionFetcher, + schemaFetcher, + req.getTimeout()); + + if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new RuntimeException("error code: " + result.status); + } + + IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId); + + try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) { + TSExecuteStatementResp resp; + if (queryExecution.isQuery()) { + resp = createResponse(queryExecution.getDatasetHeader(), queryId); + resp.setStatus(result.status); + finished = setResult.apply(resp, queryExecution, req.fetchSize); + resp.setMoreData(!finished); + quota.addReadResult(resp.getQueryResult()); + } else { + resp = RpcUtils.getTSExecuteStatementResp(result.status); + } + return resp; + } + } catch (Exception e) { finished = true; t = e;
