This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch benchants_branch_optimize_fe
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 789e78da8bed0dd5611cf5a894968a659196e164
Author: Beyyes <[email protected]>
AuthorDate: Tue Jun 6 13:57:27 2023 +0800

    change for optimizing fe
---
 .../java/org/apache/iotdb/isession/ISession.java   |   4 +
 .../java/org/apache/iotdb/session/Session.java     |  32 ++++++
 .../thrift/src/main/thrift/client.thrift           |   1 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   2 +-
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  | 119 +++++++++++++++++++++
 .../db/mpp/plan/execution/QueryExecution.java      |   7 +-
 .../ConcatExpressionWithSuffixPathsVisitor.java    |   1 +
 .../db/mpp/plan/parser/StatementGenerator.java     |   4 +
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    |   1 +
 .../db/mpp/plan/planner/LogicalPlanVisitor.java    |  38 +++++--
 .../plan/planner/distribution/SourceRewriter.java  |  28 +++--
 .../db/mpp/plan/statement/crud/QueryStatement.java |  11 ++
 .../service/thrift/impl/ClientRPCServiceImpl.java  |  87 +++++++++++++++
 13 files changed, 309 insertions(+), 26 deletions(-)

diff --git 
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java 
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
index 4095afe3680..e068e584011 100644
--- 
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
+++ 
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
@@ -189,6 +189,10 @@ public interface ISession extends AutoCloseable {
       long slidingStep)
       throws StatementExecutionException, IoTDBConnectionException;
 
+  SessionDataSet executeSingleSeriesAggregationQuery(
+      String path, TAggregationType aggregationType, long startTime, long 
endTime, long interval)
+      throws StatementExecutionException, IoTDBConnectionException;
+
   void insertRecord(
       String deviceId,
       long time,
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
index 76f290e8448..37d78b96954 100644
--- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -957,6 +957,38 @@ public class Session implements ISession {
     }
   }
 
+  @Override
+  public SessionDataSet executeSingleSeriesAggregationQuery(
+      String path, TAggregationType aggregationType, long startTime, long 
endTime, long interval)
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      return defaultSessionConnection.executeAggregationQuery(
+          Collections.singletonList(path),
+          Collections.singletonList(aggregationType),
+          startTime,
+          endTime,
+          interval);
+    } catch (RedirectException e) {
+      handleQueryRedirection(e.getEndPoint());
+      if (enableQueryRedirection) {
+        // retry
+        try {
+          return defaultSessionConnection.executeAggregationQuery(
+              Collections.singletonList(path),
+              Collections.singletonList(aggregationType),
+              startTime,
+              endTime,
+              interval);
+        } catch (RedirectException redirectException) {
+          logger.error("redirect twice", redirectException);
+          throw new StatementExecutionException("redirect twice, please try 
again.");
+        }
+      } else {
+        throw new StatementExecutionException(MSG_DONOT_ENABLE_REDIRECT);
+      }
+    }
+  }
+
   /**
    * insert data in one row, if you want to improve your performance, please 
use insertRecords
    * method or insertTablet method
diff --git a/iotdb-protocol/thrift/src/main/thrift/client.thrift 
b/iotdb-protocol/thrift/src/main/thrift/client.thrift
index 8f6cf8e6297..0679b1ba453 100644
--- a/iotdb-protocol/thrift/src/main/thrift/client.thrift
+++ b/iotdb-protocol/thrift/src/main/thrift/client.thrift
@@ -350,6 +350,7 @@ struct TSAggregationQueryReq {
   9: optional i32 fetchSize
   10: optional i64 timeout
   11: optional bool legalPathNodes
+  12: optional bool singleSeriesAggregation
 }
 
 struct TSCreateMultiTimeseriesReq {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index cdd894b5d47..b68c302cd0c 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -584,7 +584,7 @@ public class IoTDBConfig {
   private long cacheFileReaderClearPeriod = 100000;
 
   /** the max executing time of query in ms. Unit: millisecond */
