This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1447d41ccf5 Fix push-down level of global aggregation in TableModel
1447d41ccf5 is described below
commit 1447d41ccf5ec8f54cb9417cdc69e1efa4e79f22
Author: Weihao Li <[email protected]>
AuthorDate: Wed Sep 25 14:33:35 2024 +0800
Fix push-down level of global aggregation in TableModel
---
.../PushAggregationIntoTableScan.java | 6 +-
.../plan/relational/analyzer/AggregationTest.java | 138 ++++++++++++++-------
2 files changed, 96 insertions(+), 48 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java
index 6bc05802df9..5b383d50ffc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java
@@ -156,7 +156,11 @@ public class PushAggregationIntoTableScan implements
PlanOptimizer {
// calculate DataSet part
if (groupingKeys.isEmpty()) {
// GlobalAggregation
- return PushDownLevel.COMPLETE;
+ if (tableScanNode.getDeviceEntries().size() < 2) {
+ return PushDownLevel.COMPLETE;
+ }
+ // We need to two-stage Aggregation to combine Aggregation result of
different DeviceEntry
+ return PushDownLevel.PARTIAL;
}
List<FunctionCall> dateBinFunctionsOfTime = new ArrayList<>();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AggregationTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AggregationTest.java
index 2d05ac964d3..8ea55c73134 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AggregationTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AggregationTest.java
@@ -49,6 +49,7 @@ import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.singleGroupingSet;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.tableScan;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.Step.FINAL;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.Step.INTERMEDIATE;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.Step.PARTIAL;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.Step.SINGLE;
import static
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ArithmeticBinaryExpression.Operator.ADD;
@@ -387,6 +388,96 @@ public class AggregationTest {
"testdb.table1",
ImmutableList.of("attr1", "tag1", "date_bin$gid",
"count_0"),
ImmutableSet.of("attr1", "tag1", "time", "s2"))))));
+
+ // GlobalAggregation with more than one DeviceEntry
+ // Output - Aggregation - AggTableScan
+ assertPlan(
+ planTester.createPlan("SELECT count(s2) FROM table1"),
+ output(
+ aggregation(
+ singleGroupingSet(),
+ ImmutableMap.of(
+ Optional.empty(), aggregationFunction("count",
ImmutableList.of("count_0"))),
+ ImmutableList.of(),
+ Optional.empty(),
+ FINAL,
+ aggregationTableScan(
+ singleGroupingSet(),
+ ImmutableList.of(), // UnStreamable
+ Optional.empty(),
+ PARTIAL,
+ "testdb.table1",
+ ImmutableList.of("count_0"),
+ ImmutableSet.of("s2")))));
+
+ // Output - Aggregation(FINAL) - Collect - Aggregation(INTERMEDIATE) -
AggTableScan
+ assertPlan(
+ planTester.getFragmentPlan(0),
+ output(
+ aggregation(
+ singleGroupingSet(),
+ ImmutableMap.of(
+ Optional.empty(), aggregationFunction("count",
ImmutableList.of("count_1"))),
+ ImmutableList.of(),
+ Optional.empty(),
+ FINAL,
+ collect(
+ exchange(),
+ aggregation(
+ singleGroupingSet(),
+ ImmutableMap.of(
+ Optional.of("count_1"),
+ aggregationFunction("count",
ImmutableList.of("count_0"))),
+ ImmutableList.of(),
+ Optional.empty(),
+ INTERMEDIATE,
+ aggregationTableScan(
+ singleGroupingSet(),
+ ImmutableList.of(), // UnStreamable
+ Optional.empty(),
+ PARTIAL,
+ "testdb.table1",
+ ImmutableList.of("count_0"),
+ ImmutableSet.of("s2"))),
+ exchange()))));
+
+ // - Aggregation(INTERMEDIATE) - AggTableScan
+ assertPlan(
+ planTester.getFragmentPlan(1),
+ aggregation(
+ singleGroupingSet(),
+ ImmutableMap.of(
+ Optional.of("count_1"), aggregationFunction("count",
ImmutableList.of("count_0"))),
+ ImmutableList.of(),
+ Optional.empty(),
+ INTERMEDIATE,
+ aggregationTableScan(
+ singleGroupingSet(),
+ ImmutableList.of(), // UnStreamable
+ Optional.empty(),
+ PARTIAL,
+ "testdb.table1",
+ ImmutableList.of("count_0"),
+ ImmutableSet.of("s2"))));
+
+ // - Aggregation(INTERMEDIATE) - AggTableScan
+ assertPlan(
+ planTester.getFragmentPlan(2),
+ aggregation(
+ singleGroupingSet(),
+ ImmutableMap.of(
+ Optional.of("count_1"), aggregationFunction("count",
ImmutableList.of("count_0"))),
+ ImmutableList.of(),
+ Optional.empty(),
+ INTERMEDIATE,
+ aggregationTableScan(
+ singleGroupingSet(),
+ ImmutableList.of(), // UnStreamable
+ Optional.empty(),
+ PARTIAL,
+ "testdb.table1",
+ ImmutableList.of("count_0"),
+ ImmutableSet.of("s2"))));
}
@Test
@@ -533,53 +624,6 @@ public class AggregationTest {
"testdb.table1",
ImmutableList.of("tag1", "tag2", "tag3", "attr1", "time",
"count"),
ImmutableSet.of("tag1", "tag2", "tag3", "attr1", "time",
"s2")))));
-
- // GlobalAggregation
- assertPlan(
- planTester.createPlan("SELECT count(s2) FROM table1"),
- output(
- aggregationTableScan(
- singleGroupingSet(),
- ImmutableList.of(), // UnStreamable
- Optional.empty(),
- SINGLE,
- "testdb.table1",
- ImmutableList.of("count"),
- ImmutableSet.of("s2"))));
- assertPlan(
- planTester.getFragmentPlan(0),
- output(
- collect(
- exchange(),
- aggregationTableScan(
- singleGroupingSet(),
- ImmutableList.of(), // UnStreamable
- Optional.empty(),
- SINGLE,
- "testdb.table1",
- ImmutableList.of("count"),
- ImmutableSet.of("s2")),
- exchange())));
- assertPlan(
- planTester.getFragmentPlan(1),
- aggregationTableScan(
- singleGroupingSet(),
- ImmutableList.of(), // UnStreamable
- Optional.empty(),
- SINGLE,
- "testdb.table1",
- ImmutableList.of("count"),
- ImmutableSet.of("s2")));
- assertPlan(
- planTester.getFragmentPlan(2),
- aggregationTableScan(
- singleGroupingSet(),
- ImmutableList.of(), // UnStreamable
- Optional.empty(),
- SINGLE,
- "testdb.table1",
- ImmutableList.of("count"),
- ImmutableSet.of("s2")));
}
@Test