This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 24a00905ff2 Fix aggregation query of TableModel when there are
numerous regions in one device
24a00905ff2 is described below
commit 24a00905ff23ef7e951e53236eba2cf98a7cfbfe
Author: Weihao Li <[email protected]>
AuthorDate: Sat Nov 23 11:37:06 2024 +0800
Fix aggregation query of TableModel when there are numerous regions in one
device
---
.../plan/relational/analyzer/Analysis.java | 3 +-
.../distribute/TableDistributedPlanGenerator.java | 23 +++--
.../plan/relational/analyzer/AggregationTest.java | 114 ++++++++++++++++-----
.../plan/relational/analyzer/TestMatadata.java | 30 ++++++
4 files changed, 133 insertions(+), 37 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
index 51351f8bea5..ec0cc83f61a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java
@@ -101,6 +101,7 @@ import static java.util.Collections.unmodifiableList;
import static java.util.Collections.unmodifiableMap;
import static java.util.Collections.unmodifiableSet;
import static java.util.Objects.requireNonNull;
+import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
public class Analysis implements IAnalysis {
@@ -820,7 +821,7 @@ public class Analysis implements IAnalysis {
public List<TRegionReplicaSet> getDataRegionReplicaSetWithTimeFilter(
String database, IDeviceID deviceId, Filter timeFilter) {
if (dataPartition == null) {
- return Collections.singletonList(new TRegionReplicaSet());
+ return Collections.singletonList(NOT_ASSIGNED);
} else {
return dataPartition.getDataRegionReplicaSetWithTimeFilter(database,
deviceId, timeFilter);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index 05a36435310..c7a1c8475ad 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -79,6 +79,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static com.google.common.collect.ImmutableList.toImmutableList;
+import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator.GROUP_KEY_SUFFIX;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator.SEPARATOR;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.Step.SINGLE;
@@ -514,9 +515,8 @@ public class TableDistributedPlanGenerator
@Override
public List<PlanNode> visitAggregation(AggregationNode node, PlanContext
context) {
- OrderingScheme expectedOrderingSchema = null;
if (node.isStreamable()) {
- expectedOrderingSchema =
constructOrderingSchema(node.getPreGroupedSymbols());
+ OrderingScheme expectedOrderingSchema =
constructOrderingSchema(node.getPreGroupedSymbols());
context.setExpectedOrderingScheme(expectedOrderingSchema);
}
List<PlanNode> childrenNodes = node.getChild().accept(this, context);
@@ -563,7 +563,6 @@ public class TableDistributedPlanGenerator
@Override
public List<PlanNode> visitAggregationTableScan(
AggregationTableScanNode node, PlanContext context) {
-
boolean needSplit = false;
List<List<TRegionReplicaSet>> regionReplicaSetsList = new ArrayList<>();
for (DeviceEntry deviceEntry : node.getDeviceEntries()) {
@@ -579,20 +578,28 @@ public class TableDistributedPlanGenerator
}
if (regionReplicaSetsList.isEmpty()) {
- regionReplicaSetsList =
- Collections.singletonList(Collections.singletonList(new
TRegionReplicaSet()));
+ regionReplicaSetsList =
Collections.singletonList(Collections.singletonList(NOT_ASSIGNED));
}
Map<TRegionReplicaSet, AggregationTableScanNode> regionNodeMap = new
HashMap<>();
- // Step is SINGLE, has date_bin(time) and device data in more than one
region, we need to split
- // this node into two-stage Aggregation
- needSplit = needSplit && node.getProjection() != null && node.getStep() ==
SINGLE;
+ // Step is SINGLE and device data in more than one region, we need to
final aggregate the result
+ // from different region here, so split
+ // this node into two-stage
+ needSplit = needSplit && node.getStep() == SINGLE;
AggregationNode finalAggregation = null;
if (needSplit) {
Pair<AggregationNode, AggregationTableScanNode> splitResult =
split(node, symbolAllocator, queryId);
finalAggregation = splitResult.left;
AggregationTableScanNode partialAggregation = splitResult.right;
+
+ // cover case: complete push-down + group by + streamable
+ if (!context.hasSortProperty && finalAggregation.isStreamable()) {
+ OrderingScheme expectedOrderingSchema =
+ constructOrderingSchema(node.getPreGroupedSymbols());
+ context.setExpectedOrderingScheme(expectedOrderingSchema);
+ }
+
buildRegionNodeMap(node, regionReplicaSetsList, regionNodeMap,
partialAggregation);
} else {
buildRegionNodeMap(node, regionReplicaSetsList, regionNodeMap, node);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AggregationTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AggregationTest.java
index de711140a50..6d99582aac9 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AggregationTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AggregationTest.java
@@ -488,7 +488,8 @@ public class AggregationTest {
// All ID columns appear in GroupingKeys
// Output - Project - AggTableScan
LogicalQueryPlan logicalQueryPlan =
- planTester.createPlan("SELECT count(s2) FROM table1 group by tag1,
tag2, tag3");
+ planTester.createPlan(
+ "SELECT count(s2) FROM table1 where tag1!='shenzhen' group by
tag1, tag2, tag3");
assertPlan(
logicalQueryPlan,
output(
@@ -507,7 +508,6 @@ public class AggregationTest {
planTester.getFragmentPlan(0),
output(
collect(
- exchange(),
project(
aggregationTableScan(
singleGroupingSet("tag1", "tag2", "tag3"),
@@ -532,19 +532,6 @@ public class AggregationTest {
ImmutableList.of("tag1", "tag2", "tag3", "count"),
ImmutableSet.of("tag1", "tag2", "tag3", "s2"))));
- // Project - AggTableScan
- assertPlan(
- planTester.getFragmentPlan(2),
- project(
- aggregationTableScan(
- singleGroupingSet("tag1", "tag2", "tag3"),
- ImmutableList.of("tag1", "tag2", "tag3"), // Streamable
- Optional.empty(),
- SINGLE,
- "testdb.table1",
- ImmutableList.of("tag1", "tag2", "tag3", "count"),
- ImmutableSet.of("tag1", "tag2", "tag3", "s2"))));
-
// All ID columns appear in GroupingKeys, and Attribute columns , time or
date_bin(time) also
// appear
logicalQueryPlan =
@@ -562,7 +549,7 @@ public class AggregationTest {
"testdb.table1",
ImmutableList.of("tag1", "tag2", "tag3", "attr1",
"date_bin$gid", "count"),
ImmutableSet.of("tag1", "tag2", "tag3", "attr1", "time",
"s2")))));
- // Output - Project - Agg(FINAL) - Collect - AggTableScan
+ // Output - Project - Agg(FINAL) - MergeSort - AggTableScan
assertPlan(
planTester.getFragmentPlan(0),
output(
@@ -575,7 +562,7 @@ public class AggregationTest {
ImmutableList.of("tag1", "tag2", "tag3", "attr1"), //
Streamable
Optional.empty(),
FINAL,
- collect(
+ mergeSort(
exchange(),
aggregationTableScan(
singleGroupingSet("tag1", "tag2", "tag3", "attr1",
"date_bin$gid"),
@@ -599,17 +586,6 @@ public class AggregationTest {
ImmutableList.of("tag1", "tag2", "tag3", "attr1", "date_bin$gid",
"count_0"),
ImmutableSet.of("tag1", "tag2", "tag3", "attr1", "time", "s2")));
- assertPlan(
- planTester.getFragmentPlan(2),
- aggregationTableScan(
- singleGroupingSet("tag1", "tag2", "tag3", "attr1", "date_bin$gid"),
- ImmutableList.of("tag1", "tag2", "tag3", "attr1"), // Streamable
- Optional.empty(),
- PARTIAL,
- "testdb.table1",
- ImmutableList.of("tag1", "tag2", "tag3", "attr1", "date_bin$gid",
"count_0"),
- ImmutableSet.of("tag1", "tag2", "tag3", "attr1", "time", "s2")));
-
// Global Aggregation or partialPushDown Aggregation with only one
deviceEntry
// Output - AggTableScan
@@ -696,4 +672,86 @@ public class AggregationTest {
"tag1", "tag2", "tag3", "first", "last", "first_by",
"last_by"),
ImmutableSet.of("tag1", "tag2", "tag3", "s1", "time")))));
}
+
+ @Test
+ public void deviceWithNumerousRegionTest() {
+ PlanTester planTester = new PlanTester();
+ LogicalQueryPlan logicalQueryPlan =
+ planTester.createPlan("SELECT count(s1) FROM table1 where tag2='B2'");
+ // complete push-down when do logical optimize
+ assertPlan(
+ logicalQueryPlan,
+ output(
+ aggregationTableScan(
+ singleGroupingSet(),
+ ImmutableList.of(),
+ Optional.empty(),
+ SINGLE,
+ "testdb.table1",
+ ImmutableList.of("count"),
+ ImmutableSet.of("s1"))));
+
+ // transform to two-stage when generate distribution plan
+ assertPlan(
+ planTester.getFragmentPlan(0),
+ output(
+ aggregation(
+ singleGroupingSet(),
+ ImmutableMap.of(
+ Optional.of("count"),
+ aggregationFunction("count", ImmutableList.of("count_0"))),
+ Optional.empty(),
+ FINAL,
+ collect(
+ aggregationTableScan(
+ singleGroupingSet(),
+ ImmutableList.of(),
+ Optional.empty(),
+ PARTIAL,
+ "testdb.table1",
+ ImmutableList.of("count_0"),
+ ImmutableSet.of("s1")),
+ exchange()))));
+
+ logicalQueryPlan =
+ planTester.createPlan(
+ "SELECT count(s1) FROM table1 where tag2='B2' group by tag1, tag2,
tag3");
+ // complete push-down when do logical optimize
+ assertPlan(
+ logicalQueryPlan,
+ output(
+ project(
+ aggregationTableScan(
+ singleGroupingSet("tag1", "tag2", "tag3"),
+ ImmutableList.of("tag1", "tag2", "tag3"),
+ Optional.empty(),
+ SINGLE,
+ "testdb.table1",
+ ImmutableList.of("tag1", "tag2", "tag3", "count"),
+ ImmutableSet.of("tag1", "tag2", "tag3", "s1")))));
+
+ // transform to two-stage when generate distribution plan
+ assertPlan(
+ planTester.getFragmentPlan(0),
+ output(
+ project(
+ aggregation(
+ singleGroupingSet("tag1", "tag2", "tag3"),
+ ImmutableMap.of(
+ Optional.of("count"),
+ aggregationFunction("count",
ImmutableList.of("count_0"))),
+ ImmutableList.of("tag1", "tag2", "tag3"),
+ Optional.empty(),
+ FINAL,
+ mergeSort(
+ aggregationTableScan(
+ singleGroupingSet("tag1", "tag2", "tag3"),
+ ImmutableList.of("tag1", "tag2", "tag3"),
+ Optional.empty(),
+ PARTIAL,
+ "testdb.table1",
+ ImmutableList.of("tag1", "tag2", "tag3",
"count_0"),
+ ImmutableSet.of("tag1", "tag2", "tag3", "s1")),
+ exchange())))));
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java
index 5e6fc8bca6e..a1a7b9ef363 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java
@@ -240,6 +240,17 @@ public class TestMatadata implements Metadata {
new DeviceEntry(new StringArrayDeviceID(DEVICE_6.split("\\.")),
DEVICE_6_ATTRIBUTES),
new DeviceEntry(new StringArrayDeviceID(DEVICE_5.split("\\.")),
DEVICE_5_ATTRIBUTES));
}
+ if (compareNotEqualsMatch(expressionList.get(0), "tag1", "shenzhen")) {
+ return Arrays.asList(
+ new DeviceEntry(new StringArrayDeviceID(DEVICE_4.split("\\.")),
DEVICE_4_ATTRIBUTES),
+ new DeviceEntry(new StringArrayDeviceID(DEVICE_1.split("\\.")),
DEVICE_1_ATTRIBUTES),
+ new DeviceEntry(new StringArrayDeviceID(DEVICE_3.split("\\.")),
DEVICE_3_ATTRIBUTES),
+ new DeviceEntry(new StringArrayDeviceID(DEVICE_2.split("\\.")),
DEVICE_2_ATTRIBUTES));
+ }
+ if (compareEqualsMatch(expressionList.get(0), "tag2", "B2")) {
+ return Collections.singletonList(
+ new DeviceEntry(new StringArrayDeviceID(DEVICE_5.split("\\.")),
DEVICE_5_ATTRIBUTES));
+ }
}
return Arrays.asList(
@@ -270,6 +281,25 @@ public class TestMatadata implements Metadata {
return false;
}
+ private boolean compareNotEqualsMatch(Expression expression, String
idOrAttr, String value) {
+ if (expression instanceof ComparisonExpression
+ && ((ComparisonExpression) expression).getOperator()
+ == ComparisonExpression.Operator.NOT_EQUAL) {
+ Expression leftExpression = ((ComparisonExpression)
expression).getLeft();
+ Expression rightExpression = ((ComparisonExpression)
expression).getRight();
+ if (leftExpression instanceof SymbolReference && rightExpression
instanceof StringLiteral) {
+ return ((SymbolReference)
leftExpression).getName().equalsIgnoreCase(idOrAttr)
+ && ((StringLiteral)
rightExpression).getValue().equalsIgnoreCase(value);
+ } else if (leftExpression instanceof StringLiteral
+ && rightExpression instanceof SymbolReference) {
+ return ((SymbolReference)
rightExpression).getName().equalsIgnoreCase(idOrAttr)
+ && ((StringLiteral)
leftExpression).getValue().equalsIgnoreCase(value);
+ }
+ }
+
+ return false;
+ }
+
@Override
public Optional<TableSchema> validateTableHeaderSchema(
String database, TableSchema tableSchema, MPPQueryContext context,
boolean allowCreateTable) {