This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/agg_template_alignbydevice in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2b799e7d3c9af45694b20bd534a5f10dc9402e63 Author: Beyyes <[email protected]> AuthorDate: Thu May 9 22:23:23 2024 +0800 add temp impl --- .../main/java/org/apache/iotdb/SessionExample.java | 29 +++- .../queryengine/plan/analyze/AnalyzeVisitor.java | 6 +- .../plan/analyze/TemplatedAggregationAnalyze.java | 122 +++++++++++++++ .../queryengine/plan/analyze/TemplatedAnalyze.java | 100 +++++++------ .../db/queryengine/plan/analyze/TemplatedInfo.java | 5 + .../plan/planner/LogicalPlanBuilder.java | 4 +- .../plan/planner/TemplatedLogicalPlan.java | 165 ++++++++++++++++++++- .../plan/planner/TemplatedLogicalPlanBuilder.java | 55 +++++++ 8 files changed, 430 insertions(+), 56 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index 0daec0885bf..e65127fd388 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -61,12 +61,39 @@ public class SessionExample { private static final String ROOT_SG1_D1 = "root.sg1.d1"; private static final String ROOT_SG1 = "root.sg1"; private static final String LOCAL_HOST = "127.0.0.1"; - public static final String SELECT_D1 = "select * from root.sg1.d1"; + public static final String SELECT_D1 = + "select value from root.cmadaas_nafp_surf.nafp.NAFP_GRAPES_MESO_FOR_3KM.data.RHU.`100`.`100000`.tile limit 10"; private static Random random = new Random(); public static void main(String[] args) throws IoTDBConnectionException, StatementExecutionException { + session = + new Session.Builder() + .host("172.20.31.60") + .port(6667) + .username("root") + .password("root") + .version(Version.V_1_0) + .build(); + session.open(false); + + // set session fetchSize + session.setFetchSize(10000); + + long time1 = System.currentTimeMillis(); + try (SessionDataSet dataSet = session.executeQueryStatement(SELECT_D1)) { + // System.out.println(dataSet.getColumnNames()); + // dataSet.setFetchSize(1024); // default is 10000 + // while (dataSet.hasNext()) { + // System.out.println(dataSet.next()); + // } + } + System.out.println("Time: " + (System.currentTimeMillis() - time1)); + } + + public static void main1(String[] args) + throws IoTDBConnectionException, StatementExecutionException { session = new Session.Builder() .host(LOCAL_HOST) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java index b23a5d44f9c..f0086b5e79a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java @@ -1848,7 +1848,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> && rightExpression instanceof ConstantOperand)) { throw new SemanticException( String.format( - "Please check the keep condition ([%s]),it need to be a constant or a compare expression constructed by 'keep' and a long number.", + "Please check the keep condition ([%s]), " + + "it need to be a constant or a compare expression constructed by 'keep' and a long number.", keepExpression.getExpressionString())); } return; @@ -1856,7 +1857,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> if (!(keepExpression instanceof ConstantOperand)) { throw new SemanticException( String.format( - "Please check the keep condition ([%s]),it need to be a constant or a compare expression constructed by 'keep' and a long number.", + "Please check the keep condition ([%s]), " + + "it need to be a constant or a compare expression constructed by 'keep' and a long number.", keepExpression.getExpressionString())); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAggregationAnalyze.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAggregationAnalyze.java new file mode 100644 index 00000000000..ae2afa9c8f5 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAggregationAnalyze.java @@ -0,0 +1,122 @@ +package org.apache.iotdb.db.queryengine.plan.analyze; + +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; +import org.apache.iotdb.db.queryengine.plan.expression.Expression; +import org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand; +import org.apache.iotdb.db.queryengine.plan.statement.component.ResultColumn; +import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; +import org.apache.iotdb.db.schemaengine.template.Template; + +import org.apache.tsfile.utils.Pair; + +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; + +import static org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.DEVICE_EXPRESSION; +import static org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.END_TIME_EXPRESSION; +import static org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.analyzeOutput; +import static org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeDataPartition; +import static org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeDeviceToWhere; +import static org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeDeviceViewInput; +import static org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeDeviceViewOutput; +import static org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeFrom; +import static org.apache.iotdb.db.queryengine.plan.optimization.LimitOffsetPushDown.canPushDownLimitOffsetInGroupByTimeForDevice; +import static org.apache.iotdb.db.queryengine.plan.optimization.LimitOffsetPushDown.pushDownLimitOffsetInGroupByTimeForDevice; + +public class TemplatedAggregationAnalyze { + + // ----------- Methods below are used for aggregation, templated with align by device -------- + + static boolean analyzeAggregation( + Analysis analysis, + QueryStatement queryStatement, + IPartitionFetcher partitionFetcher, + ISchemaTree schemaTree, + MPPQueryContext context, + Template template) { + + List<PartialPath> deviceList = analyzeFrom(queryStatement, schemaTree); + + if (canPushDownLimitOffsetInGroupByTimeForDevice(queryStatement)) { + // remove the device which won't appear in resultSet after limit/offset + deviceList = pushDownLimitOffsetInGroupByTimeForDevice(deviceList, queryStatement); + } + + analyzeDeviceToWhere(analysis, queryStatement); + if (deviceList.isEmpty()) { + analysis.setFinishQueryAfterAnalyze(true); + return true; + } + analysis.setDeviceList(deviceList); + + List<Pair<Expression, String>> outputExpressions = new ArrayList<>(); + ColumnPaginationController paginationController = + new ColumnPaginationController( + queryStatement.getSeriesLimit(), queryStatement.getSeriesOffset()); + for (ResultColumn resultColumn : queryStatement.getSelectComponent().getResultColumns()) {} + + analyzeSelect(queryStatement, analysis, outputExpressions, template); + if (analysis.getWhereExpression() != null + && analysis.getWhereExpression().equals(ConstantOperand.FALSE)) { + analyzeOutput(analysis, queryStatement, outputExpressions); + analysis.setFinishQueryAfterAnalyze(true); + return true; + } + + analyzeDeviceToAggregation(analysis); + analyzeDeviceToSourceTransform(analysis); + analyzeDeviceToSource(analysis); + + analyzeDeviceViewOutput(analysis, queryStatement); + analyzeDeviceViewInput(analysis); + + // generate result set header according to output expressions + analyzeOutput(analysis, queryStatement, outputExpressions); + + context.generateGlobalTimeFilter(analysis); + // fetch partition information + analyzeDataPartition(analysis, schemaTree, partitionFetcher, context.getGlobalTimeFilter()); + return true; + } + + private static void analyzeSelect( + QueryStatement queryStatement, + Analysis analysis, + List<Pair<Expression, String>> outputExpressions, + Template template) { + LinkedHashSet<Expression> selectExpressions = new LinkedHashSet<>(); + selectExpressions.add(DEVICE_EXPRESSION); + if (queryStatement.isOutputEndTime()) { + selectExpressions.add(END_TIME_EXPRESSION); + } + for (Pair<Expression, String> pair : outputExpressions) { + Expression selectExpression = pair.left; + selectExpressions.add(selectExpression); + } + analysis.setOutputExpressions(outputExpressions); + analysis.setSelectExpressions(selectExpressions); + analysis.setDeviceTemplate(template); + // TODO only add measurement and schema occured in selectExpressions + analysis.setMeasurementList(new ArrayList<>(template.getSchemaMap().keySet())); + analysis.setMeasurementSchemaList(new ArrayList<>(template.getSchemaMap().values())); + } + + private static void analyzeDeviceToSourceTransform(Analysis analysis) { + // TODO add having into SourceTransform + analysis.setDeviceToSourceTransformExpressions(analysis.getDeviceToSelectExpressions()); + } + + private static void analyzeDeviceToSource(Analysis analysis) { + // TODO add having into Source + analysis.setDeviceToSourceExpressions(analysis.getDeviceToSelectExpressions()); + analysis.setDeviceToOutputExpressions(analysis.getDeviceToSelectExpressions()); + } + + private static void analyzeDeviceToAggregation(Analysis analysis) { + // TODO need add having clause? + analysis.setDeviceToAggregationExpressions(analysis.getDeviceToSelectExpressions()); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java index 856476eae24..1814a923d24 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java @@ -71,6 +71,7 @@ import static org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.getTim import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.concatDeviceAndBindSchemaForExpression; import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.getMeasurementExpression; import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionTypeAnalyzer.analyzeExpressionForTemplatedQuery; +import static org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAggregationAnalyze.analyzeAggregation; /** * This class provides accelerated implementation for multiple devices align by device query. This @@ -96,9 +97,8 @@ public class TemplatedAnalyze { IPartitionFetcher partitionFetcher, ISchemaTree schemaTree, MPPQueryContext context) { - if (queryStatement.isAggregationQuery() - || queryStatement.isGroupBy() - || queryStatement.isGroupByTime() + if (queryStatement.isGroupBy() + || (queryStatement.isGroupByTime() && !queryStatement.isAggregationQuery()) || queryStatement.isSelectInto() || queryStatement.hasFill() || schemaTree.hasNormalTimeSeries()) { @@ -106,58 +106,61 @@ public class TemplatedAnalyze { } List<Template> templates = schemaTree.getUsingTemplates(); - if (templates.size() != 1) { + if (templates.size() != 1 || templates.get(0) == null) { return false; } Template template = templates.get(0); + if (queryStatement.isAggregationQuery()) { + return analyzeAggregation( + analysis, queryStatement, partitionFetcher, schemaTree, context, template); + } + List<Pair<Expression, String>> outputExpressions = new ArrayList<>(); ColumnPaginationController paginationController = new ColumnPaginationController( queryStatement.getSeriesLimit(), queryStatement.getSeriesOffset()); - if (template != null) { - for (ResultColumn resultColumn : queryStatement.getSelectComponent().getResultColumns()) { - Expression expression = resultColumn.getExpression(); - if ("*".equals(expression.getOutputSymbol())) { - for (Map.Entry<String, IMeasurementSchema> entry : template.getSchemaMap().entrySet()) { - if (paginationController.hasCurOffset()) { - paginationController.consumeOffset(); - } else if (paginationController.hasCurLimit()) { - String measurementName = entry.getKey(); - IMeasurementSchema measurementSchema = entry.getValue(); - TimeSeriesOperand measurementPath = - new TimeSeriesOperand( - new MeasurementPath(new String[] {measurementName}, measurementSchema)); - outputExpressions.add(new Pair<>(measurementPath, null)); - paginationController.consumeLimit(); - } else { - break; - } - } - if (queryStatement.getSelectComponent().getResultColumns().size() == 1 - && queryStatement.getSeriesOffset() == 0 - && queryStatement.getSeriesLimit() == 0) { - analysis.setTemplateWildCardQuery(); + for (ResultColumn resultColumn : queryStatement.getSelectComponent().getResultColumns()) { + Expression expression = resultColumn.getExpression(); + if ("*".equals(expression.getOutputSymbol())) { + for (Map.Entry<String, IMeasurementSchema> entry : template.getSchemaMap().entrySet()) { + if (paginationController.hasCurOffset()) { + paginationController.consumeOffset(); + } else if (paginationController.hasCurLimit()) { + String measurementName = entry.getKey(); + IMeasurementSchema measurementSchema = entry.getValue(); + TimeSeriesOperand measurementPath = + new TimeSeriesOperand( + new MeasurementPath(new String[] {measurementName}, measurementSchema)); + outputExpressions.add(new Pair<>(measurementPath, null)); + paginationController.consumeLimit(); + } else { + break; } - } else if (expression instanceof TimeSeriesOperand) { - String measurementName = ((TimeSeriesOperand) expression).getPath().getMeasurement(); - if (template.getSchemaMap().containsKey(measurementName)) { - if (paginationController.hasCurOffset()) { - paginationController.consumeOffset(); - } else if (paginationController.hasCurLimit()) { - IMeasurementSchema measurementSchema = template.getSchemaMap().get(measurementName); - TimeSeriesOperand measurementPath = - new TimeSeriesOperand( - new MeasurementPath(new String[] {measurementName}, measurementSchema)); - outputExpressions.add(new Pair<>(measurementPath, resultColumn.getAlias())); - } else { - break; - } + } + if (queryStatement.getSelectComponent().getResultColumns().size() == 1 + && queryStatement.getSeriesOffset() == 0 + && queryStatement.getSeriesLimit() == 0) { + analysis.setTemplateWildCardQuery(); + } + } else if (expression instanceof TimeSeriesOperand) { + String measurementName = ((TimeSeriesOperand) expression).getPath().getMeasurement(); + if (template.getSchemaMap().containsKey(measurementName)) { + if (paginationController.hasCurOffset()) { + paginationController.consumeOffset(); + } else if (paginationController.hasCurLimit()) { + IMeasurementSchema measurementSchema = template.getSchemaMap().get(measurementName); + TimeSeriesOperand measurementPath = + new TimeSeriesOperand( + new MeasurementPath(new String[] {measurementName}, measurementSchema)); + outputExpressions.add(new Pair<>(measurementPath, resultColumn.getAlias())); + } else { + break; } - } else { - return false; } + } else { + return false; } } @@ -228,8 +231,7 @@ public class TemplatedAnalyze { analysis.setMeasurementSchemaList(measurementSchemaList); } - private static List<PartialPath> analyzeFrom( - QueryStatement queryStatement, ISchemaTree schemaTree) { + static List<PartialPath> analyzeFrom(QueryStatement queryStatement, ISchemaTree schemaTree) { // device path patterns in FROM clause List<PartialPath> devicePatternList = queryStatement.getFromComponent().getPrefixPaths(); @@ -246,7 +248,7 @@ public class TemplatedAnalyze { : deviceSet.stream().sorted(Comparator.reverseOrder()).collect(Collectors.toList()); } - private static void analyzeDeviceToWhere(Analysis analysis, QueryStatement queryStatement) { + static void analyzeDeviceToWhere(Analysis analysis, QueryStatement queryStatement) { if (!queryStatement.hasWhere()) { return; } @@ -325,7 +327,7 @@ public class TemplatedAnalyze { analysis.setDeviceToSourceTransformExpressions(analysis.getDeviceToSelectExpressions()); } - private static void analyzeDeviceViewOutput(Analysis analysis, QueryStatement queryStatement) { + static void analyzeDeviceViewOutput(Analysis analysis, QueryStatement queryStatement) { Set<Expression> selectExpressions = analysis.getSelectExpressions(); // TODO if no order by, just set deviceViewOutputExpressions as selectExpressions Set<Expression> deviceViewOutputExpressions = new LinkedHashSet<>(selectExpressions); @@ -337,7 +339,7 @@ public class TemplatedAnalyze { analyzeDeviceViewSpecialProcess(deviceViewOutputExpressions, queryStatement, analysis)); } - private static void analyzeDeviceViewInput(Analysis analysis) { + static void analyzeDeviceViewInput(Analysis analysis) { List<Integer> indexes = new ArrayList<>(); // index-0 is `Device` @@ -356,7 +358,7 @@ public class TemplatedAnalyze { analysis.setDeviceToOutputExpressions(analysis.getDeviceToSelectExpressions()); } - private static void analyzeDataPartition( + static void analyzeDataPartition( Analysis analysis, ISchemaTree schemaTree, IPartitionFetcher partitionFetcher, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedInfo.java index bd624f25634..bf5d54e5cd6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedInfo.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedInfo.java @@ -39,6 +39,7 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import static org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand.TIMESTAMP_EXPRESSION_STRING; @@ -77,6 +78,10 @@ public class TemplatedInfo { // variables related to predicate push down private Expression pushDownPredicate; + private Set<Expression> aggSelectExpressions; + + private Expression havingExpression; + public TemplatedInfo( List<String> measurementList, List<IMeasurementSchema> schemaList, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java index d457c432cf0..7ce23590e23 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java @@ -743,7 +743,7 @@ public class LogicalPlanBuilder { return this; } - private PlanNode createSlidingWindowAggregationNode( + protected PlanNode createSlidingWindowAggregationNode( PlanNode child, Set<Expression> aggregationExpressions, GroupByTimeParameter groupByTimeParameter, @@ -848,7 +848,7 @@ public class LogicalPlanBuilder { .collect(Collectors.toList())); } - private List<AggregationDescriptor> constructAggregationDescriptorList( + protected List<AggregationDescriptor> constructAggregationDescriptorList( Set<Expression> aggregationExpressions, AggregationStep curStep) { return aggregationExpressions.stream() .map( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java index 75222efb6fc..7bb9ebd579c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java @@ -26,6 +26,7 @@ import org.apache.iotdb.db.queryengine.plan.analyze.TemplatedInfo; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; @@ -86,10 +87,67 @@ public class TemplatedLogicalPlan { this.whereExpression = analysis.getWhereExpression(); // for align by device query with template, most used variables are same - initCommonVariables(); + if (queryStatement.isAggregationQuery()) { + initAggQueryCommonVariables(); + } else { + initNonAggQueryCommonVariables(); + } + } + + private void initAggQueryCommonVariables() { + if (whereExpression != null) { + newMeasurementList = new ArrayList<>(measurementList); + newSchemaList = new ArrayList<>(schemaList); + Set<String> selectMeasurements = new HashSet<>(measurementList); + List<Expression> whereSourceExpressions = searchSourceExpressions(whereExpression); + for (Expression expression : whereSourceExpressions) { + if (expression instanceof TimeSeriesOperand) { + String measurement = ((TimeSeriesOperand) expression).getPath().getMeasurement(); + if (!analysis.getDeviceTemplate().getSchemaMap().containsKey(measurement)) { + continue; + } + if (!selectMeasurements.contains(measurement)) { + newMeasurementList.add(measurement); + newSchemaList.add(analysis.getDeviceTemplate().getSchema(measurement)); + } + } + } + + // TODO fix aggregation filterLayoutMap + filterLayoutMap = makeLayout(newMeasurementList); + + analysis + .getExpressionTypes() + .forEach( + (key, value) -> + context.getTypeProvider().setType(key.getNode().getOutputSymbol(), value)); + } + + context + .getTypeProvider() + .setTemplatedInfo( + new TemplatedInfo( + newMeasurementList, + newSchemaList, + newSchemaList.stream() + .map(IMeasurementSchema::getType) + .collect(Collectors.toList()), + queryStatement.getResultTimeOrder(), + analysis.isLastLevelUseWildcard(), + analysis.getDeviceViewOutputExpressions().stream() + .map(Expression::getExpressionString) + .collect(Collectors.toList()), + analysis.getDeviceViewInputIndexesMap().values().iterator().next(), + OFFSET_VALUE, + limitValue, + whereExpression, + queryStatement.isGroupByTime(), + analysis.getDeviceTemplate().getSchemaMap(), + filterLayoutMap, + null)); } - private void initCommonVariables() { + private void initNonAggQueryCommonVariables() { if (whereExpression != null) { if (!analysis.isTemplateWildCardQuery()) { newMeasurementList = new ArrayList<>(measurementList); @@ -145,6 +203,10 @@ public class TemplatedLogicalPlan { } public PlanNode visitQuery() { + if (queryStatement.isAggregationQuery()) { + return visitAggregation(); + } + LogicalPlanBuilder planBuilder = new TemplatedLogicalPlanBuilder(analysis, context, measurementList, schemaList); @@ -193,6 +255,56 @@ public class TemplatedLogicalPlan { return planBuilder.getRoot(); } + private PlanNode visitAggregation() { + LogicalPlanBuilder planBuilder = + new TemplatedLogicalPlanBuilder(analysis, context, measurementList, schemaList); + + Map<String, PlanNode> deviceToSubPlanMap = new LinkedHashMap<>(); + for (PartialPath devicePath : analysis.getDeviceList()) { + String deviceName = devicePath.getFullPath(); + PlanNode rootNode = visitDeviceAggregationBody(devicePath); + + LogicalPlanBuilder subPlanBuilder = + new TemplatedLogicalPlanBuilder(analysis, context, measurementList, schemaList) + .withNewRoot(rootNode); + + deviceToSubPlanMap.put(deviceName, subPlanBuilder.getRoot()); + } + + // convert to ALIGN BY DEVICE view + planBuilder = + planBuilder.planDeviceView( + deviceToSubPlanMap, + analysis.getDeviceViewOutputExpressions(), + analysis.getDeviceViewInputIndexesMap(), + analysis.getSelectExpressions(), + queryStatement, + analysis); + + planBuilder = + planBuilder.planHavingAndTransform( + analysis.getHavingExpression(), + analysis.getSelectExpressions(), + analysis.getOrderByExpressions(), + queryStatement.isGroupByTime(), + queryStatement.getResultTimeOrder()); + + if (!queryStatement.needPushDownSort()) { + planBuilder = planBuilder.planOrderBy(queryStatement, analysis); + } + + planBuilder = + planBuilder + .planFill(analysis.getFillDescriptor(), queryStatement.getResultTimeOrder()) + .planOffset(queryStatement.getRowOffset()); + + if (!analysis.isUseTopKNode() || queryStatement.hasOffset()) { + planBuilder = planBuilder.planLimit(queryStatement.getRowLimit()); + } + + return planBuilder.getRoot(); + } + public PlanNode visitQueryBody(PartialPath devicePath) { TemplatedLogicalPlanBuilder planBuilder = @@ -213,4 +325,53 @@ public class TemplatedLogicalPlan { return planBuilder.getRoot(); } + + private PlanNode visitDeviceAggregationBody(PartialPath devicePath) { + TemplatedLogicalPlanBuilder planBuilder = + new TemplatedLogicalPlanBuilder(analysis, context, newMeasurementList, newSchemaList); + + planBuilder = + planBuilder + .planRawDataSource( + devicePath, + queryStatement.getResultTimeOrder(), + OFFSET_VALUE, + limitValue, + analysis.isLastLevelUseWildcard()) + .planFilter( + whereExpression, + queryStatement.isGroupByTime(), + queryStatement.getResultTimeOrder()); + + boolean outputPartial = + queryStatement.isGroupByLevel() + || queryStatement.isGroupByTag() + || (queryStatement.isGroupByTime() && analysis.getGroupByTimeParameter().hasOverlap()); + AggregationStep curStep = outputPartial ? AggregationStep.PARTIAL : AggregationStep.SINGLE; + planBuilder = + planBuilder.planRawDataAggregation( + analysis.getSelectExpressions(), + null, + analysis.getGroupByTimeParameter(), + analysis.getGroupByParameter(), + queryStatement.isOutputEndTime(), + curStep, + queryStatement.getResultTimeOrder()); + + if (queryStatement.isGroupByTime() && analysis.getGroupByTimeParameter().hasOverlap()) { + curStep = + (queryStatement.isGroupByLevel() || queryStatement.isGroupByTag()) + ? AggregationStep.INTERMEDIATE + : AggregationStep.FINAL; + planBuilder = + planBuilder.planSlidingWindowAggregation( + analysis.getSelectExpressions(), + analysis.getGroupByTimeParameter(), + curStep, + queryStatement.getResultTimeOrder()); + } + + // no group by level and group by tag + return planBuilder.getRoot(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlanBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlanBuilder.java index fb6b2ea04c8..4942c76e308 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlanBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlanBuilder.java @@ -27,14 +27,20 @@ import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByParameter; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.tsfile.write.schema.IMeasurementSchema; import java.util.ArrayList; import java.util.List; +import java.util.Set; /** * This class provides accelerated implementation for multiple devices align by device query. This @@ -126,6 +132,55 @@ public class TemplatedLogicalPlanBuilder extends LogicalPlanBuilder { return this; } + public TemplatedLogicalPlanBuilder planRawDataAggregation( + Set<Expression> aggregationExpressions, + Expression groupByExpression, + GroupByTimeParameter groupByTimeParameter, + GroupByParameter groupByParameter, + boolean outputEndTime, + AggregationStep curStep, + Ordering scanOrder) { + if (aggregationExpressions == null) { + return this; + } + + List<AggregationDescriptor> aggregationDescriptorList = + constructAggregationDescriptorList(aggregationExpressions, curStep); + updateTypeProvider(aggregationExpressions); + if (curStep.isOutputPartial()) { + aggregationDescriptorList.forEach( + aggregationDescriptor -> + updateTypeProviderByPartialAggregation( + aggregationDescriptor, context.getTypeProvider())); + } + this.root = + new RawDataAggregationNode( + context.getQueryId().genPlanNodeId(), + this.getRoot(), + aggregationDescriptorList, + groupByTimeParameter, + groupByParameter, + groupByExpression, + outputEndTime, + scanOrder); + return this; + } + + public TemplatedLogicalPlanBuilder planSlidingWindowAggregation( + Set<Expression> aggregationExpressions, + GroupByTimeParameter groupByTimeParameter, + AggregationStep curStep, + Ordering scanOrder) { + if (aggregationExpressions == null) { + return this; + } + + this.root = + createSlidingWindowAggregationNode( + this.getRoot(), aggregationExpressions, groupByTimeParameter, curStep, scanOrder); + return this; + } + @Override public TemplatedLogicalPlanBuilder withNewRoot(PlanNode newRoot) { this.root = newRoot;
