[ 
https://issues.apache.org/jira/browse/KUDU-2483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jin xing updated KUDU-2483:
---------------------------
    Description: 
Join is really common/popular in Spark SQL, in this JIRA I take broadcast join 
as an example and describe how Kudu's bloom filter can help accelerate 
distributed computing.

Spark runs broadcast join with below steps:
 1. When do broadcast join, we have a small table and a big table; Spark will 
read all data from small table to one worker and build a hash table;
 2. The generated hash table from step 1 is broadcasted to all the workers, 
which will read the splits from big table;
 3. Workers start fetching and iterating all the splits of big table and see if 
the joining keys exists in the hash table; Only matched joining keys is 
retained.

>From above, step 3 is the heaviest, especially when the worker and split 
>storage is not on the same host and bandwith is limited. Actually the cost 
>brought by step 3 is not always necessary. Think about below scenario:
{code:none}
Small table A
id      name
1      Jin
6      Xing

Big table B
id     age
1      10
2      21
3      33
4      65
5      32
6      23
7      18
8      20
9      22
{code}
Run query with SQL: *select * from A inner join B on A.id=B.id*

It's pretty straight that we don't need to fetch all the data from Table B, 
because the number of matched keys is really small;

I propose to use small table to build a bloom filter(BF) and use the generated 
BF as a predicate/filter to fetch data from big table, thus:
 1. Much traffic/bandwith is saved.
 2. Less data to processe by worker

Broadcast join is just an example, other types of join will also benefit if we 
scan with a BF

In a nutshell, I think Kudu can provide an iterface, by which user can scan 
data with bloom filters

 

Here I want add some statistics for Spark-Kudu integration with/without 
BloomBloomFilter.

In our product environment the bandwidth of each executor is 50M bps.

We do inner join with two tables – – one is large and another one is 
comparatively small.

In Spark, inner join can be implemented as SortMergeJoin or BroadcastHashJoin, 
we implemented the corresponding operators with BloomFilter as 
SortMergeBloomFilterJoin and BroadcastBloomFilterJoin.

The hash table of BloomFilter is configured as 32M. 

I show statistics as below:

 

 
||Records of Table A||Records of Table B||Join Operator||Executor Time||
|400 thousand|14 billion|SortMergeJoin|760 seconds|
|400 thousand|14 billion|BroadcastHashJoin|376s|
|400 thousand|14 billion|BroadcastBloomFilterJoin|21s|
|2 million|14 billion|SortMergeJoin|707s|
|2 million|14 billion|BroadcastHashJoin|329s|
|2 million|14 billion|SortMergeBloomFilterJoin|75s|
|2 million|14 billion|BroadcastBloomFilterJoin|35s|

As we can see, it benefit a lot from BloomFilter-PushDown. 

 

I want to take this jira  as a umbrella and my workmates will submit following 
sub-task/pr.

 

It will be great if some can take more look at this and share some comments. 

 

  was:
Join is really common/popular in Spark SQL, in this JIRA I take broadcast join 
as an example and describe how Kudu's bloom filter can help accelerate 
distributed computing.

Spark runs broadcast join with below steps:
1. When do broadcast join, we have a small table and a big table; Spark will 
read all data from small table to one worker and build a hash table;
2. The generated hash table from step 1 is broadcasted to all the workers, 
which will read the splits from big table;
3. Workers start fetching and iterating all the splits of big table and see if 
the joining keys exists in the hash table; Only matched joining keys is 
retained.

>From above, step 3 is the heaviest, especially when the worker and split 
>storage is not on the same host and bandwith is limited. Actually the cost 
>brought by step 3 is not always necessary. Think about below scenario:

{code:none}
Small table A
id      name
1      Jin
6      Xing

Big table B
id     age
1      10
2      21
3      33
4      65
5      32
6      23
7      18
8      20
9      22
{code}

Run query with SQL: *select * from A inner join B on A.id=B.id*

It's pretty straight that we don't need to fetch all the data from Table B, 
because the number of matched keys is really small;

I propose to use small table to build a bloom filter(BF) and use the generated 
BF as a predicate/filter to fetch data from big table, thus:
1. Much traffic/bandwith is saved.
2. Less data to processe by worker

Broadcast join is just an example, other types of join will also benefit if we 
scan with a BF

In a nutshell, I think Kudu can provide an iterface, by which user can scan 
data with bloom filters


> Scan tablets with bloom filter
> ------------------------------
>
>                 Key: KUDU-2483
>                 URL: https://issues.apache.org/jira/browse/KUDU-2483
>             Project: Kudu
>          Issue Type: New Feature
>          Components: client
>            Reporter: jin xing
>            Priority: Major
>         Attachments: KUDU-2483, image-2018-07-01-23-29-05-517.png
>
>
> Join is really common/popular in Spark SQL, in this JIRA I take broadcast 
> join as an example and describe how Kudu's bloom filter can help accelerate 
> distributed computing.
> Spark runs broadcast join with below steps:
>  1. When do broadcast join, we have a small table and a big table; Spark will 
> read all data from small table to one worker and build a hash table;
>  2. The generated hash table from step 1 is broadcasted to all the workers, 
> which will read the splits from big table;
>  3. Workers start fetching and iterating all the splits of big table and see 
> if the joining keys exists in the hash table; Only matched joining keys is 
> retained.
> From above, step 3 is the heaviest, especially when the worker and split 
> storage is not on the same host and bandwith is limited. Actually the cost 
> brought by step 3 is not always necessary. Think about below scenario:
> {code:none}
> Small table A
> id      name
> 1      Jin
> 6      Xing
> Big table B
> id     age
> 1      10
> 2      21
> 3      33
> 4      65
> 5      32
> 6      23
> 7      18
> 8      20
> 9      22
> {code}
> Run query with SQL: *select * from A inner join B on A.id=B.id*
> It's pretty straight that we don't need to fetch all the data from Table B, 
> because the number of matched keys is really small;
> I propose to use small table to build a bloom filter(BF) and use the 
> generated BF as a predicate/filter to fetch data from big table, thus:
>  1. Much traffic/bandwith is saved.
>  2. Less data to processe by worker
> Broadcast join is just an example, other types of join will also benefit if 
> we scan with a BF
> In a nutshell, I think Kudu can provide an iterface, by which user can scan 
> data with bloom filters
>  
> Here I want add some statistics for Spark-Kudu integration with/without 
> BloomBloomFilter.
> In our product environment the bandwidth of each executor is 50M bps.
> We do inner join with two tables – – one is large and another one is 
> comparatively small.
> In Spark, inner join can be implemented as SortMergeJoin or 
> BroadcastHashJoin, we implemented the corresponding operators with 
> BloomFilter as SortMergeBloomFilterJoin and BroadcastBloomFilterJoin.
> The hash table of BloomFilter is configured as 32M. 
> I show statistics as below:
>  
>  
> ||Records of Table A||Records of Table B||Join Operator||Executor Time||
> |400 thousand|14 billion|SortMergeJoin|760 seconds|
> |400 thousand|14 billion|BroadcastHashJoin|376s|
> |400 thousand|14 billion|BroadcastBloomFilterJoin|21s|
> |2 million|14 billion|SortMergeJoin|707s|
> |2 million|14 billion|BroadcastHashJoin|329s|
> |2 million|14 billion|SortMergeBloomFilterJoin|75s|
> |2 million|14 billion|BroadcastBloomFilterJoin|35s|
> As we can see, it benefit a lot from BloomFilter-PushDown. 
>  
> I want to take this jira  as a umbrella and my workmates will submit 
> following sub-task/pr.
>  
> It will be great if some can take more look at this and share some comments. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to