This is an automated email from the ASF dual-hosted git repository.

krisztiankasa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f53c7fb73f HIVE-29166: Fix the partition column update logic in 
ConvertJoinMapJoin#convertJoinBucketMapJoin. (#6048)
6f53c7fb73f is described below

commit 6f53c7fb73ffc4674234957106c597d4a42bccd9
Author: Seonggon Namgung <ln...@postech.ac.kr>
AuthorDate: Fri Sep 5 01:52:52 2025 +0900

    HIVE-29166: Fix the partition column update logic in 
ConvertJoinMapJoin#convertJoinBucketMapJoin. (#6048)
---
 .../src/test/queries/positive/bucket_map_join_9.q  |   9 ++
 .../test/results/positive/bucket_map_join_9.q.out  |  65 ++++++++++++
 .../hive/ql/optimizer/ConvertJoinMapJoin.java      |  47 +++++----
 .../test/queries/clientpositive/bucketmapjoin14.q  |   9 ++
 .../clientpositive/llap/bucketmapjoin14.q.out      | 112 +++++++++++++++++++++
 5 files changed, 222 insertions(+), 20 deletions(-)

diff --git 
a/iceberg/iceberg-handler/src/test/queries/positive/bucket_map_join_9.q 
b/iceberg/iceberg-handler/src/test/queries/positive/bucket_map_join_9.q
new file mode 100644
index 00000000000..4c201d71850
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/queries/positive/bucket_map_join_9.q
@@ -0,0 +1,9 @@
+set hive.auto.convert.join=true;
+set hive.convert.join.bucket.mapjoin.tez=true;
+
+CREATE TABLE tbl (foid string, part string, id string) PARTITIONED BY 
SPEC(bucket(10, id), bucket(10, part)) STORED BY ICEBERG;
+INSERT INTO tbl VALUES ('1234', 'PART_123', '1'), ('1235', 'PART_124', '2');
+
+EXPLAIN
+SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part;
+SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part;
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/bucket_map_join_9.q.out 
b/iceberg/iceberg-handler/src/test/results/positive/bucket_map_join_9.q.out
new file mode 100644
index 00000000000..8153bdd697f
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/results/positive/bucket_map_join_9.q.out
@@ -0,0 +1,65 @@
+PREHOOK: query: CREATE TABLE tbl (foid string, part string, id string) 
PARTITIONED BY SPEC(bucket(10, id), bucket(10, part)) STORED BY ICEBERG
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tbl
+POSTHOOK: query: CREATE TABLE tbl (foid string, part string, id string) 
PARTITIONED BY SPEC(bucket(10, id), bucket(10, part)) STORED BY ICEBERG
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tbl
+PREHOOK: query: INSERT INTO tbl VALUES ('1234', 'PART_123', '1'), ('1235', 
'PART_124', '2')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@tbl
+POSTHOOK: query: INSERT INTO tbl VALUES ('1234', 'PART_123', '1'), ('1235', 
'PART_124', '2')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@tbl
+PREHOOK: query: EXPLAIN
+SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: EXPLAIN
+SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+Plan optimized by CBO.
+
+Vertex dependency in root stage
+Map 1 <- Map 2 (CUSTOM_EDGE)
+
+Stage-0
+  Fetch Operator
+    limit:-1
+    Stage-1
+      Map 1 vectorized
+      File Output Operator [FS_53]
+        Map Join Operator [MAPJOIN_52] (rows=2 width=530)
+          BucketMapJoin:true,Conds:SEL_51._col1, _col2=RS_49._col1, 
_col2(Inner),Output:["_col0","_col1","_col2","_col3","_col4","_col5"]
+        <-Map 2 [CUSTOM_EDGE] vectorized
+          MULTICAST [RS_49]
+            PartitionCols:_col2, _col1
+            Select Operator [SEL_48] (rows=2 width=265)
+              Output:["_col0","_col1","_col2"]
+              Filter Operator [FIL_47] (rows=2 width=265)
+                predicate:(id is not null and part is not null)
+                TableScan [TS_3] (rows=2 width=265)
+                  
default@tbl,tbl2,Tbl:COMPLETE,Col:COMPLETE,Output:["foid","part","id"]
+        <-Select Operator [SEL_51] (rows=2 width=265)
+            Output:["_col0","_col1","_col2"]
+            Filter Operator [FIL_50] (rows=2 width=265)
+              predicate:(id is not null and part is not null)
+              TableScan [TS_0] (rows=2 width=265)
+                default@tbl,tbl,Tbl:COMPLETE,Col:COMPLETE,Grouping Num 
Buckets:100,Grouping Partition Columns:["id","part"],Output:["foid","part","id"]
+
+PREHOOK: query: SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND 
tbl.part = tbl2.part
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND 
tbl.part = tbl2.part
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1234   PART_123        1       1234    PART_123        1
+1235   PART_124        2       1235    PART_124        2
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
index 37348dcef06..a622a0a7c02 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
@@ -656,32 +656,39 @@ private boolean convertJoinBucketMapJoin(JoinOperator 
joinOp, OptimizeTezProcCon
     // on small table(s).
     ReduceSinkOperator bigTableRS = 
(ReduceSinkOperator)joinOp.getParentOperators().get(bigTablePosition);
     OpTraits opTraits = bigTableRS.getOpTraits();
-    List<List<String>> listBucketCols = opTraits.getBucketColNames();
+    // It is guaranteed there is only 1 list within 
bigTableRS.getOpTraits().getBucketColNames().
+    List<String> listBucketCols = opTraits.getBucketColNames().get(0);
     List<ExprNodeDesc> bigTablePartitionCols = 
bigTableRS.getConf().getPartitionCols();
-    boolean updatePartitionCols = false;
+    boolean updatePartitionCols = listBucketCols.size() != 
bigTablePartitionCols.size();
     List<Integer> positions = new ArrayList<>();
 
-    CustomBucketFunction bucketFunction = 
opTraits.getCustomBucketFunctions().get(0);
-    if (listBucketCols.get(0).size() != bigTablePartitionCols.size()) {
-      updatePartitionCols = true;
-      // Prepare updated partition columns for small table(s).
-      // Get the positions of bucketed columns
-
-      int bigTableExprPos = 0;
-      Map<String, ExprNodeDesc> colExprMap = bigTableRS.getColumnExprMap();
-      final boolean[] retainedColumns = new 
boolean[listBucketCols.get(0).size()];
-      for (ExprNodeDesc bigTableExpr : bigTablePartitionCols) {
-        // It is guaranteed there is only 1 list within listBucketCols.
-        for (int i = 0; i < listBucketCols.get(0).size(); i++) {
-          final String colName = listBucketCols.get(0).get(i);
-          if (colExprMap.get(colName).isSame(bigTableExpr)) {
-            positions.add(bigTableExprPos);
-            retainedColumns[i] = true;
-          }
+    // Compare the partition columns and the bucket columns of bigTableRS.
+    Map<String, ExprNodeDesc> colExprMap = bigTableRS.getColumnExprMap();
+    final boolean[] retainedColumns = new boolean[listBucketCols.size()];
+    for (int bucketColIdx = 0; bucketColIdx < listBucketCols.size(); 
bucketColIdx++) {
+      for (int bigTablePartIdx = 0; bigTablePartIdx < 
bigTablePartitionCols.size(); bigTablePartIdx++) {
+        ExprNodeDesc bigTablePartExpr = 
bigTablePartitionCols.get(bigTablePartIdx);
+        ExprNodeDesc bucketColExpr = 
colExprMap.get(listBucketCols.get(bucketColIdx));
+        if (bigTablePartExpr.isSame(bucketColExpr)) {
+          positions.add(bigTablePartIdx);
+          retainedColumns[bucketColIdx] = true;
+          // If the positions of the partition column and the bucket column 
are not the same,
+          // then we need to update the position of the partition column in 
small tables.
+          updatePartitionCols = updatePartitionCols || bucketColIdx != 
bigTablePartIdx;
+          break;
         }
-        bigTableExprPos = bigTableExprPos + 1;
       }
+    }
 
+    // If the number of partition columns is less than the number of bucket 
columns,
+    // then we cannot properly distribute small tables onto bucketized map 
tasks.
+    // Bail out.
+    if (positions.size() < listBucketCols.size()) {
+      return false;
+    }
+
+    CustomBucketFunction bucketFunction = 
opTraits.getCustomBucketFunctions().get(0);
+    if (updatePartitionCols) {
       Preconditions.checkState(opTraits.getCustomBucketFunctions().size() == 
1);
       if (opTraits.getCustomBucketFunctions().get(0) != null) {
         final Optional<CustomBucketFunction> selected =
diff --git a/ql/src/test/queries/clientpositive/bucketmapjoin14.q 
b/ql/src/test/queries/clientpositive/bucketmapjoin14.q
new file mode 100644
index 00000000000..b710456d75c
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/bucketmapjoin14.q
@@ -0,0 +1,9 @@
+set hive.auto.convert.join=true;
+set hive.convert.join.bucket.mapjoin.tez=true;
+
+CREATE TABLE tbl (foid string, part string, id string) CLUSTERED BY (id, part) 
INTO 64 BUCKETS;
+INSERT INTO tbl VALUES ('1234', 'PART_123', '1'), ('1235', 'PART_124', '2');
+
+EXPLAIN
+SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part;
+SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part;
diff --git a/ql/src/test/results/clientpositive/llap/bucketmapjoin14.q.out 
b/ql/src/test/results/clientpositive/llap/bucketmapjoin14.q.out
new file mode 100644
index 00000000000..4cf433e58f1
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/bucketmapjoin14.q.out
@@ -0,0 +1,112 @@
+PREHOOK: query: CREATE TABLE tbl (foid string, part string, id string) 
CLUSTERED BY (id, part) INTO 64 BUCKETS
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tbl
+POSTHOOK: query: CREATE TABLE tbl (foid string, part string, id string) 
CLUSTERED BY (id, part) INTO 64 BUCKETS
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tbl
+PREHOOK: query: INSERT INTO tbl VALUES ('1234', 'PART_123', '1'), ('1235', 
'PART_124', '2')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@tbl
+POSTHOOK: query: INSERT INTO tbl VALUES ('1234', 'PART_123', '1'), ('1235', 
'PART_124', '2')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@tbl
+POSTHOOK: Lineage: tbl.foid SCRIPT []
+POSTHOOK: Lineage: tbl.id SCRIPT []
+POSTHOOK: Lineage: tbl.part SCRIPT []
+PREHOOK: query: EXPLAIN
+SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl
+#### A masked pattern was here ####
+POSTHOOK: query: EXPLAIN
+SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl
+#### A masked pattern was here ####
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Map 1 <- Map 2 (CUSTOM_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: tbl
+                  filterExpr: (id is not null and part is not null) (type: 
boolean)
+                  Statistics: Num rows: 2 Data size: 530 Basic stats: COMPLETE 
Column stats: COMPLETE
+                  Filter Operator
+                    predicate: (id is not null and part is not null) (type: 
boolean)
+                    Statistics: Num rows: 2 Data size: 530 Basic stats: 
COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: foid (type: string), part (type: string), 
id (type: string)
+                      outputColumnNames: _col0, _col1, _col2
+                      Statistics: Num rows: 2 Data size: 530 Basic stats: 
COMPLETE Column stats: COMPLETE
+                      Map Join Operator
+                        condition map:
+                             Inner Join 0 to 1
+                        keys:
+                          0 _col1 (type: string), _col2 (type: string)
+                          1 _col1 (type: string), _col2 (type: string)
+                        outputColumnNames: _col0, _col1, _col2, _col3, _col4, 
_col5
+                        input vertices:
+                          1 Map 2
+                        Statistics: Num rows: 2 Data size: 1060 Basic stats: 
COMPLETE Column stats: COMPLETE
+                        File Output Operator
+                          compressed: false
+                          Statistics: Num rows: 2 Data size: 1060 Basic stats: 
COMPLETE Column stats: COMPLETE
+                          table:
+                              input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
+                              output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                              serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+            Execution mode: vectorized, llap
+            LLAP IO: all inputs
+        Map 2 
+            Map Operator Tree:
+                TableScan
+                  alias: tbl2
+                  filterExpr: (id is not null and part is not null) (type: 
boolean)
+                  Statistics: Num rows: 2 Data size: 530 Basic stats: COMPLETE 
Column stats: COMPLETE
+                  Filter Operator
+                    predicate: (id is not null and part is not null) (type: 
boolean)
+                    Statistics: Num rows: 2 Data size: 530 Basic stats: 
COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: foid (type: string), part (type: string), 
id (type: string)
+                      outputColumnNames: _col0, _col1, _col2
+                      Statistics: Num rows: 2 Data size: 530 Basic stats: 
COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col1 (type: string), _col2 (type: 
string)
+                        null sort order: zz
+                        sort order: ++
+                        Map-reduce partition columns: _col2 (type: string), 
_col1 (type: string)
+                        Statistics: Num rows: 2 Data size: 530 Basic stats: 
COMPLETE Column stats: COMPLETE
+                        value expressions: _col0 (type: string)
+            Execution mode: vectorized, llap
+            LLAP IO: all inputs
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND 
tbl.part = tbl2.part
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND 
tbl.part = tbl2.part
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl
+#### A masked pattern was here ####
+1234   PART_123        1       1234    PART_123        1
+1235   PART_124        2       1235    PART_124        2

Reply via email to