[
https://issues.apache.org/jira/browse/KUDU-2483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
jin xing updated KUDU-2483:
---------------------------
Description:
Join is really common/popular in Spark SQL, in this JIRA I take broadcast join
as an example and describe how Kudu's bloom filter can help accelerate
distributed computing.
Spark runs broadcast join with below steps:
1. When do broadcast join, we have a small table and a big table; Spark will
read all data from small table to one worker and build a hash table;
2. The generated hash table from step 1 is broadcasted to all the workers,
which will read the splits from big table;
3. Workers start fetching and iterating all the splits of big table and see if
the joining keys exists in the hash table; Only matched joining keys is
retained.
>From above, step 3 is the heaviest, especially when the worker and split
>storage is not on the same host and bandwith is limited. Actually the cost
>brought by step 3 is not always necessary. Think about below scenario:
{code:none}
Small table A
id name
1 Jin
6 Xing
Big table B
id age
1 10
2 21
3 33
4 65
5 32
6 23
7 18
8 20
9 22
{code}
Run query with SQL: select * from A inner join B on A.id=B.id
It's pretty straight that we don't need to fetch all the data from Table B,
because the number of matched keys is really small;
I propose to use small table to build a bloom filter(BF) and use the generated
BF as a predicate/filter to fetch data from big table, thus:
1. Much traffic/bandwith is saved.
2. Less data to processe by worker
Broadcast join is just an example, other types of join will also benefit if we
scan with a BF
In a nutshell, I think Kudu can provide an iterface, by which user can scan
data with bloom filters
was:
Join is really common/popular in Spark SQL, in this JIRA I take broadcast join
as an example and describe how Kudu's bloom filter can help accelerate
distributed computing.
Spark runs broadcast join with below steps:
1. When do broadcast join, we have a small table and a big table; Spark will
read all data from small table to one worker and build a hash table;
2. The generated hash table from step 1 is broadcasted to all the workers,
which will read the splits from big table;
3. Workers start fetching and iterating all the splits of big table and see if
the joining keys exists in the hash table; Only matched joining keys is
retained.
>From above, step 3 is the heaviest, especially when the worker and split
>storage is not on the same host and bandwith is limited. Actually the cost
>brought by step 3 is not always necessary. Think about below scenario:
{code:none}
Small table A
id name
1 Jin
6 Xing
Big table B
id age
1 10
2 21
3 33
4 65
5 32
6 23
7 18
8 20
9 22
{code:none}
Run query with SQL: select * from A inner join B on A.id=B.id
It's pretty straight that we don't need to fetch all the data from Table B,
because the number of matched keys is really small;
I propose to use small table to build a bloom filter(BF) and use the generated
BF as a predicate/filter to fetch data from big table, thus:
1. Much traffic/bandwith is saved.
2. Less data to processe by worker
Broadcast join is just an example, other types of join will also benefit if we
scan with a BF
In a nutshell, I think Kudu can provide an iterface, by which user can scan
data with bloom filters
> Scan tablets with bloom filter
> ------------------------------
>
> Key: KUDU-2483
> URL: https://issues.apache.org/jira/browse/KUDU-2483
> Project: Kudu
> Issue Type: Bug
> Components: client
> Reporter: jin xing
> Priority: Major
>
> Join is really common/popular in Spark SQL, in this JIRA I take broadcast
> join as an example and describe how Kudu's bloom filter can help accelerate
> distributed computing.
> Spark runs broadcast join with below steps:
> 1. When do broadcast join, we have a small table and a big table; Spark will
> read all data from small table to one worker and build a hash table;
> 2. The generated hash table from step 1 is broadcasted to all the workers,
> which will read the splits from big table;
> 3. Workers start fetching and iterating all the splits of big table and see
> if the joining keys exists in the hash table; Only matched joining keys is
> retained.
> From above, step 3 is the heaviest, especially when the worker and split
> storage is not on the same host and bandwith is limited. Actually the cost
> brought by step 3 is not always necessary. Think about below scenario:
> {code:none}
> Small table A
> id name
> 1 Jin
> 6 Xing
> Big table B
> id age
> 1 10
> 2 21
> 3 33
> 4 65
> 5 32
> 6 23
> 7 18
> 8 20
> 9 22
> {code}
> Run query with SQL: select * from A inner join B on A.id=B.id
> It's pretty straight that we don't need to fetch all the data from Table B,
> because the number of matched keys is really small;
> I propose to use small table to build a bloom filter(BF) and use the
> generated BF as a predicate/filter to fetch data from big table, thus:
> 1. Much traffic/bandwith is saved.
> 2. Less data to processe by worker
> Broadcast join is just an example, other types of join will also benefit if
> we scan with a BF
> In a nutshell, I think Kudu can provide an iterface, by which user can scan
> data with bloom filters
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)