[
https://issues.apache.org/jira/browse/HIVE-23880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17161948#comment-17161948
]
László Bodor commented on HIVE-23880:
-------------------------------------
[~zabetak]: thanks for your input!
some of your questions are covered in an internal cloudera ticket (DWX-4323)
1. 2-3 sec: that was the same I found first...in order to get 1-2 minutes,
you'll need to have >1000 upstream mapper tasks and 50M-70M entries in bloom
filter (that's the case for min 30TB scale TPCDS, or you can fake big bloom
filters with semijoin hint), otherwise you'll get the result in seconds
2. a flamegraph (attached to internal ticket) shows that ~50% is spent in
BloomKFilter. mergeBloomFilterBytes, which is basically the bitwise OR
to be honest, I haven't checked my solution on CPU level, but my jmh
microbenchmarks showed some improvement, and then I've just tested my parallel
approach on cluster, and task runtime improved as below:
{code}
THREADS - AVG runtime of merger task
2 - 188s (no improvement)
4 - 165s
8 - 149s
16 - 149s
32 - 144s
64 - 140s
128 - 137s
{code}
> Bloom filters can be merged in a parallel way in VectorUDAFBloomFilterMerge
> ---------------------------------------------------------------------------
>
> Key: HIVE-23880
> URL: https://issues.apache.org/jira/browse/HIVE-23880
> Project: Hive
> Issue Type: Improvement
> Reporter: László Bodor
> Assignee: László Bodor
> Priority: Major
> Labels: pull-request-available
> Attachments: lipwig-output3605036885489193068.svg
>
> Time Spent: 40m
> Remaining Estimate: 0h
>
> Merging bloom filters in semijoin reduction can become the main bottleneck in
> case of large number of source mapper tasks (~1000, Map 1 in below example)
> and a large amount of expected entries (50M) in bloom filters.
> For example in TPCDS Q93:
> {code}
> select /*+ semi(store_returns, sr_item_sk, store_sales, 70000000)*/
> ss_customer_sk
> ,sum(act_sales) sumsales
> from (select ss_item_sk
> ,ss_ticket_number
> ,ss_customer_sk
> ,case when sr_return_quantity is not null then
> (ss_quantity-sr_return_quantity)*ss_sales_price
> else
> (ss_quantity*ss_sales_price) end act_sales
> from store_sales left outer join store_returns on (sr_item_sk =
> ss_item_sk
> and
> sr_ticket_number = ss_ticket_number)
> ,reason
> where sr_reason_sk = r_reason_sk
> and r_reason_desc = 'reason 66') t
> group by ss_customer_sk
> order by sumsales, ss_customer_sk
> limit 100;
> {code}
> On 10TB-30TB scale there is a chance that from 3-4 mins of query runtime 1-2
> mins are spent with merging bloom filters (Reducer 2), as in:
> [^lipwig-output3605036885489193068.svg]
> {code}
> ----------------------------------------------------------------------------------------------
> VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING
> FAILED KILLED
> ----------------------------------------------------------------------------------------------
> Map 3 .......... llap SUCCEEDED 1 1 0 0
> 0 0
> Map 1 .......... llap SUCCEEDED 1263 1263 0 0
> 0 0
> Reducer 2 llap RUNNING 1 0 1 0
> 0 0
> Map 4 llap RUNNING 6154 0 207 5947
> 0 0
> Reducer 5 llap INITED 43 0 0 43
> 0 0
> Reducer 6 llap INITED 1 0 0 1
> 0 0
> ----------------------------------------------------------------------------------------------
> VERTICES: 02/06 [====>>----------------------] 16% ELAPSED TIME: 149.98 s
> ----------------------------------------------------------------------------------------------
> {code}
> For example, 70M entries in bloom filter leads to a 436 465 696 bits, so
> merging 1263 bloom filters means running ~ 1263 * 436 465 696 bitwise OR
> operation, which is very hot codepath, but can be parallelized.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)