HIVE-17529: Bucket Map Join : Sets incorrect edge type causing execution 
failure (Deepak Jaiswal, reviewed by Jason Dere)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8cdee629
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8cdee629
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8cdee629

Branch: refs/heads/master
Commit: 8cdee6297f545ec1aa9247dd24a02766d88fae1e
Parents: afd0d9f
Author: Jason Dere <[email protected]>
Authored: Thu Sep 21 13:24:03 2017 -0700
Committer: Jason Dere <[email protected]>
Committed: Thu Sep 21 13:24:03 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/OperatorUtils.java      |    4 +-
 .../clientpositive/bucket_map_join_tez1.q       |  149 +-
 .../clientpositive/bucket_map_join_tez2.q       |   57 +-
 .../llap/bucket_map_join_tez1.q.out             | 4401 ++++++++++++++----
 .../llap/bucket_map_join_tez2.q.out             | 1340 +++++-
 .../spark/bucket_map_join_tez1.q.out            | 4265 +++++++++++++----
 .../spark/bucket_map_join_tez2.q.out            | 1330 +++++-
 7 files changed, 9338 insertions(+), 2208 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/8cdee629/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
index e79d100..cddcf16 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
@@ -128,10 +128,10 @@ public class OperatorUtils {
       for (Operator<?> parent : start.getParentOperators()) {
         if (onlyIncludeIndex >= 0) {
           if (onlyIncludeIndex == i) {
-            findOperatorsUpstream(parent, clazz, found);
+            findOperatorsUpstreamJoinAccounted(parent, clazz, found);
           }
         } else {
-          findOperatorsUpstream(parent, clazz, found);
+          findOperatorsUpstreamJoinAccounted(parent, clazz, found);
         }
         i++;
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/8cdee629/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q 
b/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q
index aeb244a..cac1d6a 100644
--- a/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q
+++ b/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q
@@ -28,86 +28,185 @@ CREATE TABLE tab(key int, value string) PARTITIONED BY(ds 
STRING) CLUSTERED BY (
 insert overwrite table tab partition (ds='2008-04-08')
 select key,value from srcbucket_mapjoin;
 
+analyze table srcbucket_mapjoin compute statistics for columns;
+analyze table srcbucket_mapjoin_part compute statistics for columns;
+analyze table tab compute statistics for columns;
+analyze table tab_part compute statistics for columns;
+
+set hive.convert.join.bucket.mapjoin.tez = false;
+explain
+select a.key, a.value, b.value
+from tab a join tab_part b on a.key = b.key order by a.key, a.value, b.value;
+select a.key, a.value, b.value
+from tab a join tab_part b on a.key = b.key order by a.key, a.value, b.value;
+
 set hive.convert.join.bucket.mapjoin.tez = true;
 explain
 select a.key, a.value, b.value
-from tab a join tab_part b on a.key = b.key;
+from tab a join tab_part b on a.key = b.key order by a.key, a.value, b.value;
+select a.key, a.value, b.value
+from tab a join tab_part b on a.key = b.key order by a.key, a.value, b.value;
 
+
+set hive.auto.convert.join.noconditionaltask.size=900;
+set hive.convert.join.bucket.mapjoin.tez = false;
 explain
 select count(*)
 from 
-(select distinct key, value from tab_part) a join tab b on a.key = b.key;
-
+(select distinct key from tab_part) a join tab b on a.key = b.key;
 select count(*)
 from 
-(select distinct key, value from tab_part) a join tab b on a.key = b.key;
+(select distinct key from tab_part) a join tab b on a.key = b.key;
+
+set hive.convert.join.bucket.mapjoin.tez = true;
+explain
+select count(*)
+from
+(select distinct key from tab_part) a join tab b on a.key = b.key;
+select count(*)
+from
+(select distinct key from tab_part) a join tab b on a.key = b.key;
 
+
+set hive.convert.join.bucket.mapjoin.tez = false;
 explain
 select count(*)
 from
 (select a.key as key, a.value as value from tab a join tab_part b on a.key = 
b.key) c
 join
 tab_part d on c.key = d.key;
+select count(*)
+from
+(select a.key as key, a.value as value from tab a join tab_part b on a.key = 
b.key) c
+join
+tab_part d on c.key = d.key;
 
+set hive.convert.join.bucket.mapjoin.tez = true;
+explain
+select count(*)
+from
+(select a.key as key, a.value as value from tab a join tab_part b on a.key = 
b.key) c
+join
+tab_part d on c.key = d.key;
 select count(*)
 from
 (select a.key as key, a.value as value from tab a join tab_part b on a.key = 
b.key) c
 join
 tab_part d on c.key = d.key;
 
+set hive.convert.join.bucket.mapjoin.tez = false;
 explain
 select count(*)
 from
 tab_part d
 join
 (select a.key as key, a.value as value from tab a join tab_part b on a.key = 
b.key) c on c.key = d.key;
-
 select count(*)
 from
 tab_part d
 join
 (select a.key as key, a.value as value from tab a join tab_part b on a.key = 
b.key) c on c.key = d.key;
 
+set hive.convert.join.bucket.mapjoin.tez = true;
+explain
+select count(*)
+from
+tab_part d
+join
+(select a.key as key, a.value as value from tab a join tab_part b on a.key = 
b.key) c on c.key = d.key;
+select count(*)
+from
+tab_part d
+join
+(select a.key as key, a.value as value from tab a join tab_part b on a.key = 
b.key) c on c.key = d.key;
 
 -- one side is really bucketed. srcbucket_mapjoin is not really a bucketed 
table.
 -- In this case the sub-query is chosen as the big table.
+set hive.convert.join.bucket.mapjoin.tez = false;
+set hive.auto.convert.join.noconditionaltask.size=1000;
 explain
 select a.k1, a.v1, b.value
 from (select sum(substr(srcbucket_mapjoin.value,5)) as v1, key as k1 from 
srcbucket_mapjoin GROUP BY srcbucket_mapjoin.key) a
 join tab b on a.k1 = b.key;
+set hive.convert.join.bucket.mapjoin.tez = true;
+explain
+select a.k1, a.v1, b.value
+from (select sum(substr(srcbucket_mapjoin.value,5)) as v1, key as k1 from 
srcbucket_mapjoin GROUP BY srcbucket_mapjoin.key) a
+     join tab b on a.k1 = b.key;
 
+set hive.convert.join.bucket.mapjoin.tez = false;
 explain
 select a.k1, a.v1, b.value
 from (select sum(substr(tab.value,5)) as v1, key as k1 from tab_part join tab 
on tab_part.key = tab.key GROUP BY tab.key) a
 join tab b on a.k1 = b.key;
+set hive.convert.join.bucket.mapjoin.tez = true;
+explain
+select a.k1, a.v1, b.value
+from (select sum(substr(tab.value,5)) as v1, key as k1 from tab_part join tab 
on tab_part.key = tab.key GROUP BY tab.key) a
+     join tab b on a.k1 = b.key;
 
+set hive.convert.join.bucket.mapjoin.tez = false;
 explain
 select a.k1, a.v1, b.value
 from (select sum(substr(x.value,5)) as v1, x.key as k1 from tab x join tab y 
on x.key = y.key GROUP BY x.key) a
 join tab_part b on a.k1 = b.key;
+set hive.convert.join.bucket.mapjoin.tez = true;
+explain
+select a.k1, a.v1, b.value
+from (select sum(substr(x.value,5)) as v1, x.key as k1 from tab x join tab y 
on x.key = y.key GROUP BY x.key) a
+     join tab_part b on a.k1 = b.key;
 
 -- multi-way join
+set hive.convert.join.bucket.mapjoin.tez = false;
+set hive.auto.convert.join.noconditionaltask.size=20000;
+explain
+select a.key, a.value, b.value
+from tab_part a join tab b on a.key = b.key join tab c on a.key = c.key;
+set hive.convert.join.bucket.mapjoin.tez = true;
 explain
 select a.key, a.value, b.value
 from tab_part a join tab b on a.key = b.key join tab c on a.key = c.key;
 
+set hive.convert.join.bucket.mapjoin.tez = false;
+explain
+select a.key, a.value, c.value
+from (select x.key, x.value from tab_part x join tab y on x.key = y.key) a 
join tab c on a.key = c.key;
+set hive.convert.join.bucket.mapjoin.tez = true;
 explain
 select a.key, a.value, c.value
 from (select x.key, x.value from tab_part x join tab y on x.key = y.key) a 
join tab c on a.key = c.key;
 
 -- in this case sub-query is the small table
+set hive.convert.join.bucket.mapjoin.tez = false;
+set hive.auto.convert.join.noconditionaltask.size=900;
 explain
 select a.key, a.value, b.value
 from (select key, sum(substr(srcbucket_mapjoin.value,5)) as value from 
srcbucket_mapjoin GROUP BY srcbucket_mapjoin.key) a
 join tab_part b on a.key = b.key;
+set hive.convert.join.bucket.mapjoin.tez = true;
+explain
+select a.key, a.value, b.value
+from (select key, sum(substr(srcbucket_mapjoin.value,5)) as value from 
srcbucket_mapjoin GROUP BY srcbucket_mapjoin.key) a
+     join tab_part b on a.key = b.key;
 
+set hive.convert.join.bucket.mapjoin.tez = false;
 set hive.map.aggr=false;
 explain
 select a.key, a.value, b.value
 from (select key, sum(substr(srcbucket_mapjoin.value,5)) as value from 
srcbucket_mapjoin GROUP BY srcbucket_mapjoin.key) a
 join tab_part b on a.key = b.key;
+set hive.convert.join.bucket.mapjoin.tez = true;
+explain
+select a.key, a.value, b.value
+from (select key, sum(substr(srcbucket_mapjoin.value,5)) as value from 
srcbucket_mapjoin GROUP BY srcbucket_mapjoin.key) a
+     join tab_part b on a.key = b.key;
 
--- join on non-bucketed column results in broadcast join.
+-- join on non-bucketed column results in shuffle join.
+set hive.convert.join.bucket.mapjoin.tez = false;
+explain
+select a.key, a.value, b.value
+from tab a join tab_part b on a.value = b.value;
+set hive.convert.join.bucket.mapjoin.tez = true;
 explain
 select a.key, a.value, b.value
 from tab a join tab_part b on a.value = b.value;
@@ -116,36 +215,30 @@ CREATE TABLE tab1(key int, value string) CLUSTERED BY 
(key) INTO 2 BUCKETS STORE
 insert overwrite table tab1
 select key,value from srcbucket_mapjoin;
 
+set hive.auto.convert.join.noconditionaltask.size=20000;
+set hive.convert.join.bucket.mapjoin.tez = false;
+explain
+select a.key, a.value, b.value
+from tab1 a join tab_part b on a.key = b.key;
+set hive.convert.join.bucket.mapjoin.tez = true;
 explain
 select a.key, a.value, b.value
 from tab1 a join tab_part b on a.key = b.key;
 
+-- No map joins should be created.
+set hive.convert.join.bucket.mapjoin.tez = false;
+set hive.auto.convert.join.noconditionaltask.size=1500;
+explain select a.key, b.key from tab_part a join tab_part c on a.key = c.key 
join tab_part b on a.value = b.value;
+set hive.convert.join.bucket.mapjoin.tez = true;
 explain select a.key, b.key from tab_part a join tab_part c on a.key = c.key 
join tab_part b on a.value = b.value;
 
-set hive.auto.convert.join.noconditionaltask.size=50000;
+set hive.convert.join.bucket.mapjoin.tez = false;
+-- This wont have any effect as the column ds is partition column which is not 
bucketed.
 explain
 select a.key, a.value, b.value
 from tab a join tab_part b on a.key = b.key and a.ds = b.ds;
-
-set hive.auto.convert.join.noconditionaltask.size=10000;
-set hive.mapjoin.hybridgrace.hashtable = false;
-insert overwrite table tab partition (ds='2008-04-08')
-select key,value from srcbucket_mapjoin where key = 411;
-
-explain
-select count(*)
-from tab_part a join tab b on a.key = b.key;
-
-select count(*)
-from tab_part a join tab b on a.key = b.key;
-
-set hive.mapjoin.hybridgrace.hashtable = false;
-insert overwrite table tab partition (ds='2008-04-08')
-select key,value from srcbucket_mapjoin where key = 411;
-
+set hive.convert.join.bucket.mapjoin.tez = true;
 explain
-select count(*)
-from tab_part a join tab b on a.key = b.key;
+select a.key, a.value, b.value
+from tab a join tab_part b on a.key = b.key and a.ds = b.ds;
 
-select count(*)
-from tab_part a join tab b on a.key = b.key;

http://git-wip-us.apache.org/repos/asf/hive/blob/8cdee629/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q 
b/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q
index 37989ec..45feec2 100644
--- a/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q
+++ b/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q
@@ -28,27 +28,82 @@ CREATE TABLE tab(key int, value string) PARTITIONED BY(ds 
STRING) CLUSTERED BY (
 insert overwrite table tab partition (ds='2008-04-08')
 select key,value from srcbucket_mapjoin;
 
-set hive.convert.join.bucket.mapjoin.tez = true;
+analyze table srcbucket_mapjoin compute statistics for columns;
+analyze table srcbucket_mapjoin_part compute statistics for columns;
+analyze table tab compute statistics for columns;
+analyze table tab_part compute statistics for columns;
 
+set hive.auto.convert.join.noconditionaltask.size=1500;
+set hive.convert.join.bucket.mapjoin.tez = false;
+explain select a.key, b.key from tab_part a join tab_part c on a.key = c.key 
join tab_part b on a.value = b.value;
+set hive.convert.join.bucket.mapjoin.tez = true;
 explain select a.key, b.key from tab_part a join tab_part c on a.key = c.key 
join tab_part b on a.value = b.value;
 
 CREATE TABLE tab1(key int, value string) CLUSTERED BY (key) INTO 2 BUCKETS 
STORED AS TEXTFILE;
 insert overwrite table tab1
 select key,value from srcbucket_mapjoin;
+analyze table tab1 compute statistics for columns;
 
+-- A negative test as src is not bucketed.
+set hive.auto.convert.join.noconditionaltask.size=20000;
+set hive.convert.join.bucket.mapjoin.tez = false;
+explain
+select a.key, a.value, b.value
+from tab1 a join src b on a.key = b.key;
+set hive.convert.join.bucket.mapjoin.tez = true;
 explain
 select a.key, a.value, b.value
 from tab1 a join src b on a.key = b.key;
 
+set hive.auto.convert.join.noconditionaltask.size=500;
+set hive.convert.join.bucket.mapjoin.tez = false;
+explain
+select a.key, b.key from (select key from tab_part where key > 1) a join 
(select key from tab_part where key > 2) b on a.key = b.key;
+set hive.convert.join.bucket.mapjoin.tez = true;
 explain
 select a.key, b.key from (select key from tab_part where key > 1) a join 
(select key from tab_part where key > 2) b on a.key = b.key;
 
+set hive.convert.join.bucket.mapjoin.tez = false;
+explain
+select a.key, b.key from (select key from tab_part where key > 1) a left outer 
join (select key from tab_part where key > 2) b on a.key = b.key;
+set hive.convert.join.bucket.mapjoin.tez = true;
 explain
 select a.key, b.key from (select key from tab_part where key > 1) a left outer 
join (select key from tab_part where key > 2) b on a.key = b.key;
 
+set hive.convert.join.bucket.mapjoin.tez = false;
+explain
+select a.key, b.key from (select key from tab_part where key > 1) a right 
outer join (select key from tab_part where key > 2) b on a.key = b.key;
+set hive.convert.join.bucket.mapjoin.tez = true;
 explain
 select a.key, b.key from (select key from tab_part where key > 1) a right 
outer join (select key from tab_part where key > 2) b on a.key = b.key;
 
+set hive.auto.convert.join.noconditionaltask.size=300;
+set hive.convert.join.bucket.mapjoin.tez = false;
+explain select a.key, b.key from (select distinct key from tab) a join tab b 
on b.key = a.key;
+set hive.convert.join.bucket.mapjoin.tez = true;
 explain select a.key, b.key from (select distinct key from tab) a join tab b 
on b.key = a.key;
 
+set hive.convert.join.bucket.mapjoin.tez = false;
 explain select a.value, b.value from (select distinct value from tab) a join 
tab b on b.key = a.value;
+set hive.convert.join.bucket.mapjoin.tez = true;
+explain select a.value, b.value from (select distinct value from tab) a join 
tab b on b.key = a.value;
+
+
+
+--multi key
+CREATE TABLE tab_part1 (key int, value string) PARTITIONED BY(ds STRING) 
CLUSTERED BY (key, value) INTO 4 BUCKETS STORED AS TEXTFILE;
+insert overwrite table tab_part1 partition (ds='2008-04-08')
+select key,value from srcbucket_mapjoin_part;
+analyze table tab_part1 compute statistics for columns;
+
+set hive.auto.convert.join.noconditionaltask.size=20000;
+set hive.convert.join.bucket.mapjoin.tez = false;
+explain
+select count(*)
+from
+(select distinct key,value from tab_part) a join tab b on a.key = b.key and 
a.value = b.value;
+set hive.convert.join.bucket.mapjoin.tez = true;
+explain
+select count(*)
+from
+(select distinct key,value from tab_part) a join tab b on a.key = b.key and 
a.value = b.value;

Reply via email to