This is an automated email from the ASF dual-hosted git repository.

okumin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b9c781b065 HIVE-27078:Bucket Map Join can hang if the source vertex 
parallelism is changed by reducer autoparallelism. (#5707) (Mayank Kunwar, 
reviewed by Seonggon Namgung, Shohei Okumiya)
7b9c781b065 is described below

commit 7b9c781b06511b35307b8c0bbb9f8588417947f1
Author: Mayank Kunwar <[email protected]>
AuthorDate: Sat Apr 5 10:58:10 2025 +0530

    HIVE-27078:Bucket Map Join can hang if the source vertex parallelism is 
changed by reducer autoparallelism. (#5707) (Mayank Kunwar, reviewed by 
Seonggon Namgung, Shohei Okumiya)
---
 .../apache/hadoop/hive/ql/parse/GenTezUtils.java   | 40 ++++++++++++++
 .../bucketmapjoin_auto_reduce_parallel.q           | 19 +++++++
 .../clientpositive/llap/bucket_map_join_tez3.q.out |  4 +-
 .../llap/bucketmapjoin_auto_reduce_parallel.q.out  | 62 ++++++++++++++++++++++
 .../tez/bucketmapjoin_with_subquery.q.out          |  6 +--
 5 files changed, 126 insertions(+), 5 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
