[
https://issues.apache.org/jira/browse/KUDU-2483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16521503#comment-16521503
]
jin xing commented on KUDU-2483:
--------------------------------
Sorry for late reply. True, pushing down bloom filters for scan is quite
useful. `Join` is a typical scenario. But this feature I think is not
necessarily provided for join.
Spark currently doesn't have support for push-down bloom filters. We have
implement this feature and targeting to contribute to both Spark and Kudu.
And I also agrree that we should go by exposing a bloom filter from Kudu.
Basically our work contains below parts:
1. A Java version Bloom filter.
2. `BroadcastBloomFilterHashJoinExec` -- it can generate bloom filters from the
small table and send the BF to Kudu client.
3. Kudu RDD accepts the BFs generated by `BroadcastBloomFilterHashJoinExec` and
merge as Predicate and send to TServer.
4. TServer accepts the BFs and send back filtered data
For Part-2, it will be a patch to Spark and other parts are implemented within
Kudu scope.
If the implementation is interested. I can created some subtasks(this JIRA will
be an umbrella) and submit the patch.
> Scan tablets with bloom filter
> ------------------------------
>
> Key: KUDU-2483
> URL: https://issues.apache.org/jira/browse/KUDU-2483
> Project: Kudu
> Issue Type: New Feature
> Components: client
> Reporter: jin xing
> Priority: Major
>
> Join is really common/popular in Spark SQL, in this JIRA I take broadcast
> join as an example and describe how Kudu's bloom filter can help accelerate
> distributed computing.
> Spark runs broadcast join with below steps:
> 1. When do broadcast join, we have a small table and a big table; Spark will
> read all data from small table to one worker and build a hash table;
> 2. The generated hash table from step 1 is broadcasted to all the workers,
> which will read the splits from big table;
> 3. Workers start fetching and iterating all the splits of big table and see
> if the joining keys exists in the hash table; Only matched joining keys is
> retained.
> From above, step 3 is the heaviest, especially when the worker and split
> storage is not on the same host and bandwith is limited. Actually the cost
> brought by step 3 is not always necessary. Think about below scenario:
> {code:none}
> Small table A
> id name
> 1 Jin
> 6 Xing
> Big table B
> id age
> 1 10
> 2 21
> 3 33
> 4 65
> 5 32
> 6 23
> 7 18
> 8 20
> 9 22
> {code}
> Run query with SQL: *select * from A inner join B on A.id=B.id*
> It's pretty straight that we don't need to fetch all the data from Table B,
> because the number of matched keys is really small;
> I propose to use small table to build a bloom filter(BF) and use the
> generated BF as a predicate/filter to fetch data from big table, thus:
> 1. Much traffic/bandwith is saved.
> 2. Less data to processe by worker
> Broadcast join is just an example, other types of join will also benefit if
> we scan with a BF
> In a nutshell, I think Kudu can provide an iterface, by which user can scan
> data with bloom filters
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)