[ 
https://issues.apache.org/jira/browse/DRILL-6385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16466675#comment-16466675
 ] 

weijie.tong commented on DRILL-6385:
------------------------------------

[~amansinha100] thanks for your advice. It's here just to inform the devs about 
 the implementation. I have been working on our own storage layer for a long 
time. It's a delay to implement this feature ,as I ever noticed this proposal 
at the dev list. So now discussion is also encouraged and welcome.

 

To your question , I think I have not described well in the above message. 
Since partitioned hash join is the actual main operator, my first PR is to 
support the partitioned hash join and the above description is about a 
partitioned hash join .

To the broadcast hash join case, my plan is that the build side still needs to 
send its bloom filter to the foreman. The difference is that  the foreman 
broadcasts the bloom filter as soon as it accepted a first arrived one ,no need 
to wait for all the bloom filter from all other nodes( since the distributed 
table acts as the build side table). Through this way ,we follow the same work 
flow rule though no contact to foreman has better performance.

" is a global bloom filter always needed or a local bloom filter will suffice 
in certain cases"  there's no evidence to definitely choose one strategy. To 
partitioned hash join a aggregated global bloom filter will filter more rows 
from the probe side scan .  This is also what the impala does.  Still  needs 
some heuristic statistics plan to choose whether we still need the runtime 
filter at the runtime, since the better filter scenario is the build side has 
low percentage joined rows according to its total table row numbers.

 

" does it mean that a 'global bloom filter' is a synchronization point in your 
proposal " there's no synchronization at the hash join node. To partitioned 
hash join , only the foreman needs to wait for all the bloom filter from all 
the partitioned nodes to aggregate to a global one. The hash join nodes has no 
relationship to each other ,they continue to work parallel.

 

> Support JPPD (Join Predicate Push Down)
> ---------------------------------------
>
>                 Key: DRILL-6385
>                 URL: https://issues.apache.org/jira/browse/DRILL-6385
>             Project: Apache Drill
>          Issue Type: New Feature
>          Components:  Server, Execution - Flow
>            Reporter: weijie.tong
>            Assignee: weijie.tong
>            Priority: Major
>
> This feature is to support the JPPD (Join Predicate Push Down). It will 
> benefit the HashJoin ,Broadcast HashJoin performance by reducing the number 
> of rows to send across the network ,the memory consumed. This feature is 
> already supported by Impala which calls it RuntimeFilter 
> ([https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]).
>  The first PR will try to push down a bloom filter of HashJoin node to 
> Parquet’s scan node.   The propose basic procedure is described as follow:
>  # The HashJoin build side accumulate the equal join condition rows to 
> construct a bloom filter. Then it sends out the bloom filter to the foreman 
> node.
>  # The foreman node accept the bloom filters passively from all the fragments 
> that has the HashJoin operator. It then aggregates the bloom filters to form 
> a global bloom filter.
>  # The foreman node broadcasts the global bloom filter to all the probe side 
> scan nodes which maybe already have send out partial data to the hash join 
> nodes(currently the hash join node will prefetch one batch from both sides ).
>       4.  The scan node accepts a global bloom filter from the foreman node. 
> It will filter the rest rows satisfying the bloom filter.
>  
> To implement above execution flow, some main new notion described as below:
>       1. RuntimeFilter
> It’s a filter container which may contain BloomFilter or MinMaxFilter.
>       2. RuntimeFilterReporter
> It wraps the logic to send hash join’s bloom filter to the foreman.The 
> serialized bloom filter will be sent out through the data tunnel.This object 
> will be instanced by the FragmentExecutor and passed to the 
> FragmentContext.So the HashJoin operator can obtain it through the 
> FragmentContext.
>      3. RuntimeFilterRequestHandler
> It is responsible to accept a SendRuntimeFilterRequest RPC to strip the 
> actual BloomFilter from the network. It then translates this filter to the 
> WorkerBee’s new interface registerRuntimeFilter.
> Another RPC type is BroadcastRuntimeFilterRequest. It will register the 
> accepted global bloom filter to the WorkerBee by the registerRuntimeFilter 
> method and then propagate to the FragmentContext through which the probe side 
> scan node can fetch the aggregated bloom filter.
>       4.RuntimeFilterManager
> The foreman will instance a RuntimeFilterManager .It will indirectly get 
> every RuntimeFilter by the WorkerBee. Once all the BloomFilters have been 
> accepted and aggregated . It will broadcast the aggregated bloom filter to 
> all the probe side scan nodes through the data tunnel by a 
> BroadcastRuntimeFilterRequest RPC.
>      5. RuntimeFilterEnableOption 
>  A global option will be added to decide whether to enable this new feature.
>  
> Welcome suggestion and advice from you.The related PR will be presented as 
> soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to