HIVE-17529: Bucket Map Join : Sets incorrect edge type causing execution failure (Deepak Jaiswal, reviewed by Jason Dere)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8cdee629 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8cdee629 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8cdee629 Branch: refs/heads/master Commit: 8cdee6297f545ec1aa9247dd24a02766d88fae1e Parents: afd0d9f Author: Jason Dere <[email protected]> Authored: Thu Sep 21 13:24:03 2017 -0700 Committer: Jason Dere <[email protected]> Committed: Thu Sep 21 13:24:03 2017 -0700 ---------------------------------------------------------------------- .../hadoop/hive/ql/exec/OperatorUtils.java | 4 +- .../clientpositive/bucket_map_join_tez1.q | 149 +- .../clientpositive/bucket_map_join_tez2.q | 57 +- .../llap/bucket_map_join_tez1.q.out | 4401 ++++++++++++++---- .../llap/bucket_map_join_tez2.q.out | 1340 +++++- .../spark/bucket_map_join_tez1.q.out | 4265 +++++++++++++---- .../spark/bucket_map_join_tez2.q.out | 1330 +++++- 7 files changed, 9338 insertions(+), 2208 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/8cdee629/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java index e79d100..cddcf16 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java @@ -128,10 +128,10 @@ public class OperatorUtils { for (Operator<?> parent : start.getParentOperators()) { if (onlyIncludeIndex >= 0) { if (onlyIncludeIndex == i) { - findOperatorsUpstream(parent, clazz, found); + findOperatorsUpstreamJoinAccounted(parent, clazz, found); } } else { - findOperatorsUpstream(parent, clazz, found); + findOperatorsUpstreamJoinAccounted(parent, clazz, found); } i++; } http://git-wip-us.apache.org/repos/asf/hive/blob/8cdee629/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q b/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q index aeb244a..cac1d6a 100644 --- a/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q +++ b/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q @@ -28,86 +28,185 @@ CREATE TABLE tab(key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY ( insert overwrite table tab partition (ds='2008-04-08') select key,value from srcbucket_mapjoin; +analyze table srcbucket_mapjoin compute statistics for columns; +analyze table srcbucket_mapjoin_part compute statistics for columns; +analyze table tab compute statistics for columns; +analyze table tab_part compute statistics for columns; + +set hive.convert.join.bucket.mapjoin.tez = false; +explain +select a.key, a.value, b.value +from tab a join tab_part b on a.key = b.key order by a.key, a.value, b.value; +select a.key, a.value, b.value +from tab a join tab_part b on a.key = b.key order by a.key, a.value, b.value; + set hive.convert.join.bucket.mapjoin.tez = true; explain select a.key, a.value, b.value -from tab a join tab_part b on a.key = b.key; +from tab a join tab_part b on a.key = b.key order by a.key, a.value, b.value; +select a.key, a.value, b.value +from tab a join tab_part b on a.key = b.key order by a.key, a.value, b.value; + +set hive.auto.convert.join.noconditionaltask.size=900; +set hive.convert.join.bucket.mapjoin.tez = false; explain select count(*) from -(select distinct key, value from tab_part) a join tab b on a.key = b.key; - +(select distinct key from tab_part) a join tab b on a.key = b.key; select count(*) from -(select distinct key, value from tab_part) a join tab b on a.key = b.key; +(select distinct key from tab_part) a join tab b on a.key = b.key; + +set hive.convert.join.bucket.mapjoin.tez = true; +explain +select count(*) +from +(select distinct key from tab_part) a join tab b on a.key = b.key; +select count(*) +from +(select distinct key from tab_part) a join tab b on a.key = b.key; + +set hive.convert.join.bucket.mapjoin.tez = false; explain select count(*) from (select a.key as key, a.value as value from tab a join tab_part b on a.key = b.key) c join tab_part d on c.key = d.key; +select count(*) +from +(select a.key as key, a.value as value from tab a join tab_part b on a.key = b.key) c +join +tab_part d on c.key = d.key; +set hive.convert.join.bucket.mapjoin.tez = true; +explain +select count(*) +from +(select a.key as key, a.value as value from tab a join tab_part b on a.key = b.key) c +join +tab_part d on c.key = d.key; select count(*) from (select a.key as key, a.value as value from tab a join tab_part b on a.key = b.key) c join tab_part d on c.key = d.key; +set hive.convert.join.bucket.mapjoin.tez = false; explain select count(*) from tab_part d join (select a.key as key, a.value as value from tab a join tab_part b on a.key = b.key) c on c.key = d.key; - select count(*) from tab_part d join (select a.key as key, a.value as value from tab a join tab_part b on a.key = b.key) c on c.key = d.key; +set hive.convert.join.bucket.mapjoin.tez = true; +explain +select count(*) +from +tab_part d +join +(select a.key as key, a.value as value from tab a join tab_part b on a.key = b.key) c on c.key = d.key; +select count(*) +from +tab_part d +join +(select a.key as key, a.value as value from tab a join tab_part b on a.key = b.key) c on c.key = d.key; -- one side is really bucketed. srcbucket_mapjoin is not really a bucketed table. -- In this case the sub-query is chosen as the big table. +set hive.convert.join.bucket.mapjoin.tez = false; +set hive.auto.convert.join.noconditionaltask.size=1000; explain select a.k1, a.v1, b.value from (select sum(substr(srcbucket_mapjoin.value,5)) as v1, key as k1 from srcbucket_mapjoin GROUP BY srcbucket_mapjoin.key) a join tab b on a.k1 = b.key; +set hive.convert.join.bucket.mapjoin.tez = true; +explain +select a.k1, a.v1, b.value +from (select sum(substr(srcbucket_mapjoin.value,5)) as v1, key as k1 from srcbucket_mapjoin GROUP BY srcbucket_mapjoin.key) a + join tab b on a.k1 = b.key; +set hive.convert.join.bucket.mapjoin.tez = false; explain select a.k1, a.v1, b.value from (select sum(substr(tab.value,5)) as v1, key as k1 from tab_part join tab on tab_part.key = tab.key GROUP BY tab.key) a join tab b on a.k1 = b.key; +set hive.convert.join.bucket.mapjoin.tez = true; +explain +select a.k1, a.v1, b.value +from (select sum(substr(tab.value,5)) as v1, key as k1 from tab_part join tab on tab_part.key = tab.key GROUP BY tab.key) a + join tab b on a.k1 = b.key; +set hive.convert.join.bucket.mapjoin.tez = false; explain select a.k1, a.v1, b.value from (select sum(substr(x.value,5)) as v1, x.key as k1 from tab x join tab y on x.key = y.key GROUP BY x.key) a join tab_part b on a.k1 = b.key; +set hive.convert.join.bucket.mapjoin.tez = true; +explain +select a.k1, a.v1, b.value +from (select sum(substr(x.value,5)) as v1, x.key as k1 from tab x join tab y on x.key = y.key GROUP BY x.key) a + join tab_part b on a.k1 = b.key; -- multi-way join +set hive.convert.join.bucket.mapjoin.tez = false; +set hive.auto.convert.join.noconditionaltask.size=20000; +explain +select a.key, a.value, b.value +from tab_part a join tab b on a.key = b.key join tab c on a.key = c.key; +set hive.convert.join.bucket.mapjoin.tez = true; explain select a.key, a.value, b.value from tab_part a join tab b on a.key = b.key join tab c on a.key = c.key; +set hive.convert.join.bucket.mapjoin.tez = false; +explain +select a.key, a.value, c.value +from (select x.key, x.value from tab_part x join tab y on x.key = y.key) a join tab c on a.key = c.key; +set hive.convert.join.bucket.mapjoin.tez = true; explain select a.key, a.value, c.value from (select x.key, x.value from tab_part x join tab y on x.key = y.key) a join tab c on a.key = c.key; -- in this case sub-query is the small table +set hive.convert.join.bucket.mapjoin.tez = false; +set hive.auto.convert.join.noconditionaltask.size=900; explain select a.key, a.value, b.value from (select key, sum(substr(srcbucket_mapjoin.value,5)) as value from srcbucket_mapjoin GROUP BY srcbucket_mapjoin.key) a join tab_part b on a.key = b.key; +set hive.convert.join.bucket.mapjoin.tez = true; +explain +select a.key, a.value, b.value +from (select key, sum(substr(srcbucket_mapjoin.value,5)) as value from srcbucket_mapjoin GROUP BY srcbucket_mapjoin.key) a + join tab_part b on a.key = b.key; +set hive.convert.join.bucket.mapjoin.tez = false; set hive.map.aggr=false; explain select a.key, a.value, b.value from (select key, sum(substr(srcbucket_mapjoin.value,5)) as value from srcbucket_mapjoin GROUP BY srcbucket_mapjoin.key) a join tab_part b on a.key = b.key; +set hive.convert.join.bucket.mapjoin.tez = true; +explain +select a.key, a.value, b.value +from (select key, sum(substr(srcbucket_mapjoin.value,5)) as value from srcbucket_mapjoin GROUP BY srcbucket_mapjoin.key) a + join tab_part b on a.key = b.key; --- join on non-bucketed column results in broadcast join. +-- join on non-bucketed column results in shuffle join. +set hive.convert.join.bucket.mapjoin.tez = false; +explain +select a.key, a.value, b.value +from tab a join tab_part b on a.value = b.value; +set hive.convert.join.bucket.mapjoin.tez = true; explain select a.key, a.value, b.value from tab a join tab_part b on a.value = b.value; @@ -116,36 +215,30 @@ CREATE TABLE tab1(key int, value string) CLUSTERED BY (key) INTO 2 BUCKETS STORE insert overwrite table tab1 select key,value from srcbucket_mapjoin; +set hive.auto.convert.join.noconditionaltask.size=20000; +set hive.convert.join.bucket.mapjoin.tez = false; +explain +select a.key, a.value, b.value +from tab1 a join tab_part b on a.key = b.key; +set hive.convert.join.bucket.mapjoin.tez = true; explain select a.key, a.value, b.value from tab1 a join tab_part b on a.key = b.key; +-- No map joins should be created. +set hive.convert.join.bucket.mapjoin.tez = false; +set hive.auto.convert.join.noconditionaltask.size=1500; +explain select a.key, b.key from tab_part a join tab_part c on a.key = c.key join tab_part b on a.value = b.value; +set hive.convert.join.bucket.mapjoin.tez = true; explain select a.key, b.key from tab_part a join tab_part c on a.key = c.key join tab_part b on a.value = b.value; -set hive.auto.convert.join.noconditionaltask.size=50000; +set hive.convert.join.bucket.mapjoin.tez = false; +-- This wont have any effect as the column ds is partition column which is not bucketed. explain select a.key, a.value, b.value from tab a join tab_part b on a.key = b.key and a.ds = b.ds; - -set hive.auto.convert.join.noconditionaltask.size=10000; -set hive.mapjoin.hybridgrace.hashtable = false; -insert overwrite table tab partition (ds='2008-04-08') -select key,value from srcbucket_mapjoin where key = 411; - -explain -select count(*) -from tab_part a join tab b on a.key = b.key; - -select count(*) -from tab_part a join tab b on a.key = b.key; - -set hive.mapjoin.hybridgrace.hashtable = false; -insert overwrite table tab partition (ds='2008-04-08') -select key,value from srcbucket_mapjoin where key = 411; - +set hive.convert.join.bucket.mapjoin.tez = true; explain -select count(*) -from tab_part a join tab b on a.key = b.key; +select a.key, a.value, b.value +from tab a join tab_part b on a.key = b.key and a.ds = b.ds; -select count(*) -from tab_part a join tab b on a.key = b.key; http://git-wip-us.apache.org/repos/asf/hive/blob/8cdee629/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q b/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q index 37989ec..45feec2 100644 --- a/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q +++ b/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q @@ -28,27 +28,82 @@ CREATE TABLE tab(key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY ( insert overwrite table tab partition (ds='2008-04-08') select key,value from srcbucket_mapjoin; -set hive.convert.join.bucket.mapjoin.tez = true; +analyze table srcbucket_mapjoin compute statistics for columns; +analyze table srcbucket_mapjoin_part compute statistics for columns; +analyze table tab compute statistics for columns; +analyze table tab_part compute statistics for columns; +set hive.auto.convert.join.noconditionaltask.size=1500; +set hive.convert.join.bucket.mapjoin.tez = false; +explain select a.key, b.key from tab_part a join tab_part c on a.key = c.key join tab_part b on a.value = b.value; +set hive.convert.join.bucket.mapjoin.tez = true; explain select a.key, b.key from tab_part a join tab_part c on a.key = c.key join tab_part b on a.value = b.value; CREATE TABLE tab1(key int, value string) CLUSTERED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE; insert overwrite table tab1 select key,value from srcbucket_mapjoin; +analyze table tab1 compute statistics for columns; +-- A negative test as src is not bucketed. +set hive.auto.convert.join.noconditionaltask.size=20000; +set hive.convert.join.bucket.mapjoin.tez = false; +explain +select a.key, a.value, b.value +from tab1 a join src b on a.key = b.key; +set hive.convert.join.bucket.mapjoin.tez = true; explain select a.key, a.value, b.value from tab1 a join src b on a.key = b.key; +set hive.auto.convert.join.noconditionaltask.size=500; +set hive.convert.join.bucket.mapjoin.tez = false; +explain +select a.key, b.key from (select key from tab_part where key > 1) a join (select key from tab_part where key > 2) b on a.key = b.key; +set hive.convert.join.bucket.mapjoin.tez = true; explain select a.key, b.key from (select key from tab_part where key > 1) a join (select key from tab_part where key > 2) b on a.key = b.key; +set hive.convert.join.bucket.mapjoin.tez = false; +explain +select a.key, b.key from (select key from tab_part where key > 1) a left outer join (select key from tab_part where key > 2) b on a.key = b.key; +set hive.convert.join.bucket.mapjoin.tez = true; explain select a.key, b.key from (select key from tab_part where key > 1) a left outer join (select key from tab_part where key > 2) b on a.key = b.key; +set hive.convert.join.bucket.mapjoin.tez = false; +explain +select a.key, b.key from (select key from tab_part where key > 1) a right outer join (select key from tab_part where key > 2) b on a.key = b.key; +set hive.convert.join.bucket.mapjoin.tez = true; explain select a.key, b.key from (select key from tab_part where key > 1) a right outer join (select key from tab_part where key > 2) b on a.key = b.key; +set hive.auto.convert.join.noconditionaltask.size=300; +set hive.convert.join.bucket.mapjoin.tez = false; +explain select a.key, b.key from (select distinct key from tab) a join tab b on b.key = a.key; +set hive.convert.join.bucket.mapjoin.tez = true; explain select a.key, b.key from (select distinct key from tab) a join tab b on b.key = a.key; +set hive.convert.join.bucket.mapjoin.tez = false; explain select a.value, b.value from (select distinct value from tab) a join tab b on b.key = a.value; +set hive.convert.join.bucket.mapjoin.tez = true; +explain select a.value, b.value from (select distinct value from tab) a join tab b on b.key = a.value; + + + +--multi key +CREATE TABLE tab_part1 (key int, value string) PARTITIONED BY(ds STRING) CLUSTERED BY (key, value) INTO 4 BUCKETS STORED AS TEXTFILE; +insert overwrite table tab_part1 partition (ds='2008-04-08') +select key,value from srcbucket_mapjoin_part; +analyze table tab_part1 compute statistics for columns; + +set hive.auto.convert.join.noconditionaltask.size=20000; +set hive.convert.join.bucket.mapjoin.tez = false; +explain +select count(*) +from +(select distinct key,value from tab_part) a join tab b on a.key = b.key and a.value = b.value; +set hive.convert.join.bucket.mapjoin.tez = true; +explain +select count(*) +from +(select distinct key,value from tab_part) a join tab b on a.key = b.key and a.value = b.value;
