This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch fix_some_issues_0531 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c64c75561ae13d73d10e688917ce90e402fdec0c Author: Jinrui.Zhang <[email protected]> AuthorDate: Wed Jun 1 17:17:27 2022 +0800 fix some issues in MPP query prepared for representation --- .../apache/iotdb/db/metadata/path/AlignedPath.java | 5 ++++ .../db/mpp/plan/planner/LocalExecutionPlanner.java | 2 +- .../db/mpp/plan/planner/LogicalPlanBuilder.java | 2 +- .../plan/planner/distribution/SourceRewriter.java | 26 ++++++++++++++---- .../plan/planner/plan/node/PlanGraphPrinter.java | 32 ++++++++++++++++++++++ 5 files changed, 60 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java index 4ef32ba2a6..6ca1bd5a2c 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java @@ -311,4 +311,9 @@ public class AlignedPath extends PartialPath { } return getDevicePath().concatNode(measurementList.get(0)); } + + @Override + public String toString() { + return String.format("%s%s", getDevice(), measurementList); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java index 96e9dfdc1c..1b30176d58 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java @@ -838,7 +838,7 @@ public class LocalExecutionPlanner { context .getTypeProvider() // get the type of first inputExpression - .getType(descriptor.getInputExpressions().get(0).toString()), + .getType(inputColumnNames.get(0)), ascending), descriptor.getStep(), inputLocationList)); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java index ecb7b4e73c..a451623307 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java @@ -380,7 +380,7 @@ public class LogicalPlanBuilder { return this; } - private void updateTypeProviderByPartialAggregation( + public static void updateTypeProviderByPartialAggregation( AggregationDescriptor aggregationDescriptor, TypeProvider typeProvider) { List<AggregationType> splitAggregations = SchemaUtils.splitPartialAggregation(aggregationDescriptor.getAggregationType()); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java index 6c20206302..41cda1a735 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.mpp.common.MPPQueryContext; import org.apache.iotdb.db.mpp.plan.analyze.Analysis; import org.apache.iotdb.db.mpp.plan.expression.Expression; +import org.apache.iotdb.db.mpp.plan.planner.LogicalPlanBuilder; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.SimplePlanNodeRewriter; import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.CountSchemaMergeNode; @@ -309,7 +310,10 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte AggregationStep.PARTIAL, descriptor.getInputExpressions())); }); - + leafAggDescriptorList.forEach( + d -> + LogicalPlanBuilder.updateTypeProviderByPartialAggregation( + d, analysis.getTypeProvider())); List<AggregationDescriptor> rootAggDescriptorList = new ArrayList<>(); node.getAggregationDescriptorList() .forEach( @@ -482,10 +486,14 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte rootAggDescriptorList.add( new AggregationDescriptor( descriptor.getAggregationType(), - context.isRoot ? AggregationStep.FINAL : AggregationStep.PARTIAL, + context.isRoot ? AggregationStep.FINAL : AggregationStep.INTERMEDIATE, descriptor.getInputExpressions())); }); } + rootAggDescriptorList.forEach( + d -> + LogicalPlanBuilder.updateTypeProviderByPartialAggregation( + d, analysis.getTypeProvider())); checkArgument( sources.size() > 0, "Aggregation sources should not be empty when distribution planning"); SeriesAggregationSourceNode seed = sources.get(0); @@ -629,6 +637,8 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte } if (keep) { descriptorList.add(originalDescriptor); + LogicalPlanBuilder.updateTypeProviderByPartialAggregation( + originalDescriptor, analysis.getTypeProvider()); } } handle.setAggregationDescriptorList(descriptorList); @@ -657,10 +667,11 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte continue; } GroupByLevelDescriptor descriptor = originalDescriptor.deepClone(); - descriptor.setStep(level == 0 ? AggregationStep.FINAL : AggregationStep.PARTIAL); + descriptor.setStep(level == 0 ? AggregationStep.FINAL : AggregationStep.INTERMEDIATE); descriptor.setInputExpressions(descriptorExpression); - descriptorList.add(descriptor); + LogicalPlanBuilder.updateTypeProviderByPartialAggregation( + descriptor, analysis.getTypeProvider()); } handle.setGroupByLevelDescriptors(descriptorList); } @@ -705,7 +716,12 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte boolean isFinal = false; source .getAggregationDescriptorList() - .forEach(d -> d.setStep(isFinal ? AggregationStep.FINAL : AggregationStep.PARTIAL)); + .forEach( + d -> { + d.setStep(isFinal ? AggregationStep.FINAL : AggregationStep.PARTIAL); + LogicalPlanBuilder.updateTypeProviderByPartialAggregation( + d, analysis.getTypeProvider()); + }); } return sources; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java index f6321b7ea5..657cc121c7 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java @@ -37,6 +37,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggreg import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor; import org.apache.commons.lang3.Validate; @@ -92,6 +93,12 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter List<String> boxValue = new ArrayList<>(); boxValue.add(String.format("SeriesAggregationScan-%s", node.getPlanNodeId().getId())); boxValue.add(String.format("Series: %s", node.getSeriesPath())); + for (int i = 0; i < node.getAggregationDescriptorList().size(); i++) { + AggregationDescriptor descriptor = node.getAggregationDescriptorList().get(i); + boxValue.add( + String.format( + "Aggregator-%d: %s, %s", i, descriptor.getAggregationType(), descriptor.getStep())); + } boxValue.add(String.format("PartitionId: %s", node.getRegionReplicaSet().getRegionId().id)); return render(node, boxValue, context); } @@ -102,6 +109,12 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter List<String> boxValue = new ArrayList<>(); boxValue.add(String.format("AlignedSeriesAggregationScan-%s", node.getPlanNodeId().getId())); boxValue.add(String.format("Series: %s", node.getAlignedPath())); + for (int i = 0; i < node.getAggregationDescriptorList().size(); i++) { + AggregationDescriptor descriptor = node.getAggregationDescriptorList().get(i); + boxValue.add( + String.format( + "Aggregator-%d: %s, %s", i, descriptor.getAggregationType(), descriptor.getStep())); + } boxValue.add(String.format("PartitionId: %s", node.getRegionReplicaSet().getRegionId().id)); return render(node, boxValue, context); } @@ -149,6 +162,13 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter public List<String> visitGroupByLevel(GroupByLevelNode node, GraphContext context) { List<String> boxValue = new ArrayList<>(); boxValue.add(String.format("GroupByLevel-%s", node.getPlanNodeId().getId())); + for (int i = 0; i < node.getGroupByLevelDescriptors().size(); i++) { + AggregationDescriptor descriptor = node.getGroupByLevelDescriptors().get(i); + boxValue.add( + String.format( + "Aggregator-%d: %s, %s", i, descriptor.getAggregationType(), descriptor.getStep())); + boxValue.add(String.format("Output-%d: %s", i, descriptor.getOutputColumnNames())); + } return render(node, boxValue, context); } @@ -157,6 +177,12 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter SlidingWindowAggregationNode node, GraphContext context) { List<String> boxValue = new ArrayList<>(); boxValue.add(String.format("SlidingWindowAggregation-%s", node.getPlanNodeId().getId())); + for (int i = 0; i < node.getAggregationDescriptorList().size(); i++) { + AggregationDescriptor descriptor = node.getAggregationDescriptorList().get(i); + boxValue.add( + String.format( + "Aggregator-%d: %s, %s", i, descriptor.getAggregationType(), descriptor.getStep())); + } return render(node, boxValue, context); } @@ -172,6 +198,12 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter public List<String> visitRowBasedSeriesAggregate(AggregationNode node, GraphContext context) { List<String> boxValue = new ArrayList<>(); boxValue.add(String.format("Aggregation-%s", node.getPlanNodeId().getId())); + for (int i = 0; i < node.getAggregationDescriptorList().size(); i++) { + AggregationDescriptor descriptor = node.getAggregationDescriptorList().get(i); + boxValue.add( + String.format( + "Aggregator-%d: %s, %s", i, descriptor.getAggregationType(), descriptor.getStep())); + } return render(node, boxValue, context); }
