[
https://issues.apache.org/jira/browse/DRILL-6385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16534163#comment-16534163
]
Aman Sinha commented on DRILL-6385:
-----------------------------------
Couple of design items to discuss, hence I am adding them here instead of in
the PR:
* For broadcast hash join, the bloom filter will not save on network I/O. It
will impact number of rows in the pipeline below the hash join probe but since
we are not applying bloom filter if there is a blocking operator on probe side,
there is not much scope of memory savings. There is potential benefit if
there is some CPU intensive operation on the probe side, which is somewhat
unlikely since typically such operations would be done after the join. That
leaves only 1 other possibility - if the bloom filter allows partition/rowgroup
pruning on the scan itself but the current implementation does not support
that. So, my conclusion is that for broadcast hash join the bloom filter will
add an overhead (creation of BF plus communication with the foreman plus
foreman's communication with drill bits) without much benefit. Do you have
thoughts about this ? At the very least, I suggest adding a session option
such as 'enable_runtime_filter_broadcast_hashjoin' whose default value is FALSE
and look at performance evaluation for a large data set.
* For 3 or more table joins, consider a left-deep join plan
{noformat}
HJ2
/ \
HJ1 T3
/ \
T1 T2{noformat}
Here, I was envisaging that there will be a boom filter associated
with the join condition of HJ1 and another bloom filter for HJ2. However,
since your implementation checks for blocking operators on the probe side, it
will prevent HJ2 from creating a BF..is that correct ?
This is quite a common use case, so I was hoping that we would support it.
From a design perspective, In this case, first we should make sure that the
bloom filter for HJ2 is only created if the join is between T1 and T3, not
between T2 and T3 since T2 is on the build side. Second, in terms of
aggregating the BFs, it could behave the same way as a 2 column join between
T1-T2. In other words, conceptually the following 2 join scenarios are similar
from a bloom filter aggregation perspective:
{noformat}
// 2 column join between T1-T2
T1.a1 = T2.a2 AND T1.b1 = T2.b2
// 1 column join between T1-T2 and 1 column join between T1-T3
T1.a1 = T2.a2 AND T1.b1 = T3.b3{noformat}
> Support JPPD (Join Predicate Push Down)
> ---------------------------------------
>
> Key: DRILL-6385
> URL: https://issues.apache.org/jira/browse/DRILL-6385
> Project: Apache Drill
> Issue Type: New Feature
> Components: Server, Execution - Flow
> Affects Versions: 1.14.0
> Reporter: weijie.tong
> Assignee: weijie.tong
> Priority: Major
>
> This feature is to support the JPPD (Join Predicate Push Down). It will
> benefit the HashJoin ,Broadcast HashJoin performance by reducing the number
> of rows to send across the network ,the memory consumed. This feature is
> already supported by Impala which calls it RuntimeFilter
> ([https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]).
> The first PR will try to push down a bloom filter of HashJoin node to
> Parquet’s scan node. The propose basic procedure is described as follow:
> # The HashJoin build side accumulate the equal join condition rows to
> construct a bloom filter. Then it sends out the bloom filter to the foreman
> node.
> # The foreman node accept the bloom filters passively from all the fragments
> that has the HashJoin operator. It then aggregates the bloom filters to form
> a global bloom filter.
> # The foreman node broadcasts the global bloom filter to all the probe side
> scan nodes which maybe already have send out partial data to the hash join
> nodes(currently the hash join node will prefetch one batch from both sides ).
> 4. The scan node accepts a global bloom filter from the foreman node.
> It will filter the rest rows satisfying the bloom filter.
>
> To implement above execution flow, some main new notion described as below:
> 1. RuntimeFilter
> It’s a filter container which may contain BloomFilter or MinMaxFilter.
> 2. RuntimeFilterReporter
> It wraps the logic to send hash join’s bloom filter to the foreman.The
> serialized bloom filter will be sent out through the data tunnel.This object
> will be instanced by the FragmentExecutor and passed to the
> FragmentContext.So the HashJoin operator can obtain it through the
> FragmentContext.
> 3. RuntimeFilterRequestHandler
> It is responsible to accept a SendRuntimeFilterRequest RPC to strip the
> actual BloomFilter from the network. It then translates this filter to the
> WorkerBee’s new interface registerRuntimeFilter.
> Another RPC type is BroadcastRuntimeFilterRequest. It will register the
> accepted global bloom filter to the WorkerBee by the registerRuntimeFilter
> method and then propagate to the FragmentContext through which the probe side
> scan node can fetch the aggregated bloom filter.
> 4.RuntimeFilterManager
> The foreman will instance a RuntimeFilterManager .It will indirectly get
> every RuntimeFilter by the WorkerBee. Once all the BloomFilters have been
> accepted and aggregated . It will broadcast the aggregated bloom filter to
> all the probe side scan nodes through the data tunnel by a
> BroadcastRuntimeFilterRequest RPC.
> 5. RuntimeFilterEnableOption
> A global option will be added to decide whether to enable this new feature.
>
> Welcome suggestion and advice from you.The related PR will be presented as
> soon as possible.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)