This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/RefactorAnalyzer in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b880760c6466d7b6b6b05c97f6fe9c5bb782ff54 Author: Minghui Liu <[email protected]> AuthorDate: Mon Sep 26 16:31:33 2022 +0800 refactor LogicalPlanner --- .../db/mpp/plan/planner/LogicalPlanBuilder.java | 132 ++++------- .../db/mpp/plan/planner/LogicalPlanVisitor.java | 252 ++++++--------------- 2 files changed, 119 insertions(+), 265 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java index ba0ff63e5d..997386a4d1 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java @@ -97,7 +97,6 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import static com.google.common.base.Preconditions.checkArgument; import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD; import static org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant.COLUMN_DEVICE; @@ -200,18 +199,16 @@ public class LogicalPlanBuilder { } public LogicalPlanBuilder planAggregationSource( - Set<Expression> sourceExpressions, AggregationStep curStep, Ordering scanOrder, Filter timeFilter, GroupByTimeParameter groupByTimeParameter, Set<Expression> aggregationExpressions, - Set<Expression> aggregationTransformExpressions, Map<Expression, Set<Expression>> groupByLevelExpressions) { boolean needCheckAscending = groupByTimeParameter == null; Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations = new HashMap<>(); Map<PartialPath, List<AggregationDescriptor>> descendingAggregations = new HashMap<>(); - for (Expression sourceExpression : sourceExpressions) { + for (Expression sourceExpression : aggregationExpressions) { createAggregationDescriptor( (FunctionExpression) sourceExpression, curStep, @@ -228,73 +225,7 @@ public class LogicalPlanBuilder { scanOrder, timeFilter, groupByTimeParameter); - updateTypeProvider(sourceExpressions); - updateTypeProvider(aggregationTransformExpressions); - - return convergeAggregationSource( - sourceNodeList, - curStep, - scanOrder, - groupByTimeParameter, - aggregationExpressions, - groupByLevelExpressions); - } - - public LogicalPlanBuilder planAggregationSourceWithIndexAdjust( - Set<Expression> sourceExpressions, - AggregationStep curStep, - Ordering scanOrder, - Filter timeFilter, - GroupByTimeParameter groupByTimeParameter, - List<Integer> measurementIndexes, - Set<Expression> aggregationExpressions, - Set<Expression> aggregationTransformExpressions, - Map<Expression, Set<Expression>> groupByLevelExpressions) { - checkArgument( - sourceExpressions.size() == measurementIndexes.size(), - "Each aggregate should correspond to a column of output."); - - boolean needCheckAscending = groupByTimeParameter == null; - Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations = new HashMap<>(); - Map<PartialPath, List<AggregationDescriptor>> descendingAggregations = new HashMap<>(); - Map<AggregationDescriptor, Integer> aggregationToMeasurementIndexMap = new HashMap<>(); - - int index = 0; - for (Expression sourceExpression : sourceExpressions) { - AggregationDescriptor aggregationDescriptor = - createAggregationDescriptor( - (FunctionExpression) sourceExpression, - curStep, - scanOrder, - needCheckAscending, - ascendingAggregations, - descendingAggregations); - aggregationToMeasurementIndexMap.put(aggregationDescriptor, measurementIndexes.get(index)); - index++; - } - - List<PlanNode> sourceNodeList = - constructSourceNodeFromAggregationDescriptors( - ascendingAggregations, - descendingAggregations, - scanOrder, - timeFilter, - groupByTimeParameter); - updateTypeProvider(sourceExpressions); - updateTypeProvider(aggregationTransformExpressions); - - if (!curStep.isOutputPartial()) { - // update measurementIndexes - measurementIndexes.clear(); - measurementIndexes.addAll( - sourceNodeList.stream() - .map( - planNode -> - ((SeriesAggregationSourceNode) planNode).getAggregationDescriptorList()) - .flatMap(List::stream) - .map(aggregationToMeasurementIndexMap::get) - .collect(Collectors.toList())); - } + updateTypeProvider(aggregationExpressions); return convergeAggregationSource( sourceNodeList, @@ -305,7 +236,7 @@ public class LogicalPlanBuilder { groupByLevelExpressions); } - private AggregationDescriptor createAggregationDescriptor( + private void createAggregationDescriptor( FunctionExpression sourceExpression, AggregationStep curStep, Ordering scanOrder, @@ -331,7 +262,6 @@ public class LogicalPlanBuilder { .computeIfAbsent(selectPath, key -> new ArrayList<>()) .add(aggregationDescriptor); } - return aggregationDescriptor; } private List<PlanNode> constructSourceNodeFromAggregationDescriptors( @@ -458,11 +388,13 @@ public class LogicalPlanBuilder { public LogicalPlanBuilder planDeviceView( Map<String, PlanNode> deviceNameToSourceNodesMap, - Set<Expression> deviceViewOutput, + Set<Expression> deviceViewOutputExpressions, Map<String, List<Integer>> deviceToMeasurementIndexesMap, Ordering mergeOrder) { List<String> outputColumnNames = - deviceViewOutput.stream().map(Expression::getExpressionString).collect(Collectors.toList()); + deviceViewOutputExpressions.stream() + .map(Expression::getExpressionString) + .collect(Collectors.toList()); DeviceViewNode deviceViewNode = new DeviceViewNode( context.getQueryId().genPlanNodeId(), @@ -480,7 +412,7 @@ public class LogicalPlanBuilder { } context.getTypeProvider().setType(COLUMN_DEVICE, TSDataType.TEXT); - updateTypeProvider(deviceViewOutput); + updateTypeProvider(deviceViewOutputExpressions); this.root = deviceViewNode; return this; @@ -488,7 +420,6 @@ public class LogicalPlanBuilder { public LogicalPlanBuilder planGroupByLevel( Map<Expression, Set<Expression>> groupByLevelExpressions, - AggregationStep curStep, GroupByTimeParameter groupByTimeParameter, Ordering scanOrder) { if (groupByLevelExpressions == null) { @@ -499,7 +430,7 @@ public class LogicalPlanBuilder { createGroupByTLevelNode( Collections.singletonList(this.getRoot()), groupByLevelExpressions, - curStep, + AggregationStep.FINAL, groupByTimeParameter, scanOrder); return this; @@ -641,12 +572,12 @@ public class LogicalPlanBuilder { } public LogicalPlanBuilder planFilterAndTransform( - Expression queryFilter, + Expression filterExpression, Set<Expression> selectExpressions, boolean isGroupByTime, ZoneId zoneId, Ordering scanOrder) { - if (queryFilter == null || selectExpressions.isEmpty()) { + if (filterExpression == null || selectExpressions.isEmpty()) { return this; } @@ -655,7 +586,7 @@ public class LogicalPlanBuilder { context.getQueryId().genPlanNodeId(), this.getRoot(), selectExpressions.toArray(new Expression[0]), - queryFilter, + filterExpression, isGroupByTime, zoneId, scanOrder); @@ -664,12 +595,9 @@ public class LogicalPlanBuilder { } public LogicalPlanBuilder planTransform( - Set<Expression> transformExpressions, - boolean isGroupByTime, - ZoneId zoneId, - Ordering scanOrder) { + Set<Expression> selectExpressions, boolean isGroupByTime, ZoneId zoneId, Ordering scanOrder) { boolean needTransform = false; - for (Expression expression : transformExpressions) { + for (Expression expression : selectExpressions) { if (ExpressionAnalyzer.checkIsNeedTransform(expression)) { needTransform = true; break; @@ -683,11 +611,11 @@ public class LogicalPlanBuilder { new TransformNode( context.getQueryId().genPlanNodeId(), this.getRoot(), - transformExpressions.toArray(new Expression[0]), + selectExpressions.toArray(new Expression[0]), isGroupByTime, zoneId, scanOrder); - updateTypeProvider(transformExpressions); + updateTypeProvider(selectExpressions); return this; } @@ -720,6 +648,34 @@ public class LogicalPlanBuilder { return this; } + public LogicalPlanBuilder planHaving( + Expression havingExpression, + Set<Expression> selectExpressions, + boolean isGroupByTime, + ZoneId zoneId, + Ordering scanOrder) { + if (havingExpression != null) { + return planFilterAndTransform( + havingExpression, selectExpressions, isGroupByTime, zoneId, scanOrder); + } else { + return planTransform(selectExpressions, isGroupByTime, zoneId, scanOrder); + } + } + + public LogicalPlanBuilder planWhereAndSourceTransform( + Expression whereExpression, + Set<Expression> sourceTransformExpressions, + boolean isGroupByTime, + ZoneId zoneId, + Ordering scanOrder) { + if (whereExpression != null) { + return planFilterAndTransform( + whereExpression, sourceTransformExpressions, isGroupByTime, zoneId, scanOrder); + } else { + return planTransform(sourceTransformExpressions, isGroupByTime, zoneId, scanOrder); + } + } + /** Meta Query* */ public LogicalPlanBuilder planTimeSeriesSchemaSource( PartialPath pathPattern, diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java index d3e99cd82f..4e2f0af2d3 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java @@ -67,7 +67,6 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplate import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowPathsUsingTemplateStatement; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -112,18 +111,12 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte subPlanBuilder.withNewRoot( visitQueryBody( queryStatement, - analysis.getDeviceToIsRawDataSource().get(deviceName), analysis.getDeviceToSourceExpressions().get(deviceName), - analysis.getDeviceToAggregationExpressions().get(deviceName), analysis.getDeviceToSourceTransformExpressions().get(deviceName), - analysis.hasValueFilter() - ? analysis.getDeviceToSourceExpressions().get(deviceName) - : Collections.emptySet(), analysis.getDeviceToWhereExpression() != null ? analysis.getDeviceToWhereExpression().get(deviceName) : null, - null, - analysis.getDeviceToMeasurementIndexesMap().get(deviceName), + analysis.getDeviceToAggregationExpressions().get(deviceName), context)); deviceToSubPlanMap.put(deviceName, subPlanBuilder.getRoot()); } @@ -131,42 +124,34 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte planBuilder = planBuilder.planDeviceView( deviceToSubPlanMap, - analysis.getTransformInput(), + analysis.getDeviceViewOutputExpressions(), analysis.getDeviceToMeasurementIndexesMap(), queryStatement.getResultTimeOrder()); - if (queryStatement.hasHaving()) { - planBuilder.planFilterAndTransform( - analysis.getHavingExpression(), - analysis.getTransformOutput(), - queryStatement.isGroupByTime(), - queryStatement.getSelectComponent().getZoneId(), - queryStatement.getResultTimeOrder()); - } else { - planBuilder.planTransform( - analysis.getTransformOutput(), - queryStatement.isGroupByTime(), - queryStatement.getSelectComponent().getZoneId(), - queryStatement.getResultTimeOrder()); - } } else { planBuilder = planBuilder.withNewRoot( visitQueryBody( queryStatement, - analysis.isRawDataSource(), analysis.getSourceExpressions(), - analysis.getAggregationExpressions(), analysis.getSourceTransformExpressions(), - analysis.getSelectExpressions(), analysis.getWhereExpression(), - analysis.getHavingExpression(), - null, + analysis.getAggregationExpressions(), context)); } - // other common upstream node + // other upstream node planBuilder = planBuilder + .planGroupByLevel( + analysis.getGroupByLevelExpressions(), + analysis.getGroupByTimeParameter(), + queryStatement.getResultTimeOrder()) + .planHaving( + analysis.getHavingExpression(), + analysis.getSelectExpressions(), + queryStatement.isGroupByTime(), + queryStatement.getSelectComponent().getZoneId(), + queryStatement.getResultTimeOrder()) .planFill(analysis.getFillDescriptor(), queryStatement.getResultTimeOrder()) .planOffset(queryStatement.getRowOffset()) .planLimit(queryStatement.getRowLimit()); @@ -176,48 +161,51 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte public PlanNode visitQueryBody( QueryStatement queryStatement, - boolean isRawDataSource, Set<Expression> sourceExpressions, + Set<Expression> sourceTransformExpressions, + Expression whereExpression, Set<Expression> aggregationExpressions, - Set<Expression> aggregationTransformExpressions, - Set<Expression> transformExpressions, - Expression queryFilter, - Expression havingExpression, - List<Integer> measurementIndexes, // only used in ALIGN BY DEVICE MPPQueryContext context) { LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context); - // plan data source node - if (isRawDataSource) { + if (aggregationExpressions == null) { + // raw data query planBuilder = - planBuilder.planRawDataSource( - sourceExpressions, - queryStatement.getResultTimeOrder(), - analysis.getGlobalTimeFilter()); - - if (queryStatement.isAggregationQuery()) { - if (analysis.hasValueFilter()) { - planBuilder = - planBuilder.planFilterAndTransform( - queryFilter, - aggregationTransformExpressions, + planBuilder + .planRawDataSource( + sourceExpressions, + queryStatement.getResultTimeOrder(), + analysis.getGlobalTimeFilter()) + .planWhereAndSourceTransform( + whereExpression, + sourceTransformExpressions, queryStatement.isGroupByTime(), queryStatement.getSelectComponent().getZoneId(), queryStatement.getResultTimeOrder()); - } else { - planBuilder = - planBuilder.planTransform( - aggregationTransformExpressions, - queryStatement.isGroupByTime(), - queryStatement.getSelectComponent().getZoneId(), - queryStatement.getResultTimeOrder()); - } + } else { + // aggregation query + boolean isRawDataSource = + (whereExpression != null) || needTransform(sourceTransformExpressions); + AggregationStep curStep; + if (isRawDataSource) { + planBuilder = + planBuilder + .planRawDataSource( + sourceExpressions, + queryStatement.getResultTimeOrder(), + analysis.getGlobalTimeFilter()) + .planWhereAndSourceTransform( + whereExpression, + sourceTransformExpressions, + queryStatement.isGroupByTime(), + queryStatement.getSelectComponent().getZoneId(), + queryStatement.getResultTimeOrder()); boolean outputPartial = queryStatement.isGroupByLevel() || (queryStatement.isGroupByTime() && analysis.getGroupByTimeParameter().hasOverlap()); - AggregationStep curStep = outputPartial ? AggregationStep.PARTIAL : AggregationStep.SINGLE; + curStep = outputPartial ? AggregationStep.PARTIAL : AggregationStep.SINGLE; planBuilder = planBuilder.planAggregation( aggregationExpressions, @@ -225,147 +213,57 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte curStep, queryStatement.getResultTimeOrder()); - if (curStep.isOutputPartial()) { - if (queryStatement.isGroupByTime() && analysis.getGroupByTimeParameter().hasOverlap()) { - curStep = - queryStatement.isGroupByLevel() - ? AggregationStep.INTERMEDIATE - : AggregationStep.FINAL; - planBuilder = - planBuilder.planSlidingWindowAggregation( - aggregationExpressions, - analysis.getGroupByTimeParameter(), - curStep, - queryStatement.getResultTimeOrder()); - } - - if (queryStatement.isGroupByLevel()) { - curStep = AggregationStep.FINAL; - planBuilder = - planBuilder.planGroupByLevel( - analysis.getGroupByLevelExpressions(), - curStep, - analysis.getGroupByTimeParameter(), - queryStatement.getResultTimeOrder()); - } + if (queryStatement.isGroupByTime() && analysis.getGroupByTimeParameter().hasOverlap()) { + curStep = + queryStatement.isGroupByLevel() + ? AggregationStep.INTERMEDIATE + : AggregationStep.FINAL; + planBuilder = + planBuilder.planSlidingWindowAggregation( + aggregationExpressions, + analysis.getGroupByTimeParameter(), + curStep, + queryStatement.getResultTimeOrder()); } if (queryStatement.isGroupByLevel()) { - planBuilder = // plan Having with GroupByLevel - planBuilder.planFilterAndTransform( - havingExpression, - analysis.getGroupByLevelExpressions().keySet(), - queryStatement.isGroupByTime(), - queryStatement.getSelectComponent().getZoneId(), - queryStatement.getResultTimeOrder()); - } else { - if (havingExpression != null) { - planBuilder = // plan Having without GroupByLevel - planBuilder.planFilterAndTransform( - havingExpression, - transformExpressions, - queryStatement.isGroupByTime(), - queryStatement.getSelectComponent().getZoneId(), - queryStatement.getResultTimeOrder()); - } else { - planBuilder = // no Having - planBuilder.planTransform( - transformExpressions, - queryStatement.isGroupByTime(), - queryStatement.getSelectComponent().getZoneId(), - queryStatement.getResultTimeOrder()); - } - } - } else { - if (analysis.hasValueFilter()) { planBuilder = - planBuilder.planFilterAndTransform( - queryFilter, - transformExpressions, - queryStatement.isGroupByTime(), - queryStatement.getSelectComponent().getZoneId(), - queryStatement.getResultTimeOrder()); - } else { - planBuilder = - planBuilder.planTransform( - transformExpressions, - queryStatement.isGroupByTime(), - queryStatement.getSelectComponent().getZoneId(), + planBuilder.planGroupByLevel( + analysis.getGroupByLevelExpressions(), + analysis.getGroupByTimeParameter(), queryStatement.getResultTimeOrder()); } - } - } else { - AggregationStep curStep = - (analysis.getGroupByLevelExpressions() != null - || (analysis.getGroupByTimeParameter() != null - && analysis.getGroupByTimeParameter().hasOverlap())) - ? AggregationStep.PARTIAL - : AggregationStep.SINGLE; - - boolean needTransform = false; - for (Expression expression : transformExpressions) { - if (ExpressionAnalyzer.checkIsNeedTransform(expression)) { - needTransform = true; - break; - } - } - - if (!needTransform && measurementIndexes != null) { - planBuilder = - planBuilder.planAggregationSourceWithIndexAdjust( - sourceExpressions, - curStep, - queryStatement.getResultTimeOrder(), - analysis.getGlobalTimeFilter(), - analysis.getGroupByTimeParameter(), - measurementIndexes, - aggregationExpressions, - aggregationTransformExpressions, - analysis.getGroupByLevelExpressions()); } else { + curStep = + (analysis.getGroupByLevelExpressions() != null + || (analysis.getGroupByTimeParameter() != null + && analysis.getGroupByTimeParameter().hasOverlap())) + ? AggregationStep.PARTIAL + : AggregationStep.SINGLE; + planBuilder = planBuilder.planAggregationSource( - sourceExpressions, curStep, queryStatement.getResultTimeOrder(), analysis.getGlobalTimeFilter(), analysis.getGroupByTimeParameter(), aggregationExpressions, - aggregationTransformExpressions, analysis.getGroupByLevelExpressions()); - - if (queryStatement.isGroupByLevel()) { - planBuilder = // plan Having with GroupByLevel - planBuilder.planFilterAndTransform( - havingExpression, - analysis.getGroupByLevelExpressions().keySet(), - queryStatement.isGroupByTime(), - queryStatement.getSelectComponent().getZoneId(), - queryStatement.getResultTimeOrder()); - } else { - if (havingExpression != null) { - planBuilder = // plan Having without GroupByLevel - planBuilder.planFilterAndTransform( - havingExpression, - transformExpressions, - queryStatement.isGroupByTime(), - queryStatement.getSelectComponent().getZoneId(), - queryStatement.getResultTimeOrder()); - } else { - planBuilder = // no Having - planBuilder.planTransform( - transformExpressions, - queryStatement.isGroupByTime(), - queryStatement.getSelectComponent().getZoneId(), - queryStatement.getResultTimeOrder()); - } - } } } return planBuilder.getRoot(); } + private boolean needTransform(Set<Expression> expressions) { + for (Expression expression : expressions) { + if (ExpressionAnalyzer.checkIsNeedTransform(expression)) { + return true; + } + } + return false; + } + @Override public PlanNode visitCreateTimeseries( CreateTimeSeriesStatement createTimeSeriesStatement, MPPQueryContext context) {
