[
https://issues.apache.org/jira/browse/DRILL-6385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Timothy Farkas updated DRILL-6385:
----------------------------------
Affects Version/s: (was: 1.15.0)
1.14.0
> Support JPPD (Join Predicate Push Down)
> ---------------------------------------
>
> Key: DRILL-6385
> URL: https://issues.apache.org/jira/browse/DRILL-6385
> Project: Apache Drill
> Issue Type: New Feature
> Components: Server, Execution - Flow
> Affects Versions: 1.14.0
> Reporter: weijie.tong
> Assignee: weijie.tong
> Priority: Major
>
> This feature is to support the JPPD (Join Predicate Push Down). It will
> benefit the HashJoin ,Broadcast HashJoin performance by reducing the number
> of rows to send across the network ,the memory consumed. This feature is
> already supported by Impala which calls it RuntimeFilter
> ([https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]).
> The first PR will try to push down a bloom filter of HashJoin node to
> Parquet’s scan node. The propose basic procedure is described as follow:
> # The HashJoin build side accumulate the equal join condition rows to
> construct a bloom filter. Then it sends out the bloom filter to the foreman
> node.
> # The foreman node accept the bloom filters passively from all the fragments
> that has the HashJoin operator. It then aggregates the bloom filters to form
> a global bloom filter.
> # The foreman node broadcasts the global bloom filter to all the probe side
> scan nodes which maybe already have send out partial data to the hash join
> nodes(currently the hash join node will prefetch one batch from both sides ).
> 4. The scan node accepts a global bloom filter from the foreman node.
> It will filter the rest rows satisfying the bloom filter.
>
> To implement above execution flow, some main new notion described as below:
> 1. RuntimeFilter
> It’s a filter container which may contain BloomFilter or MinMaxFilter.
> 2. RuntimeFilterReporter
> It wraps the logic to send hash join’s bloom filter to the foreman.The
> serialized bloom filter will be sent out through the data tunnel.This object
> will be instanced by the FragmentExecutor and passed to the
> FragmentContext.So the HashJoin operator can obtain it through the
> FragmentContext.
> 3. RuntimeFilterRequestHandler
> It is responsible to accept a SendRuntimeFilterRequest RPC to strip the
> actual BloomFilter from the network. It then translates this filter to the
> WorkerBee’s new interface registerRuntimeFilter.
> Another RPC type is BroadcastRuntimeFilterRequest. It will register the
> accepted global bloom filter to the WorkerBee by the registerRuntimeFilter
> method and then propagate to the FragmentContext through which the probe side
> scan node can fetch the aggregated bloom filter.
> 4.RuntimeFilterManager
> The foreman will instance a RuntimeFilterManager .It will indirectly get
> every RuntimeFilter by the WorkerBee. Once all the BloomFilters have been
> accepted and aggregated . It will broadcast the aggregated bloom filter to
> all the probe side scan nodes through the data tunnel by a
> BroadcastRuntimeFilterRequest RPC.
> 5. RuntimeFilterEnableOption
> A global option will be added to decide whether to enable this new feature.
>
> Welcome suggestion and advice from you.The related PR will be presented as
> soon as possible.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)