[ 
https://issues.apache.org/jira/browse/DRILL-6385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16534163#comment-16534163
 ] 

Aman Sinha commented on DRILL-6385:
-----------------------------------

Couple of design items to discuss, hence I am adding them here instead of in 
the PR: 
 * For broadcast hash join, the bloom filter will not save on network I/O.  It 
will impact number of rows in the pipeline below the hash join probe but since 
we are not applying bloom filter if there is a blocking operator on probe side, 
there is not much scope of memory savings.   There is potential benefit if 
there is some CPU intensive operation on the probe side, which is somewhat 
unlikely since typically such operations would be done after the join.  That 
leaves only 1 other possibility - if the bloom filter allows partition/rowgroup 
pruning on the scan itself but the current implementation does not support 
that.  So, my conclusion is that for broadcast hash join the bloom filter will 
add an overhead (creation of BF plus communication with the foreman plus 
foreman's communication with drill bits) without much benefit.   Do you have 
thoughts about this ?  At the very least, I suggest adding a session option 
such as 'enable_runtime_filter_broadcast_hashjoin' whose default value is FALSE 
 and look at  performance evaluation for a large data set.   
 * For 3 or more table joins,  consider a left-deep join plan    

{noformat}

                HJ2 
               /  \
             HJ1   T3
            /  \
           T1   T2{noformat}
             Here, I was envisaging that there will be a boom filter associated 
with the join condition of HJ1  and another bloom filter for HJ2.   However, 
since your implementation checks for blocking operators on the probe side, it 
will prevent HJ2 from creating a BF..is that correct ?

This is quite a common use case, so I was hoping that we would support it.  
From a design perspective, In this case, first we should make sure that the 
bloom filter for HJ2 is only created if the join is between T1 and T3, not 
between T2 and T3 since T2 is on the build side.  Second,  in terms of 
aggregating the BFs, it could behave the same way as a  2 column join between 
T1-T2.  In other words, conceptually the following 2 join scenarios are similar 
from a bloom filter aggregation perspective: 
{noformat}
 // 2 column join between T1-T2
 T1.a1 = T2.a2 AND T1.b1 = T2.b2

 // 1 column join between T1-T2 and 1 column join between T1-T3
 T1.a1 = T2.a2 AND T1.b1 = T3.b3{noformat}

> Support JPPD (Join Predicate Push Down)
> ---------------------------------------
>
>                 Key: DRILL-6385
>                 URL: https://issues.apache.org/jira/browse/DRILL-6385
>             Project: Apache Drill
>          Issue Type: New Feature
>          Components:  Server, Execution - Flow
>    Affects Versions: 1.14.0
>            Reporter: weijie.tong
>            Assignee: weijie.tong
>            Priority: Major
>
> This feature is to support the JPPD (Join Predicate Push Down). It will 
> benefit the HashJoin ,Broadcast HashJoin performance by reducing the number 
> of rows to send across the network ,the memory consumed. This feature is 
> already supported by Impala which calls it RuntimeFilter 
> ([https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]).
>  The first PR will try to push down a bloom filter of HashJoin node to 
> Parquet’s scan node.   The propose basic procedure is described as follow:
>  # The HashJoin build side accumulate the equal join condition rows to 
> construct a bloom filter. Then it sends out the bloom filter to the foreman 
> node.
>  # The foreman node accept the bloom filters passively from all the fragments 
> that has the HashJoin operator. It then aggregates the bloom filters to form 
> a global bloom filter.
>  # The foreman node broadcasts the global bloom filter to all the probe side 
> scan nodes which maybe already have send out partial data to the hash join 
> nodes(currently the hash join node will prefetch one batch from both sides ).
>       4.  The scan node accepts a global bloom filter from the foreman node. 
> It will filter the rest rows satisfying the bloom filter.
>  
> To implement above execution flow, some main new notion described as below:
>       1. RuntimeFilter
> It’s a filter container which may contain BloomFilter or MinMaxFilter.
>       2. RuntimeFilterReporter
> It wraps the logic to send hash join’s bloom filter to the foreman.The 
> serialized bloom filter will be sent out through the data tunnel.This object 
> will be instanced by the FragmentExecutor and passed to the 
> FragmentContext.So the HashJoin operator can obtain it through the 
> FragmentContext.
>      3. RuntimeFilterRequestHandler
> It is responsible to accept a SendRuntimeFilterRequest RPC to strip the 
> actual BloomFilter from the network. It then translates this filter to the 
> WorkerBee’s new interface registerRuntimeFilter.
> Another RPC type is BroadcastRuntimeFilterRequest. It will register the 
> accepted global bloom filter to the WorkerBee by the registerRuntimeFilter 
> method and then propagate to the FragmentContext through which the probe side 
> scan node can fetch the aggregated bloom filter.
>       4.RuntimeFilterManager
> The foreman will instance a RuntimeFilterManager .It will indirectly get 
> every RuntimeFilter by the WorkerBee. Once all the BloomFilters have been 
> accepted and aggregated . It will broadcast the aggregated bloom filter to 
> all the probe side scan nodes through the data tunnel by a 
> BroadcastRuntimeFilterRequest RPC.
>      5. RuntimeFilterEnableOption 
>  A global option will be added to decide whether to enable this new feature.
>  
> Welcome suggestion and advice from you.The related PR will be presented as 
> soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to