[ 
https://issues.apache.org/jira/browse/DRILL-6385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16573938#comment-16573938
 ] 

Sorabh Hamirwasia commented on DRILL-6385:
------------------------------------------

For ScanBatch issue:

> Initially, I worried about the performance of the RemovingRecordBatch's 
> setupNewSchema() while generating different output schema of ScanBatch by 
> applying RuntimeFilter with an assumption implementation of the 
> RemovingRecodBatch's setupNewSchema() to generate dynamic codes.But actually 
> RemovingRecodBatch's setupNewSchema() does not generate dynamic codes. So the 
> changed implementation has reduced some complexity to judge whether to output 
> a schema with SV2 or NONE.

Since this PR has changed the encoding of _ScanBatch_ to SV2, the downstream 
operator to _ScanBatch_ can be any other operator which can accept an incoming 
with SV2 container like _Filter, RemovingRecordBatch, Limit, etc._ So the cost 
of generated code will be there based on operator downstream to ScanBatch. 
There are 2 things:
 - If output container type of _ScanBatch_ is modified at run time then there 
will be cost paid by downstream operator like _Filter_ to generate code in 
_setupNewSchema_. Also ScanBatch will generate more _OK_NEW_SCHEMA_ with this 
change which can result in frequent query failure depending upon operators 
downstream which doesn't support schema change post _BuildSchema_ phase like 
_Sort, HashAgg, etc_

 - If output container type of _ScanBatch_ is changed to _SV2_ for all cases 
then there will regression for query where _RuntimeFilter_ is not used. The 
reason being there will be an extra copy involved downstream of _ScanBatch_ to 
move from SV2 to SVNone

*To resolve above issues there are below options:*
 * At planning time decide if _RuntimeFilter_ can be applied to this query or 
not. If it can be applied then insert a _Filter_ Operator downstream of Scan 
operator. This Filter operator needs to be a different operator as it will get 
the filter condition at runtime and has to generate code then to evaluate all 
the incoming records . This filter will have all the logic for 
_applyRuntimeFilter_. This might be complex to implement and require a new 
Filter operator as well. Not sure if existing Filter operator can be used for 
this purpose or not. But this will make sure Filtering is done only for 
selective query and other queries will be unaffected. The issues mentioned in 
points above will be taken care of.
 * Generate SV2 container all the time from ScanBatch for all case and provide 
a new mechanism for SV2 which knows if any records is filtered out from actual 
input container or not. For cases when none of the record is filter it will do 
direct transfer of buffers rather than row by row copy. To achieve this may be 
we can do below:
 introduce a parameter in SelectionVector2 to have _originalRecordCount_ which 
will be set to incoming Record count at object creation time. 
 1) Introduce a new parameter in SelectionVector2 named _originalRecordCount_ 
which will be initialized to incoming or originalBatch recordCount during 
creation time. For existing constructor of _SelectionVector2_ pass this value 
as MAX_INT.
 2) _SelectionVector2_ population will happen in the existing way.
 3) Provide a new method like _public boolean doFullTransfer()_ which will do 
below:

{code:java}
// Some comments here
public boolean doFullTransfer()
{
    return (recordCount == originalRecordCount);
}
{code}
4) In _RemovingRecordBatch_ when _copyRecords_ is called in _doWork()_. It will 
call an override implementation of _copyRecords_ in _AbstractSV2Copier_ which 
will do below:
{code:java}
@Override
public int copyRecords(int index, int recordCount) throws SchemaChangeException 
{ 
    if (sv2.doFullTransfer()) {
       for(TransferPair tp : pairs){
        tp.transfer();
      }
      container.setRecordCount(incoming.getRecordCount());
      return recordCount;
    } else {
       super.copyRecords(index, recordCount);
    }
}
{code}

Above approach will also help with optimization of existing FilterRecordBatch 
which will not have any records filtered out. But we need to make sure that for 
cases when multiple operator are present which accepts SV2 container the actual 
_originalRecordCount_ is transferred all the way in operator tree upto 
RemovingRecordBatch otherwise there will be issues.

* Go with something similar to current approach i.e. Modify ScanBatch such that 
it always produce SV2 once _applyRuntimeFilter_ returns true the very first 
time. Post that it will always produce SV2 and prior to that it will always 
produce SV None. This way there won't be too much change in container type of 
ScanBatch and all existing cases where RuntimeFilter is not present will not 
regress as well. Once the optimization defined in step 2 is made then we can 
accommodate that too to improve it further. I can help with implementing the 
optimization for SV2.

> Support JPPD (Join Predicate Push Down)
> ---------------------------------------
>
>                 Key: DRILL-6385
>                 URL: https://issues.apache.org/jira/browse/DRILL-6385
>             Project: Apache Drill
>          Issue Type: New Feature
>          Components:  Server, Execution - Flow
>    Affects Versions: 1.14.0
>            Reporter: weijie.tong
>            Assignee: weijie.tong
>            Priority: Major
>
> This feature is to support the JPPD (Join Predicate Push Down). It will 
> benefit the HashJoin ,Broadcast HashJoin performance by reducing the number 
> of rows to send across the network ,the memory consumed. This feature is 
> already supported by Impala which calls it RuntimeFilter 
> ([https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]).
>  The first PR will try to push down a bloom filter of HashJoin node to 
> Parquet’s scan node.   The propose basic procedure is described as follow:
>  # The HashJoin build side accumulate the equal join condition rows to 
> construct a bloom filter. Then it sends out the bloom filter to the foreman 
> node.
>  # The foreman node accept the bloom filters passively from all the fragments 
> that has the HashJoin operator. It then aggregates the bloom filters to form 
> a global bloom filter.
>  # The foreman node broadcasts the global bloom filter to all the probe side 
> scan nodes which maybe already have send out partial data to the hash join 
> nodes(currently the hash join node will prefetch one batch from both sides ).
>       4.  The scan node accepts a global bloom filter from the foreman node. 
> It will filter the rest rows satisfying the bloom filter.
>  
> To implement above execution flow, some main new notion described as below:
>       1. RuntimeFilter
> It’s a filter container which may contain BloomFilter or MinMaxFilter.
>       2. RuntimeFilterReporter
> It wraps the logic to send hash join’s bloom filter to the foreman.The 
> serialized bloom filter will be sent out through the data tunnel.This object 
> will be instanced by the FragmentExecutor and passed to the 
> FragmentContext.So the HashJoin operator can obtain it through the 
> FragmentContext.
>      3. RuntimeFilterRequestHandler
> It is responsible to accept a SendRuntimeFilterRequest RPC to strip the 
> actual BloomFilter from the network. It then translates this filter to the 
> WorkerBee’s new interface registerRuntimeFilter.
> Another RPC type is BroadcastRuntimeFilterRequest. It will register the 
> accepted global bloom filter to the WorkerBee by the registerRuntimeFilter 
> method and then propagate to the FragmentContext through which the probe side 
> scan node can fetch the aggregated bloom filter.
>       4.RuntimeFilterManager
> The foreman will instance a RuntimeFilterManager .It will indirectly get 
> every RuntimeFilter by the WorkerBee. Once all the BloomFilters have been 
> accepted and aggregated . It will broadcast the aggregated bloom filter to 
> all the probe side scan nodes through the data tunnel by a 
> BroadcastRuntimeFilterRequest RPC.
>      5. RuntimeFilterEnableOption 
>  A global option will be added to decide whether to enable this new feature.
>  
> Welcome suggestion and advice from you.The related PR will be presented as 
> soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to