This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/agg_template_alignbydevice
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2b799e7d3c9af45694b20bd534a5f10dc9402e63
Author: Beyyes <[email protected]>
AuthorDate: Thu May 9 22:23:23 2024 +0800

    add temp impl
---
 .../main/java/org/apache/iotdb/SessionExample.java |  29 +++-
 .../queryengine/plan/analyze/AnalyzeVisitor.java   |   6 +-
 .../plan/analyze/TemplatedAggregationAnalyze.java  | 122 +++++++++++++++
 .../queryengine/plan/analyze/TemplatedAnalyze.java | 100 +++++++------
 .../db/queryengine/plan/analyze/TemplatedInfo.java |   5 +
 .../plan/planner/LogicalPlanBuilder.java           |   4 +-
 .../plan/planner/TemplatedLogicalPlan.java         | 165 ++++++++++++++++++++-
 .../plan/planner/TemplatedLogicalPlanBuilder.java  |  55 +++++++
 8 files changed, 430 insertions(+), 56 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java 
b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 0daec0885bf..e65127fd388 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -61,12 +61,39 @@ public class SessionExample {
   private static final String ROOT_SG1_D1 = "root.sg1.d1";
   private static final String ROOT_SG1 = "root.sg1";
   private static final String LOCAL_HOST = "127.0.0.1";
-  public static final String SELECT_D1 = "select * from root.sg1.d1";
+  public static final String SELECT_D1 =
+      "select value from 
root.cmadaas_nafp_surf.nafp.NAFP_GRAPES_MESO_FOR_3KM.data.RHU.`100`.`100000`.tile
 limit 10";
 
   private static Random random = new Random();
 
   public static void main(String[] args)
       throws IoTDBConnectionException, StatementExecutionException {
+    session =
+        new Session.Builder()
+            .host("172.20.31.60")
+            .port(6667)
+            .username("root")
+            .password("root")
+            .version(Version.V_1_0)
+            .build();
+    session.open(false);
+
+    // set session fetchSize
+    session.setFetchSize(10000);
+
+    long time1 = System.currentTimeMillis();
+    try (SessionDataSet dataSet = session.executeQueryStatement(SELECT_D1)) {
+      // System.out.println(dataSet.getColumnNames());
+      //      dataSet.setFetchSize(1024); // default is 10000
+      //      while (dataSet.hasNext()) {
+      //        System.out.println(dataSet.next());
+      //      }
+    }
+    System.out.println("Time: " + (System.currentTimeMillis() - time1));
+  }
+
+  public static void main1(String[] args)
+      throws IoTDBConnectionException, StatementExecutionException {
     session =
         new Session.Builder()
             .host(LOCAL_HOST)
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index b23a5d44f9c..f0086b5e79a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -1848,7 +1848,8 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
           && rightExpression instanceof ConstantOperand)) {
         throw new SemanticException(
             String.format(
-                "Please check the keep condition ([%s]),it need to be a 
constant or a compare expression constructed by 'keep' and a long number.",
+                "Please check the keep condition ([%s]), "
+                    + "it need to be a constant or a compare expression 
constructed by 'keep' and a long number.",
                 keepExpression.getExpressionString()));
       }
       return;
