[
https://issues.apache.org/jira/browse/HIVE-27734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17778749#comment-17778749
]
okumin edited comment on HIVE-27734 at 10/23/23 5:05 PM:
---------------------------------------------------------
This is my first trial to implement Bucket Map Join in a very easy case.
[https://github.com/okumin/hive/commits/POC-BMJ-ICEBERG2]
Test data.
{code:java}
SET hive.disable.unsafe.external.table.operations=false;
SET hive.fetch.task.conversion = none;DROP TABLE IF EXISTS
default.iceberg_bucketed PURGE;
DROP TABLE IF EXISTS default.iceberg_non_bucketed PURGE;
CREATE TABLE default.iceberg_bucketed (id int, txt string) PARTITIONED BY SPEC
(bucket(8, id)) STORED BY ICEBERG;
CREATE TABLE default.iceberg_non_bucketed (id int, txt string) STORED BY
ICEBERG;
INSERT INTO default.iceberg_bucketed VALUES
(1,'1'),(2,'2'),(3,'3'),(4,'4'),(5,'5'),(6,'6'),(7,'7'),(8,'8');
INSERT INTO default.iceberg_non_bucketed VALUES
(1,'1'),(2,'2'),(3,'3'),(4,'4'); {code}
EXPLAIN
{code:java}
0: jdbc:hive2://hive-hiveserver2:10000/defaul> EXPLAIN EXTENDED SELECT * FROM
default.iceberg_bucketed f INNER JOIN default.iceberg_non_bucketed d ON f.id =
d.id;
...
| Map Join Operator |
| condition map: |
| Inner Join 0 to 1 |
| Estimated key counts: Map 2 => 1 |
| keys: |
| 0 _col0 (type: int) |
| 1 _col0 (type: int) |
| outputColumnNames: _col0, _col1, _col2, _col3 |
| input vertices: |
| 1 Map 2 |
| Position of Big Table: 0 |
| Statistics: Num rows: 4 Data size: 712 Basic stats:
COMPLETE Column stats: COMPLETE |
| BucketMapJoin: true |
| File Output Operator |{code}
It succeeded in joining records.
{code:java}
0: jdbc:hive2://hive-hiveserver2:10000/defaul> SELECT * FROM
default.iceberg_bucketed f INNER JOIN default.iceberg_non_bucketed d ON f.id =
d.id;
...
----------------------------------------------------------------------------------------------
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING
FAILED KILLED
----------------------------------------------------------------------------------------------
Map 2 .......... container SUCCEEDED 1 1 0 0
0 0
Map 1 .......... container SUCCEEDED 5 5 0 0
0 0
----------------------------------------------------------------------------------------------
VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 12.78 s
----------------------------------------------------------------------------------------------
INFO : Completed executing
command(queryId=hive_20231023164515_ec38118a-9ba7-471a-914c-f53866e09ab9); Time
taken: 12.979 seconds
+-------+--------+-------+--------+
| f.id | f.txt | d.id | d.txt |
+-------+--------+-------+--------+
| 3 | 3 | 3 | 3 |
| 1 | 1 | 1 | 1 |
| 2 | 2 | 2 | 2 |
| 4 | 4 | 4 | 4 |
+-------+--------+-------+--------+ {code}
was (Author: okumin):
This is my first trial to implement Bucket Map Join.
[https://github.com/okumin/hive/commits/POC-BMJ-ICEBERG2]
Test data.
{code:java}
SET hive.disable.unsafe.external.table.operations=false;
SET hive.fetch.task.conversion = none;DROP TABLE IF EXISTS
default.iceberg_bucketed PURGE;
DROP TABLE IF EXISTS default.iceberg_non_bucketed PURGE;
CREATE TABLE default.iceberg_bucketed (id int, txt string) PARTITIONED BY SPEC
(bucket(8, id)) STORED BY ICEBERG;
CREATE TABLE default.iceberg_non_bucketed (id int, txt string) STORED BY
ICEBERG;
INSERT INTO default.iceberg_bucketed VALUES
(1,'1'),(2,'2'),(3,'3'),(4,'4'),(5,'5'),(6,'6'),(7,'7'),(8,'8');
INSERT INTO default.iceberg_non_bucketed VALUES
(1,'1'),(2,'2'),(3,'3'),(4,'4'); {code}
EXPLAIN
{code:java}
0: jdbc:hive2://hive-hiveserver2:10000/defaul> EXPLAIN EXTENDED SELECT * FROM
default.iceberg_bucketed f INNER JOIN default.iceberg_non_bucketed d ON f.id =
d.id;
...
| Map Join Operator |
| condition map: |
| Inner Join 0 to 1 |
| Estimated key counts: Map 2 => 1 |
| keys: |
| 0 _col0 (type: int) |
| 1 _col0 (type: int) |
| outputColumnNames: _col0, _col1, _col2, _col3 |
| input vertices: |
| 1 Map 2 |
| Position of Big Table: 0 |
| Statistics: Num rows: 4 Data size: 712 Basic stats:
COMPLETE Column stats: COMPLETE |
| BucketMapJoin: true |
| File Output Operator |{code}
It succeeded in joining records.
{code:java}
0: jdbc:hive2://hive-hiveserver2:10000/defaul> SELECT * FROM
default.iceberg_bucketed f INNER JOIN default.iceberg_non_bucketed d ON f.id =
d.id;
...
----------------------------------------------------------------------------------------------
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING
FAILED KILLED
----------------------------------------------------------------------------------------------
Map 2 .......... container SUCCEEDED 1 1 0 0
0 0
Map 1 .......... container SUCCEEDED 5 5 0 0
0 0
----------------------------------------------------------------------------------------------
VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 12.78 s
----------------------------------------------------------------------------------------------
INFO : Completed executing
command(queryId=hive_20231023164515_ec38118a-9ba7-471a-914c-f53866e09ab9); Time
taken: 12.979 seconds
+-------+--------+-------+--------+
| f.id | f.txt | d.id | d.txt |
+-------+--------+-------+--------+
| 3 | 3 | 3 | 3 |
| 1 | 1 | 1 | 1 |
| 2 | 2 | 2 | 2 |
| 4 | 4 | 4 | 4 |
+-------+--------+-------+--------+ {code}
> Add Icenerg's storage-partitioned join capabilities to Hive's
> [sorted-]bucket-map-join
> --------------------------------------------------------------------------------------
>
> Key: HIVE-27734
> URL: https://issues.apache.org/jira/browse/HIVE-27734
> Project: Hive
> Issue Type: Improvement
> Components: Iceberg integration
> Affects Versions: 4.0.0-alpha-2
> Reporter: Janos Kovacs
> Priority: Major
>
> Iceberg's 'data bucketing' is implemented through its rich (function based)
> partitioning feature which helps to optimize join operations - called storage
> partitioned joins.
> doc:
> [https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE/edit#heading=h.82w8qxfl2uwl]
> spark impl.: https://issues.apache.org/jira/browse/SPARK-37375
> This feature is not yet leveraged in Hive into its bucket-map-join
> optimization, neither alone nor with Iceberg's SortOrder to
> sorted-bucket-map-join.
> Customers migrating from Hive table format to Iceberg format with storage
> optimized schema will experience performance degradation on large tables
> where Iceberg's gain on no-listing performance improvement is significantly
> smaller than the actual join performance over bucket-join or even
> sorted-bucket-join.
>
> {noformat}
> SET hive.query.results.cache.enabled=false;
> SET hive.fetch.task.conversion = none;
> SET hive.optimize.bucketmapjoin=true;
> SET hive.convert.join.bucket.mapjoin.tez=true;
> SET hive.auto.convert.join.noconditionaltask.size=1000;
> --if you are working with external table, you need this for bmj:
> SET hive.disable.unsafe.external.table.operations=false;
> -- HIVE BUCKET-MAP-JOIN
> DROP TABLE IF EXISTS default.hivebmjt1 PURGE;
> DROP TABLE IF EXISTS default.hivebmjt2 PURGE;
> CREATE TABLE default.hivebmjt1 (id int, txt string) CLUSTERED BY (id) INTO 8
> BUCKETS;
> CREATE TABLE default.hivebmjt2 (id int, txt string);
> INSERT INTO default.hivebmjt1 VALUES
> (1,'1'),(2,'2'),(3,'3'),(4,'4'),(5,'5'),(6,'6'),(7,'7'),(8,'8');
> INSERT INTO default.hivebmjt2 VALUES (1,'1'),(2,'2'),(3,'3'),(4,'4');
> EXPLAIN
> SELECT * FROM default.hivebmjt1 f INNER JOIN default.hivebmjt2 d ON f.id
> = d.id;
> EXPLAIN
> SELECT * FROM default.hivebmjt1 f LEFT OUTER JOIN default.hivebmjt2 d ON f.id
> = d.id;
> -- Both are optimized into BMJ
> -- ICEBERG BUCKET-MAP-JOIN via Iceberg's storage-partitioned join
> DROP TABLE IF EXISTS default.icespbmjt1 PURGE;
> DROP TABLE IF EXISTS default.icespbmjt2 PURGE;
> CREATE TABLE default.icespbmjt1 (txt string) PARTITIONED BY (id int) STORED
> BY ICEBERG ;
> CREATE TABLE default.icespbmjt2 (txt string) PARTITIONED BY (id int) STORED
> BY ICEBERG ;
> INSERT INTO default.icespbmjt1 VALUES ('1',1),('2',2),('3',3),('4',4);
> INSERT INTO default.icespbmjt2 VALUES ('1',1),('2',2),('3',3),('4',4);
> EXPLAIN
> SELECT * FROM default.icespbmjt1 f INNER JOIN default.icespbmjt2 d ON
> f.id = d.id;
> EXPLAIN
> SELECT * FROM default.icespbmjt1 f LEFT OUTER JOIN default.icespbmjt2 d ON
> f.id = d.id;
> -- Only Map-Join optimised
> {noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)