index a2512500e37..df297cd3177 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
@@ -122,6 +122,11 @@ public static ReduceWork createReduceWork(
     reduceWork.setMaxSrcFraction(maxSrcFraction);
     
reduceWork.setUniformDistribution(reduceSink.getConf().getReducerTraits().contains(UNIFORM));
 
+    // Disabling TEZ_AUTO_REDUCER_PARALLELISM for BucketMapJoin until TEZ-4603 
is fixed.
+    if (hasBucketMapJoin(reduceSink, 0)) {
+      reduceSink.getConf().getReducerTraits().remove(AUTOPARALLEL);
+    }
+
     if (isAutoReduceParallelism && 
reduceSink.getConf().getReducerTraits().contains(AUTOPARALLEL)) {
 
       // configured limit for reducers
@@ -182,6 +187,41 @@ public static ReduceWork createReduceWork(
     return reduceWork;
   }
 
+  /**
+   * Checks if there is a Bucket Map Join (BMJ) following a hierarchy:
+   * ReduceSinkOperator (RS) -> ReduceSinkOperator (RS) -> MapJoinOperator 
(MJ).
+   * Ensures at most **two** RSOs before MJ.
+   */
+  private static boolean hasBucketMapJoin(Operator<? extends OperatorDesc> 
operator, int rsoCount) {
+    if (operator == null) {
+      return false;
+    }
+    if (operator instanceof ReduceSinkOperator) {
+      rsoCount++;
+      if (rsoCount > 2) {
+        return false; // Stop if more than 2 RSOs
+      }
+    }
+
+    // Iterate over child operators
+    for (Operator<? extends OperatorDesc> childOp : 
operator.getChildOperators()) {
+      // Check if this is a MapJoinOperator and is a Bucket Map Join
+      if (childOp instanceof MapJoinOperator) {
+        MapJoinOperator mjOp = (MapJoinOperator) childOp;
+        if (mjOp.getConf().isBucketMapJoin()) {
+          return true; // Found BMJ, no need to check further
+        }
+      }
+
+      // Recursively check children
+      if (hasBucketMapJoin(childOp, rsoCount)) {
+        return true;
+      }
+    }
+
+    return false; // No Bucket Map Join found
+  }
+
   private static void setupReduceSink(
       GenTezProcContext context, ReduceWork reduceWork, ReduceSinkOperator 
reduceSink) {
 
diff --git 
a/ql/src/test/queries/clientpositive/bucketmapjoin_auto_reduce_parallel.q 
b/ql/src/test/queries/clientpositive/bucketmapjoin_auto_reduce_parallel.q
new file mode 100644
index 00000000000..6ec8d06811f
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/bucketmapjoin_auto_reduce_parallel.q
@@ -0,0 +1,19 @@
+create table source_table2(date_col date, string_col string, decimal_col 
decimal(38,0)) clustered by (decimal_col) into 7 buckets;
+insert into table source_table2 values
+('2022-08-30', 'pipeline', '50000000000000000005905545593'), ('2022-08-16', 
'pipeline', '50000000000000000005905545593'), ('2022-09-01', 'pipeline', 
'50000000000000000006008686831'), ('2022-08-30', 'pipeline', 
'50000000000000000005992620837'), ('2022-09-01', 'pipeline', 
'50000000000000000005992620837'), ('2022-09-01', 'pipeline', 
'50000000000000000005992621067'),
+('2022-08-30', 'pipeline', '50000000000000000005992621067');
+
+create table target_table2(date_col date, string_col string, decimal_col 
decimal(38,0)) clustered by (decimal_col) into 7 buckets;
+insert into table target_table2 values
+('2017-05-17', 'pipeline', '50000000000000000000441610525'), ('2018-12-20', 
'pipeline', '50000000000000000001048981030'), ('2020-06-30', 'pipeline', 
'50000000000000000002332575516'), ('2021-08-16', 'pipeline', 
'50000000000000000003897973989'), ('2017-06-06', 'pipeline', 
'50000000000000000000449148729'), ('2017-09-08', 'pipeline', 
'50000000000000000000525378314'),
+('2022-08-30', 'pipeline', '50000000000000000005905545593'), ('2022-08-16', 
'pipeline', '50000000000000000005905545593'), ('2018-05-03', 'pipeline', 
'50000000000000000000750826355'), ('2020-01-10', 'pipeline', 
'50000000000000000001816579677'), ('2021-11-01', 'pipeline', 
'50000000000000000004269423714'), ('2017-11-07', 'pipeline', 
'50000000000000000000585901787'),
+('2019-10-15', 'pipeline', '50000000000000000001598843430'), ('2020-04-01', 
'pipeline', '50000000000000000002035795461'), ('2020-02-24', 'pipeline', 
'50000000000000000001932600185'), ('2020-04-27', 'pipeline', 
'50000000000000000002108160849'), ('2016-07-05', 'pipeline', 
'50000000000000000000054405114'), ('2020-06-02', 'pipeline', 
'50000000000000000002234387967'),
+('2020-08-21', 'pipeline', '50000000000000000002529168758'), ('2021-02-17', 
'pipeline', '50000000000000000003158511687');
+
+ set hive.auto.convert.join=true;
+
+set hive.tez.auto.reducer.parallelism=true;
+set hive.tez.min.partition.factor=12;
+set hive.tez.max.partition.factor=50;
+
+select * from target_table2 t inner join (select distinct date_col, 'pipeline' 
string_col, decimal_col from source_table2 where coalesce(decimal_col,'') = 
'50000000000000000005905545593') s on s.date_col = t.date_col AND s.string_col 
= t.string_col AND s.decimal_col = t.decimal_col;
diff --git a/ql/src/test/results/clientpositive/llap/bucket_map_join_tez3.q.out 
b/ql/src/test/results/clientpositive/llap/bucket_map_join_tez3.q.out
index 98a6023cedb..273295e916b 100644
--- a/ql/src/test/results/clientpositive/llap/bucket_map_join_tez3.q.out
+++ b/ql/src/test/results/clientpositive/llap/bucket_map_join_tez3.q.out
@@ -480,7 +480,7 @@ STAGE PLANS:
                         Map-reduce partition columns: _col0 (type: date), 
_col1 (type: decimal(38,0))
                         Statistics: Num rows: 1 Data size: 168 Basic stats: 
COMPLETE Column stats: COMPLETE
                         tag: -1
-                        auto parallelism: true
+                        auto parallelism: false
             Execution mode: llap
             LLAP IO: may be used (ACID table)
             Path -> Alias:
@@ -1432,7 +1432,7 @@ STAGE PLANS:
                         Map-reduce partition columns: _col0 (type: date), 
_col1 (type: decimal(38,0))
                         Statistics: Num rows: 1 Data size: 168 Basic stats: 
COMPLETE Column stats: COMPLETE
                         tag: -1
-                        auto parallelism: true
+                        auto parallelism: false
             Execution mode: vectorized, llap
             LLAP IO: may be used (ACID table)
             Path -> Alias:
diff --git 
a/ql/src/test/results/clientpositive/llap/bucketmapjoin_auto_reduce_parallel.q.out
 
b/ql/src/test/results/clientpositive/llap/bucketmapjoin_auto_reduce_parallel.q.out
new file mode 100644
index 00000000000..3397dbfdd5f
--- /dev/null
+++ 
b/ql/src/test/results/clientpositive/llap/bucketmapjoin_auto_reduce_parallel.q.out
@@ -0,0 +1,62 @@
+PREHOOK: query: create table source_table2(date_col date, string_col string, 
decimal_col decimal(38,0)) clustered by (decimal_col) into 7 buckets
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@source_table2
+POSTHOOK: query: create table source_table2(date_col date, string_col string, 
decimal_col decimal(38,0)) clustered by (decimal_col) into 7 buckets
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@source_table2
+PREHOOK: query: insert into table source_table2 values
+('2022-08-30', 'pipeline', '50000000000000000005905545593'), ('2022-08-16', 
'pipeline', '50000000000000000005905545593'), ('2022-09-01', 'pipeline', 
'50000000000000000006008686831'), ('2022-08-30', 'pipeline', 
'50000000000000000005992620837'), ('2022-09-01', 'pipeline', 
'50000000000000000005992620837'), ('2022-09-01', 'pipeline', 
'50000000000000000005992621067'),
+('2022-08-30', 'pipeline', '50000000000000000005992621067')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@source_table2
+POSTHOOK: query: insert into table source_table2 values
+('2022-08-30', 'pipeline', '50000000000000000005905545593'), ('2022-08-16', 
'pipeline', '50000000000000000005905545593'), ('2022-09-01', 'pipeline', 
'50000000000000000006008686831'), ('2022-08-30', 'pipeline', 
'50000000000000000005992620837'), ('2022-09-01', 'pipeline', 
'50000000000000000005992620837'), ('2022-09-01', 'pipeline', 
'50000000000000000005992621067'),
+('2022-08-30', 'pipeline', '50000000000000000005992621067')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@source_table2
+POSTHOOK: Lineage: source_table2.date_col SCRIPT []
+POSTHOOK: Lineage: source_table2.decimal_col SCRIPT []
+POSTHOOK: Lineage: source_table2.string_col SCRIPT []
+PREHOOK: query: create table target_table2(date_col date, string_col string, 
decimal_col decimal(38,0)) clustered by (decimal_col) into 7 buckets
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@target_table2
+POSTHOOK: query: create table target_table2(date_col date, string_col string, 
decimal_col decimal(38,0)) clustered by (decimal_col) into 7 buckets
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@target_table2
+PREHOOK: query: insert into table target_table2 values
+('2017-05-17', 'pipeline', '50000000000000000000441610525'), ('2018-12-20', 
'pipeline', '50000000000000000001048981030'), ('2020-06-30', 'pipeline', 
'50000000000000000002332575516'), ('2021-08-16', 'pipeline', 
'50000000000000000003897973989'), ('2017-06-06', 'pipeline', 
'50000000000000000000449148729'), ('2017-09-08', 'pipeline', 
'50000000000000000000525378314'),
+('2022-08-30', 'pipeline', '50000000000000000005905545593'), ('2022-08-16', 
'pipeline', '50000000000000000005905545593'), ('2018-05-03', 'pipeline', 
'50000000000000000000750826355'), ('2020-01-10', 'pipeline', 
'50000000000000000001816579677'), ('2021-11-01', 'pipeline', 
'50000000000000000004269423714'), ('2017-11-07', 'pipeline', 
'50000000000000000000585901787'),
+('2019-10-15', 'pipeline', '50000000000000000001598843430'), ('2020-04-01', 
'pipeline', '50000000000000000002035795461'), ('2020-02-24', 'pipeline', 
'50000000000000000001932600185'), ('2020-04-27', 'pipeline', 
'50000000000000000002108160849'), ('2016-07-05', 'pipeline', 
'50000000000000000000054405114'), ('2020-06-02', 'pipeline', 
'50000000000000000002234387967'),
+('2020-08-21', 'pipeline', '50000000000000000002529168758'), ('2021-02-17', 
'pipeline', '50000000000000000003158511687')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@target_table2
+POSTHOOK: query: insert into table target_table2 values
+('2017-05-17', 'pipeline', '50000000000000000000441610525'), ('2018-12-20', 
'pipeline', '50000000000000000001048981030'), ('2020-06-30', 'pipeline', 
'50000000000000000002332575516'), ('2021-08-16', 'pipeline', 
'50000000000000000003897973989'), ('2017-06-06', 'pipeline', 
'50000000000000000000449148729'), ('2017-09-08', 'pipeline', 
'50000000000000000000525378314'),
+('2022-08-30', 'pipeline', '50000000000000000005905545593'), ('2022-08-16', 
'pipeline', '50000000000000000005905545593'), ('2018-05-03', 'pipeline', 
'50000000000000000000750826355'), ('2020-01-10', 'pipeline', 
'50000000000000000001816579677'), ('2021-11-01', 'pipeline', 
'50000000000000000004269423714'), ('2017-11-07', 'pipeline', 
'50000000000000000000585901787'),
+('2019-10-15', 'pipeline', '50000000000000000001598843430'), ('2020-04-01', 
'pipeline', '50000000000000000002035795461'), ('2020-02-24', 'pipeline', 
'50000000000000000001932600185'), ('2020-04-27', 'pipeline', 
'50000000000000000002108160849'), ('2016-07-05', 'pipeline', 
'50000000000000000000054405114'), ('2020-06-02', 'pipeline', 
'50000000000000000002234387967'),
+('2020-08-21', 'pipeline', '50000000000000000002529168758'), ('2021-02-17', 
'pipeline', '50000000000000000003158511687')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@target_table2
+POSTHOOK: Lineage: target_table2.date_col SCRIPT []
+POSTHOOK: Lineage: target_table2.decimal_col SCRIPT []
+POSTHOOK: Lineage: target_table2.string_col SCRIPT []
+PREHOOK: query: select * from target_table2 t inner join (select distinct 
date_col, 'pipeline' string_col, decimal_col from source_table2 where 
coalesce(decimal_col,'') = '50000000000000000005905545593') s on s.date_col = 
t.date_col AND s.string_col = t.string_col AND s.decimal_col = t.decimal_col
+PREHOOK: type: QUERY
+PREHOOK: Input: default@source_table2
+PREHOOK: Input: default@target_table2
+#### A masked pattern was here ####
+POSTHOOK: query: select * from target_table2 t inner join (select distinct 
date_col, 'pipeline' string_col, decimal_col from source_table2 where 
coalesce(decimal_col,'') = '50000000000000000005905545593') s on s.date_col = 
t.date_col AND s.string_col = t.string_col AND s.decimal_col = t.decimal_col
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@source_table2
+POSTHOOK: Input: default@target_table2
+#### A masked pattern was here ####
+2022-08-16     pipeline        50000000000000000005905545593   2022-08-16      
pipeline        50000000000000000005905545593
+2022-08-30     pipeline        50000000000000000005905545593   2022-08-30      
pipeline        50000000000000000005905545593
diff --git 
a/ql/src/test/results/clientpositive/tez/bucketmapjoin_with_subquery.q.out 
b/ql/src/test/results/clientpositive/tez/bucketmapjoin_with_subquery.q.out
index 7ce5bb945e8..eb77e743a90 100644
--- a/ql/src/test/results/clientpositive/tez/bucketmapjoin_with_subquery.q.out
+++ b/ql/src/test/results/clientpositive/tez/bucketmapjoin_with_subquery.q.out
@@ -325,13 +325,13 @@ POSTHOOK: type: QUERY
 POSTHOOK: Input: default@dup_test
 POSTHOOK: Input: default@dup_test_target
 POSTHOOK: Output: hdfs://### HDFS PATH ###
+1      2023-04-14 10:11:12.111 test1   1       2023-04-14 10:11:12.111 test1
 2      2023-04-14 10:11:12.111 test2   2       2023-04-14 10:11:12.111 test2
 3      2023-04-14 10:11:12.111 test3   3       2023-04-14 10:11:12.111 test3
-6      2023-04-14 10:11:12.111 test6   6       2023-04-14 10:11:12.111 test6
-7      2023-04-14 10:11:12.111 test7   7       2023-04-14 10:11:12.111 test7
-1      2023-04-14 10:11:12.111 test1   1       2023-04-14 10:11:12.111 test1
 4      2023-04-14 10:11:12.111 test4   4       2023-04-14 10:11:12.111 test4
 5      2023-04-14 10:11:12.111 test5   5       2023-04-14 10:11:12.111 test5
+6      2023-04-14 10:11:12.111 test6   6       2023-04-14 10:11:12.111 test6
+7      2023-04-14 10:11:12.111 test7   7       2023-04-14 10:11:12.111 test7
 8      2023-04-14 10:11:12.111 test8   8       2023-04-14 10:11:12.111 test8
 9      2023-04-14 10:11:12.111 test9   9       2023-04-14 10:11:12.111 test9
 PREHOOK: query: select * from DUP_TEST_TARGET T join DUP_TEST S ON T.id = S.id

Reply via email to