This is an automated email from the ASF dual-hosted git repository.
okumin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 7b9c781b065 HIVE-27078:Bucket Map Join can hang if the source vertex
parallelism is changed by reducer autoparallelism. (#5707) (Mayank Kunwar,
reviewed by Seonggon Namgung, Shohei Okumiya)
7b9c781b065 is described below
commit 7b9c781b06511b35307b8c0bbb9f8588417947f1
Author: Mayank Kunwar <[email protected]>
AuthorDate: Sat Apr 5 10:58:10 2025 +0530
HIVE-27078:Bucket Map Join can hang if the source vertex parallelism is
changed by reducer autoparallelism. (#5707) (Mayank Kunwar, reviewed by
Seonggon Namgung, Shohei Okumiya)
---
.../apache/hadoop/hive/ql/parse/GenTezUtils.java | 40 ++++++++++++++
.../bucketmapjoin_auto_reduce_parallel.q | 19 +++++++
.../clientpositive/llap/bucket_map_join_tez3.q.out | 4 +-
.../llap/bucketmapjoin_auto_reduce_parallel.q.out | 62 ++++++++++++++++++++++
.../tez/bucketmapjoin_with_subquery.q.out | 6 +--
5 files changed, 126 insertions(+), 5 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
index a2512500e37..df297cd3177 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
@@ -122,6 +122,11 @@ public static ReduceWork createReduceWork(
reduceWork.setMaxSrcFraction(maxSrcFraction);
reduceWork.setUniformDistribution(reduceSink.getConf().getReducerTraits().contains(UNIFORM));
+ // Disabling TEZ_AUTO_REDUCER_PARALLELISM for BucketMapJoin until TEZ-4603
is fixed.
+ if (hasBucketMapJoin(reduceSink, 0)) {
+ reduceSink.getConf().getReducerTraits().remove(AUTOPARALLEL);
+ }
+
if (isAutoReduceParallelism &&
reduceSink.getConf().getReducerTraits().contains(AUTOPARALLEL)) {
// configured limit for reducers
@@ -182,6 +187,41 @@ public static ReduceWork createReduceWork(
return reduceWork;
}
+ /**
+ * Checks if there is a Bucket Map Join (BMJ) following a hierarchy:
+ * ReduceSinkOperator (RS) -> ReduceSinkOperator (RS) -> MapJoinOperator
(MJ).
+ * Ensures at most **two** RSOs before MJ.
+ */
+ private static boolean hasBucketMapJoin(Operator<? extends OperatorDesc>
operator, int rsoCount) {
+ if (operator == null) {
+ return false;
+ }
+ if (operator instanceof ReduceSinkOperator) {
+ rsoCount++;
+ if (rsoCount > 2) {
+ return false; // Stop if more than 2 RSOs
+ }
+ }
+
+ // Iterate over child operators
+ for (Operator<? extends OperatorDesc> childOp :
operator.getChildOperators()) {
+ // Check if this is a MapJoinOperator and is a Bucket Map Join
+ if (childOp instanceof MapJoinOperator) {
+ MapJoinOperator mjOp = (MapJoinOperator) childOp;
+ if (mjOp.getConf().isBucketMapJoin()) {
+ return true; // Found BMJ, no need to check further
+ }
+ }
+
+ // Recursively check children
+ if (hasBucketMapJoin(childOp, rsoCount)) {
+ return true;
+ }
+ }
+
+ return false; // No Bucket Map Join found
+ }
+
private static void setupReduceSink(
GenTezProcContext context, ReduceWork reduceWork, ReduceSinkOperator
reduceSink) {
diff --git
a/ql/src/test/queries/clientpositive/bucketmapjoin_auto_reduce_parallel.q
b/ql/src/test/queries/clientpositive/bucketmapjoin_auto_reduce_parallel.q
new file mode 100644
index 00000000000..6ec8d06811f
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/bucketmapjoin_auto_reduce_parallel.q
@@ -0,0 +1,19 @@
+create table source_table2(date_col date, string_col string, decimal_col
decimal(38,0)) clustered by (decimal_col) into 7 buckets;
+insert into table source_table2 values
+('2022-08-30', 'pipeline', '50000000000000000005905545593'), ('2022-08-16',
'pipeline', '50000000000000000005905545593'), ('2022-09-01', 'pipeline',
'50000000000000000006008686831'), ('2022-08-30', 'pipeline',
'50000000000000000005992620837'), ('2022-09-01', 'pipeline',
'50000000000000000005992620837'), ('2022-09-01', 'pipeline',
'50000000000000000005992621067'),
+('2022-08-30', 'pipeline', '50000000000000000005992621067');
+
+create table target_table2(date_col date, string_col string, decimal_col
decimal(38,0)) clustered by (decimal_col) into 7 buckets;
+insert into table target_table2 values
+('2017-05-17', 'pipeline', '50000000000000000000441610525'), ('2018-12-20',
'pipeline', '50000000000000000001048981030'), ('2020-06-30', 'pipeline',
'50000000000000000002332575516'), ('2021-08-16', 'pipeline',
'50000000000000000003897973989'), ('2017-06-06', 'pipeline',
'50000000000000000000449148729'), ('2017-09-08', 'pipeline',
'50000000000000000000525378314'),
+('2022-08-30', 'pipeline', '50000000000000000005905545593'), ('2022-08-16',
'pipeline', '50000000000000000005905545593'), ('2018-05-03', 'pipeline',
'50000000000000000000750826355'), ('2020-01-10', 'pipeline',
'50000000000000000001816579677'), ('2021-11-01', 'pipeline',
'50000000000000000004269423714'), ('2017-11-07', 'pipeline',
'50000000000000000000585901787'),
+('2019-10-15', 'pipeline', '50000000000000000001598843430'), ('2020-04-01',
'pipeline', '50000000000000000002035795461'), ('2020-02-24', 'pipeline',
'50000000000000000001932600185'), ('2020-04-27', 'pipeline',
'50000000000000000002108160849'), ('2016-07-05', 'pipeline',
'50000000000000000000054405114'), ('2020-06-02', 'pipeline',
'50000000000000000002234387967'),
+('2020-08-21', 'pipeline', '50000000000000000002529168758'), ('2021-02-17',
'pipeline', '50000000000000000003158511687');
+
+ set hive.auto.convert.join=true;
+
+set hive.tez.auto.reducer.parallelism=true;
+set hive.tez.min.partition.factor=12;
+set hive.tez.max.partition.factor=50;
+
+select * from target_table2 t inner join (select distinct date_col, 'pipeline'
string_col, decimal_col from source_table2 where coalesce(decimal_col,'') =
'50000000000000000005905545593') s on s.date_col = t.date_col AND s.string_col
= t.string_col AND s.decimal_col = t.decimal_col;
diff --git a/ql/src/test/results/clientpositive/llap/bucket_map_join_tez3.q.out
b/ql/src/test/results/clientpositive/llap/bucket_map_join_tez3.q.out
index 98a6023cedb..273295e916b 100644
--- a/ql/src/test/results/clientpositive/llap/bucket_map_join_tez3.q.out
+++ b/ql/src/test/results/clientpositive/llap/bucket_map_join_tez3.q.out
@@ -480,7 +480,7 @@ STAGE PLANS:
Map-reduce partition columns: _col0 (type: date),
_col1 (type: decimal(38,0))
Statistics: Num rows: 1 Data size: 168 Basic stats:
COMPLETE Column stats: COMPLETE
tag: -1
- auto parallelism: true
+ auto parallelism: false
Execution mode: llap
LLAP IO: may be used (ACID table)
Path -> Alias:
@@ -1432,7 +1432,7 @@ STAGE PLANS:
Map-reduce partition columns: _col0 (type: date),
_col1 (type: decimal(38,0))
Statistics: Num rows: 1 Data size: 168 Basic stats:
COMPLETE Column stats: COMPLETE
tag: -1
- auto parallelism: true
+ auto parallelism: false
Execution mode: vectorized, llap
LLAP IO: may be used (ACID table)
Path -> Alias:
diff --git
a/ql/src/test/results/clientpositive/llap/bucketmapjoin_auto_reduce_parallel.q.out
b/ql/src/test/results/clientpositive/llap/bucketmapjoin_auto_reduce_parallel.q.out
new file mode 100644
index 00000000000..3397dbfdd5f
--- /dev/null
+++
b/ql/src/test/results/clientpositive/llap/bucketmapjoin_auto_reduce_parallel.q.out
@@ -0,0 +1,62 @@
+PREHOOK: query: create table source_table2(date_col date, string_col string,
decimal_col decimal(38,0)) clustered by (decimal_col) into 7 buckets
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@source_table2
+POSTHOOK: query: create table source_table2(date_col date, string_col string,
decimal_col decimal(38,0)) clustered by (decimal_col) into 7 buckets
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@source_table2
+PREHOOK: query: insert into table source_table2 values
+('2022-08-30', 'pipeline', '50000000000000000005905545593'), ('2022-08-16',
'pipeline', '50000000000000000005905545593'), ('2022-09-01', 'pipeline',
'50000000000000000006008686831'), ('2022-08-30', 'pipeline',
'50000000000000000005992620837'), ('2022-09-01', 'pipeline',
'50000000000000000005992620837'), ('2022-09-01', 'pipeline',
'50000000000000000005992621067'),
+('2022-08-30', 'pipeline', '50000000000000000005992621067')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@source_table2
+POSTHOOK: query: insert into table source_table2 values
+('2022-08-30', 'pipeline', '50000000000000000005905545593'), ('2022-08-16',
'pipeline', '50000000000000000005905545593'), ('2022-09-01', 'pipeline',
'50000000000000000006008686831'), ('2022-08-30', 'pipeline',
'50000000000000000005992620837'), ('2022-09-01', 'pipeline',
'50000000000000000005992620837'), ('2022-09-01', 'pipeline',
'50000000000000000005992621067'),
+('2022-08-30', 'pipeline', '50000000000000000005992621067')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@source_table2
+POSTHOOK: Lineage: source_table2.date_col SCRIPT []
+POSTHOOK: Lineage: source_table2.decimal_col SCRIPT []
+POSTHOOK: Lineage: source_table2.string_col SCRIPT []
+PREHOOK: query: create table target_table2(date_col date, string_col string,
decimal_col decimal(38,0)) clustered by (decimal_col) into 7 buckets
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@target_table2
+POSTHOOK: query: create table target_table2(date_col date, string_col string,
decimal_col decimal(38,0)) clustered by (decimal_col) into 7 buckets
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@target_table2
+PREHOOK: query: insert into table target_table2 values
+('2017-05-17', 'pipeline', '50000000000000000000441610525'), ('2018-12-20',
'pipeline', '50000000000000000001048981030'), ('2020-06-30', 'pipeline',
'50000000000000000002332575516'), ('2021-08-16', 'pipeline',
'50000000000000000003897973989'), ('2017-06-06', 'pipeline',
'50000000000000000000449148729'), ('2017-09-08', 'pipeline',
'50000000000000000000525378314'),
+('2022-08-30', 'pipeline', '50000000000000000005905545593'), ('2022-08-16',
'pipeline', '50000000000000000005905545593'), ('2018-05-03', 'pipeline',
'50000000000000000000750826355'), ('2020-01-10', 'pipeline',
'50000000000000000001816579677'), ('2021-11-01', 'pipeline',
'50000000000000000004269423714'), ('2017-11-07', 'pipeline',
'50000000000000000000585901787'),
+('2019-10-15', 'pipeline', '50000000000000000001598843430'), ('2020-04-01',
'pipeline', '50000000000000000002035795461'), ('2020-02-24', 'pipeline',
'50000000000000000001932600185'), ('2020-04-27', 'pipeline',
'50000000000000000002108160849'), ('2016-07-05', 'pipeline',
'50000000000000000000054405114'), ('2020-06-02', 'pipeline',
'50000000000000000002234387967'),
+('2020-08-21', 'pipeline', '50000000000000000002529168758'), ('2021-02-17',
'pipeline', '50000000000000000003158511687')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@target_table2
+POSTHOOK: query: insert into table target_table2 values
+('2017-05-17', 'pipeline', '50000000000000000000441610525'), ('2018-12-20',
'pipeline', '50000000000000000001048981030'), ('2020-06-30', 'pipeline',
'50000000000000000002332575516'), ('2021-08-16', 'pipeline',
'50000000000000000003897973989'), ('2017-06-06', 'pipeline',
'50000000000000000000449148729'), ('2017-09-08', 'pipeline',
'50000000000000000000525378314'),
+('2022-08-30', 'pipeline', '50000000000000000005905545593'), ('2022-08-16',
'pipeline', '50000000000000000005905545593'), ('2018-05-03', 'pipeline',
'50000000000000000000750826355'), ('2020-01-10', 'pipeline',
'50000000000000000001816579677'), ('2021-11-01', 'pipeline',
'50000000000000000004269423714'), ('2017-11-07', 'pipeline',
'50000000000000000000585901787'),
+('2019-10-15', 'pipeline', '50000000000000000001598843430'), ('2020-04-01',
'pipeline', '50000000000000000002035795461'), ('2020-02-24', 'pipeline',
'50000000000000000001932600185'), ('2020-04-27', 'pipeline',
'50000000000000000002108160849'), ('2016-07-05', 'pipeline',
'50000000000000000000054405114'), ('2020-06-02', 'pipeline',
'50000000000000000002234387967'),
+('2020-08-21', 'pipeline', '50000000000000000002529168758'), ('2021-02-17',
'pipeline', '50000000000000000003158511687')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@target_table2
+POSTHOOK: Lineage: target_table2.date_col SCRIPT []
+POSTHOOK: Lineage: target_table2.decimal_col SCRIPT []
+POSTHOOK: Lineage: target_table2.string_col SCRIPT []
+PREHOOK: query: select * from target_table2 t inner join (select distinct
date_col, 'pipeline' string_col, decimal_col from source_table2 where
coalesce(decimal_col,'') = '50000000000000000005905545593') s on s.date_col =
t.date_col AND s.string_col = t.string_col AND s.decimal_col = t.decimal_col
+PREHOOK: type: QUERY
+PREHOOK: Input: default@source_table2
+PREHOOK: Input: default@target_table2
+#### A masked pattern was here ####
+POSTHOOK: query: select * from target_table2 t inner join (select distinct
date_col, 'pipeline' string_col, decimal_col from source_table2 where
coalesce(decimal_col,'') = '50000000000000000005905545593') s on s.date_col =
t.date_col AND s.string_col = t.string_col AND s.decimal_col = t.decimal_col
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@source_table2
+POSTHOOK: Input: default@target_table2
+#### A masked pattern was here ####
+2022-08-16 pipeline 50000000000000000005905545593 2022-08-16
pipeline 50000000000000000005905545593
+2022-08-30 pipeline 50000000000000000005905545593 2022-08-30
pipeline 50000000000000000005905545593
diff --git
a/ql/src/test/results/clientpositive/tez/bucketmapjoin_with_subquery.q.out
b/ql/src/test/results/clientpositive/tez/bucketmapjoin_with_subquery.q.out
index 7ce5bb945e8..eb77e743a90 100644
--- a/ql/src/test/results/clientpositive/tez/bucketmapjoin_with_subquery.q.out
+++ b/ql/src/test/results/clientpositive/tez/bucketmapjoin_with_subquery.q.out
@@ -325,13 +325,13 @@ POSTHOOK: type: QUERY
POSTHOOK: Input: default@dup_test
POSTHOOK: Input: default@dup_test_target
POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 2023-04-14 10:11:12.111 test1 1 2023-04-14 10:11:12.111 test1
2 2023-04-14 10:11:12.111 test2 2 2023-04-14 10:11:12.111 test2
3 2023-04-14 10:11:12.111 test3 3 2023-04-14 10:11:12.111 test3
-6 2023-04-14 10:11:12.111 test6 6 2023-04-14 10:11:12.111 test6
-7 2023-04-14 10:11:12.111 test7 7 2023-04-14 10:11:12.111 test7
-1 2023-04-14 10:11:12.111 test1 1 2023-04-14 10:11:12.111 test1
4 2023-04-14 10:11:12.111 test4 4 2023-04-14 10:11:12.111 test4
5 2023-04-14 10:11:12.111 test5 5 2023-04-14 10:11:12.111 test5
+6 2023-04-14 10:11:12.111 test6 6 2023-04-14 10:11:12.111 test6
+7 2023-04-14 10:11:12.111 test7 7 2023-04-14 10:11:12.111 test7
8 2023-04-14 10:11:12.111 test8 8 2023-04-14 10:11:12.111 test8
9 2023-04-14 10:11:12.111 test9 9 2023-04-14 10:11:12.111 test9
PREHOOK: query: select * from DUP_TEST_TARGET T join DUP_TEST S ON T.id = S.id