[ 
https://issues.apache.org/jira/browse/KUDU-2483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16969681#comment-16969681
 ] 

Bankim Bhavsar commented on KUDU-2483:
--------------------------------------

Noticed few issues in existing implementation of the Bloomfilter predicate 
specifically the java client implementation.
 # BloomFilter.java doesn't actually use the user provided HashFunction and 
defaults to using Murmur2 hash which is okay since the enum only lists Murmur2 
hash. See 
[1|https://github.com/apache/kudu/blob/master/java/kudu-client/src/main/java/org/apache/kudu/util/BloomFilter.java#L275],
 
[2|https://github.com/apache/kudu/blob/master/java/kudu-client/src/main/java/org/apache/kudu/util/BloomFilter.java#L263],
 
[3|https://github.com/apache/kudu/blob/master/java/kudu-client/src/main/java/org/apache/kudu/util/BloomFilter.java#L260]
 # Reading the BloomFilter java client suggests Murmur2 should be the default 
hash function, however the [wire 
protocol|https://github.com/apache/kudu/blob/master/src/kudu/common/common.proto#L373]
 specifies CityHash as the default hashing function!
 # The java client doesn't seem complete as it doesn't provide helper methods 
to build BloomFilter predicate, nor are there any tests/examples that 
illustrate how to use the BloomFilter in a predicate from the client.
 
[https://github.com/apache/kudu/blob/master/java/kudu-client/src/main/java/org/apache/kudu/client/KuduPredicate.java#L73]

----
I couldn't find a readily available implementation of CityHash in java which I 
suspect might be the reason for defaulting to Murmur2 hash in java client.
 [https://github.com/google/cityhash]

To begin with I'm inclined to fix issue 1 and 3 noted above and not change the 
default hashing function in the wire protocol or the java client.

> Scan tablets with bloom filter
> ------------------------------
>
>                 Key: KUDU-2483
>                 URL: https://issues.apache.org/jira/browse/KUDU-2483
>             Project: Kudu
>          Issue Type: New Feature
>          Components: client
>            Reporter: Jin Xing
>            Assignee: Xu Yao
>            Priority: Major
>              Labels: roadmap-candidate
>         Attachments: BloomFilter+Design+Doc.pdf, KUDU-2483, 
> image-2018-07-01-23-29-05-517.png
>
>
> Join is really common/popular in Spark SQL, in this JIRA I take broadcast 
> join as an example and describe how Kudu's bloom filter can help accelerate 
> distributed computing.
> Spark runs broadcast join with below steps:
>  1. When do broadcast join, we have a small table and a big table; Spark will 
> read all data from small table to one worker and build a hash table;
>  2. The generated hash table from step 1 is broadcasted to all the workers, 
> which will read the splits from big table;
>  3. Workers start fetching and iterating all the splits of big table and see 
> if the joining keys exists in the hash table; Only matched joining keys is 
> retained.
> From above, step 3 is the heaviest, especially when the worker and split 
> storage is not on the same host and bandwith is limited. Actually the cost 
> brought by step 3 is not always necessary. Think about below scenario:
> {code:none}
> Small table A
> id      name
> 1      Jin
> 6      Xing
> Big table B
> id     age
> 1      10
> 2      21
> 3      33
> 4      65
> 5      32
> 6      23
> 7      18
> 8      20
> 9      22
> {code}
> Run query with SQL: *select * from A inner join B on A.id=B.id*
> It's pretty straight that we don't need to fetch all the data from Table B, 
> because the number of matched keys is really small;
> I propose to use small table to build a bloom filter(BF) and use the 
> generated BF as a predicate/filter to fetch data from big table, thus:
>  1. Much traffic/bandwith is saved.
>  2. Less data to processe by worker
> Broadcast join is just an example, other types of join will also benefit if 
> we scan with a BF
> In a nutshell, I think Kudu can provide an iterface, by which user can scan 
> data with bloom filters
>  
> Here I want add some statistics for Spark-Kudu integration with/without 
> BloomBloomFilter.
> In our product environment the bandwidth of each executor is 50M bps.
> We do inner join with two tables – – one is large and another one is 
> comparatively small.
> In Spark, inner join can be implemented as SortMergeJoin or 
> BroadcastHashJoin, we implemented the corresponding operators with 
> BloomFilter as SortMergeBloomFilterJoin and BroadcastBloomFilterJoin.
> The hash table of BloomFilter is configured as 32M. 
> I show statistics as below:
> ||Records of Table A||Records of Table B||Join Operator||Executor Time||
> |400 thousand|14 billion|SortMergeJoin|760 seconds|
> |400 thousand|14 billion|BroadcastHashJoin|376s|
> |400 thousand|14 billion|BroadcastBloomFilterJoin|21s|
> |2 million|14 billion|SortMergeJoin|707s|
> |2 million|14 billion|BroadcastHashJoin|329s|
> |2 million|14 billion|SortMergeBloomFilterJoin|75s|
> |2 million|14 billion|BroadcastBloomFilterJoin|35s|
> As we can see, it benefit a lot from BloomFilter-PushDown. 
> I want to take this jira  as a umbrella and my workmates will submit 
> following sub-task/pr.
> It will be great if some can take more look at this and share some comments. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to