This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch tsbs/iot
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/tsbs/iot by this push:
new 456f6110290 Add aggregation template align by device optimization
456f6110290 is described below
commit 456f6110290d364a0d39d1ed46c0edbd00f9f4a5
Author: Beyyes <[email protected]>
AuthorDate: Wed May 15 10:14:06 2024 +0800
Add aggregation template align by device optimization
---
.../db/queryengine/plan/analyze/Analysis.java | 16 +-
.../queryengine/plan/analyze/AnalyzeVisitor.java | 8 +-
.../plan/analyze/ExpressionTypeAnalyzer.java | 25 ++-
.../plan/analyze/TemplatedAggregationAnalyze.java | 229 ++++++++++++++++++++
.../queryengine/plan/analyze/TemplatedAnalyze.java | 148 ++++++++-----
.../db/queryengine/plan/analyze/TemplatedInfo.java | 99 +++++++--
.../plan/optimization/AggregationPushDown.java | 132 +++++++++++-
.../plan/optimization/PredicatePushDown.java | 16 +-
.../plan/planner/LogicalPlanBuilder.java | 7 +-
.../plan/planner/LogicalPlanVisitor.java | 2 +-
.../plan/planner/OperatorTreeGenerator.java | 114 +++++++---
.../plan/planner/SubPlanTypeExtractor.java | 9 +
.../plan/planner/TemplatedLogicalPlan.java | 240 ++++++++++++++++++++-
.../plan/planner/TemplatedLogicalPlanBuilder.java | 54 ++++-
.../plan/planner/plan/PlanFragment.java | 6 +-
.../plan/planner/plan/node/PlanGraphPrinter.java | 2 +-
.../plan/planner/plan/node/PlanNodeType.java | 8 +-
.../planner/plan/node/process/DeviceViewNode.java | 47 ++++
.../plan/planner/plan/node/process/FilterNode.java | 59 ++++-
.../plan/node/process/RawDataAggregationNode.java | 20 ++
.../plan/node/process/SingleDeviceViewNode.java | 2 +-
.../source/AlignedSeriesAggregationScanNode.java | 36 ++++
.../plan/parameter/AggregationDescriptor.java | 2 +-
.../plan/parameter/GroupByTimeParameter.java | 4 +
.../plan/optimization/TestPlanBuilder.java | 3 +-
.../logical/DataQueryLogicalPlannerTest.java | 18 +-
.../planner/node/process/FilterNodeSerdeTest.java | 3 +-
27 files changed, 1148 insertions(+), 161 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
index be5c98525bb..5897f78b0dd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
@@ -293,7 +293,7 @@ public class Analysis implements IAnalysis {
private Template deviceTemplate;
// when deviceTemplate is not empty and all expressions in this query are
templated measurements,
// i.e. no aggregation and arithmetic expression
- private boolean onlyQueryTemplateMeasurements = true;
+ private boolean noWhereAndAggregation = true;
// if it is wildcard query in templated align by device query
private boolean templateWildCardQuery;
// all queried measurementList and schemaList in deviceTemplate.
@@ -437,8 +437,8 @@ public class Analysis implements IAnalysis {
return null;
}
- if (isAllDevicesInOneTemplate()
- && (isOnlyQueryTemplateMeasurements() || expression instanceof
TimeSeriesOperand)) {
+ if (allDevicesInOneTemplate()
+ && (noWhereAndAggregation() || expression instanceof
TimeSeriesOperand)) {
TimeSeriesOperand seriesOperand = (TimeSeriesOperand) expression;
return
deviceTemplate.getSchemaMap().get(seriesOperand.getPath().getMeasurement()).getType();
}
@@ -921,7 +921,7 @@ public class Analysis implements IAnalysis {
// All Queries Devices Set In One Template
/////////////////////////////////////////////////////////////////////////////////////////////////
- public boolean isAllDevicesInOneTemplate() {
+ public boolean allDevicesInOneTemplate() {
return this.deviceTemplate != null;
}
@@ -933,12 +933,12 @@ public class Analysis implements IAnalysis {
this.deviceTemplate = template;
}
- public boolean isOnlyQueryTemplateMeasurements() {
- return onlyQueryTemplateMeasurements;
+ public boolean noWhereAndAggregation() {
+ return noWhereAndAggregation;
}
- public void setOnlyQueryTemplateMeasurements(boolean
onlyQueryTemplateMeasurements) {
- this.onlyQueryTemplateMeasurements = onlyQueryTemplateMeasurements;
+ public void setNoWhereAndAggregation(boolean value) {
+ this.noWhereAndAggregation = value;
}
public List<String> getMeasurementList() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index a7448a93529..39a377303c2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -1851,7 +1851,8 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
&& rightExpression instanceof ConstantOperand)) {
throw new SemanticException(
String.format(
- "Please check the keep condition ([%s]),it need to be a
constant or a compare expression constructed by 'keep' and a long number.",
+ "Please check the keep condition ([%s]), "
+ + "it need to be a constant or a compare expression
constructed by 'keep' and a long number.",
keepExpression.getExpressionString()));
}
return;
@@ -1859,12 +1860,13 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
if (!(keepExpression instanceof ConstantOperand)) {
throw new SemanticException(
String.format(
- "Please check the keep condition ([%s]),it need to be a constant
or a compare expression constructed by 'keep' and a long number.",
+ "Please check the keep condition ([%s]), "
+ + "it need to be a constant or a compare expression
constructed by 'keep' and a long number.",
keepExpression.getExpressionString()));
}
}
- private void analyzeGroupByTime(Analysis analysis, QueryStatement
queryStatement) {
+ static void analyzeGroupByTime(Analysis analysis, QueryStatement
queryStatement) {
if (!queryStatement.isGroupByTime()) {
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java
index 49904d6532c..6868b612274 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.queryengine.plan.analyze;
+import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.common.NodeRef;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
@@ -65,7 +66,10 @@ public class ExpressionTypeAnalyzer {
public static TSDataType analyzeExpression(Analysis analysis, Expression
expression) {
if (!analysis.getExpressionTypes().containsKey(NodeRef.of(expression))) {
ExpressionTypeAnalyzer analyzer = new ExpressionTypeAnalyzer();
- analyzer.analyze(expression, null);
+
+ Map<String, IMeasurementSchema> context =
+ analysis.allDevicesInOneTemplate() ?
analysis.getDeviceTemplate().getSchemaMap() : null;
+ analyzer.analyze(expression, context);
addExpressionTypes(analysis, analyzer);
}
@@ -96,7 +100,16 @@ public class ExpressionTypeAnalyzer {
Expression expression,
TemplatedInfo templatedInfo) {
ExpressionTypeAnalyzer analyzer = new ExpressionTypeAnalyzer();
- analyzer.analyze(expression, templatedInfo.getSchemaMap());
+
+ Map<String, IMeasurementSchema> schemaMap = templatedInfo.getSchemaMap();
+ if (schemaMap == null) {
+ schemaMap = new LinkedHashMap<>();
+ for (int i = 0; i < templatedInfo.getMeasurementList().size(); i++) {
+ schemaMap.put(
+ templatedInfo.getMeasurementList().get(i),
templatedInfo.getSchemaList().get(i));
+ }
+ }
+ analyzer.analyze(expression, schemaMap);
types.putAll(analyzer.getExpressionTypes());
}
@@ -346,6 +359,14 @@ public class ExpressionTypeAnalyzer {
return setExpressionType(
timeSeriesOperand,
context.get(timeSeriesOperand.getOutputSymbol()).getType());
}
+
+ if (context != null
+ && !(timeSeriesOperand.getPath() instanceof MeasurementPath)
+ && context.containsKey(timeSeriesOperand.getPath().getFullPath())) {
+ return setExpressionType(
+ timeSeriesOperand,
context.get(timeSeriesOperand.getPath().getFullPath()).getType());
+ }
+
return setExpressionType(timeSeriesOperand,
timeSeriesOperand.getPath().getSeriesType());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAggregationAnalyze.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAggregationAnalyze.java
new file mode 100644
index 00000000000..cf19a791482
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAggregationAnalyze.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.queryengine.plan.analyze;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
+import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand;
+import org.apache.iotdb.db.queryengine.plan.statement.component.ResultColumn;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.db.schemaengine.template.Template;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.DEVICE_EXPRESSION;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.END_TIME_EXPRESSION;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.analyzeExpressionType;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.analyzeGroupByTime;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.analyzeOutput;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.normalizeExpression;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.searchAggregationExpressions;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeDataPartition;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeDeviceToWhere;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeDeviceViewInput;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeDeviceViewOutput;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeFrom;
+import static
org.apache.iotdb.db.queryengine.plan.optimization.LimitOffsetPushDown.canPushDownLimitOffsetInGroupByTimeForDevice;
+import static
org.apache.iotdb.db.queryengine.plan.optimization.LimitOffsetPushDown.pushDownLimitOffsetInGroupByTimeForDevice;
+
+/** Methods in this class are used for aggregation, templated with align by
device situation. */
+public class TemplatedAggregationAnalyze {
+
+ static boolean canBuildAggregationPlanUseTemplate(
+ Analysis analysis,
+ QueryStatement queryStatement,
+ IPartitionFetcher partitionFetcher,
+ ISchemaTree schemaTree,
+ MPPQueryContext context,
+ Template template) {
+
+ analysis.setNoWhereAndAggregation(false);
+
+ List<PartialPath> deviceList = analyzeFrom(queryStatement, schemaTree);
+
+ if (canPushDownLimitOffsetInGroupByTimeForDevice(queryStatement)) {
+ // remove the device which won't appear in resultSet after limit/offset
+ deviceList = pushDownLimitOffsetInGroupByTimeForDevice(deviceList,
queryStatement);
+ }
+
+ List<Pair<Expression, String>> outputExpressions = new ArrayList<>();
+ boolean valid = analyzeSelect(queryStatement, analysis, outputExpressions,
template);
+ if (!valid) {
+ return false;
+ }
+
+ analyzeDeviceToWhere(analysis, queryStatement);
+ if (deviceList.isEmpty()) {
+ analysis.setFinishQueryAfterAnalyze(true);
+ return true;
+ }
+ analysis.setDeviceList(deviceList);
+
+ if (analysis.getWhereExpression() != null
+ && ConstantOperand.FALSE.equals(analysis.getWhereExpression())) {
+ analyzeOutput(analysis, queryStatement, outputExpressions);
+ analysis.setFinishQueryAfterAnalyze(true);
+ return true;
+ }
+
+ analyzeHaving(analysis, queryStatement, schemaTree, deviceList);
+
+ analyzeDeviceToAggregation(analysis);
+ analyzeDeviceToSourceTransform(analysis);
+ analyzeDeviceToSource(analysis);
+
+ analyzeDeviceViewOutput(analysis, queryStatement);
+ analyzeDeviceViewInput(analysis, queryStatement);
+
+ // generate result set header according to output expressions
+ analyzeOutput(analysis, queryStatement, outputExpressions);
+
+ analyzeGroupByTime(analysis, queryStatement);
+ context.generateGlobalTimeFilter(analysis);
+
+ // fetch partition information
+ analyzeDataPartition(analysis, schemaTree, partitionFetcher,
context.getGlobalTimeFilter());
+ return true;
+ }
+
+ private static boolean analyzeSelect(
+ QueryStatement queryStatement,
+ Analysis analysis,
+ List<Pair<Expression, String>> outputExpressions,
+ Template template) {
+
+ LinkedHashSet<Expression> selectExpressions = new LinkedHashSet<>();
+ selectExpressions.add(DEVICE_EXPRESSION);
+ if (queryStatement.isOutputEndTime()) {
+ selectExpressions.add(END_TIME_EXPRESSION);
+ }
+
+ ColumnPaginationController paginationController =
+ new ColumnPaginationController(
+ queryStatement.getSeriesLimit(), queryStatement.getSeriesOffset());
+
+ Set<Expression> aggregationExpressions = new LinkedHashSet<>();
+ for (ResultColumn resultColumn :
queryStatement.getSelectComponent().getResultColumns()) {
+ if (paginationController.hasCurOffset()) {
+ paginationController.consumeOffset();
+ } else if (paginationController.hasCurLimit()) {
+ Expression selectExpression = resultColumn.getExpression();
+ outputExpressions.add(new Pair<>(selectExpression,
resultColumn.getAlias()));
+ selectExpressions.add(selectExpression);
+ aggregationExpressions.add(selectExpression);
+ } else {
+ break;
+ }
+ }
+
+ analysis.setDeviceTemplate(template);
+ List<String> measurementList = new ArrayList<>();
+ List<IMeasurementSchema> measurementSchemaList = new ArrayList<>();
+ Set<String> measurementSet = new HashSet<>();
+ for (Expression selectExpression : selectExpressions) {
+ if ("device".equalsIgnoreCase(selectExpression.getOutputSymbol())) {
+ continue;
+ }
+
+ String measurement =
selectExpression.getExpressions().get(0).getOutputSymbol();
+ if (!template.getSchemaMap().containsKey(measurement)) {
+ analysis.setDeviceTemplate(null);
+ // TODO not support agg(*), agg(s1+1), count_time(*) now
+ return false;
+ }
+
+ // for agg1(s1) + agg2(s1), only record s1 for one time
+ if (!measurementSet.contains(measurement)) {
+ measurementSet.add(measurement);
+ measurementList.add(measurement);
+ measurementSchemaList.add(template.getSchemaMap().get(measurement));
+ }
+
+ analyzeExpressionType(analysis, selectExpression);
+ }
+
+ analysis.setMeasurementList(measurementList);
+ analysis.setMeasurementSchemaList(measurementSchemaList);
+ analysis.setAggregationExpressions(aggregationExpressions);
+ analysis.setOutputExpressions(outputExpressions);
+ analysis.setSelectExpressions(selectExpressions);
+ return true;
+ }
+
+ private static void analyzeHaving(
+ Analysis analysis,
+ QueryStatement queryStatement,
+ ISchemaTree schemaTree,
+ List<PartialPath> deviceSet) {
+ if (!queryStatement.hasHaving()) {
+ return;
+ }
+
+ // TODO not support having count(s1) + sum(s2) expression
+ Set<Expression> aggregationExpressions =
analysis.getAggregationExpressions();
+
+ Expression havingExpression =
queryStatement.getHavingCondition().getPredicate();
+
+ // Set<Expression> normalizedAggregationExpressions = new
LinkedHashSet<>();
+ for (Expression aggregationExpression :
searchAggregationExpressions(havingExpression)) {
+ Expression normalizedAggregationExpression =
normalizeExpression(aggregationExpression);
+
+ analyzeExpressionType(analysis, aggregationExpression);
+ analyzeExpressionType(analysis, normalizedAggregationExpression);
+
+ aggregationExpressions.add(aggregationExpression);
+ // normalizedAggregationExpressions.add(normalizedAggregationExpression);
+ }
+
+ TSDataType outputType = analyzeExpressionType(analysis, havingExpression);
+ if (outputType != TSDataType.BOOLEAN) {
+ throw new SemanticException(
+ String.format(
+ "The output type of the expression in HAVING clause should be
BOOLEAN, actual data type: %s.",
+ outputType));
+ }
+ analysis.setHavingExpression(havingExpression);
+ }
+
+ private static void analyzeDeviceToSourceTransform(Analysis analysis) {
+ // TODO add having into SourceTransform
+
analysis.setDeviceToSourceTransformExpressions(analysis.getDeviceToSelectExpressions());
+ }
+
+ private static void analyzeDeviceToSource(Analysis analysis) {
+
analysis.setDeviceToSourceExpressions(analysis.getDeviceToSelectExpressions());
+
analysis.setDeviceToOutputExpressions(analysis.getDeviceToSelectExpressions());
+ }
+
+ private static void analyzeDeviceToAggregation(Analysis analysis) {
+ // TODO need add having clause?
+
analysis.setDeviceToAggregationExpressions(analysis.getDeviceToSelectExpressions());
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java
index 856476eae24..bc14b2c55cd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java
@@ -70,7 +70,9 @@ import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.analyz
import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.getTimePartitionSlotList;
import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.concatDeviceAndBindSchemaForExpression;
import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.getMeasurementExpression;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.searchAggregationExpressions;
import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionTypeAnalyzer.analyzeExpressionForTemplatedQuery;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAggregationAnalyze.canBuildAggregationPlanUseTemplate;
/**
* This class provides accelerated implementation for multiple devices align
by device query. This
@@ -96,9 +98,7 @@ public class TemplatedAnalyze {
IPartitionFetcher partitionFetcher,
ISchemaTree schemaTree,
MPPQueryContext context) {
- if (queryStatement.isAggregationQuery()
- || queryStatement.isGroupBy()
- || queryStatement.isGroupByTime()
+ if (queryStatement.getGroupByComponent() != null
|| queryStatement.isSelectInto()
|| queryStatement.hasFill()
|| schemaTree.hasNormalTimeSeries()) {
@@ -106,58 +106,61 @@ public class TemplatedAnalyze {
}
List<Template> templates = schemaTree.getUsingTemplates();
- if (templates.size() != 1) {
+ if (templates.size() != 1 || templates.get(0) == null) {
return false;
}
Template template = templates.get(0);
+ if (queryStatement.isAggregationQuery()) {
+ return canBuildAggregationPlanUseTemplate(
+ analysis, queryStatement, partitionFetcher, schemaTree, context,
template);
+ }
+
List<Pair<Expression, String>> outputExpressions = new ArrayList<>();
ColumnPaginationController paginationController =
new ColumnPaginationController(
queryStatement.getSeriesLimit(), queryStatement.getSeriesOffset());
- if (template != null) {
- for (ResultColumn resultColumn :
queryStatement.getSelectComponent().getResultColumns()) {
- Expression expression = resultColumn.getExpression();
- if ("*".equals(expression.getOutputSymbol())) {
- for (Map.Entry<String, IMeasurementSchema> entry :
template.getSchemaMap().entrySet()) {
- if (paginationController.hasCurOffset()) {
- paginationController.consumeOffset();
- } else if (paginationController.hasCurLimit()) {
- String measurementName = entry.getKey();
- IMeasurementSchema measurementSchema = entry.getValue();
- TimeSeriesOperand measurementPath =
- new TimeSeriesOperand(
- new MeasurementPath(new String[] {measurementName},
measurementSchema));
- outputExpressions.add(new Pair<>(measurementPath, null));
- paginationController.consumeLimit();
- } else {
- break;
- }
- }
- if (queryStatement.getSelectComponent().getResultColumns().size() ==
1
- && queryStatement.getSeriesOffset() == 0
- && queryStatement.getSeriesLimit() == 0) {
- analysis.setTemplateWildCardQuery();
+ for (ResultColumn resultColumn :
queryStatement.getSelectComponent().getResultColumns()) {
+ Expression expression = resultColumn.getExpression();
+ if ("*".equals(expression.getOutputSymbol())) {
+ for (Map.Entry<String, IMeasurementSchema> entry :
template.getSchemaMap().entrySet()) {
+ if (paginationController.hasCurOffset()) {
+ paginationController.consumeOffset();
+ } else if (paginationController.hasCurLimit()) {
+ String measurementName = entry.getKey();
+ IMeasurementSchema measurementSchema = entry.getValue();
+ TimeSeriesOperand measurementPath =
+ new TimeSeriesOperand(
+ new MeasurementPath(new String[] {measurementName},
measurementSchema));
+ outputExpressions.add(new Pair<>(measurementPath, null));
+ paginationController.consumeLimit();
+ } else {
+ break;
}
- } else if (expression instanceof TimeSeriesOperand) {
- String measurementName = ((TimeSeriesOperand)
expression).getPath().getMeasurement();
- if (template.getSchemaMap().containsKey(measurementName)) {
- if (paginationController.hasCurOffset()) {
- paginationController.consumeOffset();
- } else if (paginationController.hasCurLimit()) {
- IMeasurementSchema measurementSchema =
template.getSchemaMap().get(measurementName);
- TimeSeriesOperand measurementPath =
- new TimeSeriesOperand(
- new MeasurementPath(new String[] {measurementName},
measurementSchema));
- outputExpressions.add(new Pair<>(measurementPath,
resultColumn.getAlias()));
- } else {
- break;
- }
+ }
+ if (queryStatement.getSelectComponent().getResultColumns().size() == 1
+ && queryStatement.getSeriesOffset() == 0
+ && queryStatement.getSeriesLimit() == 0) {
+ analysis.setTemplateWildCardQuery();
+ }
+ } else if (expression instanceof TimeSeriesOperand) {
+ String measurementName = ((TimeSeriesOperand)
expression).getPath().getMeasurement();
+ if (template.getSchemaMap().containsKey(measurementName)) {
+ if (paginationController.hasCurOffset()) {
+ paginationController.consumeOffset();
+ } else if (paginationController.hasCurLimit()) {
+ IMeasurementSchema measurementSchema =
template.getSchemaMap().get(measurementName);
+ TimeSeriesOperand measurementPath =
+ new TimeSeriesOperand(
+ new MeasurementPath(new String[] {measurementName},
measurementSchema));
+ outputExpressions.add(new Pair<>(measurementPath,
resultColumn.getAlias()));
+ } else {
+ break;
}
- } else {
- return false;
}
+ } else {
+ return false;
}
}
@@ -188,7 +191,7 @@ public class TemplatedAnalyze {
analyzeDeviceToSource(analysis);
analyzeDeviceViewOutput(analysis, queryStatement);
- analyzeDeviceViewInput(analysis);
+ analyzeDeviceViewInput(analysis, queryStatement);
analyzeFill(analysis, queryStatement);
@@ -228,8 +231,7 @@ public class TemplatedAnalyze {
analysis.setMeasurementSchemaList(measurementSchemaList);
}
- private static List<PartialPath> analyzeFrom(
- QueryStatement queryStatement, ISchemaTree schemaTree) {
+ static List<PartialPath> analyzeFrom(QueryStatement queryStatement,
ISchemaTree schemaTree) {
// device path patterns in FROM clause
List<PartialPath> devicePatternList =
queryStatement.getFromComponent().getPrefixPaths();
@@ -246,12 +248,12 @@ public class TemplatedAnalyze {
:
deviceSet.stream().sorted(Comparator.reverseOrder()).collect(Collectors.toList());
}
- private static void analyzeDeviceToWhere(Analysis analysis, QueryStatement
queryStatement) {
+ static void analyzeDeviceToWhere(Analysis analysis, QueryStatement
queryStatement) {
if (!queryStatement.hasWhere()) {
return;
}
- analysis.setOnlyQueryTemplateMeasurements(false);
+ analysis.setNoWhereAndAggregation(false);
Expression wherePredicate =
new TemplatedConcatRemoveUnExistentMeasurementVisitor()
.process(
@@ -325,25 +327,55 @@ public class TemplatedAnalyze {
analysis.setDeviceToSourceTransformExpressions(analysis.getDeviceToSelectExpressions());
}
- private static void analyzeDeviceViewOutput(Analysis analysis,
QueryStatement queryStatement) {
+ static void analyzeDeviceViewOutput(Analysis analysis, QueryStatement
queryStatement) {
Set<Expression> selectExpressions = analysis.getSelectExpressions();
- // TODO if no order by, just set deviceViewOutputExpressions as
selectExpressions
- Set<Expression> deviceViewOutputExpressions = new
LinkedHashSet<>(selectExpressions);
- if (queryStatement.hasOrderByExpression()) {
- deviceViewOutputExpressions.addAll(analysis.getOrderByExpressions());
+ // if no order by, just set deviceViewOutputExpressions as
selectExpressions
+ Set<Expression> deviceViewOutputExpressions = new LinkedHashSet<>();
+
+ if (queryStatement.isAggregationQuery()) {
+ deviceViewOutputExpressions.add(DEVICE_EXPRESSION);
+ if (queryStatement.isOutputEndTime()) {
+ deviceViewOutputExpressions.add(END_TIME_EXPRESSION);
+ }
+ for (Expression selectExpression : selectExpressions) {
+
deviceViewOutputExpressions.addAll(searchAggregationExpressions(selectExpression));
+ }
+ if (queryStatement.hasHaving()) {
+ deviceViewOutputExpressions.addAll(
+ searchAggregationExpressions(analysis.getHavingExpression()));
+ }
+ if (queryStatement.hasOrderByExpression()) {
+ for (Expression orderByExpression : analysis.getOrderByExpressions()) {
+
deviceViewOutputExpressions.addAll(searchAggregationExpressions(orderByExpression));
+ }
+ }
+ } else {
+ deviceViewOutputExpressions.addAll(selectExpressions);
+ if (queryStatement.hasOrderByExpression()) {
+ deviceViewOutputExpressions.addAll(analysis.getOrderByExpressions());
+ }
}
+
analysis.setDeviceViewOutputExpressions(deviceViewOutputExpressions);
analysis.setDeviceViewSpecialProcess(
analyzeDeviceViewSpecialProcess(deviceViewOutputExpressions,
queryStatement, analysis));
}
- private static void analyzeDeviceViewInput(Analysis analysis) {
+ static void analyzeDeviceViewInput(Analysis analysis, QueryStatement
queryStatement) {
List<Integer> indexes = new ArrayList<>();
- // index-0 is `Device`
- for (int i = 1; i < analysis.getSelectExpressions().size(); i++) {
- indexes.add(i);
+ if (queryStatement.isAggregationQuery()) {
+ // TODO verify the rightness of order
+ for (int i = 1; i <= analysis.getAggregationExpressions().size(); i++) {
+ indexes.add(i);
+ }
+ } else {
+ for (int i = 1; i < analysis.getSelectExpressions().size(); i++) {
+ indexes.add(i);
+ }
}
+
+ // TODO only store once
Map<String, List<Integer>> deviceViewInputIndexesMap = new HashMap<>();
for (PartialPath devicePath : analysis.getDeviceList()) {
deviceViewInputIndexesMap.put(devicePath.getFullPath(), indexes);
@@ -356,7 +388,7 @@ public class TemplatedAnalyze {
analysis.setDeviceToOutputExpressions(analysis.getDeviceToSelectExpressions());
}
- private static void analyzeDataPartition(
+ static void analyzeDataPartition(
Analysis analysis,
ISchemaTree schemaTree,
IPartitionFetcher partitionFetcher,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedInfo.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedInfo.java
index bd624f25634..2ae63c12a90 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedInfo.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedInfo.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
@@ -58,7 +60,7 @@ public class TemplatedInfo {
private final boolean queryAllSensors;
// variables used in DeviceViewOperator
- private final List<String> selectMeasurements;
+ private final List<String> deviceViewOutputNames;
private final List<Integer> deviceToMeasurementIndexes;
// variables related to LIMIT/OFFSET push down
@@ -71,33 +73,44 @@ public class TemplatedInfo {
// utils variables, not serialize
private Map<String, IMeasurementSchema> schemaMap;
- private Map<String, List<InputLocation>> layoutMap;
+ private Map<String, List<InputLocation>> filterLayoutMap;
private int maxTsBlockLineNum = -1;
// variables related to predicate push down
+ // TODO when to init pushDownPredicate in agg situation?
private Expression pushDownPredicate;
+ // variables related to aggregation
+ public List<AggregationDescriptor> aggregationDescriptorList;
+ public GroupByTimeParameter groupByTimeParameter;
+ public boolean outputEndTime;
+
+ private Expression havingExpression;
+
public TemplatedInfo(
List<String> measurementList,
List<IMeasurementSchema> schemaList,
List<TSDataType> dataTypes,
Ordering scanOrder,
boolean queryAllSensors,
- List<String> selectMeasurements,
+ List<String> deviceViewOutputNames,
List<Integer> deviceToMeasurementIndexes,
long offsetValue,
long limitValue,
Expression predicate,
boolean keepNull,
Map<String, IMeasurementSchema> schemaMap,
- Map<String, List<InputLocation>> layoutMap,
- Expression pushDownPredicate) {
+ Map<String, List<InputLocation>> filterLayoutMap,
+ Expression pushDownPredicate,
+ List<AggregationDescriptor> aggregationDescriptorList,
+ GroupByTimeParameter groupByTimeParameter,
+ boolean outputEndTime) {
this.measurementList = measurementList;
this.schemaList = schemaList;
this.dataTypes = dataTypes;
this.scanOrder = scanOrder;
this.queryAllSensors = queryAllSensors;
- this.selectMeasurements = selectMeasurements;
+ this.deviceViewOutputNames = deviceViewOutputNames;
this.deviceToMeasurementIndexes = deviceToMeasurementIndexes;
this.offsetValue = offsetValue;
this.limitValue = limitValue;
@@ -105,9 +118,13 @@ public class TemplatedInfo {
if (predicate != null) {
this.keepNull = keepNull;
this.schemaMap = schemaMap;
- this.layoutMap = layoutMap;
+ this.filterLayoutMap = filterLayoutMap;
}
this.pushDownPredicate = pushDownPredicate;
+
+ this.aggregationDescriptorList = aggregationDescriptorList;
+ this.groupByTimeParameter = groupByTimeParameter;
+ this.outputEndTime = outputEndTime;
}
public List<String> getMeasurementList() {
@@ -130,8 +147,8 @@ public class TemplatedInfo {
return this.queryAllSensors;
}
- public List<String> getSelectMeasurements() {
- return this.selectMeasurements;
+ public List<String> getDeviceViewOutputNames() {
+ return this.deviceViewOutputNames;
}
public long getOffsetValue() {
@@ -158,8 +175,8 @@ public class TemplatedInfo {
return this.schemaMap;
}
- public Map<String, List<InputLocation>> getLayoutMap() {
- return this.layoutMap;
+ public Map<String, List<InputLocation>> getFilterLayoutMap() {
+ return this.filterLayoutMap;
}
public Expression getPushDownPredicate() {
@@ -215,8 +232,8 @@ public class TemplatedInfo {
ReadWriteIOUtils.write(scanOrder.ordinal(), byteBuffer);
ReadWriteIOUtils.write(queryAllSensors, byteBuffer);
- ReadWriteIOUtils.write(selectMeasurements.size(), byteBuffer);
- for (String selectMeasurement : selectMeasurements) {
+ ReadWriteIOUtils.write(deviceViewOutputNames.size(), byteBuffer);
+ for (String selectMeasurement : deviceViewOutputNames) {
ReadWriteIOUtils.write(selectMeasurement, byteBuffer);
}
@@ -242,6 +259,20 @@ public class TemplatedInfo {
} else {
ReadWriteIOUtils.write((byte) 0, byteBuffer);
}
+
+ if (aggregationDescriptorList != null) {
+ ReadWriteIOUtils.write(aggregationDescriptorList.size(), byteBuffer);
+ aggregationDescriptorList.forEach(d -> d.serialize(byteBuffer));
+ } else {
+ ReadWriteIOUtils.write(0, byteBuffer);
+ }
+
+ if (groupByTimeParameter != null) {
+ ReadWriteIOUtils.write((byte) 1, byteBuffer);
+ groupByTimeParameter.serialize(byteBuffer);
+ } else {
+ ReadWriteIOUtils.write((byte) 0, byteBuffer);
+ }
}
public void serialize(DataOutputStream stream) throws IOException {
@@ -258,8 +289,8 @@ public class TemplatedInfo {
ReadWriteIOUtils.write(scanOrder.ordinal(), stream);
ReadWriteIOUtils.write(queryAllSensors, stream);
- ReadWriteIOUtils.write(selectMeasurements.size(), stream);
- for (String selectMeasurement : selectMeasurements) {
+ ReadWriteIOUtils.write(deviceViewOutputNames.size(), stream);
+ for (String selectMeasurement : deviceViewOutputNames) {
ReadWriteIOUtils.write(selectMeasurement, stream);
}
@@ -285,6 +316,22 @@ public class TemplatedInfo {
} else {
ReadWriteIOUtils.write((byte) 0, stream);
}
+
+ if (aggregationDescriptorList != null) {
+ ReadWriteIOUtils.write(aggregationDescriptorList.size(), stream);
+ for (AggregationDescriptor descriptor : aggregationDescriptorList) {
+ descriptor.serialize(stream);
+ }
+ } else {
+ ReadWriteIOUtils.write(0, stream);
+ }
+
+ if (groupByTimeParameter != null) {
+ ReadWriteIOUtils.write((byte) 1, stream);
+ groupByTimeParameter.serialize(stream);
+ } else {
+ ReadWriteIOUtils.write((byte) 0, stream);
+ }
}
public static TemplatedInfo deserialize(ByteBuffer byteBuffer) {
@@ -350,6 +397,23 @@ public class TemplatedInfo {
pushDownPredicate = Expression.deserialize(byteBuffer);
}
+ List<AggregationDescriptor> aggregationDescriptorList = null;
+ listSize = ReadWriteIOUtils.readInt(byteBuffer);
+ if (listSize > 0) {
+ aggregationDescriptorList = new ArrayList<>(listSize);
+ while (listSize-- > 0) {
+
aggregationDescriptorList.add(AggregationDescriptor.deserialize(byteBuffer));
+ }
+ }
+
+ byte hasGroupByTime = ReadWriteIOUtils.readByte(byteBuffer);
+ GroupByTimeParameter groupByTimeParameter = null;
+ if (hasGroupByTime == 1) {
+ groupByTimeParameter = GroupByTimeParameter.deserialize(byteBuffer);
+ }
+
+ // TODO add outputEndTime serialization and deserialization
+
return new TemplatedInfo(
measurementList,
measurementSchemaList,
@@ -364,6 +428,9 @@ public class TemplatedInfo {
keepNull,
currentSchemaMap,
layoutMap,
- pushDownPredicate);
+ pushDownPredicate,
+ aggregationDescriptorList,
+ groupByTimeParameter,
+ false);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java
index cf4b3461955..2e65e750193 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java
@@ -64,11 +64,13 @@ import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkState;
import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_TIME;
@@ -93,6 +95,12 @@ public class AggregationPushDown implements PlanOptimizer {
private boolean cannotUseStatistics(QueryStatement queryStatement, Analysis
analysis) {
boolean isAlignByDevice = queryStatement.isAlignByDevice();
if (isAlignByDevice) {
+ if (analysis.allDevicesInOneTemplate()) {
+ // TODO agg+template situation, how about the
SourceTransformExpressions
+ return cannotUseStatistics(
+ analysis.getAggregationExpressions(),
analysis.getAggregationExpressions());
+ }
+
// check any of the devices
String device = analysis.getDeviceList().get(0).toString();
return cannotUseStatistics(
@@ -173,6 +181,10 @@ public class AggregationPushDown implements PlanOptimizer {
List<PlanNode> rewrittenChildren = new ArrayList<>();
for (int i = 0; i < node.getDevices().size(); i++) {
context.setCurDevice(node.getDevices().get(i));
+ if (context.analysis.allDevicesInOneTemplate()) {
+ context.setCurDevicePath(context.analysis.getDeviceList().get(i));
+ }
+
rewrittenChildren.add(node.getChildren().get(i).accept(this, context));
}
node.setChildren(rewrittenChildren);
@@ -263,14 +275,26 @@ public class AggregationPushDown implements PlanOptimizer
{
sourceToCountTimeAggregationsMap);
}
- List<PlanNode> sourceNodeList =
- constructSourceNodeFromAggregationDescriptors(
- sourceToAscendingAggregationsMap,
- sourceToDescendingAggregationsMap,
- sourceToCountTimeAggregationsMap,
- node.getScanOrder(),
- node.getGroupByTimeParameter(),
- context);
+ List<PlanNode> sourceNodeList;
+ if (context.analysis.allDevicesInOneTemplate()) {
+ sourceNodeList =
+ constructSourceNodeFromTemplateAggregationDescriptors(
+ sourceToAscendingAggregationsMap,
+ sourceToDescendingAggregationsMap,
+ sourceToCountTimeAggregationsMap,
+ node.getScanOrder(),
+ node.getGroupByTimeParameter(),
+ context);
+ } else {
+ sourceNodeList =
+ constructSourceNodeFromAggregationDescriptors(
+ sourceToAscendingAggregationsMap,
+ sourceToDescendingAggregationsMap,
+ sourceToCountTimeAggregationsMap,
+ node.getScanOrder(),
+ node.getGroupByTimeParameter(),
+ context);
+ }
if (isSingleSource && ((SeriesScanSourceNode)
child).getPushDownPredicate() != null) {
Expression pushDownPredicate = ((SeriesScanSourceNode)
child).getPushDownPredicate();
@@ -364,7 +388,6 @@ public class AggregationPushDown implements PlanOptimizer {
GroupByTimeParameter groupByTimeParameter,
RewriterContext context) {
List<PlanNode> sourceNodeList = new ArrayList<>();
- boolean needCheckAscending = groupByTimeParameter == null;
Map<PartialPath, List<AggregationDescriptor>>
groupedAscendingAggregations = null;
if (!countTimeAggregations.isEmpty()) {
groupedAscendingAggregations = countTimeAggregations;
@@ -383,6 +406,7 @@ public class AggregationPushDown implements PlanOptimizer {
context));
}
+ boolean needCheckAscending = groupByTimeParameter == null;
if (needCheckAscending) {
Map<PartialPath, List<AggregationDescriptor>>
groupedDescendingAggregations =
MetaUtils.groupAlignedAggregations(descendingAggregations);
@@ -400,6 +424,85 @@ public class AggregationPushDown implements PlanOptimizer {
return sourceNodeList;
}
+ private List<PlanNode>
constructSourceNodeFromTemplateAggregationDescriptors(
+ Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations,
+ Map<PartialPath, List<AggregationDescriptor>> descendingAggregations,
+ Map<PartialPath, List<AggregationDescriptor>> countTimeAggregations,
+ Ordering scanOrder,
+ GroupByTimeParameter groupByTimeParameter,
+ RewriterContext context) {
+
+ // keySet of ascendingAggregations is measurement,
+ // valueSet of ascendingAggregations is aggDescriptors such as
count(s1), avg(s1)
+
+ List<PlanNode> sourceNodeList = new ArrayList<>();
+ PartialPath devicePath = context.curDevicePath;
+ List<String> measurementList = context.analysis.getMeasurementList();
+ List<IMeasurementSchema> measurementSchemaList =
context.analysis.getMeasurementSchemaList();
+ boolean needCheckAscending = groupByTimeParameter == null;
+
+ if (context.analysis.getDeviceTemplate().isDirectAligned()) {
+ AlignedPath alignedPath = new AlignedPath(devicePath);
+ alignedPath.setMeasurementList(measurementList);
+ alignedPath.addSchemas(measurementSchemaList);
+
+ List<AggregationDescriptor> aggregationDescriptors =
+ ascendingAggregations.values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+ if (!aggregationDescriptors.isEmpty()) {
+ sourceNodeList.add(
+ createAggregationScanNode(
+ alignedPath, aggregationDescriptors, scanOrder,
groupByTimeParameter, context));
+ }
+
+ if (needCheckAscending && !descendingAggregations.isEmpty()) {
+ aggregationDescriptors =
+ descendingAggregations.values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+ sourceNodeList.add(
+ createAggregationScanNode(
+ alignedPath, aggregationDescriptors, scanOrder,
groupByTimeParameter, context));
+ }
+ } else {
+ // TODO verify the rightness of non-aligned series
+ for (int i = 0; i < measurementList.size(); i++) {
+ MeasurementPath measurementPath =
+ new MeasurementPath(
+ devicePath.concatNode(measurementList.get(i)),
measurementSchemaList.get(i));
+ for (List<AggregationDescriptor> aggregationDescriptorList :
+ descendingAggregations.values()) {
+ sourceNodeList.add(
+ createAggregationScanNode(
+ measurementPath,
+ aggregationDescriptorList,
+ scanOrder,
+ groupByTimeParameter,
+ context));
+ }
+
+ if (needCheckAscending) {
+ for (List<AggregationDescriptor> aggregationDescriptorList :
+ descendingAggregations.values()) {
+ sourceNodeList.add(
+ createAggregationScanNode(
+ measurementPath,
+ aggregationDescriptorList,
+ scanOrder,
+ groupByTimeParameter,
+ context));
+ }
+ }
+ }
+ }
+
+ // TODO count(s1+s2) is not supported
+ // TODO count_time is not supported
+
+ return sourceNodeList;
+ }
+
private SeriesAggregationSourceNode createAggregationScanNode(
PartialPath selectPath,
List<AggregationDescriptor> aggregationDescriptorList,
@@ -454,6 +557,7 @@ public class AggregationPushDown implements PlanOptimizer {
private final boolean isAlignByDevice;
private String curDevice;
+ private PartialPath curDevicePath;
public RewriterContext(Analysis analysis, MPPQueryContext context, boolean
isAlignByDevice) {
this.analysis = analysis;
@@ -473,9 +577,17 @@ public class AggregationPushDown implements PlanOptimizer {
this.curDevice = curDevice;
}
+ public void setCurDevicePath(PartialPath devicePath) {
+ this.curDevicePath = devicePath;
+ }
+
public Set<Expression> getAggregationExpressions() {
if (isAlignByDevice) {
- return analysis.getDeviceToAggregationExpressions().get(curDevice);
+ if (analysis.allDevicesInOneTemplate()) {
+ return analysis.getAggregationExpressions();
+ } else {
+ return analysis.getDeviceToAggregationExpressions().get(curDevice);
+ }
}
return analysis.getAggregationExpressions();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/PredicatePushDown.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/PredicatePushDown.java
index 2207b94f6b8..65a7023bb31 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/PredicatePushDown.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/PredicatePushDown.java
@@ -164,7 +164,10 @@ public class PredicatePushDown implements PlanOptimizer {
if (!cannotPushDownConjuncts.isEmpty()) {
resultNode =
planFilter(
- resultNode,
PredicateUtils.combineConjuncts(cannotPushDownConjuncts), context);
+ resultNode,
+ PredicateUtils.combineConjuncts(cannotPushDownConjuncts),
+ context,
+ true);
} else {
resultNode = planTransform(resultNode, context);
resultNode = planProject(resultNode, context);
@@ -249,7 +252,8 @@ public class PredicatePushDown implements PlanOptimizer {
return resultNode;
}
- private PlanNode planFilter(PlanNode child, Expression predicate,
RewriterContext context) {
+ private PlanNode planFilter(
+ PlanNode child, Expression predicate, RewriterContext context, boolean
isFromWhere) {
FilterNode pushDownFilterNode = context.getPushDownFilterNode();
return new FilterNode(
context.genPlanNodeId(),
@@ -257,7 +261,8 @@ public class PredicatePushDown implements PlanOptimizer {
pushDownFilterNode.getOutputExpressions(),
predicate,
pushDownFilterNode.isKeepNull(),
- pushDownFilterNode.getScanOrder());
+ pushDownFilterNode.getScanOrder(),
+ isFromWhere);
}
@Override
@@ -330,7 +335,8 @@ public class PredicatePushDown implements PlanOptimizer {
resultNode = planProject(resultNode, context);
return resultNode;
} else {
- return planFilter(node,
PredicateUtils.combineConjuncts(cannotPushDownConjuncts), context);
+ return planFilter(
+ node, PredicateUtils.combineConjuncts(cannotPushDownConjuncts),
context, true);
}
}
@@ -402,7 +408,7 @@ public class PredicatePushDown implements PlanOptimizer {
private RewriterContext(Analysis analysis, MPPQueryContext context,
boolean isAlignByDevice) {
this.queryId = context.getQueryId();
this.isAlignByDevice = isAlignByDevice;
- this.isBuildPlanUseTemplate = analysis.isAllDevicesInOneTemplate();
+ this.isBuildPlanUseTemplate = analysis.allDevicesInOneTemplate();
this.templatedInfo = context.getTypeProvider().getTemplatedInfo();
this.filterNodeFromWhereChecker = analysis::fromWhere;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
index b1786c85112..b696501a417 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
@@ -746,7 +746,7 @@ public class LogicalPlanBuilder {
return this;
}
- private PlanNode createSlidingWindowAggregationNode(
+ protected PlanNode createSlidingWindowAggregationNode(
PlanNode child,
Set<Expression> aggregationExpressions,
GroupByTimeParameter groupByTimeParameter,
@@ -851,7 +851,7 @@ public class LogicalPlanBuilder {
.collect(Collectors.toList()));
}
- private List<AggregationDescriptor> constructAggregationDescriptorList(
+ protected List<AggregationDescriptor> constructAggregationDescriptorList(
Set<Expression> aggregationExpressions, AggregationStep curStep) {
return aggregationExpressions.stream()
.map(
@@ -883,7 +883,8 @@ public class LogicalPlanBuilder {
selectExpressions.toArray(new Expression[0]),
filterExpression,
isGroupByTime,
- scanOrder);
+ scanOrder,
+ fromWhere);
if (fromWhere) {
analysis.setFromWhere(filterNode);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index 08990c5df47..3bec19321d3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -134,7 +134,7 @@ public class LogicalPlanVisitor extends
StatementVisitor<PlanNode, MPPQueryConte
@Override
public PlanNode visitQuery(QueryStatement queryStatement, MPPQueryContext
context) {
- if (analysis.isAllDevicesInOneTemplate()) {
+ if (analysis.allDevicesInOneTemplate()) {
return new TemplatedLogicalPlan(analysis, queryStatement,
context).visitQuery();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index c271e8ec2da..6b2ecccfd65 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -447,7 +447,7 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
seriesScanOperator,
templatedInfo.getProjectExpressions(),
templatedInfo.getDataTypes(),
- templatedInfo.getLayoutMap(),
+ templatedInfo.getFilterLayoutMap(),
templatedInfo.isKeepNull(),
node.getPlanNodeId(),
templatedInfo.getScanOrder(),
@@ -494,10 +494,18 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
List<String> inputColumnNames;
List<String> outputColumnNames = node.getOutputColumnNames();
if (outputColumnNames == null) {
- outputColumnNames =
context.getTypeProvider().getTemplatedInfo().getSelectMeasurements();
- // skip device column
- outputColumnNames = outputColumnNames.subList(1,
outputColumnNames.size());
- inputColumnNames =
context.getTypeProvider().getTemplatedInfo().getMeasurementList();
+ if
(context.getTypeProvider().getTemplatedInfo().aggregationDescriptorList !=
null) {
+ // TODO fix it
+ // outputColumnNames is aggregation expression
+ outputColumnNames =
context.getTypeProvider().getTemplatedInfo().getDeviceViewOutputNames();
+ outputColumnNames = outputColumnNames.subList(1,
outputColumnNames.size());
+ inputColumnNames = outputColumnNames;
+ } else {
+ outputColumnNames =
context.getTypeProvider().getTemplatedInfo().getDeviceViewOutputNames();
+ // skip device column
+ outputColumnNames = outputColumnNames.subList(1,
outputColumnNames.size());
+ inputColumnNames =
context.getTypeProvider().getTemplatedInfo().getMeasurementList();
+ }
} else {
inputColumnNames = node.getChild().getOutputColumnNames();
}
@@ -591,28 +599,62 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
@Override
public Operator visitAlignedSeriesAggregationScan(
AlignedSeriesAggregationScanNode node, LocalExecutionPlanContext
context) {
- AlignedPath seriesPath = node.getAlignedPath();
- boolean ascending = node.getScanOrder() == Ordering.ASC;
+ if (context.isBuildPlanUseTemplate()) {
+ return constructAlignedSeriesAggregationScanOperator(
+ node.getPlanNodeId(),
+ node.getAlignedPath(),
+ context.getTemplatedInfo().aggregationDescriptorList,
+ context.getTemplatedInfo().getPushDownPredicate(),
+ context.getTemplatedInfo().getScanOrder(),
+ context.getTemplatedInfo().groupByTimeParameter,
+ context.getTemplatedInfo().outputEndTime,
+ context);
+ }
+
+ return constructAlignedSeriesAggregationScanOperator(
+ node.getPlanNodeId(),
+ node.getAlignedPath(),
+ node.getAggregationDescriptorList(),
+ node.getPushDownPredicate(),
+ node.getScanOrder(),
+ node.getGroupByTimeParameter(),
+ node.isOutputEndTime(),
+ context);
+ }
+
+ private Operator constructAlignedSeriesAggregationScanOperator(
+ PlanNodeId planNodeId,
+ AlignedPath alignedPath,
+ List<AggregationDescriptor> aggregationDescriptorList,
+ Expression pushDownPredicate,
+ Ordering scanOrder,
+ GroupByTimeParameter groupByTimeParameter,
+ boolean outputEndTime,
+ LocalExecutionPlanContext context) {
+ boolean ascending = scanOrder == Ordering.ASC;
List<Aggregator> aggregators = new ArrayList<>();
- for (AggregationDescriptor descriptor :
node.getAggregationDescriptorList()) {
+ for (AggregationDescriptor descriptor : aggregationDescriptorList) {
checkArgument(
descriptor.getInputExpressions().size() == 1,
"descriptor's input expression size is not 1");
+
Expression expression = descriptor.getInputExpressions().get(0);
if (expression instanceof TimeSeriesOperand) {
+ // TODO for template_agg, no need use getPath.getMeasurement
String inputSeries =
((TimeSeriesOperand) (descriptor.getInputExpressions().get(0)))
.getPath()
.getMeasurement();
- int seriesIndex = seriesPath.getMeasurementList().indexOf(inputSeries);
+ int seriesIndex =
alignedPath.getMeasurementList().indexOf(inputSeries);
TSDataType seriesDataType =
-
seriesPath.getMeasurementSchema().getSubMeasurementsTSDataTypeList().get(seriesIndex);
+
alignedPath.getMeasurementSchema().getSubMeasurementsTSDataTypeList().get(seriesIndex);
aggregators.add(
new Aggregator(
AccumulatorFactory.createAccumulator(
descriptor.getAggregationFuncName(),
descriptor.getAggregationType(),
Collections.singletonList(seriesDataType),
+ // TODO inputExpression must be devicePath+measurement
descriptor.getInputExpressions(),
descriptor.getInputAttributes(),
ascending,
@@ -627,6 +669,7 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
descriptor.getAggregationFuncName(),
descriptor.getAggregationType(),
Collections.singletonList(TSDataType.INT64),
+ // TODO inputExpression must be devicePath+measurement
descriptor.getInputExpressions(),
descriptor.getInputAttributes(),
ascending,
@@ -640,23 +683,22 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
}
}
- GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter();
ITimeRangeIterator timeRangeIterator =
initTimeRangeIterator(groupByTimeParameter, ascending, true);
long maxReturnSize =
AggregationUtil.calculateMaxAggregationResultSize(
- node.getAggregationDescriptorList(), timeRangeIterator,
context.getTypeProvider());
+ aggregationDescriptorList, timeRangeIterator,
context.getTypeProvider());
SeriesScanOptions.Builder scanOptionsBuilder =
getSeriesScanOptionsBuilder(context);
- scanOptionsBuilder.withAllSensors(new
HashSet<>(seriesPath.getMeasurementList()));
+ scanOptionsBuilder.withAllSensors(new
HashSet<>(alignedPath.getMeasurementList()));
- Expression pushDownPredicate = node.getPushDownPredicate();
if (pushDownPredicate != null) {
checkArgument(PredicateUtils.predicateCanPushIntoScan(pushDownPredicate));
scanOptionsBuilder.withPushDownFilter(
convertPredicateToFilter(
pushDownPredicate,
- node.getAlignedPath().getMeasurementList(),
+ alignedPath.getMeasurementList(),
+ // TODO what's the meaning of isBuildPlanUseTemplate
context.getTypeProvider().getTemplatedInfo() != null,
context.getTypeProvider()));
}
@@ -666,14 +708,14 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
- node.getPlanNodeId(),
+ planNodeId,
AlignedSeriesAggregationScanOperator.class.getSimpleName());
AlignedSeriesAggregationScanOperator seriesAggregationScanOperator =
new AlignedSeriesAggregationScanOperator(
- node.getPlanNodeId(),
- seriesPath,
- node.getScanOrder(),
- node.isOutputEndTime(),
+ planNodeId,
+ alignedPath,
+ scanOrder,
+ outputEndTime,
scanOptionsBuilder.build(),
operatorContext,
aggregators,
@@ -683,7 +725,7 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
((DataDriverContext) context.getDriverContext())
.addSourceOperator(seriesAggregationScanOperator);
- ((DataDriverContext) context.getDriverContext()).addPath(seriesPath);
+ ((DataDriverContext) context.getDriverContext()).addPath(alignedPath);
context.getDriverContext().setInputDriver(true);
return seriesAggregationScanOperator;
}
@@ -1395,25 +1437,32 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
@Override
public Operator visitFilter(FilterNode node, LocalExecutionPlanContext
context) {
- if (context.isBuildPlanUseTemplate()) {
+ if (context.isBuildPlanUseTemplate() && node.isFromWhere()) {
TemplatedInfo templatedInfo = context.getTemplatedInfo();
return constructFilterOperator(
node.getPredicate(),
generateOnlyChildOperator(node, context),
templatedInfo.getProjectExpressions(),
templatedInfo.getDataTypes(),
- templatedInfo.getLayoutMap(),
+ templatedInfo.getFilterLayoutMap(),
templatedInfo.isKeepNull(),
node.getPlanNodeId(),
templatedInfo.getScanOrder(),
context);
}
+ // 1. not use template
+ // 2. use template but the FilterNode is not generated by FilterNode
+ // the inputDataTypes should be generated by the outputColumns of children
return constructFilterOperator(
node.getPredicate(),
generateOnlyChildOperator(node, context),
node.getOutputExpressions(),
- getInputColumnTypes(node, context.getTypeProvider()),
+ node.getChildren().stream()
+ .map(PlanNode::getOutputColumnNames)
+ .flatMap(List::stream)
+ .map(context.getTypeProvider()::getType)
+ .collect(Collectors.toList()),
makeLayout(node),
node.isKeepNull(),
node.getPlanNodeId(),
@@ -1778,13 +1827,28 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
@Override
public Operator visitRawDataAggregation(
RawDataAggregationNode node, LocalExecutionPlanContext context) {
+ // TODO optimize serialize and deserialize method in template situation
+ Map<String, List<InputLocation>> layout;
+ if (context.isBuildPlanUseTemplate()) {
+ // in template situation, output columns of ProjectNode is not stored,
it's same as its
+ // children
+ layout = context.getTemplatedInfo().getFilterLayoutMap();
+ } else {
+ layout = makeLayout(node);
+ }
+ return createRawDataAggregationOperator(node, context, layout);
+ }
+
+ private RawDataAggregationOperator createRawDataAggregationOperator(
+ RawDataAggregationNode node,
+ LocalExecutionPlanContext context,
+ Map<String, List<InputLocation>> layout) {
checkArgument(
!node.getAggregationDescriptorList().isEmpty(),
"Aggregation descriptorList cannot be empty");
Operator child = node.getChild().accept(this, context);
boolean ascending = node.getScanOrder() == Ordering.ASC;
List<Aggregator> aggregators = new ArrayList<>();
- Map<String, List<InputLocation>> layout = makeLayout(node);
List<AggregationDescriptor> aggregationDescriptors =
node.getAggregationDescriptorList();
for (AggregationDescriptor descriptor :
node.getAggregationDescriptorList()) {
List<InputLocation[]> inputLocationList =
calcInputLocationList(descriptor, layout);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java
index b8efa1e3b3d..34d48ea8a72 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java
@@ -85,6 +85,13 @@ public class SubPlanTypeExtractor {
@Override
public Void visitAlignedSeriesAggregationScan(
AlignedSeriesAggregationScanNode node, Void context) {
+ // if TemplateInfo is not empty, all type infos used by
AlignedSeriesAggregationScanNode have
+ // been stored
+ // in TemplateInfo
+ if (typeProvider.getTemplatedInfo() != null) {
+ return null;
+ }
+
AlignedPath alignedPath = node.getAlignedPath();
for (int i = 0; i < alignedPath.getColumnNum(); i++) {
String sourcePath =
alignedPath.getPathWithMeasurement(i).getFullPath();
@@ -168,6 +175,7 @@ public class SubPlanTypeExtractor {
if (typeProvider.getTemplatedInfo() != null) {
return null;
}
+
return visitPlan(node, context);
}
@@ -178,6 +186,7 @@ public class SubPlanTypeExtractor {
if (typeProvider.getTemplatedInfo() != null) {
return null;
}
+
return visitPlan(node, context);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java
index 75222efb6fc..a00aee711d7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java
@@ -25,13 +25,19 @@ import
org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
import org.apache.iotdb.db.queryengine.plan.analyze.TemplatedInfo;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
+import
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
+import org.apache.commons.lang3.Validate;
+import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
@@ -39,9 +45,13 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import static
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.DEVICE;
+import static
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.ENDTIME;
import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.searchSourceExpressions;
import static
org.apache.iotdb.db.queryengine.plan.analyze.TemplatedInfo.makeLayout;
+import static
org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanBuilder.updateTypeProviderByPartialAggregation;
import static
org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanVisitor.pushDownLimitToScanNode;
+import static
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode.getDeduplicatedDescriptors;
/**
* This class provides accelerated implementation for multiple devices align
by device query. This
@@ -70,6 +80,10 @@ public class TemplatedLogicalPlan {
private Map<String, List<InputLocation>> filterLayoutMap;
+ List<AggregationDescriptor> aggregationDescriptorList;
+
+ List<AggregationDescriptor> deduplicatedDescriptors;
+
public TemplatedLogicalPlan(
Analysis analysis, QueryStatement queryStatement, MPPQueryContext
context) {
this.analysis = analysis;
@@ -86,10 +100,70 @@ public class TemplatedLogicalPlan {
this.whereExpression = analysis.getWhereExpression();
// for align by device query with template, most used variables are same
- initCommonVariables();
+ if (queryStatement.isAggregationQuery()) {
+ initAggQueryCommonVariables();
+ } else {
+ initNonAggQueryCommonVariables();
+ }
}
- private void initCommonVariables() {
+ private void initAggQueryCommonVariables() {
+ if (whereExpression != null) {
+ newMeasurementList = new ArrayList<>(measurementList);
+ newSchemaList = new ArrayList<>(schemaList);
+ Set<String> selectMeasurements = new HashSet<>(measurementList);
+ List<Expression> whereSourceExpressions =
searchSourceExpressions(whereExpression);
+ for (Expression expression : whereSourceExpressions) {
+ if (expression instanceof TimeSeriesOperand) {
+ String measurement = ((TimeSeriesOperand)
expression).getPath().getMeasurement();
+ if
(!analysis.getDeviceTemplate().getSchemaMap().containsKey(measurement)) {
+ continue;
+ }
+ if (!selectMeasurements.contains(measurement)) {
+ newMeasurementList.add(measurement);
+
newSchemaList.add(analysis.getDeviceTemplate().getSchema(measurement));
+ }
+ }
+ }
+
+ // TODO fix aggregation filterLayoutMap
+ filterLayoutMap = makeLayout(newMeasurementList);
+
+ analysis
+ .getExpressionTypes()
+ .forEach(
+ (key, value) ->
+
context.getTypeProvider().setType(key.getNode().getOutputSymbol(), value));
+ }
+
+ context
+ .getTypeProvider()
+ .setTemplatedInfo(
+ new TemplatedInfo(
+ newMeasurementList,
+ newSchemaList,
+ newSchemaList.stream()
+ .map(IMeasurementSchema::getType)
+ .collect(Collectors.toList()),
+ queryStatement.getResultTimeOrder(),
+ analysis.isLastLevelUseWildcard(),
+ analysis.getDeviceViewOutputExpressions().stream()
+ .map(Expression::getExpressionString)
+ .collect(Collectors.toList()),
+
analysis.getDeviceViewInputIndexesMap().values().iterator().next(),
+ OFFSET_VALUE,
+ limitValue,
+ whereExpression,
+ queryStatement.isGroupByTime(),
+ analysis.getDeviceTemplate().getSchemaMap(),
+ filterLayoutMap,
+ null,
+ null,
+ analysis.getGroupByTimeParameter(),
+ queryStatement.isOutputEndTime()));
+ }
+
+ private void initNonAggQueryCommonVariables() {
if (whereExpression != null) {
if (!analysis.isTemplateWildCardQuery()) {
newMeasurementList = new ArrayList<>(measurementList);
@@ -141,10 +215,17 @@ public class TemplatedLogicalPlan {
queryStatement.isGroupByTime(),
analysis.getDeviceTemplate().getSchemaMap(),
filterLayoutMap,
- null));
+ null,
+ null,
+ analysis.getGroupByTimeParameter(),
+ queryStatement.isOutputEndTime()));
}
public PlanNode visitQuery() {
+ if (queryStatement.isAggregationQuery()) {
+ return visitAggregation();
+ }
+
LogicalPlanBuilder planBuilder =
new TemplatedLogicalPlanBuilder(analysis, context, measurementList,
schemaList);
@@ -213,4 +294,157 @@ public class TemplatedLogicalPlan {
return planBuilder.getRoot();
}
+
+ // ============== Methods below are used for templated aggregation
======================
+
+ private PlanNode visitAggregation() {
+ boolean outputPartial =
+ queryStatement.isGroupByLevel()
+ || queryStatement.isGroupByTag()
+ || (queryStatement.isGroupByTime() &&
analysis.getGroupByTimeParameter().hasOverlap());
+ AggregationStep curStep = outputPartial ? AggregationStep.PARTIAL :
AggregationStep.SINGLE;
+
+ if (queryStatement.isGroupByTime() &&
analysis.getGroupByTimeParameter().hasOverlap()) {
+ curStep =
+ (queryStatement.isGroupByLevel() || queryStatement.isGroupByTag())
+ ? AggregationStep.INTERMEDIATE
+ : AggregationStep.FINAL;
+ }
+
+ aggregationDescriptorList =
+
constructAggregationDescriptorList(analysis.getAggregationExpressions(),
curStep);
+ updateTypeProvider(analysis.getAggregationExpressions());
+ if (curStep.isOutputPartial()) {
+ aggregationDescriptorList.forEach(
+ aggregationDescriptor ->
+ updateTypeProviderByPartialAggregation(
+ aggregationDescriptor, context.getTypeProvider()));
+ }
+
+ context.getTypeProvider().getTemplatedInfo().aggregationDescriptorList =
+ aggregationDescriptorList;
+
+ LogicalPlanBuilder planBuilder =
+ new TemplatedLogicalPlanBuilder(analysis, context, measurementList,
schemaList);
+ Map<String, PlanNode> deviceToSubPlanMap = new LinkedHashMap<>();
+ deduplicatedDescriptors =
getDeduplicatedDescriptors(aggregationDescriptorList);
+ for (PartialPath devicePath : analysis.getDeviceList()) {
+ String deviceName = devicePath.getFullPath();
+ PlanNode rootNode = visitDeviceAggregationBody(devicePath, curStep);
+
+ LogicalPlanBuilder subPlanBuilder =
+ new TemplatedLogicalPlanBuilder(analysis, context, measurementList,
schemaList)
+ .withNewRoot(rootNode);
+
+ deviceToSubPlanMap.put(deviceName, subPlanBuilder.getRoot());
+ }
+
+ // convert to ALIGN BY DEVICE view
+ planBuilder =
+ planBuilder.planDeviceView(
+ deviceToSubPlanMap,
+ analysis.getDeviceViewOutputExpressions(),
+ analysis.getDeviceViewInputIndexesMap(),
+ analysis.getSelectExpressions(),
+ queryStatement,
+ analysis);
+
+ planBuilder =
+ planBuilder.planHavingAndTransform(
+ analysis.getHavingExpression(),
+ analysis.getSelectExpressions(),
+ analysis.getOrderByExpressions(),
+ queryStatement.isGroupByTime(),
+ queryStatement.getResultTimeOrder());
+
+ if (!queryStatement.needPushDownSort()) {
+ planBuilder = planBuilder.planOrderBy(queryStatement, analysis);
+ }
+
+ planBuilder =
+ planBuilder
+ .planFill(analysis.getFillDescriptor(),
queryStatement.getResultTimeOrder())
+ .planOffset(queryStatement.getRowOffset());
+
+ if (!analysis.isUseTopKNode() || queryStatement.hasOffset()) {
+ planBuilder = planBuilder.planLimit(queryStatement.getRowLimit());
+ }
+
+ return planBuilder.getRoot();
+ }
+
+ private PlanNode visitDeviceAggregationBody(PartialPath devicePath,
AggregationStep curStep) {
+ TemplatedLogicalPlanBuilder planBuilder =
+ new TemplatedLogicalPlanBuilder(analysis, context, newMeasurementList,
newSchemaList);
+
+ planBuilder =
+ planBuilder
+ .planRawDataSource(
+ devicePath,
+ queryStatement.getResultTimeOrder(),
+ OFFSET_VALUE,
+ limitValue,
+ analysis.isLastLevelUseWildcard())
+ .planFilter(
+ whereExpression,
+ queryStatement.isGroupByTime(),
+ queryStatement.getResultTimeOrder());
+
+ planBuilder =
+ planBuilder.planRawDataAggregation(
+ analysis.getAggregationExpressions(),
+ null,
+ analysis.getGroupByTimeParameter(),
+ analysis.getGroupByParameter(),
+ queryStatement.isOutputEndTime(),
+ curStep,
+ queryStatement.getResultTimeOrder(),
+ deduplicatedDescriptors);
+
+ if (queryStatement.isGroupByTime() &&
analysis.getGroupByTimeParameter().hasOverlap()) {
+ planBuilder =
+ planBuilder.planSlidingWindowAggregation(
+ analysis.getSelectExpressions(),
+ analysis.getGroupByTimeParameter(),
+ curStep,
+ queryStatement.getResultTimeOrder());
+ }
+
+ // no group by level and group by tag
+ return planBuilder.getRoot();
+ }
+
+ private List<AggregationDescriptor> constructAggregationDescriptorList(
+ Set<Expression> aggregationExpressions, AggregationStep curStep) {
+ return aggregationExpressions.stream()
+ .map(
+ expression -> {
+ Validate.isTrue(expression instanceof FunctionExpression);
+ return new AggregationDescriptor(
+ ((FunctionExpression) expression).getFunctionName(),
+ curStep,
+ expression.getExpressions(),
+ ((FunctionExpression) expression).getFunctionAttributes());
+ })
+ .collect(Collectors.toList());
+ }
+
+ void updateTypeProvider(Collection<Expression> expressions) {
+ if (expressions == null) {
+ return;
+ }
+ expressions.forEach(
+ expression -> {
+ if (!expression.getExpressionString().equals(DEVICE)
+ && !expression.getExpressionString().equals(ENDTIME)) {
+ context
+ .getTypeProvider()
+ .setType(expression.getExpressionString(),
getPreAnalyzedType(expression));
+ }
+ });
+ }
+
+ private TSDataType getPreAnalyzedType(Expression expression) {
+ return analysis.getType(expression);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlanBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlanBuilder.java
index b1107ae8fab..3ee79317753 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlanBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlanBuilder.java
@@ -27,14 +27,20 @@ import
org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAggregationNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByParameter;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
/**
* This class provides accelerated implementation for multiple devices align
by device query. This
@@ -42,6 +48,7 @@ import java.util.List;
* unnecessary judgements.
*/
public class TemplatedLogicalPlanBuilder extends LogicalPlanBuilder {
+
private final MPPQueryContext context;
private final Analysis analysis;
@@ -121,7 +128,8 @@ public class TemplatedLogicalPlanBuilder extends
LogicalPlanBuilder {
null,
filterExpression,
isGroupByTime,
- scanOrder);
+ scanOrder,
+ true);
analysis.setFromWhere(filterNode);
this.root = filterNode;
@@ -129,6 +137,50 @@ public class TemplatedLogicalPlanBuilder extends
LogicalPlanBuilder {
return this;
}
+ // ===================== Methods below are used for aggregation
=============================
+
+ public TemplatedLogicalPlanBuilder planRawDataAggregation(
+ Set<Expression> aggregationExpressions,
+ Expression groupByExpression,
+ GroupByTimeParameter groupByTimeParameter,
+ GroupByParameter groupByParameter,
+ boolean outputEndTime,
+ AggregationStep curStep,
+ Ordering scanOrder,
+ List<AggregationDescriptor> deduplicatedAggregationDescriptorList) {
+ if (aggregationExpressions == null) {
+ return this;
+ }
+
+ this.root =
+ new RawDataAggregationNode(
+ context.getQueryId().genPlanNodeId(),
+ this.getRoot(),
+ deduplicatedAggregationDescriptorList,
+ groupByTimeParameter,
+ groupByParameter,
+ groupByExpression,
+ outputEndTime,
+ scanOrder,
+ true);
+ return this;
+ }
+
+ public TemplatedLogicalPlanBuilder planSlidingWindowAggregation(
+ Set<Expression> aggregationExpressions,
+ GroupByTimeParameter groupByTimeParameter,
+ AggregationStep curStep,
+ Ordering scanOrder) {
+ if (aggregationExpressions == null) {
+ return this;
+ }
+
+ this.root =
+ createSlidingWindowAggregationNode(
+ this.getRoot(), aggregationExpressions, groupByTimeParameter,
curStep, scanOrder);
+ return this;
+ }
+
@Override
public TemplatedLogicalPlanBuilder withNewRoot(PlanNode newRoot) {
this.root = newRoot;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
index 4276cba4290..3b32f2932cb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.SubPlanTypeExtractor;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.IPartitionRelatedNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.VirtualSourceNode;
@@ -152,7 +153,7 @@ public class PlanFragment {
} else {
ReadWriteIOUtils.write((byte) 1, stream);
- // templated device, the serialized attribute basically same,
+ // templated align by device query, the serialized attributes are same,
// so there is no need to serialize all the SeriesScanNode repeated
if (typeProvider.getTemplatedInfo() != null) {
typeProvider.serialize(stream);
@@ -183,7 +184,8 @@ public class PlanFragment {
PlanNode root;
if (typeProvider != null && typeProvider.getTemplatedInfo() != null) {
root = PlanNodeType.deserializeWithTemplate(byteBuffer, typeProvider);
- if (root instanceof AlignedSeriesScanNode) {
+ if (root instanceof AlignedSeriesScanNode
+ || root instanceof AlignedSeriesAggregationScanNode) {
return root;
}
} else {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
index 09fe6c92960..730edb747b7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
@@ -452,7 +452,7 @@ public class PlanGraphPrinter extends
PlanVisitor<List<String>, PlanGraphPrinter
List<String> outputColumns = node.getOutputColumnNames();
if (outputColumns == null) {
checkArgument(context.getTemplatedInfo() != null);
- outputColumns = context.getTemplatedInfo().getSelectMeasurements();
+ outputColumns = context.getTemplatedInfo().getDeviceViewOutputNames();
// skip device column
outputColumns = outputColumns.subList(1, outputColumns.size());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index 0fd420ce1bb..2b5e665a755 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -470,14 +470,18 @@ public enum PlanNodeType {
public static PlanNode deserializeWithTemplate(ByteBuffer buffer,
TypeProvider typeProvider) {
short nodeType = buffer.getShort();
switch (nodeType) {
+ case 1:
+ return DeviceViewNode.deserializeUseTemplate(buffer, typeProvider);
case 3:
return FilterNode.deserializeUseTemplate(buffer, typeProvider);
+ case 32:
+ return ProjectNode.deserializeUseTemplate(buffer, typeProvider);
case 33:
return AlignedSeriesScanNode.deserializeUseTemplate(buffer,
typeProvider);
+ case 34:
+ return AlignedSeriesAggregationScanNode.deserializeUseTemplate(buffer,
typeProvider);
case 65:
return SingleDeviceViewNode.deserializeUseTemplate(buffer,
typeProvider);
- case 32:
- return ProjectNode.deserializeUseTemplate(buffer, typeProvider);
default:
return deserialize(buffer, nodeType);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java
index b1f18c555a6..dcb29879365 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.queryengine.plan.planner.plan.node.process;
+import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
@@ -175,18 +176,21 @@ public class DeviceViewNode extends MultiChildProcessNode
{
public static DeviceViewNode deserialize(ByteBuffer byteBuffer) {
OrderByParameter mergeOrderParameter =
OrderByParameter.deserialize(byteBuffer);
+
int columnSize = ReadWriteIOUtils.readInt(byteBuffer);
List<String> outputColumnNames = new ArrayList<>();
while (columnSize > 0) {
outputColumnNames.add(ReadWriteIOUtils.readString(byteBuffer));
columnSize--;
}
+
int devicesSize = ReadWriteIOUtils.readInt(byteBuffer);
List<String> devices = new ArrayList<>();
while (devicesSize > 0) {
devices.add(ReadWriteIOUtils.readString(byteBuffer));
devicesSize--;
}
+
int mapSize = ReadWriteIOUtils.readInt(byteBuffer);
Map<String, List<Integer>> deviceToMeasurementIndexesMap = new
HashMap<>(mapSize);
while (mapSize > 0) {
@@ -205,6 +209,49 @@ public class DeviceViewNode extends MultiChildProcessNode {
planNodeId, mergeOrderParameter, outputColumnNames, devices,
deviceToMeasurementIndexesMap);
}
+ @Override
+ public void serializeUseTemplate(DataOutputStream stream, TypeProvider
typeProvider)
+ throws IOException {
+ PlanNodeType.DEVICE_VIEW.serialize(stream);
+ id.serialize(stream);
+ mergeOrderParameter.serializeAttributes(stream);
+ ReadWriteIOUtils.write(devices.size(), stream);
+ for (String deviceName : devices) {
+ ReadWriteIOUtils.write(deviceName, stream);
+ }
+
+ ReadWriteIOUtils.write(getChildren().size(), stream);
+ for (PlanNode planNode : getChildren()) {
+ planNode.serializeUseTemplate(stream, typeProvider);
+ }
+ }
+
+ public static DeviceViewNode deserializeUseTemplate(
+ ByteBuffer byteBuffer, TypeProvider typeProvider) {
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+ OrderByParameter mergeOrderParameter =
OrderByParameter.deserialize(byteBuffer);
+
+ int devicesSize = ReadWriteIOUtils.readInt(byteBuffer);
+ List<String> devices = new ArrayList<>(devicesSize);
+ while (devicesSize > 0) {
+ devices.add(ReadWriteIOUtils.readString(byteBuffer));
+ devicesSize--;
+ }
+
+ Map<String, List<Integer>> deviceToMeasurementIndexesMap = new
HashMap<>(devices.size());
+ for (String deviceName : devices) {
+ deviceToMeasurementIndexesMap.put(
+ deviceName,
typeProvider.getTemplatedInfo().getDeviceToMeasurementIndexes());
+ }
+
+ return new DeviceViewNode(
+ planNodeId,
+ mergeOrderParameter,
+ typeProvider.getTemplatedInfo().getDeviceViewOutputNames(),
+ devices,
+ deviceToMeasurementIndexesMap);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/FilterNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/FilterNode.java
index f57466c98a0..f121eec96f9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/FilterNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/FilterNode.java
@@ -37,15 +37,21 @@ public class FilterNode extends TransformNode {
private final Expression predicate;
+ // whether this node is generated by where clause
+ // fromWhere equals false, means it is generated by having clause
+ private final boolean fromWhere;
+
public FilterNode(
PlanNodeId id,
PlanNode childPlanNode,
Expression[] outputExpressions,
Expression predicate,
boolean keepNull,
- Ordering scanOrder) {
+ Ordering scanOrder,
+ boolean fromWhere) {
super(id, childPlanNode, outputExpressions, keepNull, scanOrder);
this.predicate = predicate;
+ this.fromWhere = fromWhere;
}
/** This construction method is only used in inner of class `FilterNode`. */
@@ -54,9 +60,11 @@ public class FilterNode extends TransformNode {
Expression[] outputExpressions,
Expression predicate,
boolean keepNull,
- Ordering scanOrder) {
+ Ordering scanOrder,
+ boolean fromWhere) {
super(id, outputExpressions, keepNull, scanOrder);
this.predicate = predicate;
+ this.fromWhere = fromWhere;
}
@Override
@@ -71,7 +79,8 @@ public class FilterNode extends TransformNode {
@Override
public PlanNode clone() {
- return new FilterNode(getPlanNodeId(), outputExpressions, predicate,
keepNull, scanOrder);
+ return new FilterNode(
+ getPlanNodeId(), outputExpressions, predicate, keepNull, scanOrder,
fromWhere);
}
@Override
@@ -108,7 +117,7 @@ public class FilterNode extends TransformNode {
boolean keepNull = ReadWriteIOUtils.readBool(byteBuffer);
Ordering scanOrder =
Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)];
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new FilterNode(planNodeId, outputExpressions, predicate, keepNull,
scanOrder);
+ return new FilterNode(planNodeId, outputExpressions, predicate, keepNull,
scanOrder, false);
}
@Override
@@ -116,6 +125,17 @@ public class FilterNode extends TransformNode {
throws IOException {
PlanNodeType.FILTER.serialize(stream);
id.serialize(stream);
+ ReadWriteIOUtils.write(fromWhere, stream);
+ // for FilterNode generated by having clause, it must be serialized totally
+ if (!fromWhere) {
+ ReadWriteIOUtils.write(outputExpressions.length, stream);
+ for (Expression expression : outputExpressions) {
+ Expression.serialize(expression, stream);
+ }
+ Expression.serialize(predicate, stream);
+ ReadWriteIOUtils.write(keepNull, stream);
+ ReadWriteIOUtils.write(scanOrder.ordinal(), stream);
+ }
ReadWriteIOUtils.write(getChildren().size(), stream);
for (PlanNode planNode : getChildren()) {
planNode.serializeUseTemplate(stream, typeProvider);
@@ -125,13 +145,26 @@ public class FilterNode extends TransformNode {
public static FilterNode deserializeUseTemplate(
ByteBuffer byteBuffer, TypeProvider typeProvider) {
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-
- return new FilterNode(
- planNodeId,
- null,
- typeProvider.getTemplatedInfo().getPredicate(),
- typeProvider.getTemplatedInfo().isKeepNull(),
- typeProvider.getTemplatedInfo().getScanOrder());
+ boolean fromWhere = ReadWriteIOUtils.readBool(byteBuffer);
+ if (!fromWhere) {
+ int outputExpressionsLength = ReadWriteIOUtils.readInt(byteBuffer);
+ Expression[] outputExpressions = new Expression[outputExpressionsLength];
+ for (int i = 0; i < outputExpressionsLength; ++i) {
+ outputExpressions[i] = Expression.deserialize(byteBuffer);
+ }
+ Expression predicate = Expression.deserialize(byteBuffer);
+ boolean keepNull = ReadWriteIOUtils.readBool(byteBuffer);
+ Ordering scanOrder =
Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)];
+ return new FilterNode(planNodeId, outputExpressions, predicate,
keepNull, scanOrder, false);
+ } else {
+ return new FilterNode(
+ planNodeId,
+ null,
+ typeProvider.getTemplatedInfo().getPredicate(),
+ typeProvider.getTemplatedInfo().isKeepNull(),
+ typeProvider.getTemplatedInfo().getScanOrder(),
+ true);
+ }
}
public Expression getPredicate() {
@@ -142,6 +175,10 @@ public class FilterNode extends TransformNode {
this.outputExpressions = outputExpressions;
}
+ public boolean isFromWhere() {
+ return fromWhere;
+ }
+
@Override
public String toString() {
return "FilterNode-" + this.getPlanNodeId();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/RawDataAggregationNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/RawDataAggregationNode.java
index 345d2977088..dafcbdb9409 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/RawDataAggregationNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/RawDataAggregationNode.java
@@ -124,6 +124,26 @@ public class RawDataAggregationNode extends
SingleChildProcessNode {
this.outputEndTime = outputEndTime;
}
+ // to avoid the repeated invoking of getDeduplicatedDescriptors method
+ public RawDataAggregationNode(
+ PlanNodeId id,
+ PlanNode child,
+ List<AggregationDescriptor> deduplicatedAggregationDescriptorList,
+ @Nullable GroupByTimeParameter groupByTimeParameter,
+ @Nullable GroupByParameter groupByParameter,
+ Expression groupByExpression,
+ boolean outputEndTime,
+ Ordering scanOrder,
+ boolean useDeduplicatedDescriptors) {
+ super(id, child);
+ this.aggregationDescriptorList = deduplicatedAggregationDescriptorList;
+ this.scanOrder = scanOrder;
+ this.groupByParameter = groupByParameter;
+ this.groupByTimeParameter = groupByTimeParameter;
+ this.groupByExpression = groupByExpression;
+ this.outputEndTime = outputEndTime;
+ }
+
public List<AggregationDescriptor> getAggregationDescriptorList() {
return aggregationDescriptorList;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/SingleDeviceViewNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/SingleDeviceViewNode.java
index c1b3898ed28..263223c5d5c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/SingleDeviceViewNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/SingleDeviceViewNode.java
@@ -190,7 +190,7 @@ public class SingleDeviceViewNode extends
SingleChildProcessNode {
return new SingleDeviceViewNode(
planNodeId,
cacheOutputColumnNames,
- typeProvider.getTemplatedInfo().getSelectMeasurements(),
+ typeProvider.getTemplatedInfo().getDeviceViewOutputNames(),
device,
typeProvider.getTemplatedInfo().getDeviceToMeasurementIndexes());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
index 2b69cdb0bc4..7bdbea3a577 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesAggregationScanNode.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathDeserializeUtil;
+import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -259,6 +260,41 @@ public class AlignedSeriesAggregationScanNode extends
SeriesAggregationSourceNod
null);
}
+ @Override
+ public void serializeUseTemplate(DataOutputStream stream, TypeProvider
typeProvider)
+ throws IOException {
+ PlanNodeType.ALIGNED_SERIES_AGGREGATE_SCAN.serialize(stream);
+ id.serialize(stream);
+ ReadWriteIOUtils.write(alignedPath.getNodes().length, stream);
+ for (String node : alignedPath.getNodes()) {
+ ReadWriteIOUtils.write(node, stream);
+ }
+ }
+
+ public static AlignedSeriesAggregationScanNode deserializeUseTemplate(
+ ByteBuffer byteBuffer, TypeProvider typeProvider) {
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+
+ int nodeSize = ReadWriteIOUtils.readInt(byteBuffer);
+ String[] nodes = new String[nodeSize];
+ for (int i = 0; i < nodeSize; i++) {
+ nodes[i] = ReadWriteIOUtils.readString(byteBuffer);
+ }
+ AlignedPath alignedPath = new AlignedPath(new PartialPath(nodes));
+
alignedPath.setMeasurementList(typeProvider.getTemplatedInfo().getMeasurementList());
+ alignedPath.addSchemas(typeProvider.getTemplatedInfo().getSchemaList());
+
+ return new AlignedSeriesAggregationScanNode(
+ planNodeId,
+ alignedPath,
+ typeProvider.getTemplatedInfo().aggregationDescriptorList,
+ typeProvider.getTemplatedInfo().getScanOrder(),
+ typeProvider.getTemplatedInfo().outputEndTime,
+ typeProvider.getTemplatedInfo().getPushDownPredicate(),
+ typeProvider.getTemplatedInfo().groupByTimeParameter,
+ null);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
index d266de50aaa..c7758007d36 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
@@ -265,7 +265,7 @@ public class AggregationDescriptor {
protected String getInputString(List<Expression> expressions) {
StringBuilder builder = new StringBuilder();
- if (!(expressions.size() == 0)) {
+ if (!(expressions.isEmpty())) {
builder.append(expressions.get(0).getExpressionString());
for (int i = 1; i < expressions.size(); ++i) {
builder.append(", ").append(expressions.get(i).getExpressionString());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/GroupByTimeParameter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/GroupByTimeParameter.java
index 9de54f85c0a..d4b979f91a8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/GroupByTimeParameter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/GroupByTimeParameter.java
@@ -162,4 +162,8 @@ public class GroupByTimeParameter {
public int hashCode() {
return Objects.hash(startTime, endTime, interval, slidingStep,
leftCRightO);
}
+
+ public GroupByTimeParameter clone() {
+ return new GroupByTimeParameter(startTime, endTime, interval, slidingStep,
leftCRightO);
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java
index 0ca53b7b84c..372b803d954 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java
@@ -365,7 +365,8 @@ public class TestPlanBuilder {
expressions.toArray(new Expression[0]),
predicate,
isGroupByTime,
- Ordering.ASC);
+ Ordering.ASC,
+ true);
return this;
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/logical/DataQueryLogicalPlannerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/logical/DataQueryLogicalPlannerTest.java
index adb25ea1a00..2a14c0cf3b2 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/logical/DataQueryLogicalPlannerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/logical/DataQueryLogicalPlannerTest.java
@@ -194,7 +194,8 @@ public class DataQueryLogicalPlannerTest {
new Expression[] {new
TimeSeriesOperand(schemaMap.get("root.sg.d2.s1"))},
predicate,
false,
- Ordering.DESC);
+ Ordering.DESC,
+ true);
OffsetNode offsetNode = new OffsetNode(queryId.genPlanNodeId(),
filterNode, 100);
LimitNode limitNode = new LimitNode(queryId.genPlanNodeId(), offsetNode,
100);
@@ -249,7 +250,8 @@ public class DataQueryLogicalPlannerTest {
},
predicate1,
false,
- Ordering.DESC);
+ Ordering.DESC,
+ true);
List<PlanNode> sourceNodeList2 = new ArrayList<>();
sourceNodeList2.add(
@@ -287,7 +289,8 @@ public class DataQueryLogicalPlannerTest {
},
predicate2,
false,
- Ordering.DESC);
+ Ordering.DESC,
+ true);
Map<String, List<Integer>> deviceToMeasurementIndexesMap = new HashMap<>();
deviceToMeasurementIndexesMap.put("root.sg.d1", Arrays.asList(1, 2, 3));
@@ -742,7 +745,8 @@ public class DataQueryLogicalPlannerTest {
},
predicate,
false,
- Ordering.DESC);
+ Ordering.DESC,
+ true);
RawDataAggregationNode aggregationNode =
new RawDataAggregationNode(
@@ -867,7 +871,8 @@ public class DataQueryLogicalPlannerTest {
},
predicate1,
false,
- Ordering.DESC);
+ Ordering.DESC,
+ true);
RawDataAggregationNode aggregationNode1 =
new RawDataAggregationNode(
@@ -922,7 +927,8 @@ public class DataQueryLogicalPlannerTest {
},
predicate2,
false,
- Ordering.DESC);
+ Ordering.DESC,
+ true);
RawDataAggregationNode aggregationNode2 =
new RawDataAggregationNode(
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/process/FilterNodeSerdeTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/process/FilterNodeSerdeTest.java
index ad5e28f27b3..998df88719b 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/process/FilterNodeSerdeTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/process/FilterNodeSerdeTest.java
@@ -52,7 +52,8 @@ public class FilterNodeSerdeTest {
new TimeSeriesOperand(new PartialPath("root.sg.d1.s1")),
new ConstantOperand(TSDataType.INT64, "100")),
false,
- Ordering.ASC);
+ Ordering.ASC,
+ true);
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
filterNode.serialize(byteBuffer);