This is an automated email from the ASF dual-hosted git repository. krisztiankasa pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 6f53c7fb73f HIVE-29166: Fix the partition column update logic in ConvertJoinMapJoin#convertJoinBucketMapJoin. (#6048) 6f53c7fb73f is described below commit 6f53c7fb73ffc4674234957106c597d4a42bccd9 Author: Seonggon Namgung <ln...@postech.ac.kr> AuthorDate: Fri Sep 5 01:52:52 2025 +0900 HIVE-29166: Fix the partition column update logic in ConvertJoinMapJoin#convertJoinBucketMapJoin. (#6048) --- .../src/test/queries/positive/bucket_map_join_9.q | 9 ++ .../test/results/positive/bucket_map_join_9.q.out | 65 ++++++++++++ .../hive/ql/optimizer/ConvertJoinMapJoin.java | 47 +++++---- .../test/queries/clientpositive/bucketmapjoin14.q | 9 ++ .../clientpositive/llap/bucketmapjoin14.q.out | 112 +++++++++++++++++++++ 5 files changed, 222 insertions(+), 20 deletions(-) diff --git a/iceberg/iceberg-handler/src/test/queries/positive/bucket_map_join_9.q b/iceberg/iceberg-handler/src/test/queries/positive/bucket_map_join_9.q new file mode 100644 index 00000000000..4c201d71850 --- /dev/null +++ b/iceberg/iceberg-handler/src/test/queries/positive/bucket_map_join_9.q @@ -0,0 +1,9 @@ +set hive.auto.convert.join=true; +set hive.convert.join.bucket.mapjoin.tez=true; + +CREATE TABLE tbl (foid string, part string, id string) PARTITIONED BY SPEC(bucket(10, id), bucket(10, part)) STORED BY ICEBERG; +INSERT INTO tbl VALUES ('1234', 'PART_123', '1'), ('1235', 'PART_124', '2'); + +EXPLAIN +SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part; +SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part; diff --git a/iceberg/iceberg-handler/src/test/results/positive/bucket_map_join_9.q.out b/iceberg/iceberg-handler/src/test/results/positive/bucket_map_join_9.q.out new file mode 100644 index 00000000000..8153bdd697f --- /dev/null +++ b/iceberg/iceberg-handler/src/test/results/positive/bucket_map_join_9.q.out @@ -0,0 +1,65 @@ +PREHOOK: query: CREATE TABLE tbl (foid string, part string, id string) PARTITIONED BY SPEC(bucket(10, id), bucket(10, part)) STORED BY ICEBERG +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@tbl +POSTHOOK: query: CREATE TABLE tbl (foid string, part string, id string) PARTITIONED BY SPEC(bucket(10, id), bucket(10, part)) STORED BY ICEBERG +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@tbl +PREHOOK: query: INSERT INTO tbl VALUES ('1234', 'PART_123', '1'), ('1235', 'PART_124', '2') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@tbl +POSTHOOK: query: INSERT INTO tbl VALUES ('1234', 'PART_123', '1'), ('1235', 'PART_124', '2') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@tbl +PREHOOK: query: EXPLAIN +SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: EXPLAIN +SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl +POSTHOOK: Output: hdfs://### HDFS PATH ### +Plan optimized by CBO. + +Vertex dependency in root stage +Map 1 <- Map 2 (CUSTOM_EDGE) + +Stage-0 + Fetch Operator + limit:-1 + Stage-1 + Map 1 vectorized + File Output Operator [FS_53] + Map Join Operator [MAPJOIN_52] (rows=2 width=530) + BucketMapJoin:true,Conds:SEL_51._col1, _col2=RS_49._col1, _col2(Inner),Output:["_col0","_col1","_col2","_col3","_col4","_col5"] + <-Map 2 [CUSTOM_EDGE] vectorized + MULTICAST [RS_49] + PartitionCols:_col2, _col1 + Select Operator [SEL_48] (rows=2 width=265) + Output:["_col0","_col1","_col2"] + Filter Operator [FIL_47] (rows=2 width=265) + predicate:(id is not null and part is not null) + TableScan [TS_3] (rows=2 width=265) + default@tbl,tbl2,Tbl:COMPLETE,Col:COMPLETE,Output:["foid","part","id"] + <-Select Operator [SEL_51] (rows=2 width=265) + Output:["_col0","_col1","_col2"] + Filter Operator [FIL_50] (rows=2 width=265) + predicate:(id is not null and part is not null) + TableScan [TS_0] (rows=2 width=265) + default@tbl,tbl,Tbl:COMPLETE,Col:COMPLETE,Grouping Num Buckets:100,Grouping Partition Columns:["id","part"],Output:["foid","part","id"] + +PREHOOK: query: SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl +POSTHOOK: Output: hdfs://### HDFS PATH ### +1234 PART_123 1 1234 PART_123 1 +1235 PART_124 2 1235 PART_124 2 diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 37348dcef06..a622a0a7c02 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -656,32 +656,39 @@ private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcCon // on small table(s). ReduceSinkOperator bigTableRS = (ReduceSinkOperator)joinOp.getParentOperators().get(bigTablePosition); OpTraits opTraits = bigTableRS.getOpTraits(); - List<List<String>> listBucketCols = opTraits.getBucketColNames(); + // It is guaranteed there is only 1 list within bigTableRS.getOpTraits().getBucketColNames(). + List<String> listBucketCols = opTraits.getBucketColNames().get(0); List<ExprNodeDesc> bigTablePartitionCols = bigTableRS.getConf().getPartitionCols(); - boolean updatePartitionCols = false; + boolean updatePartitionCols = listBucketCols.size() != bigTablePartitionCols.size(); List<Integer> positions = new ArrayList<>(); - CustomBucketFunction bucketFunction = opTraits.getCustomBucketFunctions().get(0); - if (listBucketCols.get(0).size() != bigTablePartitionCols.size()) { - updatePartitionCols = true; - // Prepare updated partition columns for small table(s). - // Get the positions of bucketed columns - - int bigTableExprPos = 0; - Map<String, ExprNodeDesc> colExprMap = bigTableRS.getColumnExprMap(); - final boolean[] retainedColumns = new boolean[listBucketCols.get(0).size()]; - for (ExprNodeDesc bigTableExpr : bigTablePartitionCols) { - // It is guaranteed there is only 1 list within listBucketCols. - for (int i = 0; i < listBucketCols.get(0).size(); i++) { - final String colName = listBucketCols.get(0).get(i); - if (colExprMap.get(colName).isSame(bigTableExpr)) { - positions.add(bigTableExprPos); - retainedColumns[i] = true; - } + // Compare the partition columns and the bucket columns of bigTableRS. + Map<String, ExprNodeDesc> colExprMap = bigTableRS.getColumnExprMap(); + final boolean[] retainedColumns = new boolean[listBucketCols.size()]; + for (int bucketColIdx = 0; bucketColIdx < listBucketCols.size(); bucketColIdx++) { + for (int bigTablePartIdx = 0; bigTablePartIdx < bigTablePartitionCols.size(); bigTablePartIdx++) { + ExprNodeDesc bigTablePartExpr = bigTablePartitionCols.get(bigTablePartIdx); + ExprNodeDesc bucketColExpr = colExprMap.get(listBucketCols.get(bucketColIdx)); + if (bigTablePartExpr.isSame(bucketColExpr)) { + positions.add(bigTablePartIdx); + retainedColumns[bucketColIdx] = true; + // If the positions of the partition column and the bucket column are not the same, + // then we need to update the position of the partition column in small tables. + updatePartitionCols = updatePartitionCols || bucketColIdx != bigTablePartIdx; + break; } - bigTableExprPos = bigTableExprPos + 1; } + } + // If the number of partition columns is less than the number of bucket columns, + // then we cannot properly distribute small tables onto bucketized map tasks. + // Bail out. + if (positions.size() < listBucketCols.size()) { + return false; + } + + CustomBucketFunction bucketFunction = opTraits.getCustomBucketFunctions().get(0); + if (updatePartitionCols) { Preconditions.checkState(opTraits.getCustomBucketFunctions().size() == 1); if (opTraits.getCustomBucketFunctions().get(0) != null) { final Optional<CustomBucketFunction> selected = diff --git a/ql/src/test/queries/clientpositive/bucketmapjoin14.q b/ql/src/test/queries/clientpositive/bucketmapjoin14.q new file mode 100644 index 00000000000..b710456d75c --- /dev/null +++ b/ql/src/test/queries/clientpositive/bucketmapjoin14.q @@ -0,0 +1,9 @@ +set hive.auto.convert.join=true; +set hive.convert.join.bucket.mapjoin.tez=true; + +CREATE TABLE tbl (foid string, part string, id string) CLUSTERED BY (id, part) INTO 64 BUCKETS; +INSERT INTO tbl VALUES ('1234', 'PART_123', '1'), ('1235', 'PART_124', '2'); + +EXPLAIN +SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part; +SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part; diff --git a/ql/src/test/results/clientpositive/llap/bucketmapjoin14.q.out b/ql/src/test/results/clientpositive/llap/bucketmapjoin14.q.out new file mode 100644 index 00000000000..4cf433e58f1 --- /dev/null +++ b/ql/src/test/results/clientpositive/llap/bucketmapjoin14.q.out @@ -0,0 +1,112 @@ +PREHOOK: query: CREATE TABLE tbl (foid string, part string, id string) CLUSTERED BY (id, part) INTO 64 BUCKETS +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@tbl +POSTHOOK: query: CREATE TABLE tbl (foid string, part string, id string) CLUSTERED BY (id, part) INTO 64 BUCKETS +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@tbl +PREHOOK: query: INSERT INTO tbl VALUES ('1234', 'PART_123', '1'), ('1235', 'PART_124', '2') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@tbl +POSTHOOK: query: INSERT INTO tbl VALUES ('1234', 'PART_123', '1'), ('1235', 'PART_124', '2') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@tbl +POSTHOOK: Lineage: tbl.foid SCRIPT [] +POSTHOOK: Lineage: tbl.id SCRIPT [] +POSTHOOK: Lineage: tbl.part SCRIPT [] +PREHOOK: query: EXPLAIN +SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl +#### A masked pattern was here #### +POSTHOOK: query: EXPLAIN +SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl +#### A masked pattern was here #### +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 1 <- Map 2 (CUSTOM_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: tbl + filterExpr: (id is not null and part is not null) (type: boolean) + Statistics: Num rows: 2 Data size: 530 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: (id is not null and part is not null) (type: boolean) + Statistics: Num rows: 2 Data size: 530 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: foid (type: string), part (type: string), id (type: string) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 2 Data size: 530 Basic stats: COMPLETE Column stats: COMPLETE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col1 (type: string), _col2 (type: string) + 1 _col1 (type: string), _col2 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5 + input vertices: + 1 Map 2 + Statistics: Num rows: 2 Data size: 1060 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 2 Data size: 1060 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Execution mode: vectorized, llap + LLAP IO: all inputs + Map 2 + Map Operator Tree: + TableScan + alias: tbl2 + filterExpr: (id is not null and part is not null) (type: boolean) + Statistics: Num rows: 2 Data size: 530 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: (id is not null and part is not null) (type: boolean) + Statistics: Num rows: 2 Data size: 530 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: foid (type: string), part (type: string), id (type: string) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 2 Data size: 530 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col1 (type: string), _col2 (type: string) + null sort order: zz + sort order: ++ + Map-reduce partition columns: _col2 (type: string), _col1 (type: string) + Statistics: Num rows: 2 Data size: 530 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: string) + Execution mode: vectorized, llap + LLAP IO: all inputs + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl +#### A masked pattern was here #### +1234 PART_123 1 1234 PART_123 1 +1235 PART_124 2 1235 PART_124 2