This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/TypeProviderOpt in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c9981379eed90a866b5c094e96e59592e26692e7 Author: Minghui Liu <[email protected]> AuthorDate: Wed Sep 7 16:17:44 2022 +0800 split types to sub plans --- .../iotdb/db/mpp/common/MPPQueryContext.java | 7 +++ .../apache/iotdb/db/mpp/plan/analyze/Analysis.java | 15 ++++-- .../iotdb/db/mpp/plan/analyze/TypeProvider.java | 21 +++----- .../db/mpp/plan/planner/LogicalPlanBuilder.java | 59 +++++++++++++++++----- .../db/mpp/plan/planner/LogicalPlanVisitor.java | 40 +++++++-------- .../db/mpp/plan/planner/SubPlanTypeExtractor.java | 54 ++++++++++++++++++++ .../SimpleFragmentParallelPlanner.java | 2 +- .../plan/planner/distribution/SourceRewriter.java | 18 ++++--- .../db/mpp/plan/planner/plan/PlanFragment.java | 9 +++- .../plan/planner/plan/node/SimplePlanVisitor.java | 30 +++++++++++ .../mpp/plan/plan/distribution/LastQueryTest.java | 26 ++++++---- 11 files changed, 208 insertions(+), 73 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java index 91d4d85f89..8f4e53470d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.common; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.db.mpp.plan.analyze.QueryType; +import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider; import java.util.LinkedList; import java.util.List; @@ -45,6 +46,8 @@ public class MPPQueryContext { // onto this node. private final List<TEndPoint> endPointBlackList; + private final TypeProvider typeProvider = new TypeProvider(); + public MPPQueryContext(QueryId queryId) { this.queryId = queryId; this.endPointBlackList = new LinkedList<>(); @@ -129,4 +132,8 @@ public class MPPQueryContext { public List<TEndPoint> getEndPointBlackList() { return endPointBlackList; } + + public TypeProvider getTypeProvider() { + return typeProvider; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java index c106572837..d45d98e22c 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java @@ -65,7 +65,6 @@ public class Analysis { // map from output column name (for every node) to its datatype private final Map<NodeRef<Expression>, TSDataType> expressionTypes = new LinkedHashMap<>(); - private TypeProvider typeProvider; private boolean finishQueryAfterAnalyze; @@ -129,6 +128,8 @@ public class Analysis { // Query Common Analysis (above DeviceView) ///////////////////////////////////////////////////////////////////////////////////////////////// + private List<Pair<Expression, String>> outputExpressions; + // indicate is there a value filter private boolean hasValueFilter = false; @@ -223,10 +224,6 @@ public class Analysis { this.respDatasetHeader = respDatasetHeader; } - public TypeProvider getTypeProvider() { - return typeProvider; - } - public TSDataType getType(Expression expression) { TSDataType type = expressionTypes.get(NodeRef.of(expression)); checkArgument(type != null, "Expression not analyzed: %s", expression); @@ -460,4 +457,12 @@ public class Analysis { public void addTypes(Map<NodeRef<Expression>, TSDataType> types) { this.expressionTypes.putAll(types); } + + public List<Pair<Expression, String>> getOutputExpressions() { + return outputExpressions; + } + + public void setOutputExpressions(List<Pair<Expression, String>> outputExpressions) { + this.outputExpressions = outputExpressions; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/TypeProvider.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/TypeProvider.java index 27ec021ff4..1c3bdd3b56 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/TypeProvider.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/TypeProvider.java @@ -22,8 +22,6 @@ package org.apache.iotdb.db.mpp.plan.analyze; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; -import com.google.common.collect.ImmutableMap; - import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -37,10 +35,6 @@ public class TypeProvider { private final Map<String, TSDataType> typeMap; - public static TypeProvider empty() { - return new TypeProvider(ImmutableMap.of()); - } - public TypeProvider() { this.typeMap = new HashMap<>(); } @@ -49,16 +43,17 @@ public class TypeProvider { this.typeMap = typeMap; } - public TSDataType getType(String path) { - checkState(typeMap.containsKey(path), String.format("no data type found for path: %s", path)); - return typeMap.get(path); + public TSDataType getType(String symbol) { + checkState( + typeMap.containsKey(symbol), String.format("no data type found for symbol: %s", symbol)); + return typeMap.get(symbol); } - public void setType(String path, TSDataType dataType) { + public void setType(String symbol, TSDataType dataType) { checkState( - !typeMap.containsKey(path) || typeMap.get(path) == dataType, - String.format("inconsistent data type for path: %s", path)); - this.typeMap.put(path, dataType); + !typeMap.containsKey(symbol) || typeMap.get(symbol) == dataType, + String.format("inconsistent data type for symbol: %s", symbol)); + this.typeMap.put(symbol, dataType); } public boolean containsTypeInfoOf(String path) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java index afa7e34806..fd221a0b43 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java @@ -27,7 +27,9 @@ import org.apache.iotdb.db.metadata.path.MeasurementPath; import org.apache.iotdb.db.metadata.template.Template; import org.apache.iotdb.db.metadata.utils.MetaUtils; import org.apache.iotdb.db.mpp.common.MPPQueryContext; +import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant; import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree; +import org.apache.iotdb.db.mpp.plan.analyze.Analysis; import org.apache.iotdb.db.mpp.plan.analyze.ExpressionAnalyzer; import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider; import org.apache.iotdb.db.mpp.plan.expression.Expression; @@ -78,13 +80,17 @@ import org.apache.iotdb.db.mpp.plan.statement.component.SortItem; import org.apache.iotdb.db.mpp.plan.statement.component.SortKey; import org.apache.iotdb.db.query.aggregation.AggregationType; import org.apache.iotdb.db.utils.SchemaUtils; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.apache.iotdb.tsfile.utils.Pair; +import com.google.common.base.Function; import org.apache.commons.lang.Validate; import java.time.ZoneId; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -101,7 +107,10 @@ public class LogicalPlanBuilder { private final MPPQueryContext context; - public LogicalPlanBuilder(MPPQueryContext context) { + private final Function<Expression, TSDataType> getPreAnalyzedType; + + public LogicalPlanBuilder(Analysis analysis, MPPQueryContext context) { + this.getPreAnalyzedType = analysis::getType; this.context = context; } @@ -114,6 +123,14 @@ public class LogicalPlanBuilder { return this; } + private void updateTypeProvider(Collection<Expression> expressions) { + expressions.forEach( + expression -> + context + .getTypeProvider() + .setType(expression.toString(), getPreAnalyzedType.apply(expression))); + } + public LogicalPlanBuilder planRawDataSource( Set<Expression> sourceExpressions, Ordering scanOrder, Filter timeFilter) { List<PlanNode> sourceNodeList = new ArrayList<>(); @@ -140,6 +157,8 @@ public class LogicalPlanBuilder { } } + updateTypeProvider(sourceExpressions); + this.root = convergeWithTimeJoin(sourceNodeList, scanOrder); return this; } @@ -160,6 +179,7 @@ public class LogicalPlanBuilder { sourceNodeList.add(new LastQueryScanNode(context.getQueryId().genPlanNodeId(), selectPath)); } } + updateTypeProvider(sourceExpressions); this.root = new LastQueryNode( @@ -167,6 +187,12 @@ public class LogicalPlanBuilder { sourceNodeList, globalTimeFilter, mergeOrderParameter); + ColumnHeaderConstant.lastQueryColumnHeaders.forEach( + columnHeader -> + context + .getTypeProvider() + .setType(columnHeader.getColumnName(), columnHeader.getColumnType())); + return this; } @@ -177,9 +203,7 @@ public class LogicalPlanBuilder { Filter timeFilter, GroupByTimeParameter groupByTimeParameter, Set<Expression> aggregationExpressions, - Map<Expression, Set<Expression>> groupByLevelExpressions, - TypeProvider typeProvider) { - + Map<Expression, Set<Expression>> groupByLevelExpressions) { boolean needCheckAscending = groupByTimeParameter == null; Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations = new HashMap<>(); Map<PartialPath, List<AggregationDescriptor>> descendingAggregations = new HashMap<>(); @@ -189,7 +213,6 @@ public class LogicalPlanBuilder { curStep, scanOrder, needCheckAscending, - typeProvider, ascendingAggregations, descendingAggregations); } @@ -201,6 +224,7 @@ public class LogicalPlanBuilder { scanOrder, timeFilter, groupByTimeParameter); + updateTypeProvider(sourceExpressions); return convergeAggregationSource( sourceNodeList, @@ -219,8 +243,7 @@ public class LogicalPlanBuilder { GroupByTimeParameter groupByTimeParameter, Set<Expression> aggregationExpressions, List<Integer> measurementIndexes, - Map<Expression, Set<Expression>> groupByLevelExpressions, - TypeProvider typeProvider) { + Map<Expression, Set<Expression>> groupByLevelExpressions) { checkArgument( sourceExpressions.size() == measurementIndexes.size(), "Each aggregate should correspond to a column of output."); @@ -238,7 +261,6 @@ public class LogicalPlanBuilder { curStep, scanOrder, needCheckAscending, - typeProvider, ascendingAggregations, descendingAggregations); aggregationToMeasurementIndexMap.put(aggregationDescriptor, measurementIndexes.get(index)); @@ -252,6 +274,7 @@ public class LogicalPlanBuilder { scanOrder, timeFilter, groupByTimeParameter); + updateTypeProvider(sourceExpressions); if (!curStep.isOutputPartial()) { // update measurementIndexes @@ -280,14 +303,13 @@ public class LogicalPlanBuilder { AggregationStep curStep, Ordering scanOrder, boolean needCheckAscending, - TypeProvider typeProvider, Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations, Map<PartialPath, List<AggregationDescriptor>> descendingAggregations) { AggregationDescriptor aggregationDescriptor = new AggregationDescriptor( sourceExpression.getFunctionName(), curStep, sourceExpression.getExpressions()); if (curStep.isOutputPartial()) { - updateTypeProviderByPartialAggregation(aggregationDescriptor, typeProvider); + updateTypeProviderByPartialAggregation(aggregationDescriptor, context.getTypeProvider()); } PartialPath selectPath = ((TimeSeriesOperand) sourceExpression.getExpressions().get(0)).getPath(); @@ -416,9 +438,13 @@ public class LogicalPlanBuilder { public LogicalPlanBuilder planDeviceView( Map<String, PlanNode> deviceNameToSourceNodesMap, - List<String> outputColumnNames, + List<Pair<Expression, String>> outputExpressions, Map<String, List<Integer>> deviceToMeasurementIndexesMap, Ordering mergeOrder) { + List<String> outputColumnNames = + outputExpressions.stream() + .map(pair -> pair.getLeft().toString()) + .collect(Collectors.toList()); DeviceViewNode deviceViewNode = new DeviceViewNode( context.getQueryId().genPlanNodeId(), @@ -434,6 +460,9 @@ public class LogicalPlanBuilder { deviceViewNode.addChildDeviceNode(deviceName, subPlan); } + context.getTypeProvider().setType(ColumnHeaderConstant.COLUMN_DEVICE, TSDataType.TEXT); + updateTypeProvider(outputExpressions.stream().map(Pair::getLeft).collect(Collectors.toList())); + this.root = deviceViewNode; return this; } @@ -461,7 +490,6 @@ public class LogicalPlanBuilder { Set<Expression> aggregationExpressions, GroupByTimeParameter groupByTimeParameter, AggregationStep curStep, - TypeProvider typeProvider, Ordering scanOrder) { if (aggregationExpressions == null) { return this; @@ -469,10 +497,12 @@ public class LogicalPlanBuilder { List<AggregationDescriptor> aggregationDescriptorList = constructAggregationDescriptorList(aggregationExpressions, curStep); + updateTypeProvider(aggregationExpressions); if (curStep.isOutputPartial()) { aggregationDescriptorList.forEach( aggregationDescriptor -> - updateTypeProviderByPartialAggregation(aggregationDescriptor, typeProvider)); + updateTypeProviderByPartialAggregation( + aggregationDescriptor, context.getTypeProvider())); } this.root = new AggregationNode( @@ -533,6 +563,7 @@ public class LogicalPlanBuilder { .collect(Collectors.toList()), groupedExpression.getExpressions().get(0))); } + updateTypeProvider(groupByLevelExpressions.keySet()); return new GroupByLevelNode( context.getQueryId().genPlanNodeId(), children, @@ -605,6 +636,7 @@ public class LogicalPlanBuilder { isGroupByTime, zoneId, scanOrder); + updateTypeProvider(selectExpressions); return this; } @@ -632,6 +664,7 @@ public class LogicalPlanBuilder { isGroupByTime, zoneId, scanOrder); + updateTypeProvider(transformExpressions); return this; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java index 87d59a3e2e..d1d5d46e20 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java @@ -69,7 +69,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; -import java.util.stream.Collectors; /** * This visitor is used to generate a logical plan for the statement and returns the {@link @@ -91,7 +90,7 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte @Override public PlanNode visitQuery(QueryStatement queryStatement, MPPQueryContext context) { - LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context); + LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context); if (queryStatement.isLastQuery()) { return planBuilder @@ -105,7 +104,7 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte if (queryStatement.isAlignByDevice()) { Map<String, PlanNode> deviceToSubPlanMap = new TreeMap<>(); for (String deviceName : analysis.getDeviceToSourceExpressions().keySet()) { - LogicalPlanBuilder subPlanBuilder = new LogicalPlanBuilder(context); + LogicalPlanBuilder subPlanBuilder = new LogicalPlanBuilder(analysis, context); subPlanBuilder = subPlanBuilder.withNewRoot( visitQueryBody( @@ -129,9 +128,7 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte planBuilder = planBuilder.planDeviceView( deviceToSubPlanMap, - analysis.getRespDatasetHeader().getColumnNameWithoutAlias().stream() - .distinct() - .collect(Collectors.toList()), + analysis.getOutputExpressions(), analysis.getDeviceToMeasurementIndexesMap(), queryStatement.getResultTimeOrder()); } else { @@ -171,7 +168,7 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte Expression havingExpression, List<Integer> measurementIndexes, // only used in ALIGN BY DEVICE MPPQueryContext context) { - LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context); + LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context); // plan data source node if (isRawDataSource) { @@ -209,7 +206,6 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte aggregationExpressions, analysis.getGroupByTimeParameter(), curStep, - analysis.getTypeProvider(), queryStatement.getResultTimeOrder()); if (curStep.isOutputPartial()) { @@ -307,8 +303,7 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte analysis.getGroupByTimeParameter(), aggregationExpressions, measurementIndexes, - analysis.getGroupByLevelExpressions(), - analysis.getTypeProvider()); + analysis.getGroupByLevelExpressions()); if (queryStatement.isGroupByLevel()) { planBuilder = // plan Having with GroupByLevel planBuilder.planFilterAndTransform( @@ -335,8 +330,7 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte analysis.getGlobalTimeFilter(), analysis.getGroupByTimeParameter(), aggregationExpressions, - analysis.getGroupByLevelExpressions(), - analysis.getTypeProvider()); + analysis.getGroupByLevelExpressions()); if (queryStatement.isGroupByLevel()) { planBuilder = // plan Having with GroupByLevel @@ -483,7 +477,7 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte @Override public PlanNode visitShowTimeSeries( ShowTimeSeriesStatement showTimeSeriesStatement, MPPQueryContext context) { - LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context); + LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context); // If there is only one region, we can push down the offset and limit operation to // source operator. @@ -519,7 +513,7 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte && null != analysis.getDataPartitionInfo() && 0 != analysis.getDataPartitionInfo().getDataPartitionMap().size()) { PlanNode lastPlanNode = - new LogicalPlanBuilder(context) + new LogicalPlanBuilder(analysis, context) .planLast( analysis.getSourceExpressions(), analysis.getGlobalTimeFilter(), @@ -541,7 +535,7 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte @Override public PlanNode visitShowDevices( ShowDevicesStatement showDevicesStatement, MPPQueryContext context) { - LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context); + LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context); // If there is only one region, we can push down the offset and limit operation to // source operator. @@ -578,7 +572,7 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte @Override public PlanNode visitCountDevices( CountDevicesStatement countDevicesStatement, MPPQueryContext context) { - LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context); + LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context); return planBuilder .planDevicesCountSource( countDevicesStatement.getPathPattern(), countDevicesStatement.isPrefixPath()) @@ -589,7 +583,7 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte @Override public PlanNode visitCountTimeSeries( CountTimeSeriesStatement countTimeSeriesStatement, MPPQueryContext context) { - LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context); + LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context); return planBuilder .planTimeSeriesCountSource( countTimeSeriesStatement.getPathPattern(), @@ -605,7 +599,7 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte @Override public PlanNode visitCountLevelTimeSeries( CountLevelTimeSeriesStatement countLevelTimeSeriesStatement, MPPQueryContext context) { - LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context); + LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context); return planBuilder .planLevelTimeSeriesCountSource( countLevelTimeSeriesStatement.getPathPattern(), @@ -620,7 +614,7 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte @Override public PlanNode visitCountNodes(CountNodesStatement countStatement, MPPQueryContext context) { - LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context); + LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context); return planBuilder .planNodePathsSchemaSource(countStatement.getPathPattern(), countStatement.getLevel()) .planSchemaQueryMerge(false) @@ -710,7 +704,7 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte @Override public PlanNode visitSchemaFetch( SchemaFetchStatement schemaFetchStatement, MPPQueryContext context) { - LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context); + LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context); List<String> storageGroupList = new ArrayList<>(analysis.getSchemaPartitionInfo().getSchemaPartitionMap().keySet()); return planBuilder @@ -725,7 +719,7 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte @Override public PlanNode visitShowChildPaths( ShowChildPathsStatement showChildPathsStatement, MPPQueryContext context) { - LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context); + LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context); return planBuilder .planNodePathsSchemaSource(showChildPathsStatement.getPartialPath(), -1) .planSchemaQueryMerge(false) @@ -736,7 +730,7 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte @Override public PlanNode visitShowChildNodes( ShowChildNodesStatement showChildNodesStatement, MPPQueryContext context) { - LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context); + LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context); return planBuilder .planNodePathsSchemaSource(showChildNodesStatement.getPartialPath(), -1) .planSchemaQueryMerge(false) @@ -768,7 +762,7 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte @Override public PlanNode visitShowPathsUsingTemplate( ShowPathsUsingTemplateStatement showPathsUsingTemplateStatement, MPPQueryContext context) { - LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context); + LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context); planBuilder = planBuilder .planPathsUsingTemplateSource(analysis.getTemplateSetInfo().left.getId()) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java new file mode 100644 index 0000000000..0c78183794 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.plan.planner; + +import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.SimplePlanVisitor; + +public class SubPlanTypeExtractor { + + public static TypeProvider extractor(PlanNode root, TypeProvider allTypes) { + TypeProvider typeProvider = new TypeProvider(); + root.accept(new Visitor(typeProvider, allTypes), null); + return typeProvider; + } + + private static class Visitor extends SimplePlanVisitor<Void> { + + private final TypeProvider typeProvider; + private final TypeProvider allTypes; + + public Visitor(TypeProvider typeProvider, TypeProvider allTypes) { + this.typeProvider = typeProvider; + this.allTypes = allTypes; + } + + @Override + public Void visitPlan(PlanNode node, Void context) { + node.getOutputColumnNames() + .forEach(name -> typeProvider.setType(name, allTypes.getType(name))); + for (PlanNode source : node.getChildren()) { + source.accept(this, context); + } + return null; + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java index d5c73f6822..2559f46e4b 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java @@ -112,7 +112,7 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner { fragmentInstance.setDataRegionAndHost(regionReplicaSet); fragmentInstance.setHostDataNode(selectTargetDataNode(regionReplicaSet)); - fragmentInstance.getFragment().setTypeProvider(analysis.getTypeProvider()); + fragmentInstance.getFragment().generateTypeProvider(queryContext.getTypeProvider()); instanceMap.putIfAbsent(fragment.getId(), fragmentInstance); fragmentInstanceList.add(fragmentInstance); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java index b063c4cb62..c608e09e1b 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java @@ -315,7 +315,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte leafAggDescriptorList.forEach( d -> LogicalPlanBuilder.updateTypeProviderByPartialAggregation( - d, analysis.getTypeProvider())); + d, context.queryContext.getTypeProvider())); List<AggregationDescriptor> rootAggDescriptorList = new ArrayList<>(); node.getAggregationDescriptorList() .forEach( @@ -532,7 +532,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte rootAggDescriptorList.forEach( d -> LogicalPlanBuilder.updateTypeProviderByPartialAggregation( - d, analysis.getTypeProvider())); + d, context.queryContext.getTypeProvider())); checkArgument( sources.size() > 0, "Aggregation sources should not be empty when distribution planning"); SeriesAggregationSourceNode seed = sources.get(0); @@ -590,7 +590,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte : groupSourcesForGroupByLevel(root, sourceGroup, context); // Then, we calculate the attributes for GroupByLevelNode in each level - calculateGroupByLevelNodeAttributes(newRoot, 0); + calculateGroupByLevelNodeAttributes(newRoot, 0, context); return newRoot; } @@ -672,11 +672,13 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte } // TODO: (xingtanzjr) consider to implement the descriptor construction in every class - private void calculateGroupByLevelNodeAttributes(PlanNode node, int level) { + private void calculateGroupByLevelNodeAttributes( + PlanNode node, int level, DistributionPlanContext context) { if (node == null) { return; } - node.getChildren().forEach(child -> calculateGroupByLevelNodeAttributes(child, level + 1)); + node.getChildren() + .forEach(child -> calculateGroupByLevelNodeAttributes(child, level + 1, context)); // Construct all outputColumns from children. Using Set here to avoid duplication Set<String> childrenOutputColumns = new HashSet<>(); @@ -697,7 +699,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte if (keep) { descriptorList.add(originalDescriptor); LogicalPlanBuilder.updateTypeProviderByPartialAggregation( - originalDescriptor, analysis.getTypeProvider()); + originalDescriptor, context.queryContext.getTypeProvider()); } } handle.setAggregationDescriptorList(descriptorList); @@ -730,7 +732,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte descriptor.setInputExpressions(new ArrayList<>(descriptorExpressions)); descriptorList.add(descriptor); LogicalPlanBuilder.updateTypeProviderByPartialAggregation( - descriptor, analysis.getTypeProvider()); + descriptor, context.queryContext.getTypeProvider()); } handle.setGroupByLevelDescriptors(descriptorList); } @@ -779,7 +781,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte d -> { d.setStep(isFinal ? AggregationStep.FINAL : AggregationStep.PARTIAL); LogicalPlanBuilder.updateTypeProviderByPartialAggregation( - d, analysis.getTypeProvider()); + d, context.queryContext.getTypeProvider()); }); } return sources; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java index 000d94e9eb..c21cfa28fc 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java @@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.commons.partition.DataPartition; import org.apache.iotdb.db.mpp.common.PlanFragmentId; import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider; +import org.apache.iotdb.db.mpp.plan.planner.SubPlanTypeExtractor; import org.apache.iotdb.db.mpp.plan.planner.plan.node.IPartitionRelatedNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; @@ -36,8 +37,10 @@ import java.util.Objects; /** PlanFragment contains a sub-query of distributed query. */ public class PlanFragment { // TODO once you add field for this class you need to change the serialize and deserialize methods - private PlanFragmentId id; + private final PlanFragmentId id; private PlanNode planNodeTree; + + // map from output column name (for every node) to its datatype private TypeProvider typeProvider; // indicate whether this PlanFragment is the root of the whole Fragment-Plan-Tree or not @@ -69,6 +72,10 @@ public class PlanFragment { this.typeProvider = typeProvider; } + public void generateTypeProvider(TypeProvider allTypes) { + this.typeProvider = SubPlanTypeExtractor.extractor(planNodeTree, allTypes); + } + public boolean isRoot() { return isRoot; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/SimplePlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/SimplePlanVisitor.java new file mode 100644 index 0000000000..7b26611ee4 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/SimplePlanVisitor.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.plan.planner.plan.node; + +public class SimplePlanVisitor<C> extends PlanVisitor<Void, C> { + @Override + public Void visitPlan(PlanNode node, C context) { + for (PlanNode source : node.getChildren()) { + source.accept(this, context); + } + return null; + } +} diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/LastQueryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/LastQueryTest.java index b150d8d6f8..9361038cae 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/LastQueryTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/LastQueryTest.java @@ -21,12 +21,10 @@ package org.apache.iotdb.db.mpp.plan.plan.distribution; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.db.metadata.path.AlignedPath; import org.apache.iotdb.db.metadata.path.MeasurementPath; import org.apache.iotdb.db.mpp.common.MPPQueryContext; import org.apache.iotdb.db.mpp.common.QueryId; -import org.apache.iotdb.db.mpp.plan.expression.Expression; -import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand; -import org.apache.iotdb.db.mpp.plan.planner.LogicalPlanBuilder; import org.apache.iotdb.db.mpp.plan.planner.distribution.DistributionPlanner; import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan; import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan; @@ -35,16 +33,17 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryCollectNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter; import org.junit.Assert; import org.junit.Test; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.List; -import java.util.Set; public class LastQueryTest { @@ -197,12 +196,21 @@ public class LastQueryTest { private LogicalQueryPlan constructLastQuery(List<String> paths, MPPQueryContext context) throws IllegalPathException { - LogicalPlanBuilder builder = new LogicalPlanBuilder(context); - Set<Expression> expressions = new HashSet<>(); + List<PlanNode> sourceNodeList = new ArrayList<>(); for (String path : paths) { - expressions.add(new TimeSeriesOperand(new MeasurementPath(path))); + MeasurementPath selectPath = new MeasurementPath(path); + if (selectPath.isUnderAlignedEntity()) { + sourceNodeList.add( + new AlignedLastQueryScanNode( + context.getQueryId().genPlanNodeId(), new AlignedPath(selectPath))); + } else { + sourceNodeList.add(new LastQueryScanNode(context.getQueryId().genPlanNodeId(), selectPath)); + } } - PlanNode root = builder.planLast(expressions, null, new OrderByParameter()).getRoot(); + + PlanNode root = + new LastQueryNode( + context.getQueryId().genPlanNodeId(), sourceNodeList, null, new OrderByParameter()); return new LogicalQueryPlan(context, root); } }
