This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/fix_agg_1_series in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c02621fa6d8540e58d70b262b46b103fd46b74ee Author: Jinrui.Zhang <[email protected]> AuthorDate: Mon Nov 21 18:50:53 2022 +0800 add step check when do distribution plan for 1 series --- .../plan/planner/distribution/SourceRewriter.java | 2 +- .../distribution/AggregationDistributionTest.java | 63 ++++++++++++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java index fc532f5d36..2274c6ce1e 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java @@ -397,7 +397,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte rootAggDescriptorList.add( new AggregationDescriptor( descriptor.getAggregationFuncName(), - AggregationStep.FINAL, + context.isRoot ? AggregationStep.FINAL : AggregationStep.INTERMEDIATE, descriptor.getInputExpressions())); }); diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java index 50cc202a52..6db6b2fa36 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java @@ -64,6 +64,69 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; public class AggregationDistributionTest { + + @Test + public void testAggregation1Series2Regions() throws IllegalPathException { + QueryId queryId = new QueryId("test_1_series_2_regions"); + MPPQueryContext context = + new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint()); + String sql = "select count(s1) from root.sg.d1"; + String d1s1Path = "root.sg.d1.s1"; + + Analysis analysis = Util.analyze(sql, context); + PlanNode rootNode = Util.genLogicalPlan(analysis, context); + + DistributionPlanner planner = + new DistributionPlanner(analysis, new LogicalQueryPlan(context, rootNode)); + DistributedQueryPlan plan = planner.planFragments(); + assertEquals(2, plan.getInstances().size()); + Map<String, AggregationStep> expectedStep = new HashMap<>(); + expectedStep.put(d1s1Path, AggregationStep.PARTIAL); + List<FragmentInstance> fragmentInstances = plan.getInstances(); + fragmentInstances.forEach( + f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree())); + AggregationNode aggregationNode = + (AggregationNode) + fragmentInstances.get(0).getFragment().getPlanNodeTree().getChildren().get(0); + assertEquals( + AggregationStep.FINAL, aggregationNode.getAggregationDescriptorList().get(0).getStep()); + } + + @Test + public void testAggregation1Series2RegionsWithSlidingWindow() throws IllegalPathException { + QueryId queryId = new QueryId("test_1_series_2_regions_sliding_window"); + MPPQueryContext context = + new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint()); + String sql = "select count(s1) from root.sg.d1 group by ([0, 100), 5ms, 1ms)"; + String d1s1Path = "root.sg.d1.s1"; + + Analysis analysis = Util.analyze(sql, context); + PlanNode rootNode = Util.genLogicalPlan(analysis, context); + + DistributionPlanner planner = + new DistributionPlanner(analysis, new LogicalQueryPlan(context, rootNode)); + DistributedQueryPlan plan = planner.planFragments(); + assertEquals(2, plan.getInstances().size()); + Map<String, AggregationStep> expectedStep = new HashMap<>(); + expectedStep.put(d1s1Path, AggregationStep.PARTIAL); + List<FragmentInstance> fragmentInstances = plan.getInstances(); + fragmentInstances.forEach( + f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree())); + AggregationNode aggregationNode = + (AggregationNode) + fragmentInstances + .get(0) + .getFragment() + .getPlanNodeTree() + .getChildren() + .get(0) + .getChildren() + .get(0); + assertEquals( + AggregationStep.INTERMEDIATE, + aggregationNode.getAggregationDescriptorList().get(0).getStep()); + } + @Test public void testTimeJoinAggregationSinglePerRegion() throws IllegalPathException { QueryId queryId = new QueryId("test_query_time_join_aggregation");
