This is an automated email from the ASF dual-hosted git repository.
yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 99d8110 [Bug-fix] Fix wrong data distribution judgment (#6029)
99d8110 is described below
commit 99d8110972be0476d2303c5e28471611462dfc0c
Author: EmmyMiao87 <[email protected]>
AuthorDate: Fri Jun 18 09:21:46 2021 +0800
[Bug-fix] Fix wrong data distribution judgment (#6029)
* [Bug-fix] Fix wrong data distribution judgment
The Fragment where OlapScanNode is located has three data distribution
possibilities.
1. UNPARTITIONED: The scan range of OlapScanNode contains only one
instance(BE)
2. RANDOM: Involving multi-partitioned tables in OlapScanNode.
3. HASH_PARTITIONED: The involving table is in the colocate group.
For a multi-partition table, although the data in each individual partition
is distributed according to the bucketing column,
the same bucketing column between different partitions is not necessarily
in the same be.
So the data distribution is RANDOM.
If Doris wrongly plan RANDOM as HASH_PARTITIONED, it will lead to the wrong
colocate agg node.
The result of query is incorrect.
---
.../org/apache/doris/planner/OlapScanNode.java | 30 +++++++++++++++-------
.../org/apache/doris/planner/ColocatePlanTest.java | 23 +++++++++++++++--
.../org/apache/doris/planner/QueryPlanTest.java | 2 +-
3 files changed, 43 insertions(+), 12 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 6f81465..143c175 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -794,15 +794,27 @@ public class OlapScanNode extends ScanNode {
}
}
+ /*
+ Although sometimes the scan range only involves one instance,
+ the data distribution cannot be set to UNPARTITION here.
+ The reason is that @coordicator will not set the scan range for the
fragment,
+ when data partition of fragment is UNPARTITION.
+ */
public DataPartition constructInputPartitionByDistributionInfo() {
- DistributionInfo distributionInfo =
olapTable.getDefaultDistributionInfo();
- Preconditions.checkState(distributionInfo instanceof
HashDistributionInfo);
- List<Column> distributeColumns = ((HashDistributionInfo)
distributionInfo).getDistributionColumns();
- List<Expr> dataDistributeExprs = Lists.newArrayList();
- for (Column column : distributeColumns) {
- SlotRef slotRef = new SlotRef(desc.getRef().getName(),
column.getName());
- dataDistributeExprs.add(slotRef);
- }
- return DataPartition.hashPartitioned(dataDistributeExprs);
+ if
(Catalog.getCurrentColocateIndex().isColocateTable(olapTable.getId())
+ || olapTable.getPartitionInfo().getType() ==
PartitionType.UNPARTITIONED
+ || olapTable.getPartitions().size() == 1) {
+ DistributionInfo distributionInfo =
olapTable.getDefaultDistributionInfo();
+ Preconditions.checkState(distributionInfo instanceof
HashDistributionInfo);
+ List<Column> distributeColumns = ((HashDistributionInfo)
distributionInfo).getDistributionColumns();
+ List<Expr> dataDistributeExprs = Lists.newArrayList();
+ for (Column column : distributeColumns) {
+ SlotRef slotRef = new SlotRef(desc.getRef().getName(),
column.getName());
+ dataDistributeExprs.add(slotRef);
+ }
+ return DataPartition.hashPartitioned(dataDistributeExprs);
+ } else {
+ return DataPartition.RANDOM;
+ }
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java
b/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java
index 5c28b22..a2fd126 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java
@@ -50,8 +50,14 @@ public class ColocatePlanTest {
// create table test_colocate (k1 int ,k2 int, k3 int, k4 int)
// distributed by hash(k1, k2) buckets 10
// properties ("replication_num" = "2");
- String createTblStmtStr = "create table db1.test_colocate(k1 int, k2
int, k3 int, k4 int) "
- + "distributed by hash(k1, k2) buckets 10
properties('replication_num' = '2');";
+ String createColocateTblStmtStr = "create table db1.test_colocate(k1
int, k2 int, k3 int, k4 int) "
+ + "distributed by hash(k1, k2) buckets 10
properties('replication_num' = '2',"
+ + "'colocate_with' = 'group1');";
+ CreateTableStmt createColocateTableStmt = (CreateTableStmt)
UtFrameUtils.parseAndAnalyzeStmt(createColocateTblStmtStr, ctx);
+ Catalog.getCurrentCatalog().createTable(createColocateTableStmt);
+ String createTblStmtStr = "create table db1.test(k1 int, k2 int, k3
int, k4 int)"
+ + "partition by range(k1) (partition p1 values less than
(\"1\"), partition p2 values less than (\"2\"))"
+ + "distributed by hash(k1, k2) buckets 10
properties('replication_num' = '2')";
CreateTableStmt createTableStmt = (CreateTableStmt)
UtFrameUtils.parseAndAnalyzeStmt(createTblStmtStr, ctx);
Catalog.getCurrentCatalog().createTable(createTableStmt);
}
@@ -118,4 +124,17 @@ public class ColocatePlanTest {
Assert.assertEquals(1, StringUtils.countMatches(plan1, "AGGREGATE"));
Assert.assertTrue(plan1.contains(COLOCATE_ENABLE));
}
+
+ // without:
+ // 1. agg columns = distributed columns
+ // 2. table is not in colocate group
+ // 3. more then 1 instances
+ // Fixed #6028
+ @Test
+ public void sqlAggWithNonColocateTable() throws Exception {
+ String sql = "explain select k1, k2 from db1.test group by k1, k2";
+ String plan1 = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql);
+ Assert.assertEquals(2, StringUtils.countMatches(plan1, "AGGREGATE"));
+ Assert.assertFalse(plan1.contains(COLOCATE_ENABLE));
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
index 3c0ab65..53fede1 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java
@@ -1359,7 +1359,7 @@ public class QueryPlanTest {
sql = "SELECT dt, dis_key, COUNT(1) FROM table_partitioned group by
dt, dis_key";
explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext,
"EXPLAIN " + sql);
System.out.println(explainString);
- Assert.assertTrue(explainString.contains("AGGREGATE (update
finalize)"));
+ Assert.assertTrue(explainString.contains("AGGREGATE (update
serialize)"));
}
public void testLeadAndLagFunction() throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]