[ 
https://issues.apache.org/jira/browse/HIVE-27734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17778749#comment-17778749
 ] 

okumin edited comment on HIVE-27734 at 10/23/23 5:05 PM:
---------------------------------------------------------

This is my first trial to implement Bucket Map Join in a very easy case.

[https://github.com/okumin/hive/commits/POC-BMJ-ICEBERG2]

Test data.
{code:java}
SET hive.disable.unsafe.external.table.operations=false;
SET hive.fetch.task.conversion = none;DROP TABLE IF EXISTS 
default.iceberg_bucketed PURGE;
DROP TABLE IF EXISTS default.iceberg_non_bucketed PURGE;
CREATE TABLE default.iceberg_bucketed (id int, txt string) PARTITIONED BY SPEC 
(bucket(8, id)) STORED BY ICEBERG;
CREATE TABLE default.iceberg_non_bucketed (id int, txt string) STORED BY 
ICEBERG;
INSERT INTO default.iceberg_bucketed VALUES 
(1,'1'),(2,'2'),(3,'3'),(4,'4'),(5,'5'),(6,'6'),(7,'7'),(8,'8');
INSERT INTO default.iceberg_non_bucketed VALUES 
(1,'1'),(2,'2'),(3,'3'),(4,'4'); {code}
EXPLAIN
{code:java}
0: jdbc:hive2://hive-hiveserver2:10000/defaul> EXPLAIN EXTENDED SELECT * FROM 
default.iceberg_bucketed f INNER JOIN default.iceberg_non_bucketed d ON f.id = 
d.id;
...
|                       Map Join Operator            |
|                         condition map:             |
|                              Inner Join 0 to 1     |
|                         Estimated key counts: Map 2 => 1 |
|                         keys:                      |
|                           0 _col0 (type: int)      |
|                           1 _col0 (type: int)      |
|                         outputColumnNames: _col0, _col1, _col2, _col3 |
|                         input vertices:            |
|                           1 Map 2                  |
|                         Position of Big Table: 0   |
|                         Statistics: Num rows: 4 Data size: 712 Basic stats: 
COMPLETE Column stats: COMPLETE |
|                         BucketMapJoin: true        |
|                         File Output Operator       |{code}
It succeeded in joining records.
{code:java}
0: jdbc:hive2://hive-hiveserver2:10000/defaul> SELECT * FROM 
default.iceberg_bucketed f INNER JOIN default.iceberg_non_bucketed d ON f.id = 
d.id;
...
----------------------------------------------------------------------------------------------
        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  
FAILED  KILLED  
----------------------------------------------------------------------------------------------
Map 2 .......... container     SUCCEEDED      1          1        0        0    
   0       0  
Map 1 .......... container     SUCCEEDED      5          5        0        0    
   0       0  
----------------------------------------------------------------------------------------------
VERTICES: 02/02  [==========================>>] 100%  ELAPSED TIME: 12.78 s    
----------------------------------------------------------------------------------------------
INFO  : Completed executing 
command(queryId=hive_20231023164515_ec38118a-9ba7-471a-914c-f53866e09ab9); Time 
taken: 12.979 seconds
+-------+--------+-------+--------+
| f.id  | f.txt  | d.id  | d.txt  |
+-------+--------+-------+--------+
| 3     | 3      | 3     | 3      |
| 1     | 1      | 1     | 1      |
| 2     | 2      | 2     | 2      |
| 4     | 4      | 4     | 4      |
+-------+--------+-------+--------+ {code}


was (Author: okumin):
This is my first trial to implement Bucket Map Join.

[https://github.com/okumin/hive/commits/POC-BMJ-ICEBERG2]

Test data.
{code:java}
SET hive.disable.unsafe.external.table.operations=false;
SET hive.fetch.task.conversion = none;DROP TABLE IF EXISTS 
default.iceberg_bucketed PURGE;
DROP TABLE IF EXISTS default.iceberg_non_bucketed PURGE;
CREATE TABLE default.iceberg_bucketed (id int, txt string) PARTITIONED BY SPEC 
(bucket(8, id)) STORED BY ICEBERG;
CREATE TABLE default.iceberg_non_bucketed (id int, txt string) STORED BY 
ICEBERG;
INSERT INTO default.iceberg_bucketed VALUES 
(1,'1'),(2,'2'),(3,'3'),(4,'4'),(5,'5'),(6,'6'),(7,'7'),(8,'8');
INSERT INTO default.iceberg_non_bucketed VALUES 
(1,'1'),(2,'2'),(3,'3'),(4,'4'); {code}
EXPLAIN
{code:java}
0: jdbc:hive2://hive-hiveserver2:10000/defaul> EXPLAIN EXTENDED SELECT * FROM 
default.iceberg_bucketed f INNER JOIN default.iceberg_non_bucketed d ON f.id = 
d.id;
...
|                       Map Join Operator            |
|                         condition map:             |
|                              Inner Join 0 to 1     |
|                         Estimated key counts: Map 2 => 1 |
|                         keys:                      |
|                           0 _col0 (type: int)      |
|                           1 _col0 (type: int)      |
|                         outputColumnNames: _col0, _col1, _col2, _col3 |
|                         input vertices:            |
|                           1 Map 2                  |
|                         Position of Big Table: 0   |
|                         Statistics: Num rows: 4 Data size: 712 Basic stats: 
COMPLETE Column stats: COMPLETE |
|                         BucketMapJoin: true        |
|                         File Output Operator       |{code}
It succeeded in joining records.
{code:java}
0: jdbc:hive2://hive-hiveserver2:10000/defaul> SELECT * FROM 
default.iceberg_bucketed f INNER JOIN default.iceberg_non_bucketed d ON f.id = 
d.id;
...
----------------------------------------------------------------------------------------------
        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  
FAILED  KILLED  
----------------------------------------------------------------------------------------------
Map 2 .......... container     SUCCEEDED      1          1        0        0    
   0       0  
Map 1 .......... container     SUCCEEDED      5          5        0        0    
   0       0  
----------------------------------------------------------------------------------------------
VERTICES: 02/02  [==========================>>] 100%  ELAPSED TIME: 12.78 s    
----------------------------------------------------------------------------------------------
INFO  : Completed executing 
command(queryId=hive_20231023164515_ec38118a-9ba7-471a-914c-f53866e09ab9); Time 
taken: 12.979 seconds
+-------+--------+-------+--------+
| f.id  | f.txt  | d.id  | d.txt  |
+-------+--------+-------+--------+
| 3     | 3      | 3     | 3      |
| 1     | 1      | 1     | 1      |
| 2     | 2      | 2     | 2      |
| 4     | 4      | 4     | 4      |
+-------+--------+-------+--------+ {code}

> Add Icenerg's storage-partitioned join capabilities to Hive's 
> [sorted-]bucket-map-join
> --------------------------------------------------------------------------------------
>
>                 Key: HIVE-27734
>                 URL: https://issues.apache.org/jira/browse/HIVE-27734
>             Project: Hive
>          Issue Type: Improvement
>          Components: Iceberg integration
>    Affects Versions: 4.0.0-alpha-2
>            Reporter: Janos Kovacs
>            Priority: Major
>
> Iceberg's 'data bucketing' is implemented through its rich (function based) 
> partitioning feature which helps to optimize join operations - called storage 
> partitioned joins. 
> doc: 
> [https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE/edit#heading=h.82w8qxfl2uwl]
> spark impl.: https://issues.apache.org/jira/browse/SPARK-37375
> This feature is not yet leveraged in Hive into its bucket-map-join 
> optimization, neither alone nor with Iceberg's SortOrder to 
> sorted-bucket-map-join.
> Customers migrating from Hive table format to Iceberg format with storage 
> optimized schema will experience performance degradation on large tables 
> where Iceberg's gain on no-listing performance improvement is significantly 
> smaller than the actual join performance over bucket-join or even 
> sorted-bucket-join.
>  
> {noformat}
> SET hive.query.results.cache.enabled=false;
> SET hive.fetch.task.conversion = none;
> SET hive.optimize.bucketmapjoin=true;
> SET hive.convert.join.bucket.mapjoin.tez=true;
> SET hive.auto.convert.join.noconditionaltask.size=1000;
> --if you are working with external table, you need this for bmj:
> SET hive.disable.unsafe.external.table.operations=false;
> -- HIVE BUCKET-MAP-JOIN
> DROP TABLE IF EXISTS default.hivebmjt1 PURGE;
> DROP TABLE IF EXISTS default.hivebmjt2 PURGE;
> CREATE TABLE default.hivebmjt1 (id int, txt string) CLUSTERED BY (id) INTO 8 
> BUCKETS;
> CREATE TABLE default.hivebmjt2 (id int, txt string);
> INSERT INTO default.hivebmjt1 VALUES 
> (1,'1'),(2,'2'),(3,'3'),(4,'4'),(5,'5'),(6,'6'),(7,'7'),(8,'8');
> INSERT INTO default.hivebmjt2 VALUES (1,'1'),(2,'2'),(3,'3'),(4,'4');
> EXPLAIN
> SELECT * FROM default.hivebmjt1 f INNER      JOIN default.hivebmjt2 d ON f.id 
> = d.id;
> EXPLAIN
> SELECT * FROM default.hivebmjt1 f LEFT OUTER JOIN default.hivebmjt2 d ON f.id 
> = d.id;
> -- Both are optimized into BMJ
> -- ICEBERG BUCKET-MAP-JOIN via Iceberg's storage-partitioned join
> DROP TABLE IF EXISTS default.icespbmjt1 PURGE;
> DROP TABLE IF EXISTS default.icespbmjt2 PURGE;
> CREATE TABLE default.icespbmjt1 (txt string) PARTITIONED BY (id int) STORED 
> BY ICEBERG ;
> CREATE TABLE default.icespbmjt2 (txt string) PARTITIONED BY (id int) STORED 
> BY ICEBERG ;
> INSERT INTO default.icespbmjt1 VALUES ('1',1),('2',2),('3',3),('4',4);
> INSERT INTO default.icespbmjt2 VALUES ('1',1),('2',2),('3',3),('4',4);
> EXPLAIN
> SELECT * FROM default.icespbmjt1 f INNER      JOIN default.icespbmjt2 d ON 
> f.id = d.id;
> EXPLAIN
> SELECT * FROM default.icespbmjt1 f LEFT OUTER JOIN default.icespbmjt2 d ON 
> f.id = d.id;
> -- Only Map-Join optimised
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to