This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/fix_agg_1_series
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c02621fa6d8540e58d70b262b46b103fd46b74ee
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Mon Nov 21 18:50:53 2022 +0800

    add step check when do distribution plan for 1 series
---
 .../plan/planner/distribution/SourceRewriter.java  |  2 +-
 .../distribution/AggregationDistributionTest.java  | 63 ++++++++++++++++++++++
 2 files changed, 64 insertions(+), 1 deletion(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index fc532f5d36..2274c6ce1e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -397,7 +397,7 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
               rootAggDescriptorList.add(
                   new AggregationDescriptor(
                       descriptor.getAggregationFuncName(),
-                      AggregationStep.FINAL,
+                      context.isRoot ? AggregationStep.FINAL : 
AggregationStep.INTERMEDIATE,
                       descriptor.getInputExpressions()));
             });
 
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
index 50cc202a52..6db6b2fa36 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
@@ -64,6 +64,69 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class AggregationDistributionTest {
+
+  @Test
+  public void testAggregation1Series2Regions() throws IllegalPathException {
+    QueryId queryId = new QueryId("test_1_series_2_regions");
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new 
TEndPoint());
+    String sql = "select count(s1) from root.sg.d1";
+    String d1s1Path = "root.sg.d1.s1";
+
+    Analysis analysis = Util.analyze(sql, context);
+    PlanNode rootNode = Util.genLogicalPlan(analysis, context);
+
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
rootNode));
+    DistributedQueryPlan plan = planner.planFragments();
+    assertEquals(2, plan.getInstances().size());
+    Map<String, AggregationStep> expectedStep = new HashMap<>();
+    expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
+    List<FragmentInstance> fragmentInstances = plan.getInstances();
+    fragmentInstances.forEach(
+        f -> verifyAggregationStep(expectedStep, 
f.getFragment().getPlanNodeTree()));
+    AggregationNode aggregationNode =
+        (AggregationNode)
+            
fragmentInstances.get(0).getFragment().getPlanNodeTree().getChildren().get(0);
+    assertEquals(
+        AggregationStep.FINAL, 
aggregationNode.getAggregationDescriptorList().get(0).getStep());
+  }
+
+  @Test
+  public void testAggregation1Series2RegionsWithSlidingWindow() throws 
IllegalPathException {
+    QueryId queryId = new QueryId("test_1_series_2_regions_sliding_window");
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new 
TEndPoint());
+    String sql = "select count(s1) from root.sg.d1 group by ([0, 100), 5ms, 
1ms)";
+    String d1s1Path = "root.sg.d1.s1";
+
+    Analysis analysis = Util.analyze(sql, context);
+    PlanNode rootNode = Util.genLogicalPlan(analysis, context);
+
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
rootNode));
+    DistributedQueryPlan plan = planner.planFragments();
+    assertEquals(2, plan.getInstances().size());
+    Map<String, AggregationStep> expectedStep = new HashMap<>();
+    expectedStep.put(d1s1Path, AggregationStep.PARTIAL);
+    List<FragmentInstance> fragmentInstances = plan.getInstances();
+    fragmentInstances.forEach(
+        f -> verifyAggregationStep(expectedStep, 
f.getFragment().getPlanNodeTree()));
+    AggregationNode aggregationNode =
+        (AggregationNode)
+            fragmentInstances
+                .get(0)
+                .getFragment()
+                .getPlanNodeTree()
+                .getChildren()
+                .get(0)
+                .getChildren()
+                .get(0);
+    assertEquals(
+        AggregationStep.INTERMEDIATE,
+        aggregationNode.getAggregationDescriptorList().get(0).getStep());
+  }
+
   @Test
   public void testTimeJoinAggregationSinglePerRegion() throws 
IllegalPathException {
     QueryId queryId = new QueryId("test_query_time_join_aggregation");

Reply via email to