@@ -1856,7 +1857,8 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     if (!(keepExpression instanceof ConstantOperand)) {
       throw new SemanticException(
           String.format(
-              "Please check the keep condition ([%s]),it need to be a constant 
or a compare expression constructed by 'keep' and a long number.",
+              "Please check the keep condition ([%s]), "
+                  + "it need to be a constant or a compare expression 
constructed by 'keep' and a long number.",
               keepExpression.getExpressionString()));
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAggregationAnalyze.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAggregationAnalyze.java
new file mode 100644
index 00000000000..ae2afa9c8f5
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAggregationAnalyze.java
@@ -0,0 +1,122 @@
+package org.apache.iotdb.db.queryengine.plan.analyze;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
+import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand;
+import org.apache.iotdb.db.queryengine.plan.statement.component.ResultColumn;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.db.schemaengine.template.Template;
+
+import org.apache.tsfile.utils.Pair;
+
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.DEVICE_EXPRESSION;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.END_TIME_EXPRESSION;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.analyzeOutput;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeDataPartition;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeDeviceToWhere;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeDeviceViewInput;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeDeviceViewOutput;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeFrom;
+import static 
org.apache.iotdb.db.queryengine.plan.optimization.LimitOffsetPushDown.canPushDownLimitOffsetInGroupByTimeForDevice;
+import static 
org.apache.iotdb.db.queryengine.plan.optimization.LimitOffsetPushDown.pushDownLimitOffsetInGroupByTimeForDevice;
+
+public class TemplatedAggregationAnalyze {
+
+  // ----------- Methods below are used for aggregation, templated with align 
by device --------
+
+  static boolean analyzeAggregation(
+      Analysis analysis,
+      QueryStatement queryStatement,
+      IPartitionFetcher partitionFetcher,
+      ISchemaTree schemaTree,
+      MPPQueryContext context,
+      Template template) {
+
+    List<PartialPath> deviceList = analyzeFrom(queryStatement, schemaTree);
+
+    if (canPushDownLimitOffsetInGroupByTimeForDevice(queryStatement)) {
+      // remove the device which won't appear in resultSet after limit/offset
+      deviceList = pushDownLimitOffsetInGroupByTimeForDevice(deviceList, 
queryStatement);
+    }
+
+    analyzeDeviceToWhere(analysis, queryStatement);
+    if (deviceList.isEmpty()) {
+      analysis.setFinishQueryAfterAnalyze(true);
+      return true;
+    }
+    analysis.setDeviceList(deviceList);
+
+    List<Pair<Expression, String>> outputExpressions = new ArrayList<>();
+    ColumnPaginationController paginationController =
+        new ColumnPaginationController(
+            queryStatement.getSeriesLimit(), queryStatement.getSeriesOffset());
+    for (ResultColumn resultColumn : 
queryStatement.getSelectComponent().getResultColumns()) {}
+
+    analyzeSelect(queryStatement, analysis, outputExpressions, template);
+    if (analysis.getWhereExpression() != null
+        && analysis.getWhereExpression().equals(ConstantOperand.FALSE)) {
+      analyzeOutput(analysis, queryStatement, outputExpressions);
+      analysis.setFinishQueryAfterAnalyze(true);
+      return true;
+    }
+
+    analyzeDeviceToAggregation(analysis);
+    analyzeDeviceToSourceTransform(analysis);
+    analyzeDeviceToSource(analysis);
+
+    analyzeDeviceViewOutput(analysis, queryStatement);
+    analyzeDeviceViewInput(analysis);
+
+    // generate result set header according to output expressions
+    analyzeOutput(analysis, queryStatement, outputExpressions);
+
+    context.generateGlobalTimeFilter(analysis);
+    // fetch partition information
+    analyzeDataPartition(analysis, schemaTree, partitionFetcher, 
context.getGlobalTimeFilter());
+    return true;
+  }
+
+  private static void analyzeSelect(
+      QueryStatement queryStatement,
+      Analysis analysis,
+      List<Pair<Expression, String>> outputExpressions,
+      Template template) {
+    LinkedHashSet<Expression> selectExpressions = new LinkedHashSet<>();
+    selectExpressions.add(DEVICE_EXPRESSION);
+    if (queryStatement.isOutputEndTime()) {
+      selectExpressions.add(END_TIME_EXPRESSION);
+    }
+    for (Pair<Expression, String> pair : outputExpressions) {
+      Expression selectExpression = pair.left;
+      selectExpressions.add(selectExpression);
+    }
+    analysis.setOutputExpressions(outputExpressions);
+    analysis.setSelectExpressions(selectExpressions);
+    analysis.setDeviceTemplate(template);
+    // TODO only add measurement and schema occured in selectExpressions
+    analysis.setMeasurementList(new 
ArrayList<>(template.getSchemaMap().keySet()));
+    analysis.setMeasurementSchemaList(new 
ArrayList<>(template.getSchemaMap().values()));
+  }
+
+  private static void analyzeDeviceToSourceTransform(Analysis analysis) {
+    // TODO add having into SourceTransform
+    
analysis.setDeviceToSourceTransformExpressions(analysis.getDeviceToSelectExpressions());
+  }
+
+  private static void analyzeDeviceToSource(Analysis analysis) {
+    // TODO add having into Source
+    
analysis.setDeviceToSourceExpressions(analysis.getDeviceToSelectExpressions());
+    
analysis.setDeviceToOutputExpressions(analysis.getDeviceToSelectExpressions());
+  }
+
+  private static void analyzeDeviceToAggregation(Analysis analysis) {
+    // TODO need add having clause?
+    
analysis.setDeviceToAggregationExpressions(analysis.getDeviceToSelectExpressions());
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java
index 856476eae24..1814a923d24 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java
@@ -71,6 +71,7 @@ import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.getTim
 import static 
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.concatDeviceAndBindSchemaForExpression;
 import static 
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.getMeasurementExpression;
 import static 
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionTypeAnalyzer.analyzeExpressionForTemplatedQuery;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAggregationAnalyze.analyzeAggregation;
 
 /**
  * This class provides accelerated implementation for multiple devices align 
by device query. This
@@ -96,9 +97,8 @@ public class TemplatedAnalyze {
       IPartitionFetcher partitionFetcher,
       ISchemaTree schemaTree,
       MPPQueryContext context) {
-    if (queryStatement.isAggregationQuery()
-        || queryStatement.isGroupBy()
-        || queryStatement.isGroupByTime()
+    if (queryStatement.isGroupBy()
+        || (queryStatement.isGroupByTime() && 
!queryStatement.isAggregationQuery())
         || queryStatement.isSelectInto()
         || queryStatement.hasFill()
         || schemaTree.hasNormalTimeSeries()) {
@@ -106,58 +106,61 @@ public class TemplatedAnalyze {
     }
 
     List<Template> templates = schemaTree.getUsingTemplates();
-    if (templates.size() != 1) {
+    if (templates.size() != 1 || templates.get(0) == null) {
       return false;
     }
 
     Template template = templates.get(0);
 
+    if (queryStatement.isAggregationQuery()) {
+      return analyzeAggregation(
+          analysis, queryStatement, partitionFetcher, schemaTree, context, 
template);
+    }
+
     List<Pair<Expression, String>> outputExpressions = new ArrayList<>();
     ColumnPaginationController paginationController =
         new ColumnPaginationController(
             queryStatement.getSeriesLimit(), queryStatement.getSeriesOffset());
-    if (template != null) {
-      for (ResultColumn resultColumn : 
queryStatement.getSelectComponent().getResultColumns()) {
-        Expression expression = resultColumn.getExpression();
-        if ("*".equals(expression.getOutputSymbol())) {
-          for (Map.Entry<String, IMeasurementSchema> entry : 
template.getSchemaMap().entrySet()) {
-            if (paginationController.hasCurOffset()) {
-              paginationController.consumeOffset();
-            } else if (paginationController.hasCurLimit()) {
-              String measurementName = entry.getKey();
-              IMeasurementSchema measurementSchema = entry.getValue();
-              TimeSeriesOperand measurementPath =
-                  new TimeSeriesOperand(
-                      new MeasurementPath(new String[] {measurementName}, 
measurementSchema));
-              outputExpressions.add(new Pair<>(measurementPath, null));
-              paginationController.consumeLimit();
-            } else {
-              break;
-            }
-          }
-          if (queryStatement.getSelectComponent().getResultColumns().size() == 
1
-              && queryStatement.getSeriesOffset() == 0
-              && queryStatement.getSeriesLimit() == 0) {
-            analysis.setTemplateWildCardQuery();
+    for (ResultColumn resultColumn : 
queryStatement.getSelectComponent().getResultColumns()) {
+      Expression expression = resultColumn.getExpression();
+      if ("*".equals(expression.getOutputSymbol())) {
+        for (Map.Entry<String, IMeasurementSchema> entry : 
template.getSchemaMap().entrySet()) {
+          if (paginationController.hasCurOffset()) {
+            paginationController.consumeOffset();
+          } else if (paginationController.hasCurLimit()) {
+            String measurementName = entry.getKey();
+            IMeasurementSchema measurementSchema = entry.getValue();
+            TimeSeriesOperand measurementPath =
+                new TimeSeriesOperand(
+                    new MeasurementPath(new String[] {measurementName}, 
measurementSchema));
+            outputExpressions.add(new Pair<>(measurementPath, null));
+            paginationController.consumeLimit();
+          } else {
+            break;
           }
-        } else if (expression instanceof TimeSeriesOperand) {
-          String measurementName = ((TimeSeriesOperand) 
expression).getPath().getMeasurement();
-          if (template.getSchemaMap().containsKey(measurementName)) {
-            if (paginationController.hasCurOffset()) {
-              paginationController.consumeOffset();
-            } else if (paginationController.hasCurLimit()) {
-              IMeasurementSchema measurementSchema = 
template.getSchemaMap().get(measurementName);
-              TimeSeriesOperand measurementPath =
-                  new TimeSeriesOperand(
-                      new MeasurementPath(new String[] {measurementName}, 
measurementSchema));
-              outputExpressions.add(new Pair<>(measurementPath, 
resultColumn.getAlias()));
-            } else {
-              break;
-            }
+        }
+        if (queryStatement.getSelectComponent().getResultColumns().size() == 1
+            && queryStatement.getSeriesOffset() == 0
+            && queryStatement.getSeriesLimit() == 0) {
+          analysis.setTemplateWildCardQuery();
+        }
+      } else if (expression instanceof TimeSeriesOperand) {
+        String measurementName = ((TimeSeriesOperand) 
expression).getPath().getMeasurement();
+        if (template.getSchemaMap().containsKey(measurementName)) {
+          if (paginationController.hasCurOffset()) {
+            paginationController.consumeOffset();
+          } else if (paginationController.hasCurLimit()) {
+            IMeasurementSchema measurementSchema = 
template.getSchemaMap().get(measurementName);
+            TimeSeriesOperand measurementPath =
+                new TimeSeriesOperand(
+                    new MeasurementPath(new String[] {measurementName}, 
measurementSchema));
+            outputExpressions.add(new Pair<>(measurementPath, 
resultColumn.getAlias()));
+          } else {
+            break;
           }
-        } else {
-          return false;
         }
+      } else {
+        return false;
       }
     }
 
@@ -228,8 +231,7 @@ public class TemplatedAnalyze {
     analysis.setMeasurementSchemaList(measurementSchemaList);
   }
 
-  private static List<PartialPath> analyzeFrom(
-      QueryStatement queryStatement, ISchemaTree schemaTree) {
+  static List<PartialPath> analyzeFrom(QueryStatement queryStatement, 
ISchemaTree schemaTree) {
     // device path patterns in FROM clause
     List<PartialPath> devicePatternList = 
queryStatement.getFromComponent().getPrefixPaths();
 
@@ -246,7 +248,7 @@ public class TemplatedAnalyze {
         : 
deviceSet.stream().sorted(Comparator.reverseOrder()).collect(Collectors.toList());
   }
 
-  private static void analyzeDeviceToWhere(Analysis analysis, QueryStatement 
queryStatement) {
+  static void analyzeDeviceToWhere(Analysis analysis, QueryStatement 
queryStatement) {
     if (!queryStatement.hasWhere()) {
       return;
     }
@@ -325,7 +327,7 @@ public class TemplatedAnalyze {
     
analysis.setDeviceToSourceTransformExpressions(analysis.getDeviceToSelectExpressions());
   }
 
-  private static void analyzeDeviceViewOutput(Analysis analysis, 
QueryStatement queryStatement) {
+  static void analyzeDeviceViewOutput(Analysis analysis, QueryStatement 
queryStatement) {
     Set<Expression> selectExpressions = analysis.getSelectExpressions();
     // TODO if no order by, just set deviceViewOutputExpressions as 
selectExpressions
     Set<Expression> deviceViewOutputExpressions = new 
LinkedHashSet<>(selectExpressions);
@@ -337,7 +339,7 @@ public class TemplatedAnalyze {
         analyzeDeviceViewSpecialProcess(deviceViewOutputExpressions, 
queryStatement, analysis));
   }
 
-  private static void analyzeDeviceViewInput(Analysis analysis) {
+  static void analyzeDeviceViewInput(Analysis analysis) {
     List<Integer> indexes = new ArrayList<>();
 
     // index-0 is `Device`
@@ -356,7 +358,7 @@ public class TemplatedAnalyze {
     
analysis.setDeviceToOutputExpressions(analysis.getDeviceToSelectExpressions());
   }
 
-  private static void analyzeDataPartition(
+  static void analyzeDataPartition(
       Analysis analysis,
       ISchemaTree schemaTree,
       IPartitionFetcher partitionFetcher,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedInfo.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedInfo.java
index bd624f25634..bf5d54e5cd6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedInfo.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedInfo.java
@@ -39,6 +39,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import static 
org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand.TIMESTAMP_EXPRESSION_STRING;
 
@@ -77,6 +78,10 @@ public class TemplatedInfo {
   // variables related to predicate push down
   private Expression pushDownPredicate;
 
+  private Set<Expression> aggSelectExpressions;
+
+  private Expression havingExpression;
+
   public TemplatedInfo(
       List<String> measurementList,
       List<IMeasurementSchema> schemaList,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
index d457c432cf0..7ce23590e23 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
@@ -743,7 +743,7 @@ public class LogicalPlanBuilder {
     return this;
   }
 
-  private PlanNode createSlidingWindowAggregationNode(
+  protected PlanNode createSlidingWindowAggregationNode(
       PlanNode child,
       Set<Expression> aggregationExpressions,
       GroupByTimeParameter groupByTimeParameter,
@@ -848,7 +848,7 @@ public class LogicalPlanBuilder {
             .collect(Collectors.toList()));
   }
 
-  private List<AggregationDescriptor> constructAggregationDescriptorList(
+  protected List<AggregationDescriptor> constructAggregationDescriptorList(
       Set<Expression> aggregationExpressions, AggregationStep curStep) {
     return aggregationExpressions.stream()
         .map(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java
index 75222efb6fc..7bb9ebd579c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java
@@ -26,6 +26,7 @@ import 
org.apache.iotdb.db.queryengine.plan.analyze.TemplatedInfo;
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
 import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
 
@@ -86,10 +87,67 @@ public class TemplatedLogicalPlan {
     this.whereExpression = analysis.getWhereExpression();
 
     // for align by device query with template, most used variables are same
-    initCommonVariables();
+    if (queryStatement.isAggregationQuery()) {
+      initAggQueryCommonVariables();
+    } else {
+      initNonAggQueryCommonVariables();
+    }
+  }
+
+  private void initAggQueryCommonVariables() {
+    if (whereExpression != null) {
+      newMeasurementList = new ArrayList<>(measurementList);
+      newSchemaList = new ArrayList<>(schemaList);
+      Set<String> selectMeasurements = new HashSet<>(measurementList);
+      List<Expression> whereSourceExpressions = 
searchSourceExpressions(whereExpression);
+      for (Expression expression : whereSourceExpressions) {
+        if (expression instanceof TimeSeriesOperand) {
+          String measurement = ((TimeSeriesOperand) 
expression).getPath().getMeasurement();
+          if 
(!analysis.getDeviceTemplate().getSchemaMap().containsKey(measurement)) {
+            continue;
+          }
+          if (!selectMeasurements.contains(measurement)) {
+            newMeasurementList.add(measurement);
+            
newSchemaList.add(analysis.getDeviceTemplate().getSchema(measurement));
+          }
+        }
+      }
+
+      // TODO fix aggregation filterLayoutMap
+      filterLayoutMap = makeLayout(newMeasurementList);
+
+      analysis
+          .getExpressionTypes()
+          .forEach(
+              (key, value) ->
+                  
context.getTypeProvider().setType(key.getNode().getOutputSymbol(), value));
+    }
+
+    context
+        .getTypeProvider()
+        .setTemplatedInfo(
+            new TemplatedInfo(
+                newMeasurementList,
+                newSchemaList,
+                newSchemaList.stream()
+                    .map(IMeasurementSchema::getType)
+                    .collect(Collectors.toList()),
+                queryStatement.getResultTimeOrder(),
+                analysis.isLastLevelUseWildcard(),
+                analysis.getDeviceViewOutputExpressions().stream()
+                    .map(Expression::getExpressionString)
+                    .collect(Collectors.toList()),
+                
analysis.getDeviceViewInputIndexesMap().values().iterator().next(),
+                OFFSET_VALUE,
+                limitValue,
+                whereExpression,
+                queryStatement.isGroupByTime(),
+                analysis.getDeviceTemplate().getSchemaMap(),
+                filterLayoutMap,
+                null));
   }
 
-  private void initCommonVariables() {
+  private void initNonAggQueryCommonVariables() {
     if (whereExpression != null) {
       if (!analysis.isTemplateWildCardQuery()) {
         newMeasurementList = new ArrayList<>(measurementList);
@@ -145,6 +203,10 @@ public class TemplatedLogicalPlan {
   }
 
   public PlanNode visitQuery() {
+    if (queryStatement.isAggregationQuery()) {
+      return visitAggregation();
+    }
+
     LogicalPlanBuilder planBuilder =
         new TemplatedLogicalPlanBuilder(analysis, context, measurementList, 
schemaList);
 
@@ -193,6 +255,56 @@ public class TemplatedLogicalPlan {
     return planBuilder.getRoot();
   }
 
+  private PlanNode visitAggregation() {
+    LogicalPlanBuilder planBuilder =
+        new TemplatedLogicalPlanBuilder(analysis, context, measurementList, 
schemaList);
+
+    Map<String, PlanNode> deviceToSubPlanMap = new LinkedHashMap<>();
+    for (PartialPath devicePath : analysis.getDeviceList()) {
+      String deviceName = devicePath.getFullPath();
+      PlanNode rootNode = visitDeviceAggregationBody(devicePath);
+
+      LogicalPlanBuilder subPlanBuilder =
+          new TemplatedLogicalPlanBuilder(analysis, context, measurementList, 
schemaList)
+              .withNewRoot(rootNode);
+
+      deviceToSubPlanMap.put(deviceName, subPlanBuilder.getRoot());
+    }
+
+    // convert to ALIGN BY DEVICE view
+    planBuilder =
+        planBuilder.planDeviceView(
+            deviceToSubPlanMap,
+            analysis.getDeviceViewOutputExpressions(),
+            analysis.getDeviceViewInputIndexesMap(),
+            analysis.getSelectExpressions(),
+            queryStatement,
+            analysis);
+
+    planBuilder =
+        planBuilder.planHavingAndTransform(
+            analysis.getHavingExpression(),
+            analysis.getSelectExpressions(),
+            analysis.getOrderByExpressions(),
+            queryStatement.isGroupByTime(),
+            queryStatement.getResultTimeOrder());
+
+    if (!queryStatement.needPushDownSort()) {
+      planBuilder = planBuilder.planOrderBy(queryStatement, analysis);
+    }
+
+    planBuilder =
+        planBuilder
+            .planFill(analysis.getFillDescriptor(), 
queryStatement.getResultTimeOrder())
+            .planOffset(queryStatement.getRowOffset());
+
+    if (!analysis.isUseTopKNode() || queryStatement.hasOffset()) {
+      planBuilder = planBuilder.planLimit(queryStatement.getRowLimit());
+    }
+
+    return planBuilder.getRoot();
+  }
+
   public PlanNode visitQueryBody(PartialPath devicePath) {
 
     TemplatedLogicalPlanBuilder planBuilder =
@@ -213,4 +325,53 @@ public class TemplatedLogicalPlan {
 
     return planBuilder.getRoot();
   }
+
+  private PlanNode visitDeviceAggregationBody(PartialPath devicePath) {
+    TemplatedLogicalPlanBuilder planBuilder =
+        new TemplatedLogicalPlanBuilder(analysis, context, newMeasurementList, 
newSchemaList);
+
+    planBuilder =
+        planBuilder
+            .planRawDataSource(
+                devicePath,
+                queryStatement.getResultTimeOrder(),
+                OFFSET_VALUE,
+                limitValue,
+                analysis.isLastLevelUseWildcard())
+            .planFilter(
+                whereExpression,
+                queryStatement.isGroupByTime(),
+                queryStatement.getResultTimeOrder());
+
+    boolean outputPartial =
+        queryStatement.isGroupByLevel()
+            || queryStatement.isGroupByTag()
+            || (queryStatement.isGroupByTime() && 
analysis.getGroupByTimeParameter().hasOverlap());
+    AggregationStep curStep = outputPartial ? AggregationStep.PARTIAL : 
AggregationStep.SINGLE;
+    planBuilder =
+        planBuilder.planRawDataAggregation(
+            analysis.getSelectExpressions(),
+            null,
+            analysis.getGroupByTimeParameter(),
+            analysis.getGroupByParameter(),
+            queryStatement.isOutputEndTime(),
+            curStep,
+            queryStatement.getResultTimeOrder());
+
+    if (queryStatement.isGroupByTime() && 
analysis.getGroupByTimeParameter().hasOverlap()) {
+      curStep =
+          (queryStatement.isGroupByLevel() || queryStatement.isGroupByTag())
+              ? AggregationStep.INTERMEDIATE
+              : AggregationStep.FINAL;
+      planBuilder =
+          planBuilder.planSlidingWindowAggregation(
+              analysis.getSelectExpressions(),
+              analysis.getGroupByTimeParameter(),
+              curStep,
+              queryStatement.getResultTimeOrder());
+    }
+
+    // no group by level and group by tag
+    return planBuilder.getRoot();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlanBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlanBuilder.java
index fb6b2ea04c8..4942c76e308 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlanBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlanBuilder.java
@@ -27,14 +27,20 @@ import 
org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByParameter;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 /**
  * This class provides accelerated implementation for multiple devices align 
by device query. This
@@ -126,6 +132,55 @@ public class TemplatedLogicalPlanBuilder extends 
LogicalPlanBuilder {
     return this;
   }
 
+  public TemplatedLogicalPlanBuilder planRawDataAggregation(
+      Set<Expression> aggregationExpressions,
+      Expression groupByExpression,
+      GroupByTimeParameter groupByTimeParameter,
+      GroupByParameter groupByParameter,
+      boolean outputEndTime,
+      AggregationStep curStep,
+      Ordering scanOrder) {
+    if (aggregationExpressions == null) {
+      return this;
+    }
+
+    List<AggregationDescriptor> aggregationDescriptorList =
+        constructAggregationDescriptorList(aggregationExpressions, curStep);
+    updateTypeProvider(aggregationExpressions);
+    if (curStep.isOutputPartial()) {
+      aggregationDescriptorList.forEach(
+          aggregationDescriptor ->
+              updateTypeProviderByPartialAggregation(
+                  aggregationDescriptor, context.getTypeProvider()));
+    }
+    this.root =
+        new RawDataAggregationNode(
+            context.getQueryId().genPlanNodeId(),
+            this.getRoot(),
+            aggregationDescriptorList,
+            groupByTimeParameter,
+            groupByParameter,
+            groupByExpression,
+            outputEndTime,
+            scanOrder);
+    return this;
+  }
+
+  public TemplatedLogicalPlanBuilder planSlidingWindowAggregation(
+      Set<Expression> aggregationExpressions,
+      GroupByTimeParameter groupByTimeParameter,
+      AggregationStep curStep,
+      Ordering scanOrder) {
+    if (aggregationExpressions == null) {
+      return this;
+    }
+
+    this.root =
+        createSlidingWindowAggregationNode(
+            this.getRoot(), aggregationExpressions, groupByTimeParameter, 
curStep, scanOrder);
+    return this;
+  }
+
   @Override
   public TemplatedLogicalPlanBuilder withNewRoot(PlanNode newRoot) {
     this.root = newRoot;

Reply via email to