Repository: incubator-impala Updated Branches: refs/heads/master d0ff27cc3 -> d72353d0c
IMPALA-2932: Extend DistributedPlanner to account for hash table build cost When deciding between a broadcast or repartition join, Impala calculates the cost of each join as the total amount of data that is sent over the network. This ignores some relevant costs, and can lead to bad plans. One such relevant cost is the work to create the hash table used in the join. This patch accounts for this by adding the amount of data inserted into the hash table (the size of the right side of the join) to the previous cost. This generally increases the estimated cost of broadcast joins relative to repartitioning joins, as the broadcast join must build the hash table on each node the data was broadcast to, so its effect will be to make repartitioning joins more likely to be chosen, especially in large clusters. This patch has not yet been performance tested. Change-Id: I03a0f56f69c8deae68d48dfdb9dc95b71aec11f1 Reviewed-on: http://gerrit.cloudera.org:8080/4098 Tested-by: Internal Jenkins Reviewed-by: Matthew Jacobs <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/d72353d0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/d72353d0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/d72353d0 Branch: refs/heads/master Commit: d72353d0c9a563e69b34ba0cccdc6c1c8bf2630b Parents: d0ff27c Author: Thomas Tauber-Marshall <[email protected]> Authored: Tue Aug 23 11:42:39 2016 -0700 Committer: Matthew Jacobs <[email protected]> Committed: Mon Aug 29 16:44:22 2016 +0000 ---------------------------------------------------------------------- .../impala/planner/DistributedPlanner.java | 21 ++-- .../queries/PlannerTest/joins.test | 8 +- .../queries/PlannerTest/tpcds-all.test | 64 +++++----- .../queries/PlannerTest/tpch-all.test | 126 +++++++++++-------- .../queries/PlannerTest/tpch-nested.test | 8 +- .../queries/PlannerTest/union.test | 20 +-- .../queries/PlannerTest/views.test | 18 +-- .../queries/QueryTest/spilling.test | 2 +- 8 files changed, 148 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d72353d0/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java b/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java index d546245..2c3124b 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java +++ b/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java @@ -417,17 +417,20 @@ public class DistributedPlanner { PlanFragment leftChildFragment, long perNodeMemLimit, ArrayList<PlanFragment> fragments) throws ImpalaException { + // For both join types, the total cost is calculated as the amount of data + // sent over the network, plus the amount of data inserted into the hash table. // broadcast: send the rightChildFragment's output to each node executing - // the leftChildFragment; the cost across all nodes is proportional to the - // total amount of data sent + // the leftChildFragment, and build a hash table with it on each node. Analyzer analyzer = ctx_.getRootAnalyzer(); PlanNode rhsTree = rightChildFragment.getPlanRoot(); long rhsDataSize = 0; long broadcastCost = Long.MAX_VALUE; - if (rhsTree.getCardinality() != -1 && leftChildFragment.getNumNodes() != -1) { + if (rhsTree.getCardinality() != -1) { rhsDataSize = Math.round( rhsTree.getCardinality() * ExchangeNode.getAvgSerializedRowSize(rhsTree)); - broadcastCost = rhsDataSize * leftChildFragment.getNumNodes(); + if (leftChildFragment.getNumNodes() != -1) { + broadcastCost = 2 * rhsDataSize * leftChildFragment.getNumNodes(); + } } LOG.debug("broadcast: cost=" + Long.toString(broadcastCost)); LOG.debug("card=" + Long.toString(rhsTree.getCardinality()) + " row_size=" @@ -435,7 +438,7 @@ public class DistributedPlanner { + Integer.toString(leftChildFragment.getNumNodes())); // repartition: both left- and rightChildFragment are partitioned on the - // join exprs + // join exprs, and a hash table is built with the rightChildFragment's output. PlanNode lhsTree = leftChildFragment.getPlanRoot(); long partitionCost = Long.MAX_VALUE; List<Expr> lhsJoinExprs = Lists.newArrayList(); @@ -453,13 +456,11 @@ public class DistributedPlanner { rhsHasCompatPartition = analyzer.equivSets(rhsJoinExprs, rightChildFragment.getDataPartition().getPartitionExprs()); - double lhsCost = (lhsHasCompatPartition) ? 0.0 : + double lhsNetworkCost = (lhsHasCompatPartition) ? 0.0 : Math.round( lhsTree.getCardinality() * ExchangeNode.getAvgSerializedRowSize(lhsTree)); - double rhsCost = (rhsHasCompatPartition) ? 0.0 : - Math.round( - rhsTree.getCardinality() * ExchangeNode.getAvgSerializedRowSize(rhsTree)); - partitionCost = Math.round(lhsCost + rhsCost); + double rhsNetworkCost = (rhsHasCompatPartition) ? 0.0 : rhsDataSize; + partitionCost = Math.round(lhsNetworkCost + rhsNetworkCost + rhsDataSize); } LOG.debug("partition: cost=" + Long.toString(partitionCost)); LOG.debug("lhs card=" + Long.toString(lhsTree.getCardinality()) + " row_size=" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d72353d0/testdata/workloads/functional-planner/queries/PlannerTest/joins.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/joins.test b/testdata/workloads/functional-planner/queries/PlannerTest/joins.test index ec3c7a2..b5b36a0 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/joins.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/joins.test @@ -189,13 +189,13 @@ and (b.double_col * c.tinyint_col > 1000 or c.tinyint_col < 1000) 02:SCAN HDFS [functional.alltypesaggnonulls c] partitions=2/10 files=2 size=148.10KB ---- DISTRIBUTEDPLAN -08:EXCHANGE [UNPARTITIONED] +09:EXCHANGE [UNPARTITIONED] | -04:HASH JOIN [LEFT OUTER JOIN, BROADCAST] +04:HASH JOIN [LEFT OUTER JOIN, PARTITIONED] | hash predicates: c.id = a.id, c.string_col = b.string_col | other predicates: a.tinyint_col = 15, b.string_col = '15', a.day >= 6, b.month > 2, a.tinyint_col + b.tinyint_col < 15, a.float_col - c.double_col < 0, (b.double_col * c.tinyint_col > 1000 OR c.tinyint_col < 1000) | -|--07:EXCHANGE [BROADCAST] +|--08:EXCHANGE [HASH(a.id,b.string_col)] | | | 03:HASH JOIN [FULL OUTER JOIN, PARTITIONED] | | hash predicates: a.id = b.id, a.int_col = b.int_col @@ -212,6 +212,8 @@ and (b.double_col * c.tinyint_col > 1000 or c.tinyint_col < 1000) | partitions=5/11 files=5 size=372.38KB | predicates: a.tinyint_col = 15 | +07:EXCHANGE [HASH(c.id,c.string_col)] +| 02:SCAN HDFS [functional.alltypesaggnonulls c] partitions=2/10 files=2 size=148.10KB ==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d72353d0/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-all.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-all.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-all.test index 334f7b2..492f7a4 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-all.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-all.test @@ -4715,19 +4715,19 @@ select * from ( partitions=120/120 files=120 size=21.31MB runtime filters: RF002 -> s.ss_item_sk, RF003 -> s.ss_sold_date_sk, RF005 -> s.ss_customer_sk ---- DISTRIBUTEDPLAN -30:MERGING-EXCHANGE [UNPARTITIONED] +31:MERGING-EXCHANGE [UNPARTITIONED] | order by: count(*) ASC | limit: 100 | 16:TOP-N [LIMIT=100] | order by: count(*) ASC | -29:AGGREGATE [FINALIZE] +30:AGGREGATE [FINALIZE] | output: count:merge(*) | group by: a.ca_state | having: count(*) >= 10 | -28:EXCHANGE [HASH(a.ca_state)] +29:EXCHANGE [HASH(a.ca_state)] | 15:AGGREGATE [STREAMING] | output: count(*) @@ -4738,13 +4738,13 @@ select * from ( | other join predicates: i.i_current_price > 1.2 * avg(j.i_current_price) | runtime filters: RF000 <- j.i_category | -|--27:EXCHANGE [BROADCAST] +|--28:EXCHANGE [BROADCAST] | | -| 26:AGGREGATE [FINALIZE] +| 27:AGGREGATE [FINALIZE] | | output: avg:merge(j.i_current_price) | | group by: j.i_category | | -| 25:EXCHANGE [HASH(j.i_category)] +| 26:EXCHANGE [HASH(j.i_category)] | | | 08:AGGREGATE [STREAMING] | | output: avg(j.i_current_price) @@ -4757,16 +4757,16 @@ select * from ( | hash predicates: d.d_month_seq = (d_month_seq) | runtime filters: RF001 <- (d_month_seq) | -|--24:EXCHANGE [BROADCAST] +|--25:EXCHANGE [BROADCAST] | | -| 23:EXCHANGE [UNPARTITIONED] +| 24:EXCHANGE [UNPARTITIONED] | | limit: 1 | | -| 22:AGGREGATE [FINALIZE] +| 23:AGGREGATE [FINALIZE] | | group by: (d_month_seq) | | limit: 1 | | -| 21:EXCHANGE [HASH((d_month_seq))] +| 22:EXCHANGE [HASH((d_month_seq))] | | | 06:AGGREGATE [STREAMING] | | group by: (d_month_seq) @@ -4779,7 +4779,7 @@ select * from ( | hash predicates: s.ss_item_sk = i.i_item_sk | runtime filters: RF002 <- i.i_item_sk | -|--20:EXCHANGE [BROADCAST] +|--21:EXCHANGE [BROADCAST] | | | 04:SCAN HDFS [tpcds.item i] | partitions=1/1 files=1 size=4.82MB @@ -4789,7 +4789,7 @@ select * from ( | hash predicates: s.ss_sold_date_sk = d.d_date_sk | runtime filters: RF003 <- d.d_date_sk | -|--19:EXCHANGE [BROADCAST] +|--20:EXCHANGE [BROADCAST] | | | 03:SCAN HDFS [tpcds.date_dim d] | partitions=1/1 files=1 size=9.84MB @@ -4799,38 +4799,40 @@ select * from ( | hash predicates: c.c_current_addr_sk = a.ca_address_sk | runtime filters: RF004 <- a.ca_address_sk | -|--18:EXCHANGE [BROADCAST] +|--19:EXCHANGE [BROADCAST] | | | 00:SCAN HDFS [tpcds.customer_address a] | partitions=1/1 files=1 size=5.25MB | -09:HASH JOIN [INNER JOIN, BROADCAST] +09:HASH JOIN [INNER JOIN, PARTITIONED] | hash predicates: s.ss_customer_sk = c.c_customer_sk | runtime filters: RF005 <- c.c_customer_sk | -|--17:EXCHANGE [BROADCAST] +|--18:EXCHANGE [HASH(c.c_customer_sk)] | | | 01:SCAN HDFS [tpcds.customer c] | partitions=1/1 files=1 size=12.60MB | runtime filters: RF004 -> c.c_current_addr_sk | +17:EXCHANGE [HASH(s.ss_customer_sk)] +| 02:SCAN HDFS [tpcds.store_sales s] partitions=120/120 files=120 size=21.31MB runtime filters: RF002 -> s.ss_item_sk, RF003 -> s.ss_sold_date_sk, RF005 -> s.ss_customer_sk ---- PARALLELPLANS -30:MERGING-EXCHANGE [UNPARTITIONED] +31:MERGING-EXCHANGE [UNPARTITIONED] | order by: count(*) ASC | limit: 100 | 16:TOP-N [LIMIT=100] | order by: count(*) ASC | -29:AGGREGATE [FINALIZE] +30:AGGREGATE [FINALIZE] | output: count:merge(*) | group by: a.ca_state | having: count(*) >= 10 | -28:EXCHANGE [HASH(a.ca_state)] +29:EXCHANGE [HASH(a.ca_state)] | 15:AGGREGATE [STREAMING] | output: count(*) @@ -4845,13 +4847,13 @@ select * from ( | | join-table-id=00 plan-id=01 cohort-id=01 | | build expressions: j.i_category | | -| 27:EXCHANGE [BROADCAST] +| 28:EXCHANGE [BROADCAST] | | -| 26:AGGREGATE [FINALIZE] +| 27:AGGREGATE [FINALIZE] | | output: avg:merge(j.i_current_price) | | group by: j.i_category | | -| 25:EXCHANGE [HASH(j.i_category)] +| 26:EXCHANGE [HASH(j.i_category)] | | | 08:AGGREGATE [STREAMING] | | output: avg(j.i_current_price) @@ -4868,16 +4870,16 @@ select * from ( | | join-table-id=01 plan-id=02 cohort-id=01 | | build expressions: (d_month_seq) | | -| 24:EXCHANGE [BROADCAST] +| 25:EXCHANGE [BROADCAST] | | -| 23:EXCHANGE [UNPARTITIONED] +| 24:EXCHANGE [UNPARTITIONED] | | limit: 1 | | -| 22:AGGREGATE [FINALIZE] +| 23:AGGREGATE [FINALIZE] | | group by: (d_month_seq) | | limit: 1 | | -| 21:EXCHANGE [HASH((d_month_seq))] +| 22:EXCHANGE [HASH((d_month_seq))] | | | 06:AGGREGATE [STREAMING] | | group by: (d_month_seq) @@ -4894,7 +4896,7 @@ select * from ( | | join-table-id=02 plan-id=03 cohort-id=01 | | build expressions: i.i_item_sk | | -| 20:EXCHANGE [BROADCAST] +| 21:EXCHANGE [BROADCAST] | | | 04:SCAN HDFS [tpcds.item i] | partitions=1/1 files=1 size=4.82MB @@ -4908,7 +4910,7 @@ select * from ( | | join-table-id=03 plan-id=04 cohort-id=01 | | build expressions: d.d_date_sk | | -| 19:EXCHANGE [BROADCAST] +| 20:EXCHANGE [BROADCAST] | | | 03:SCAN HDFS [tpcds.date_dim d] | partitions=1/1 files=1 size=9.84MB @@ -4922,12 +4924,12 @@ select * from ( | | join-table-id=04 plan-id=05 cohort-id=01 | | build expressions: a.ca_address_sk | | -| 18:EXCHANGE [BROADCAST] +| 19:EXCHANGE [BROADCAST] | | | 00:SCAN HDFS [tpcds.customer_address a] | partitions=1/1 files=1 size=5.25MB | -09:HASH JOIN [INNER JOIN, BROADCAST] +09:HASH JOIN [INNER JOIN, PARTITIONED] | hash predicates: s.ss_customer_sk = c.c_customer_sk | runtime filters: RF005 <- c.c_customer_sk | @@ -4935,12 +4937,14 @@ select * from ( | | join-table-id=05 plan-id=06 cohort-id=01 | | build expressions: c.c_customer_sk | | -| 17:EXCHANGE [BROADCAST] +| 18:EXCHANGE [HASH(c.c_customer_sk)] | | | 01:SCAN HDFS [tpcds.customer c] | partitions=1/1 files=1 size=12.60MB | runtime filters: RF004 -> c.c_current_addr_sk | +17:EXCHANGE [HASH(s.ss_customer_sk)] +| 02:SCAN HDFS [tpcds.store_sales s] partitions=120/120 files=120 size=21.31MB runtime filters: RF002 -> s.ss_item_sk, RF003 -> s.ss_sold_date_sk, RF005 -> s.ss_customer_sk http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d72353d0/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test index 4e7851f..7864153 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test @@ -1343,17 +1343,17 @@ order by partitions=1/1 files=1 size=23.08MB runtime filters: RF002 -> c_nationkey, RF003 -> c_custkey ---- DISTRIBUTEDPLAN -26:MERGING-EXCHANGE [UNPARTITIONED] +28:MERGING-EXCHANGE [UNPARTITIONED] | order by: o_year ASC | 16:SORT | order by: o_year ASC | -25:AGGREGATE [FINALIZE] +27:AGGREGATE [FINALIZE] | output: sum:merge(CASE WHEN nation = 'BRAZIL' THEN volume ELSE 0 END), sum:merge(volume) | group by: o_year | -24:EXCHANGE [HASH(o_year)] +26:EXCHANGE [HASH(o_year)] | 15:AGGREGATE [STREAMING] | output: sum(CASE WHEN n2.n_name = 'BRAZIL' THEN l_extendedprice * (1 - l_discount) ELSE 0 END), sum(l_extendedprice * (1 - l_discount)) @@ -1363,7 +1363,7 @@ order by | hash predicates: s_nationkey = n2.n_nationkey | runtime filters: RF000 <- n2.n_nationkey | -|--23:EXCHANGE [BROADCAST] +|--25:EXCHANGE [BROADCAST] | | | 06:SCAN HDFS [tpch.nation n2] | partitions=1/1 files=1 size=2.15KB @@ -1372,7 +1372,7 @@ order by | hash predicates: n1.n_regionkey = r_regionkey | runtime filters: RF001 <- r_regionkey | -|--22:EXCHANGE [BROADCAST] +|--24:EXCHANGE [BROADCAST] | | | 07:SCAN HDFS [tpch.region] | partitions=1/1 files=1 size=384B @@ -1382,7 +1382,7 @@ order by | hash predicates: c_nationkey = n1.n_nationkey | runtime filters: RF002 <- n1.n_nationkey | -|--21:EXCHANGE [BROADCAST] +|--23:EXCHANGE [BROADCAST] | | | 05:SCAN HDFS [tpch.nation n1] | partitions=1/1 files=1 size=2.15KB @@ -1392,23 +1392,25 @@ order by | hash predicates: c_custkey = o_custkey | runtime filters: RF003 <- o_custkey | -|--20:EXCHANGE [BROADCAST] +|--22:EXCHANGE [BROADCAST] | | -| 10:HASH JOIN [INNER JOIN, BROADCAST] +| 10:HASH JOIN [INNER JOIN, PARTITIONED] | | hash predicates: l_suppkey = s_suppkey | | runtime filters: RF004 <- s_suppkey | | -| |--19:EXCHANGE [BROADCAST] +| |--21:EXCHANGE [HASH(s_suppkey)] | | | | | 01:SCAN HDFS [tpch.supplier] | | partitions=1/1 files=1 size=1.33MB | | runtime filters: RF000 -> s_nationkey | | -| 09:HASH JOIN [INNER JOIN, BROADCAST] +| 20:EXCHANGE [HASH(l_suppkey)] +| | +| 09:HASH JOIN [INNER JOIN, PARTITIONED] | | hash predicates: o_orderkey = l_orderkey | | runtime filters: RF005 <- l_orderkey | | -| |--18:EXCHANGE [BROADCAST] +| |--19:EXCHANGE [HASH(l_orderkey)] | | | | | 08:HASH JOIN [INNER JOIN, BROADCAST] | | | hash predicates: l_partkey = p_partkey @@ -1424,6 +1426,8 @@ order by | | partitions=1/1 files=1 size=718.94MB | | runtime filters: RF004 -> l_suppkey, RF006 -> l_partkey | | +| 18:EXCHANGE [HASH(o_orderkey)] +| | | 03:SCAN HDFS [tpch.orders] | partitions=1/1 files=1 size=162.56MB | predicates: o_orderdate >= '1995-01-01', o_orderdate <= '1996-12-31' @@ -1433,17 +1437,17 @@ order by partitions=1/1 files=1 size=23.08MB runtime filters: RF002 -> c_nationkey, RF003 -> c_custkey ---- PARALLELPLANS -26:MERGING-EXCHANGE [UNPARTITIONED] +28:MERGING-EXCHANGE [UNPARTITIONED] | order by: o_year ASC | 16:SORT | order by: o_year ASC | -25:AGGREGATE [FINALIZE] +27:AGGREGATE [FINALIZE] | output: sum:merge(CASE WHEN nation = 'BRAZIL' THEN volume ELSE 0 END), sum:merge(volume) | group by: o_year | -24:EXCHANGE [HASH(o_year)] +26:EXCHANGE [HASH(o_year)] | 15:AGGREGATE [STREAMING] | output: sum(CASE WHEN n2.n_name = 'BRAZIL' THEN l_extendedprice * (1 - l_discount) ELSE 0 END), sum(l_extendedprice * (1 - l_discount)) @@ -1457,7 +1461,7 @@ order by | | join-table-id=00 plan-id=01 cohort-id=01 | | build expressions: n2.n_nationkey | | -| 23:EXCHANGE [BROADCAST] +| 25:EXCHANGE [BROADCAST] | | | 06:SCAN HDFS [tpch.nation n2] | partitions=1/1 files=1 size=2.15KB @@ -1470,7 +1474,7 @@ order by | | join-table-id=01 plan-id=02 cohort-id=01 | | build expressions: r_regionkey | | -| 22:EXCHANGE [BROADCAST] +| 24:EXCHANGE [BROADCAST] | | | 07:SCAN HDFS [tpch.region] | partitions=1/1 files=1 size=384B @@ -1484,7 +1488,7 @@ order by | | join-table-id=02 plan-id=03 cohort-id=01 | | build expressions: n1.n_nationkey | | -| 21:EXCHANGE [BROADCAST] +| 23:EXCHANGE [BROADCAST] | | | 05:SCAN HDFS [tpch.nation n1] | partitions=1/1 files=1 size=2.15KB @@ -1498,9 +1502,9 @@ order by | | join-table-id=03 plan-id=04 cohort-id=01 | | build expressions: o_custkey | | -| 20:EXCHANGE [BROADCAST] +| 22:EXCHANGE [BROADCAST] | | -| 10:HASH JOIN [INNER JOIN, BROADCAST] +| 10:HASH JOIN [INNER JOIN, PARTITIONED] | | hash predicates: l_suppkey = s_suppkey | | runtime filters: RF004 <- s_suppkey | | @@ -1508,13 +1512,15 @@ order by | | | join-table-id=04 plan-id=05 cohort-id=02 | | | build expressions: s_suppkey | | | -| | 19:EXCHANGE [BROADCAST] +| | 21:EXCHANGE [HASH(s_suppkey)] | | | | | 01:SCAN HDFS [tpch.supplier] | | partitions=1/1 files=1 size=1.33MB | | runtime filters: RF000 -> s_nationkey | | -| 09:HASH JOIN [INNER JOIN, BROADCAST] +| 20:EXCHANGE [HASH(l_suppkey)] +| | +| 09:HASH JOIN [INNER JOIN, PARTITIONED] | | hash predicates: o_orderkey = l_orderkey | | runtime filters: RF005 <- l_orderkey | | @@ -1522,7 +1528,7 @@ order by | | | join-table-id=05 plan-id=06 cohort-id=02 | | | build expressions: l_orderkey | | | -| | 18:EXCHANGE [BROADCAST] +| | 19:EXCHANGE [HASH(l_orderkey)] | | | | | 08:HASH JOIN [INNER JOIN, BROADCAST] | | | hash predicates: l_partkey = p_partkey @@ -1542,6 +1548,8 @@ order by | | partitions=1/1 files=1 size=718.94MB | | runtime filters: RF004 -> l_suppkey, RF006 -> l_partkey | | +| 18:EXCHANGE [HASH(o_orderkey)] +| | | 03:SCAN HDFS [tpch.orders] | partitions=1/1 files=1 size=162.56MB | predicates: o_orderdate >= '1995-01-01', o_orderdate <= '1996-12-31' @@ -2292,53 +2300,55 @@ order by partitions=1/1 files=1 size=162.56MB runtime filters: RF000 -> o_orderkey ---- DISTRIBUTEDPLAN -08:MERGING-EXCHANGE [UNPARTITIONED] +09:MERGING-EXCHANGE [UNPARTITIONED] | order by: l_shipmode ASC | 04:SORT | order by: l_shipmode ASC | -07:AGGREGATE [FINALIZE] +08:AGGREGATE [FINALIZE] | output: sum:merge(CASE WHEN o_orderpriority = '1-URGENT' OR o_orderpriority = '2-HIGH' THEN 1 ELSE 0 END), sum:merge(CASE WHEN o_orderpriority != '1-URGENT' AND o_orderpriority != '2-HIGH' THEN 1 ELSE 0 END) | group by: l_shipmode | -06:EXCHANGE [HASH(l_shipmode)] +07:EXCHANGE [HASH(l_shipmode)] | 03:AGGREGATE [STREAMING] | output: sum(CASE WHEN o_orderpriority = '1-URGENT' OR o_orderpriority = '2-HIGH' THEN 1 ELSE 0 END), sum(CASE WHEN o_orderpriority != '1-URGENT' AND o_orderpriority != '2-HIGH' THEN 1 ELSE 0 END) | group by: l_shipmode | -02:HASH JOIN [INNER JOIN, BROADCAST] +02:HASH JOIN [INNER JOIN, PARTITIONED] | hash predicates: o_orderkey = l_orderkey | runtime filters: RF000 <- l_orderkey | -|--05:EXCHANGE [BROADCAST] +|--06:EXCHANGE [HASH(l_orderkey)] | | | 01:SCAN HDFS [tpch.lineitem] | partitions=1/1 files=1 size=718.94MB | predicates: l_shipmode IN ('MAIL', 'SHIP'), l_commitdate < l_receiptdate, l_shipdate < l_commitdate, l_receiptdate >= '1994-01-01', l_receiptdate < '1995-01-01' | +05:EXCHANGE [HASH(o_orderkey)] +| 00:SCAN HDFS [tpch.orders] partitions=1/1 files=1 size=162.56MB runtime filters: RF000 -> o_orderkey ---- PARALLELPLANS -08:MERGING-EXCHANGE [UNPARTITIONED] +09:MERGING-EXCHANGE [UNPARTITIONED] | order by: l_shipmode ASC | 04:SORT | order by: l_shipmode ASC | -07:AGGREGATE [FINALIZE] +08:AGGREGATE [FINALIZE] | output: sum:merge(CASE WHEN o_orderpriority = '1-URGENT' OR o_orderpriority = '2-HIGH' THEN 1 ELSE 0 END), sum:merge(CASE WHEN o_orderpriority != '1-URGENT' AND o_orderpriority != '2-HIGH' THEN 1 ELSE 0 END) | group by: l_shipmode | -06:EXCHANGE [HASH(l_shipmode)] +07:EXCHANGE [HASH(l_shipmode)] | 03:AGGREGATE [STREAMING] | output: sum(CASE WHEN o_orderpriority = '1-URGENT' OR o_orderpriority = '2-HIGH' THEN 1 ELSE 0 END), sum(CASE WHEN o_orderpriority != '1-URGENT' AND o_orderpriority != '2-HIGH' THEN 1 ELSE 0 END) | group by: l_shipmode | -02:HASH JOIN [INNER JOIN, BROADCAST] +02:HASH JOIN [INNER JOIN, PARTITIONED] | hash predicates: o_orderkey = l_orderkey | runtime filters: RF000 <- l_orderkey | @@ -2346,12 +2356,14 @@ order by | | join-table-id=00 plan-id=01 cohort-id=01 | | build expressions: l_orderkey | | -| 05:EXCHANGE [BROADCAST] +| 06:EXCHANGE [HASH(l_orderkey)] | | | 01:SCAN HDFS [tpch.lineitem] | partitions=1/1 files=1 size=718.94MB | predicates: l_shipmode IN ('MAIL', 'SHIP'), l_commitdate < l_receiptdate, l_shipdate < l_commitdate, l_receiptdate >= '1994-01-01', l_receiptdate < '1995-01-01' | +05:EXCHANGE [HASH(o_orderkey)] +| 00:SCAN HDFS [tpch.orders] partitions=1/1 files=1 size=162.56MB runtime filters: RF000 -> o_orderkey @@ -2509,37 +2521,39 @@ where predicates: l_shipdate >= '1995-09-01', l_shipdate < '1995-10-01' runtime filters: RF000 -> l_partkey ---- DISTRIBUTEDPLAN -06:AGGREGATE [FINALIZE] +07:AGGREGATE [FINALIZE] | output: sum:merge(CASE WHEN p_type LIKE 'PROMO%' THEN l_extendedprice * (1 - l_discount) ELSE 0.0 END), sum:merge(l_extendedprice * (1 - l_discount)) | -05:EXCHANGE [UNPARTITIONED] +06:EXCHANGE [UNPARTITIONED] | 03:AGGREGATE | output: sum(CASE WHEN p_type LIKE 'PROMO%' THEN l_extendedprice * (1 - l_discount) ELSE 0.0 END), sum(l_extendedprice * (1 - l_discount)) | -02:HASH JOIN [INNER JOIN, BROADCAST] +02:HASH JOIN [INNER JOIN, PARTITIONED] | hash predicates: l_partkey = p_partkey | runtime filters: RF000 <- p_partkey | -|--04:EXCHANGE [BROADCAST] +|--05:EXCHANGE [HASH(p_partkey)] | | | 01:SCAN HDFS [tpch.part] | partitions=1/1 files=1 size=22.83MB | +04:EXCHANGE [HASH(l_partkey)] +| 00:SCAN HDFS [tpch.lineitem] partitions=1/1 files=1 size=718.94MB predicates: l_shipdate >= '1995-09-01', l_shipdate < '1995-10-01' runtime filters: RF000 -> l_partkey ---- PARALLELPLANS -06:AGGREGATE [FINALIZE] +07:AGGREGATE [FINALIZE] | output: sum:merge(CASE WHEN p_type LIKE 'PROMO%' THEN l_extendedprice * (1 - l_discount) ELSE 0.0 END), sum:merge(l_extendedprice * (1 - l_discount)) | -05:EXCHANGE [UNPARTITIONED] +06:EXCHANGE [UNPARTITIONED] | 03:AGGREGATE | output: sum(CASE WHEN p_type LIKE 'PROMO%' THEN l_extendedprice * (1 - l_discount) ELSE 0.0 END), sum(l_extendedprice * (1 - l_discount)) | -02:HASH JOIN [INNER JOIN, BROADCAST] +02:HASH JOIN [INNER JOIN, PARTITIONED] | hash predicates: l_partkey = p_partkey | runtime filters: RF000 <- p_partkey | @@ -2547,11 +2561,13 @@ where | | join-table-id=00 plan-id=01 cohort-id=01 | | build expressions: p_partkey | | -| 04:EXCHANGE [BROADCAST] +| 05:EXCHANGE [HASH(p_partkey)] | | | 01:SCAN HDFS [tpch.part] | partitions=1/1 files=1 size=22.83MB | +04:EXCHANGE [HASH(l_partkey)] +| 00:SCAN HDFS [tpch.lineitem] partitions=1/1 files=1 size=718.94MB predicates: l_shipdate >= '1995-09-01', l_shipdate < '1995-10-01' @@ -3698,13 +3714,11 @@ limit 100 | | other join predicates: l2.l_suppkey != l1.l_suppkey | | runtime filters: RF000 <- l1.l_orderkey | | -| |--17:EXCHANGE [HASH(l1.l_orderkey)] -| | | -| | 08:HASH JOIN [INNER JOIN, BROADCAST] +| |--08:HASH JOIN [INNER JOIN, BROADCAST] | | | hash predicates: s_nationkey = n_nationkey | | | runtime filters: RF001 <- n_nationkey | | | -| | |--15:EXCHANGE [BROADCAST] +| | |--16:EXCHANGE [BROADCAST] | | | | | | | 03:SCAN HDFS [tpch.nation] | | | partitions=1/1 files=1 size=2.15KB @@ -3714,28 +3728,30 @@ limit 100 | | | hash predicates: l1.l_suppkey = s_suppkey | | | runtime filters: RF002 <- s_suppkey | | | -| | |--14:EXCHANGE [BROADCAST] +| | |--15:EXCHANGE [BROADCAST] | | | | | | | 00:SCAN HDFS [tpch.supplier] | | | partitions=1/1 files=1 size=1.33MB | | | runtime filters: RF001 -> s_nationkey | | | -| | 06:HASH JOIN [INNER JOIN, BROADCAST] +| | 06:HASH JOIN [INNER JOIN, PARTITIONED] | | | hash predicates: l1.l_orderkey = o_orderkey | | | runtime filters: RF003 <- o_orderkey | | | -| | |--13:EXCHANGE [BROADCAST] +| | |--14:EXCHANGE [HASH(o_orderkey)] | | | | | | | 02:SCAN HDFS [tpch.orders] | | | partitions=1/1 files=1 size=162.56MB | | | predicates: o_orderstatus = 'F' | | | +| | 13:EXCHANGE [HASH(l1.l_orderkey)] +| | | | | 01:SCAN HDFS [tpch.lineitem l1] | | partitions=1/1 files=1 size=718.94MB | | predicates: l1.l_receiptdate > l1.l_commitdate | | runtime filters: RF002 -> l1.l_suppkey, RF003 -> l1.l_orderkey | | -| 16:EXCHANGE [HASH(l2.l_orderkey)] +| 17:EXCHANGE [HASH(l2.l_orderkey)] | | | 04:SCAN HDFS [tpch.lineitem l2] | partitions=1/1 files=1 size=718.94MB @@ -3781,8 +3797,6 @@ limit 100 | | | join-table-id=01 plan-id=02 cohort-id=02 | | | build expressions: l1.l_orderkey | | | -| | 17:EXCHANGE [HASH(l1.l_orderkey)] -| | | | | 08:HASH JOIN [INNER JOIN, BROADCAST] | | | hash predicates: s_nationkey = n_nationkey | | | runtime filters: RF001 <- n_nationkey @@ -3791,7 +3805,7 @@ limit 100 | | | | join-table-id=02 plan-id=03 cohort-id=03 | | | | build expressions: n_nationkey | | | | -| | | 15:EXCHANGE [BROADCAST] +| | | 16:EXCHANGE [BROADCAST] | | | | | | | 03:SCAN HDFS [tpch.nation] | | | partitions=1/1 files=1 size=2.15KB @@ -3805,13 +3819,13 @@ limit 100 | | | | join-table-id=03 plan-id=04 cohort-id=03 | | | | build expressions: s_suppkey | | | | -| | | 14:EXCHANGE [BROADCAST] +| | | 15:EXCHANGE [BROADCAST] | | | | | | | 00:SCAN HDFS [tpch.supplier] | | | partitions=1/1 files=1 size=1.33MB | | | runtime filters: RF001 -> s_nationkey | | | -| | 06:HASH JOIN [INNER JOIN, BROADCAST] +| | 06:HASH JOIN [INNER JOIN, PARTITIONED] | | | hash predicates: l1.l_orderkey = o_orderkey | | | runtime filters: RF003 <- o_orderkey | | | @@ -3819,18 +3833,20 @@ limit 100 | | | | join-table-id=04 plan-id=05 cohort-id=03 | | | | build expressions: o_orderkey | | | | -| | | 13:EXCHANGE [BROADCAST] +| | | 14:EXCHANGE [HASH(o_orderkey)] | | | | | | | 02:SCAN HDFS [tpch.orders] | | | partitions=1/1 files=1 size=162.56MB | | | predicates: o_orderstatus = 'F' | | | +| | 13:EXCHANGE [HASH(l1.l_orderkey)] +| | | | | 01:SCAN HDFS [tpch.lineitem l1] | | partitions=1/1 files=1 size=718.94MB | | predicates: l1.l_receiptdate > l1.l_commitdate | | runtime filters: RF002 -> l1.l_suppkey, RF003 -> l1.l_orderkey | | -| 16:EXCHANGE [HASH(l2.l_orderkey)] +| 17:EXCHANGE [HASH(l2.l_orderkey)] | | | 04:SCAN HDFS [tpch.lineitem l2] | partitions=1/1 files=1 size=718.94MB http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d72353d0/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test index 1f0e315..4354814 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test @@ -1887,14 +1887,12 @@ where 06:AGGREGATE | output: sum(l_extendedprice) | -05:HASH JOIN [LEFT SEMI JOIN, BROADCAST] +05:HASH JOIN [LEFT SEMI JOIN, PARTITIONED] | hash predicates: p_partkey = l_partkey | other join predicates: l_quantity < 0.2 * avg(l_quantity) | runtime filters: RF000 <- l_partkey | -|--10:EXCHANGE [BROADCAST] -| | -| 09:AGGREGATE [FINALIZE] +|--09:AGGREGATE [FINALIZE] | | output: avg:merge(l_quantity) | | group by: l_partkey | | @@ -1907,6 +1905,8 @@ where | 02:SCAN HDFS [tpch_nested_parquet.customer.c_orders.o_lineitems l] | partitions=1/1 files=4 size=577.87MB | +10:EXCHANGE [HASH(p_partkey)] +| 04:HASH JOIN [INNER JOIN, BROADCAST] | hash predicates: l_partkey = p_partkey | runtime filters: RF001 <- p_partkey http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d72353d0/testdata/workloads/functional-planner/queries/PlannerTest/union.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/union.test b/testdata/workloads/functional-planner/queries/PlannerTest/union.test index 1dfbdcc..7e9549a 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/union.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/union.test @@ -2469,7 +2469,7 @@ select 1000, 2000 01:SCAN HDFS [functional.alltypestiny] partitions=4/4 files=4 size=460B ---- DISTRIBUTEDPLAN -10:EXCHANGE [UNPARTITIONED] +11:EXCHANGE [UNPARTITIONED] | 00:UNION | constant-operands=1 @@ -2477,15 +2477,17 @@ select 1000, 2000 |--01:SCAN HDFS [functional.alltypestiny] | partitions=4/4 files=4 size=460B | -|--06:HASH JOIN [INNER JOIN, BROADCAST] +|--06:HASH JOIN [INNER JOIN, PARTITIONED] | | hash predicates: a.id = b.id | | runtime filters: RF000 <- b.id | | -| |--09:EXCHANGE [BROADCAST] +| |--10:EXCHANGE [HASH(b.id)] | | | | | 05:SCAN HDFS [functional.alltypestiny b] | | partitions=4/4 files=4 size=460B | | +| 09:EXCHANGE [HASH(a.id)] +| | | 04:SCAN HDFS [functional.alltypestiny a] | partitions=4/4 files=4 size=460B | runtime filters: RF000 -> a.id @@ -2602,7 +2604,7 @@ select 1000, 2000 01:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB ---- DISTRIBUTEDPLAN -19:EXCHANGE [UNPARTITIONED] +20:EXCHANGE [UNPARTITIONED] | 00:UNION | constant-operands=1 @@ -2610,7 +2612,7 @@ select 1000, 2000 |--05:SCAN HDFS [functional.alltypestiny] | partitions=4/4 files=4 size=460B | -|--18:EXCHANGE [RANDOM] +|--19:EXCHANGE [RANDOM] | | | 13:MERGING-EXCHANGE [UNPARTITIONED] | | order by: id ASC @@ -2622,7 +2624,7 @@ select 1000, 2000 | 03:SCAN HDFS [functional.alltypessmall] | partitions=4/4 files=4 size=6.32KB | -|--17:EXCHANGE [RANDOM] +|--18:EXCHANGE [RANDOM] | | | 12:AGGREGATE [FINALIZE] | | output: count:merge(id), sum:merge(bigint_col) @@ -2635,15 +2637,17 @@ select 1000, 2000 | 01:SCAN HDFS [functional.alltypes] | partitions=24/24 files=24 size=478.45KB | -|--10:HASH JOIN [INNER JOIN, BROADCAST] +|--10:HASH JOIN [INNER JOIN, PARTITIONED] | | hash predicates: a.id = b.id | | runtime filters: RF000 <- b.id | | -| |--16:EXCHANGE [BROADCAST] +| |--17:EXCHANGE [HASH(b.id)] | | | | | 09:SCAN HDFS [functional.alltypestiny b] | | partitions=4/4 files=4 size=460B | | +| 16:EXCHANGE [HASH(a.id)] +| | | 08:SCAN HDFS [functional.alltypestiny a] | partitions=4/4 files=4 size=460B | runtime filters: RF000 -> a.id http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d72353d0/testdata/workloads/functional-planner/queries/PlannerTest/views.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/views.test b/testdata/workloads/functional-planner/queries/PlannerTest/views.test index 9836d90..50bee61 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/views.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/views.test @@ -155,27 +155,27 @@ functional.complex_view t3 where t1.id = t2.x and t2.x = t3.abc predicates: functional.alltypes.id > 1 runtime filters: RF000 -> functional.alltypes.id, RF001 -> functional.alltypes.id ---- DISTRIBUTEDPLAN -15:EXCHANGE [UNPARTITIONED] +16:EXCHANGE [UNPARTITIONED] | 08:HASH JOIN [INNER JOIN, BROADCAST] | hash predicates: int_col = count(a.bigint_col) | runtime filters: RF000 <- count(a.bigint_col) | -|--14:EXCHANGE [BROADCAST] +|--15:EXCHANGE [BROADCAST] | | -| 13:MERGING-EXCHANGE [UNPARTITIONED] +| 14:MERGING-EXCHANGE [UNPARTITIONED] | | order by: b.string_col ASC | | limit: 100 | | | 06:TOP-N [LIMIT=100] | | order by: b.string_col ASC | | -| 12:AGGREGATE [FINALIZE] +| 13:AGGREGATE [FINALIZE] | | output: count:merge(a.bigint_col) | | group by: b.string_col | | having: count(a.bigint_col) > 1 | | -| 11:EXCHANGE [HASH(b.string_col)] +| 12:EXCHANGE [HASH(b.string_col)] | | | 05:AGGREGATE [STREAMING] | | output: count(a.bigint_col) @@ -185,7 +185,7 @@ functional.complex_view t3 where t1.id = t2.x and t2.x = t3.abc | | hash predicates: a.id = b.id | | runtime filters: RF002 <- b.id | | -| |--10:EXCHANGE [BROADCAST] +| |--11:EXCHANGE [BROADCAST] | | | | | 03:SCAN HDFS [functional.alltypestiny b] | | partitions=4/4 files=4 size=460B @@ -195,17 +195,19 @@ functional.complex_view t3 where t1.id = t2.x and t2.x = t3.abc | predicates: a.bigint_col < 50 | runtime filters: RF002 -> a.id | -07:HASH JOIN [INNER JOIN, BROADCAST] +07:HASH JOIN [INNER JOIN, PARTITIONED] | hash predicates: functional.alltypes.id = int_col | runtime filters: RF001 <- int_col | -|--09:EXCHANGE [BROADCAST] +|--10:EXCHANGE [HASH(int_col)] | | | 01:SCAN HDFS [functional.alltypes] | partitions=24/24 files=24 size=478.45KB | predicates: functional.alltypes.int_col > 1 | runtime filters: RF000 -> int_col | +09:EXCHANGE [HASH(functional.alltypes.id)] +| 00:SCAN HDFS [functional.alltypes] partitions=24/24 files=24 size=478.45KB predicates: functional.alltypes.id > 1 http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d72353d0/testdata/workloads/functional-query/queries/QueryTest/spilling.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling.test b/testdata/workloads/functional-query/queries/QueryTest/spilling.test index e97afc5..91b425e 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/spilling.test +++ b/testdata/workloads/functional-query/queries/QueryTest/spilling.test @@ -203,7 +203,7 @@ select count(*) as numwait from supplier, - lineitem l1, + lineitem l1 join [BROADCAST] orders, nation where
