[ 
https://issues.apache.org/jira/browse/KUDU-2483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529133#comment-16529133
 ] 

jin xing edited comment on KUDU-2483 at 7/1/18 3:37 PM:
--------------------------------------------------------

Thanks [~tlipcon] for comment

The benefit I think is straight. 

I created two Kudu tables from Spark shell:
{code:java}
CREATE TABLE smallTable (
    idA string NOT NULL,
    dt string NOT NULL,
    PRIMARY KEY (id,dt))
PARTITION BY HASH (id) PARTITIONS 2,
RANGE (dt) (
    PARTITION "20180630" <= VALUES < "20180701",
    PARTITION "20180701" <= VALUES < "20180702",
    PARTITION "20180702" <= VALUES < "20180703"
)

CREATE TABLE bigTable (
    idB string NOT NULL,
    idC string NOT NULL,
    dt string NOT NULL,
PRIMARY KEY (holder_alipay_id,quote_biz_id,dt))
PARTITION BY HASH (idB, idC) PARTITIONS 10,
RANGE (dt) (
    PARTITION "20180630" <= VALUES < "20180701",
    PARTITION "20180701" <= VALUES < "20180702",
    PARTITION "20180702" <= VALUES < "20180703"
){code}
I inserted 6 rows to smallTable and 323075 rows into bigTable.

Then query with sql
{code:java}
select * from smallTable inner join bigTable on smallTable.idA=bigTable.idB;
{code}
 

I added a boolean config `spark.sql.kudu.pushDownKuduBloomFilters` to control 
if this feature is enabled.

I created the bloom filters by size=32KB and fp_rate=0.01.

When `spark.sql.kudu.pushDownKuduBloomFilters` is enabled, statistics from 
Spark are shown as below:

number of output rows from Kudu: 114

duration cost by query: 2s

 

When ``spark.sql.kudu.pushDownKuduBloomFilters` is disabled, statics from Spark 
are shown as below:

number of output rows from Kudu: 323075

duration cost by query: 16s

 

I post the screen shot for comparison as below:

[^image-2018-07-01-23-29-05-517.png]


was (Author: jinxing6...@126.com):
Thanks [~tlipcon] for comment

The benefit I think is straight. 

I created two Kudu tables from Spark shell:
{code:java}
CREATE TABLE smallTable (
    idA string NOT NULL,
    dt string NOT NULL,
    PRIMARY KEY (id,dt))
PARTITION BY HASH (id) PARTITIONS 2,
RANGE (dt) (
    PARTITION "20180630" <= VALUES < "20180701",
    PARTITION "20180701" <= VALUES < "20180702",
    PARTITION "20180702" <= VALUES < "20180703"
)

CREATE TABLE bigTable (
    idB string NOT NULL,
    idC string NOT NULL,
    dt string NOT NULL,
PRIMARY KEY (holder_alipay_id,quote_biz_id,dt))
PARTITION BY HASH (idB, idC) PARTITIONS 10,
RANGE (dt) (
    PARTITION "20180630" <= VALUES < "20180701",
    PARTITION "20180701" <= VALUES < "20180702",
    PARTITION "20180702" <= VALUES < "20180703"
){code}
I inserted 6 rows to smallTable and 323075 rows into bigTable.

Then query with sql
{code:java}
select * from smallTable inner join bigTable on smallTable.idA=bigTable.idB;
{code}
 

I added a boolean config `spark.sql.kudu.pushDownKuduBloomFilters` to control 
if this feature is enabled.

I created the bloom filters by size=23KB and fp_rate=0.01.

When `spark.sql.kudu.pushDownKuduBloomFilters` is enabled, statistics from 
Spark are shown as below:

number of output rows from Kudu: 114

duration cost by query: 2s

 

When ``spark.sql.kudu.pushDownKuduBloomFilters` is disabled, statics from Spark 
are shown as below:

number of output rows from Kudu: 323075

duration cost by query: 16s

 

I post the screen shot for comparison as below:

[^image-2018-07-01-23-29-05-517.png]

> Scan tablets with bloom filter
> ------------------------------
>
>                 Key: KUDU-2483
>                 URL: https://issues.apache.org/jira/browse/KUDU-2483
>             Project: Kudu
>          Issue Type: New Feature
>          Components: client
>            Reporter: jin xing
>            Priority: Major
>         Attachments: KUDU-2483, image-2018-07-01-23-29-05-517.png
>
>
> Join is really common/popular in Spark SQL, in this JIRA I take broadcast 
> join as an example and describe how Kudu's bloom filter can help accelerate 
> distributed computing.
> Spark runs broadcast join with below steps:
> 1. When do broadcast join, we have a small table and a big table; Spark will 
> read all data from small table to one worker and build a hash table;
> 2. The generated hash table from step 1 is broadcasted to all the workers, 
> which will read the splits from big table;
> 3. Workers start fetching and iterating all the splits of big table and see 
> if the joining keys exists in the hash table; Only matched joining keys is 
> retained.
> From above, step 3 is the heaviest, especially when the worker and split 
> storage is not on the same host and bandwith is limited. Actually the cost 
> brought by step 3 is not always necessary. Think about below scenario:
> {code:none}
> Small table A
> id      name
> 1      Jin
> 6      Xing
> Big table B
> id     age
> 1      10
> 2      21
> 3      33
> 4      65
> 5      32
> 6      23
> 7      18
> 8      20
> 9      22
> {code}
> Run query with SQL: *select * from A inner join B on A.id=B.id*
> It's pretty straight that we don't need to fetch all the data from Table B, 
> because the number of matched keys is really small;
> I propose to use small table to build a bloom filter(BF) and use the 
> generated BF as a predicate/filter to fetch data from big table, thus:
> 1. Much traffic/bandwith is saved.
> 2. Less data to processe by worker
> Broadcast join is just an example, other types of join will also benefit if 
> we scan with a BF
> In a nutshell, I think Kudu can provide an iterface, by which user can scan 
> data with bloom filters



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to