This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1447d41ccf5 Fix push-down level of global aggregation in TableModel
1447d41ccf5 is described below

commit 1447d41ccf5ec8f54cb9417cdc69e1efa4e79f22
Author: Weihao Li <[email protected]>
AuthorDate: Wed Sep 25 14:33:35 2024 +0800

    Fix push-down level of global aggregation in TableModel
---
 .../PushAggregationIntoTableScan.java              |   6 +-
 .../plan/relational/analyzer/AggregationTest.java  | 138 ++++++++++++++-------
 2 files changed, 96 insertions(+), 48 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java
index 6bc05802df9..5b383d50ffc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java
@@ -156,7 +156,11 @@ public class PushAggregationIntoTableScan implements 
PlanOptimizer {
       // calculate DataSet part
       if (groupingKeys.isEmpty()) {
         // GlobalAggregation
-        return PushDownLevel.COMPLETE;
+        if (tableScanNode.getDeviceEntries().size() < 2) {
+          return PushDownLevel.COMPLETE;
+        }
+        // We need to two-stage Aggregation to combine Aggregation result of 
different DeviceEntry
+        return PushDownLevel.PARTIAL;
       }
 
       List<FunctionCall> dateBinFunctionsOfTime = new ArrayList<>();
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AggregationTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AggregationTest.java
index 2d05ac964d3..8ea55c73134 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AggregationTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AggregationTest.java
@@ -49,6 +49,7 @@ import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.singleGroupingSet;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.tableScan;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.Step.FINAL;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.Step.INTERMEDIATE;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.Step.PARTIAL;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.Step.SINGLE;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ArithmeticBinaryExpression.Operator.ADD;
@@ -387,6 +388,96 @@ public class AggregationTest {
                         "testdb.table1",
                         ImmutableList.of("attr1", "tag1", "date_bin$gid", 
"count_0"),
                         ImmutableSet.of("attr1", "tag1", "time", "s2"))))));
+
+    // GlobalAggregation with more than one DeviceEntry
+    // Output - Aggregation - AggTableScan
+    assertPlan(
+        planTester.createPlan("SELECT count(s2) FROM table1"),
+        output(
+            aggregation(
+                singleGroupingSet(),
+                ImmutableMap.of(
+                    Optional.empty(), aggregationFunction("count", 
ImmutableList.of("count_0"))),
+                ImmutableList.of(),
+                Optional.empty(),
+                FINAL,
+                aggregationTableScan(
+                    singleGroupingSet(),
+                    ImmutableList.of(), // UnStreamable
+                    Optional.empty(),
+                    PARTIAL,
+                    "testdb.table1",
+                    ImmutableList.of("count_0"),
+                    ImmutableSet.of("s2")))));
+
+    // Output - Aggregation(FINAL) - Collect - Aggregation(INTERMEDIATE) - 
AggTableScan
+    assertPlan(
+        planTester.getFragmentPlan(0),
+        output(
+            aggregation(
+                singleGroupingSet(),
+                ImmutableMap.of(
+                    Optional.empty(), aggregationFunction("count", 
ImmutableList.of("count_1"))),
+                ImmutableList.of(),
+                Optional.empty(),
+                FINAL,
+                collect(
+                    exchange(),
+                    aggregation(
+                        singleGroupingSet(),
+                        ImmutableMap.of(
+                            Optional.of("count_1"),
+                            aggregationFunction("count", 
ImmutableList.of("count_0"))),
+                        ImmutableList.of(),
+                        Optional.empty(),
+                        INTERMEDIATE,
+                        aggregationTableScan(
+                            singleGroupingSet(),
+                            ImmutableList.of(), // UnStreamable
+                            Optional.empty(),
+                            PARTIAL,
+                            "testdb.table1",
+                            ImmutableList.of("count_0"),
+                            ImmutableSet.of("s2"))),
+                    exchange()))));
+
+    // - Aggregation(INTERMEDIATE) - AggTableScan
+    assertPlan(
+        planTester.getFragmentPlan(1),
+        aggregation(
+            singleGroupingSet(),
+            ImmutableMap.of(
+                Optional.of("count_1"), aggregationFunction("count", 
ImmutableList.of("count_0"))),
+            ImmutableList.of(),
+            Optional.empty(),
+            INTERMEDIATE,
+            aggregationTableScan(
+                singleGroupingSet(),
+                ImmutableList.of(), // UnStreamable
+                Optional.empty(),
+                PARTIAL,
+                "testdb.table1",
+                ImmutableList.of("count_0"),
+                ImmutableSet.of("s2"))));
+
+    // - Aggregation(INTERMEDIATE) - AggTableScan
+    assertPlan(
+        planTester.getFragmentPlan(2),
+        aggregation(
+            singleGroupingSet(),
+            ImmutableMap.of(
+                Optional.of("count_1"), aggregationFunction("count", 
ImmutableList.of("count_0"))),
+            ImmutableList.of(),
+            Optional.empty(),
+            INTERMEDIATE,
+            aggregationTableScan(
+                singleGroupingSet(),
+                ImmutableList.of(), // UnStreamable
+                Optional.empty(),
+                PARTIAL,
+                "testdb.table1",
+                ImmutableList.of("count_0"),
+                ImmutableSet.of("s2"))));
   }
 
   @Test
@@ -533,53 +624,6 @@ public class AggregationTest {
                     "testdb.table1",
                     ImmutableList.of("tag1", "tag2", "tag3", "attr1", "time", 
"count"),
                     ImmutableSet.of("tag1", "tag2", "tag3", "attr1", "time", 
"s2")))));
-
-    // GlobalAggregation
-    assertPlan(
-        planTester.createPlan("SELECT count(s2) FROM table1"),
-        output(
-            aggregationTableScan(
-                singleGroupingSet(),
-                ImmutableList.of(), // UnStreamable
-                Optional.empty(),
-                SINGLE,
-                "testdb.table1",
-                ImmutableList.of("count"),
-                ImmutableSet.of("s2"))));
-    assertPlan(
-        planTester.getFragmentPlan(0),
-        output(
-            collect(
-                exchange(),
-                aggregationTableScan(
-                    singleGroupingSet(),
-                    ImmutableList.of(), // UnStreamable
-                    Optional.empty(),
-                    SINGLE,
-                    "testdb.table1",
-                    ImmutableList.of("count"),
-                    ImmutableSet.of("s2")),
-                exchange())));
-    assertPlan(
-        planTester.getFragmentPlan(1),
-        aggregationTableScan(
-            singleGroupingSet(),
-            ImmutableList.of(), // UnStreamable
-            Optional.empty(),
-            SINGLE,
-            "testdb.table1",
-            ImmutableList.of("count"),
-            ImmutableSet.of("s2")));
-    assertPlan(
-        planTester.getFragmentPlan(2),
-        aggregationTableScan(
-            singleGroupingSet(),
-            ImmutableList.of(), // UnStreamable
-            Optional.empty(),
-            SINGLE,
-            "testdb.table1",
-            ImmutableList.of("count"),
-            ImmutableSet.of("s2")));
   }
 
   @Test

Reply via email to