-  private long queryTimeoutThreshold = 60000;
+  private long queryTimeoutThreshold = 60000000;
 
   /** the max time to live of a session in ms. Unit: millisecond */
   private int sessionTimeoutThreshold = 0;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index a96bd165a59..325d782fb76 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -236,6 +236,11 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
 
   @Override
   public Analysis visitQuery(QueryStatement queryStatement, MPPQueryContext 
context) {
+    if (queryStatement.isSingleSeriesAggregation() || 
queryStatement.isGroupBy()) {
+      queryStatement.setSingleSeriesAggregation(true);
+      return visitSimpleSingleSeriesAggregationQuery(queryStatement, context);
+    }
+
     Analysis analysis = new Analysis();
     try {
       // check for semantic errors
@@ -375,6 +380,120 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     return analysis;
   }
 
+  // single aggregation with single timeseries (could associate with start 
time, end time and
+  // interval)
+  public Analysis visitSimpleSingleSeriesAggregationQuery(
+      QueryStatement queryStatement, MPPQueryContext context) {
+    Analysis analysis = new Analysis();
+    try {
+      if (queryStatement.isGroupByTag()) {
+        throw new SemanticException("GroupByTag is not supported in 
SimpleSingleSeriesAggregation");
+      }
+      if (queryStatement.isLastQuery()) {
+        throw new SemanticException("LastQuery is not supported in 
SimpleSingleSeriesAggregation");
+      }
+      if (analysis.useLogicalView()) {
+        throw new SemanticException("View is not supported in 
SimpleSingleSeriesAggregation");
+      }
+      if (queryStatement.isAlignByDevice()) {
+        throw new SemanticException(
+            "AlignByDevice is not supported in SimpleSingleSeriesAggregation");
+      }
+      if (queryStatement.hasHaving()) {
+        throw new SemanticException("Having is not supported in 
SimpleSingleSeriesAggregation");
+      }
+      if (queryStatement.getFillComponent() != null) {
+        throw new SemanticException("Fill is not supported in 
SimpleSingleSeriesAggregation");
+      }
+      if (queryStatement.useWildcard()) {
+        throw new SemanticException("Wildcard is not supported in 
SimpleSingleSeriesAggregation");
+      }
+      if (queryStatement.hasOrderByExpression()) {
+        throw new SemanticException("OrderBy is not supported in 
SimpleSingleSeriesAggregation");
+      }
+      if (queryStatement.isSelectInto()) {
+        throw new SemanticException("SelectInto is not supported in 
SimpleSingleSeriesAggregation");
+      }
+      if (queryStatement.hasWhere()) {
+        throw new SemanticException("Where is not supported in 
SimpleSingleSeriesAggregation");
+      }
+
+      // check for semantic errors
+      // queryStatement.semanticCheck();
+
+      // concat path and construct path pattern tree
+      PathPatternTree patternTree = new PathPatternTree(false);
+      // TODO optimize rewrite method, just concat PrefixPath and Expression
+      // List<PartialPath> prefixPaths = 
queryStatement.getFromComponent().getPrefixPaths();
+      // List<ResultColumn> resultColumns =
+      //        concatSelectWithFrom(queryStatement.getSelectComponent(), 
prefixPaths, false);
+      // queryStatement.getSelectComponent().setResultColumns(resultColumns);
+      queryStatement =
+          (QueryStatement) new ConcatPathRewriter().rewrite(queryStatement, 
patternTree);
+      analysis.setStatement(queryStatement);
+
+      // request schema fetch API
+      long startTime = System.nanoTime();
+      ISchemaTree schemaTree;
+      try {
+        schemaTree = schemaFetcher.fetchSchema(patternTree, context);
+        // If there is no leaf node in the schema tree, the query should be 
completed immediately
+        if (schemaTree.isEmpty()) {
+          return finishQuery(queryStatement, analysis);
+        }
+      } finally {
+        QueryPlanCostMetricSet.getInstance()
+            .recordPlanCost(SCHEMA_FETCHER, System.nanoTime() - startTime);
+      }
+
+      // extract global time filter from query filter and determine if there 
is a value filter
+      analyzeGlobalTimeFilter(analysis, queryStatement);
+
+      List<Pair<Expression, String>> outputExpressions;
+      Map<Integer, List<Pair<Expression, String>>> outputExpressionMap =
+          analyzeSelect(analysis, queryStatement, schemaTree);
+
+      outputExpressions = new ArrayList<>();
+      outputExpressionMap.values().forEach(outputExpressions::addAll);
+      analysis.setOutputExpressions(outputExpressions);
+      if (outputExpressions.isEmpty()) {
+        return finishQuery(queryStatement, analysis);
+      }
+
+      analyzeGroupBy(analysis, queryStatement, schemaTree);
+      analyzeGroupByLevel(analysis, queryStatement, outputExpressionMap, 
outputExpressions);
+
+      Set<Expression> selectExpressions = new LinkedHashSet<>();
+      if (queryStatement.isOutputEndTime()) {
+        selectExpressions.add(endTimeExpression);
+      }
+      for (Pair<Expression, String> outputExpressionAndAlias : 
outputExpressions) {
+        selectExpressions.add(outputExpressionAndAlias.left);
+      }
+      analysis.setSelectExpressions(selectExpressions);
+
+      analyzeAggregation(analysis, queryStatement);
+
+      analyzeSourceTransform(analysis, queryStatement);
+
+      analyzeSource(analysis, queryStatement);
+
+      analyzeGroupByTime(analysis, queryStatement);
+
+      // generate result set header according to output expressions
+      analyzeOutput(analysis, queryStatement, outputExpressions);
+
+      // fetch partition information
+      analyzeDataPartition(analysis, queryStatement, schemaTree);
+
+    } catch (StatementAnalyzeException e) {
+      logger.warn("Meet error when analyzing the query statement: ", e);
+      throw new StatementAnalyzeException(
+          "Meet error when analyzing the query statement: " + e.getMessage());
+    }
+    return analysis;
+  }
+
   private Analysis finishQuery(QueryStatement queryStatement, Analysis 
analysis) {
     if (queryStatement.isSelectInto()) {
       analysis.setRespDatasetHeader(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 8c4200fe391..ea2a1b3d1f1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -64,6 +64,7 @@ import 
org.apache.iotdb.db.mpp.plan.statement.crud.InsertBaseStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -360,11 +361,15 @@ public class QueryExecution implements IQueryExecution {
     DistributionPlanner planner = new DistributionPlanner(this.analysis, 
this.logicalPlan);
     this.distributedPlan = planner.planFragments();
 
+    if (rawStatement instanceof QueryStatement && ((QueryStatement) 
rawStatement).isSingleSeriesAggregation()) {
+      // simplify planner.planFragments() process
+    }
+
     if (rawStatement.isQuery()) {
       QUERY_PLAN_COST_METRIC_SET.recordPlanCost(
           DISTRIBUTION_PLANNER, System.nanoTime() - startTime);
     }
-    if (isQuery() && logger.isDebugEnabled()) {
+    if (logger.isDebugEnabled() && isQuery()) {
       logger.debug(
           "distribution plan done. Fragment instance count is {}, details is: 
\n {}",
           distributedPlan.getInstances().size(),
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/cartesian/ConcatExpressionWithSuffixPathsVisitor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/cartesian/ConcatExpressionWithSuffixPathsVisitor.java
index ae66a3e3e46..c028d365443 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/cartesian/ConcatExpressionWithSuffixPathsVisitor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/cartesian/ConcatExpressionWithSuffixPathsVisitor.java
@@ -39,6 +39,7 @@ import static 
org.apache.iotdb.db.mpp.plan.analyze.ExpressionUtils.reconstructTi
 
 public class ConcatExpressionWithSuffixPathsVisitor
     extends 
CartesianProductVisitor<ConcatExpressionWithSuffixPathsVisitor.Context> {
+
   @Override
   public List<Expression> visitFunctionExpression(
       FunctionExpression functionExpression, Context context) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
index 5fc4f3a2348..57b388a43bf 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
@@ -218,6 +218,7 @@ public class StatementGenerator {
     FromComponent fromComponent = new FromComponent();
     fromComponent.addPrefixPath(new PartialPath("", false));
     queryStatement.setFromComponent(fromComponent);
+    queryStatement.setSingleSeriesAggregation(true);
 
     SelectComponent selectComponent = new SelectComponent(zoneId);
     List<PartialPath> selectPaths = new ArrayList<>();
@@ -228,6 +229,9 @@ public class StatementGenerator {
         selectPaths.add(new PartialPath(pathStr));
       }
     }
+    if (req.isSingleSeries()) {
+      queryStatement.setSingleSeriesAggregation(true);
+    }
     List<TAggregationType> aggregations = req.getAggregations();
     for (int i = 0; i < aggregations.size(); i++) {
       selectComponent.addResultColumn(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index f2d19fe9025..80b5460e1f5 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -315,6 +315,7 @@ public class LogicalPlanBuilder {
       List<String> tagKeys,
       Map<List<String>, LinkedHashMap<Expression, List<Expression>>>
           tagValuesToGroupedTimeseriesOperands) {
+
     boolean needCheckAscending = groupByTimeParameter == null;
     Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations = new 
HashMap<>();
     Map<PartialPath, List<AggregationDescriptor>> descendingAggregations = new 
HashMap<>();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
index 14a49611613..76266046fed 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
@@ -83,8 +83,6 @@ import 
org.apache.iotdb.db.mpp.plan.statement.metadata.view.ShowLogicalViewState
 import org.apache.iotdb.db.mpp.plan.statement.sys.ShowQueriesStatement;
 import org.apache.iotdb.tsfile.utils.Pair;
 
-import org.apache.commons.lang3.Validate;
-
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -113,6 +111,10 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
 
   @Override
   public PlanNode visitQuery(QueryStatement queryStatement, MPPQueryContext 
context) {
+    if (queryStatement.isSingleSeriesAggregation()) {
+      return visitSingleSeriesAggregationQuery(queryStatement, context);
+    }
+
     LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
 
     if (queryStatement.isLastQuery()) {
@@ -218,6 +220,24 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
     return planBuilder.getRoot();
   }
 
+  private PlanNode visitSingleSeriesAggregationQuery(
+      QueryStatement queryStatement, MPPQueryContext context) {
+    LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
+
+    planBuilder =
+        planBuilder.planAggregationSource(
+            AggregationStep.SINGLE,
+            queryStatement.getResultTimeOrder(),
+            analysis.getGlobalTimeFilter(),
+            analysis.getGroupByTimeParameter(),
+            analysis.getAggregationExpressions(),
+            analysis.getSourceTransformExpressions(),
+            analysis.getCrossGroupByExpressions(),
+            analysis.getTagKeys(),
+            analysis.getTagValuesToGroupedTimeseriesOperands());
+    return planBuilder.getRoot();
+  }
+
   public PlanNode visitQueryBody(
       QueryStatement queryStatement,
       Set<Expression> sourceExpressions,
@@ -346,12 +366,14 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
 
   private boolean cannotUseStatistics(Set<Expression> expressions) {
     for (Expression expression : expressions) {
-      Validate.isTrue(
-          expression instanceof FunctionExpression,
-          String.format("Invalid Aggregation Expression: %s", 
expression.getExpressionString()));
-      if (!BuiltinAggregationFunction.canUseStatistics(
-          ((FunctionExpression) expression).getFunctionName())) {
-        return true;
+      if (expression instanceof FunctionExpression) {
+        if (!BuiltinAggregationFunction.canUseStatistics(
+            ((FunctionExpression) expression).getFunctionName())) {
+          return true;
+        }
+      } else {
+        throw new IllegalArgumentException(
+            String.format("Invalid Aggregation Expression: %s", 
expression.getExpressionString()));
       }
     }
     return false;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 9c66ff9c46d..67b0cd101af 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -483,14 +483,12 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
     List<AggregationDescriptor> leafAggDescriptorList = new ArrayList<>();
     node.getAggregationDescriptorList()
         .forEach(
-            descriptor -> {
-              leafAggDescriptorList.add(
-                  new AggregationDescriptor(
-                      descriptor.getAggregationFuncName(),
-                      AggregationStep.PARTIAL,
-                      descriptor.getInputExpressions(),
-                      descriptor.getInputAttributes()));
-            });
+            descriptor -> leafAggDescriptorList.add(
+                new AggregationDescriptor(
+                    descriptor.getAggregationFuncName(),
+                    AggregationStep.PARTIAL,
+                    descriptor.getInputExpressions(),
+                    descriptor.getInputAttributes())));
     leafAggDescriptorList.forEach(
         d ->
             LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
@@ -498,14 +496,12 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
     List<AggregationDescriptor> rootAggDescriptorList = new ArrayList<>();
     node.getAggregationDescriptorList()
         .forEach(
-            descriptor -> {
-              rootAggDescriptorList.add(
-                  new AggregationDescriptor(
-                      descriptor.getAggregationFuncName(),
-                      context.isRoot ? AggregationStep.FINAL : 
AggregationStep.INTERMEDIATE,
-                      descriptor.getInputExpressions(),
-                      descriptor.getInputAttributes()));
-            });
+            descriptor -> rootAggDescriptorList.add(
+                new AggregationDescriptor(
+                    descriptor.getAggregationFuncName(),
+                    context.isRoot ? AggregationStep.FINAL : 
AggregationStep.INTERMEDIATE,
+                    descriptor.getInputExpressions(),
+                    descriptor.getInputAttributes())));
 
     AggregationNode aggregationNode =
         new AggregationNode(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
index 3b7fe31c517..3e374021f25 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
@@ -114,6 +114,9 @@ public class QueryStatement extends Statement {
 
   private boolean useWildcard = true;
 
+  // single timeseries, single aggregation type
+  private boolean isSingleSeriesAggregation = false;
+
   public QueryStatement() {
     this.statementType = StatementType.QUERY;
   }
@@ -480,6 +483,14 @@ public class QueryStatement extends Statement {
     return useWildcard;
   }
 
+  public void setSingleSeriesAggregation(boolean singleSeriesAggregation) {
+    this.isSingleSeriesAggregation = singleSeriesAggregation;
+  }
+
+  public boolean isSingleSeriesAggregation() {
+    return this.isSingleSeriesAggregation;
+  }
+
   public void semanticCheck() {
     if (isAggregationQuery()) {
       if (disableAlign()) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
index 28bec0979bd..a814e3ea56e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java
@@ -523,6 +523,93 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
         return resp;
       }
 
+    } catch (Exception e) {
+      finished = true;
+      t = e;
+      return RpcUtils.getTSExecuteStatementResp(
+          onQueryException(e, "\"" + req + "\". " + 
OperationType.EXECUTE_AGG_QUERY));
+    } catch (Error error) {
+      t = error;
+      throw error;
+    } finally {
+
+      long currentOperationCost = System.nanoTime() - startTime;
+      COORDINATOR.recordExecutionTime(queryId, currentOperationCost);
+
+      // record each operation time cost
+      addStatementExecutionLatency(
+          OperationType.EXECUTE_AGG_QUERY, StatementType.QUERY, 
currentOperationCost);
+
+      if (finished) {
+        // record total time cost for one query
+        long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
+        addQueryLatency(
+            StatementType.QUERY, executionTime > 0 ? executionTime : 
currentOperationCost);
+        COORDINATOR.cleanupQueryExecution(queryId, t);
+      }
+
+      SESSION_MANAGER.updateIdleTime();
+      if (quota != null) {
+        quota.close();
+      }
+    }
+  }
+
+  private TSExecuteStatementResp executeSingleSeriesAggregationQueryInternal(
+      TSAggregationQueryReq req, SelectResult setResult) {
+    boolean finished = false;
+    long queryId = Long.MIN_VALUE;
+    IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+    OperationQuota quota = null;
+    if (!SESSION_MANAGER.checkLogin(clientSession)) {
+      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
+    }
+    long startTime = System.nanoTime();
+    Throwable t = null;
+    try {
+      Statement s = StatementGenerator.createStatement(req, 
clientSession.getZoneId());
+      // permission check
+      TSStatus status = AuthorityChecker.checkAuthority(s, clientSession);
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return RpcUtils.getTSExecuteStatementResp(status);
+      }
+
+      quota =
+          DataNodeThrottleQuotaManager.getInstance()
+              .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s);
+
+      queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);
+      // create and cache dataset
+      ExecutionResult result =
+          COORDINATOR.execute(
+              s,
+              queryId,
+              SESSION_MANAGER.getSessionInfo(clientSession),
+              "",
+              partitionFetcher,
+              schemaFetcher,
+              req.getTimeout());
+
+      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        throw new RuntimeException("error code: " + result.status);
+      }
+
+      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
+
+      try (SetThreadName threadName = new 
SetThreadName(result.queryId.getId())) {
+        TSExecuteStatementResp resp;
+        if (queryExecution.isQuery()) {
+          resp = createResponse(queryExecution.getDatasetHeader(), queryId);
+          resp.setStatus(result.status);
+          finished = setResult.apply(resp, queryExecution, req.fetchSize);
+          resp.setMoreData(!finished);
+          quota.addReadResult(resp.getQueryResult());
+        } else {
+          resp = RpcUtils.getTSExecuteStatementResp(result.status);
+        }
+        return resp;
+      }
+
     } catch (Exception e) {
       finished = true;
       t = e;

Reply via email to