This is an automated email from the ASF dual-hosted git repository.
kangkaisen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new d659167 [Planner] Set MysqlScanNode's cardinality to avoid unexpected
shuffle join (#3886)
d659167 is described below
commit d659167d6d9a972ae05eb3fc0d008bae9d1772bf
Author: Mingyu Chen <[email protected]>
AuthorDate: Wed Jun 17 10:53:36 2020 +0800
[Planner] Set MysqlScanNode's cardinality to avoid unexpected shuffle join
(#3886)
---
.../apache/doris/planner/DistributedPlanner.java | 10 ++++
.../org/apache/doris/planner/MysqlScanNode.java | 11 ++++
.../org/apache/doris/planner/QueryPlanTest.java | 59 ++++++++++++++++++++--
3 files changed, 77 insertions(+), 3 deletions(-)
diff --git a/fe/src/main/java/org/apache/doris/planner/DistributedPlanner.java
b/fe/src/main/java/org/apache/doris/planner/DistributedPlanner.java
index 9af4d41..25be3af 100644
--- a/fe/src/main/java/org/apache/doris/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/doris/planner/DistributedPlanner.java
@@ -294,6 +294,16 @@ public class DistributedPlanner {
// broadcast: send the rightChildFragment's output to each node
executing
// the leftChildFragment; the cost across all nodes is proportional to
the
// total amount of data sent
+
+ // NOTICE:
+ // for now, only MysqlScanNode and OlapScanNode has Cardinality.
+ // OlapScanNode's cardinality is calculated by row num and data size,
+ // and MysqlScanNode's cardinality is always 0.
+ // Other ScanNode's cardinality is -1.
+ //
+ // So if there are other kind of scan node in join query, it won't be
able to calculate the cost of
+ // join normally and result in both "broadcastCost" and
"partitionCost" be 0. And this will lead
+ // to a SHUFFLE join.
PlanNode rhsTree = rightChildFragment.getPlanRoot();
long rhsDataSize = 0;
long broadcastCost = 0;
diff --git a/fe/src/main/java/org/apache/doris/planner/MysqlScanNode.java
b/fe/src/main/java/org/apache/doris/planner/MysqlScanNode.java
index 181f08b..86fa0db 100644
--- a/fe/src/main/java/org/apache/doris/planner/MysqlScanNode.java
+++ b/fe/src/main/java/org/apache/doris/planner/MysqlScanNode.java
@@ -71,6 +71,7 @@ public class MysqlScanNode extends ScanNode {
// Convert predicates to MySQL columns and filters.
createMySQLColumns(analyzer);
createMySQLFilters(analyzer);
+ computeStats(analyzer);
}
@Override
@@ -149,4 +150,14 @@ public class MysqlScanNode extends ScanNode {
return 1;
}
+ @Override
+ public void computeStats(Analyzer analyzer) {
+ super.computeStats(analyzer);
+ // even if current node scan has no data,at least on backend will be
assigned when the fragment actually execute
+ numNodes = numNodes <= 0 ? 1 : numNodes;
+ // this is just to avoid mysql scan node's cardinality being -1. So
that we can calculate the join cost
+ // normally.
+ // We assume that the data volume of all mysql tables is very small,
so set cardinality directly to 0.
+ cardinality = cardinality == -1 ? 0 : cardinality;
+ }
}
diff --git a/fe/src/test/java/org/apache/doris/planner/QueryPlanTest.java
b/fe/src/test/java/org/apache/doris/planner/QueryPlanTest.java
index 371c38d..dc05ec5 100644
--- a/fe/src/test/java/org/apache/doris/planner/QueryPlanTest.java
+++ b/fe/src/test/java/org/apache/doris/planner/QueryPlanTest.java
@@ -17,7 +17,6 @@
package org.apache.doris.planner;
-import org.apache.commons.lang3.StringUtils;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DropDbStmt;
@@ -27,13 +26,21 @@ import org.apache.doris.analysis.SelectStmt;
import org.apache.doris.analysis.ShowCreateDbStmt;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.catalog.Catalog;
-import org.apache.doris.common.FeConstants;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.Type;
+import org.apache.doris.common.FeConstants;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.utframe.UtFrameUtils;
+import org.apache.commons.lang3.StringUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -270,6 +277,22 @@ public class QueryPlanTest {
"\"in_memory\" = \"false\",\n" +
"\"storage_format\" = \"DEFAULT\"\n" +
");");
+
+ createTable("create table test.jointest\n" +
+ "(k1 int, k2 int) distributed by hash(k1) buckets 1\n" +
+ "properties(\"replication_num\" = \"1\");");
+
+ createTable("create external table test.mysql_table\n" +
+ "(k1 int, k2 int)\n" +
+ "ENGINE=MYSQL\n" +
+ "PROPERTIES (\n" +
+ "\"host\" = \"127.0.0.1\",\n" +
+ "\"port\" = \"3306\",\n" +
+ "\"user\" = \"root\",\n" +
+ "\"password\" = \"123\",\n" +
+ "\"database\" = \"db1\",\n" +
+ "\"table\" = \"tbl1\"\n" +
+ ");");
}
@AfterClass
@@ -838,7 +861,7 @@ public class QueryPlanTest {
@Test
public void testOrCompoundPredicateFold() throws Exception {
- String queryStr = "explain select * from baseall where (k1 > 1) or
(k1 > 1 and k2 < 1)";
+ String queryStr = "explain select * from baseall where (k1 > 1) or (k1
> 1 and k2 < 1)";
String explainString =
UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
Assert.assertTrue(explainString.contains("PREDICATES: (`k1` > 1)\n"));
@@ -850,4 +873,34 @@ public class QueryPlanTest {
explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext,
queryStr);
Assert.assertTrue(explainString.contains("PREDICATES: (`k1` > 1)\n"));
}
+
+ @Test
+ public void testJoinWithMysqlTable() throws Exception {
+ connectContext.setDatabase("default_cluster:test");
+
+ // set data size and row count for the olap table
+ Database db =
Catalog.getCurrentCatalog().getDb("default_cluster:test");
+ OlapTable tbl = (OlapTable) db.getTable("jointest");
+ for (Partition partition : tbl.getPartitions()) {
+ partition.updateVisibleVersionAndVersionHash(2, 0);
+ for (MaterializedIndex mIndex :
partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
+ mIndex.setRowCount(10000);
+ for (Tablet tablet : mIndex.getTablets()) {
+ for (Replica replica : tablet.getReplicas()) {
+ replica.updateVersionInfo(2, 0, 200000, 10000);
+ }
+ }
+ }
+ }
+
+ String queryStr = "explain select * from mysql_table t2, jointest t1
where t1.k1 = t2.k1";
+ String explainString =
UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
+ Assert.assertTrue(explainString.contains("INNER JOIN (BROADCAST)"));
+ Assert.assertTrue(explainString.contains("1:SCAN MYSQL"));
+
+ queryStr = "explain select * from jointest t1, mysql_table t2 where
t1.k1 = t2.k1";
+ explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext,
queryStr);
+ Assert.assertTrue(explainString.contains("INNER JOIN (BROADCAST)"));
+ Assert.assertTrue(explainString.contains("1:SCAN MYSQL"));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]