[ 
https://issues.apache.org/jira/browse/DRILL-6498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pritesh Maker updated DRILL-6498:
---------------------------------
    Labels: ready-to-commit  (was: )

> Support for EMIT outcome in ExternalSortBatch
> ---------------------------------------------
>
>                 Key: DRILL-6498
>                 URL: https://issues.apache.org/jira/browse/DRILL-6498
>             Project: Apache Drill
>          Issue Type: Task
>          Components: Execution - Relational Operators
>            Reporter: Sorabh Hamirwasia
>            Assignee: Sorabh Hamirwasia
>            Priority: Major
>              Labels: ready-to-commit
>             Fix For: 1.14.0
>
>
> With Lateral and Unnest if Sort is present in the sub-query, then it needs to 
> handle the EMIT outcome correctly. This means when a EMIT is received then 
> perform the Sort operation on the records buffered so far and produce output 
> with it. After EMIT Sort should refresh it's state and again work on next 
> batches of incoming record unless an EMIT is seen again.
> For first cut Sort will not support spilling in the subquery between Lateral 
> and Unnest since spilling is very unlikely. The worst case that can happen is 
> that Lateral will get a batch with only 1 row of data because of repeated 
> type column data size being too big. In that case Unnest will produce 1 
> output batch only and Sort or other blocking operators anyways needs enough 
> memory to at least hold 1 incoming batch. So in ideal cases spilling should 
> not happen. But if there is a operator between Sort and Unnest which 
> increases the data size then Sort might be in a situation to spill but thats 
> not a common case for now.
>  
> *Description of Changes:*
>  Currently the sort operator is implemented in below way. This is to provide 
> general high level working of Sort and how EMIT support was implemented.
> 1) In buildSchema phase SORT creates an empty container with SV NONE and 
> sends that downstream.
> 2) Post buildSchema phase it goes into a LOAD and keeps calling next() on 
> upstream until it sees NONE or there is a failure.
> 3) Each batch which it receives it applies SV2 on them if it already doesn't 
> have it and sort them and then buffers the batch after converting it into 
> something called BatchGroup.InputBatch.
> 4) During buffering it looks for memory pressure and spill as needed.
> 5) Once all the batches are received and it gets None from upstream, it 
> starts a merge phase.
> 6) In Merge phase it check if the merge can happen in memory or spilling is 
> needed and perform the merge accordingly. 
> 7) Sort has a concept of SortResults which represents different kinds of 
> output container that sort can generate based on input batches and memory 
> conditions. For example if it's an in-memory merge then output container of 
> sort is SV4 container with SortResults of type MergeSortWrapper. If its spill 
> and merge then container is of SV_NONE type with SortResults as BatchMerger. 
> There are SortResults type for empty and single batches (not used anywhere).
> 8) SortResults basically provides an abstraction such that it provides output 
> result with each next call to it backed by output container of the 
> ExternalSortRecordBatch along with correct recordCount and Sv2/SV4 as needed. 
> So for example: in case of MergeSortWrapper all the inputs are in memory and 
> hence all output is also in memory backed by SV4. For each next call 
> basically SV4 is updated with the start index and length which informs called 
> about record boundary that it should consume. For BatchMerger based on memory 
> pressure and number of record Sort can output with each output container, it 
> fills the output container with that many records and sends downstream.
> 9) Also the abstraction of SortResults is such that at beginning of 
> MergePhase output container which is held by SortResults is cleared off and 
> later re-initialized after merge is completed.
> Now in current condition since SORT is a blocking operator it was clearing 
> the output container ValueVectors post buildSchema phase and in load phase. 
> And later it create the final output container (with ValueVector objects 
> )after it has seen all the incoming data. The very first output batch is 
> always returned with OK_NEW_SCHEMA such that downstream operator can setup 
> the correct SV mode and schema with the first output batch, since schema 
> returned in buildSchema phase was a dummy one. So the vector references 
> maintained by downstream operator in buildSchema phase is updated with vector 
> references in the first output batch.
> With EMIT however SORT will go into load phase multiple times and hence we 
> cannot clear off the output container of Sort after each EMIT boundary. If we 
> do that then downstream operator which is holding references to ExternalSort 
> output container ValueVector will become invalid and Sort also has to send 
> OK_NEW_SCHEMA with first output batch of each EMIT boundary. So to avoid that 
> we need to make sure that with each EMIT boundary, sort will go from load to 
> merge phase to produce output. And for each of these phase there is no 
> clearing of output container is happening.
> *To achieve above and keep the code changes to minimum following method is 
> followed:*
> 1)  A wrapper output container (_outputWrapperContainer_) and wrapper SV4 
> (_outputSV4_) is created.
> 2) This outputWrapperContainer is provided to the SortResults instead of the 
> actual container which carries the output of ExternalSortBatch.
> 3) For each EMIT boundary the Load and Merge phase will happen and mostly the 
> data for EMIT subquery is expected to be small so mostly sort will happen in 
> memory.
> 4) During each merge phase across EMIT boundary the wrapper container will be 
> created with fresh set of vectors as it does earlier. Basically clearing off 
> the container during start of merge phase and re-creating the container with 
> fresh sets of vector.
> 5) Once the Merge phase is done then _prepareOutputContainer_ is called which 
> basically calls the new method _updateOutputContainer_ of _SortResult_.
> 6) Since we are not supporting spilling with EMIT hence in 
> updateOutputContainer of BatchMerger, outcome is checked and an exception is 
> thrown as UnsupportedOperation. Whereas for 
> _MergeSortWrapper::updateOutputContainer_ it creates the vector in sort 
> actual output container for the very first output with vectors from 
> SortResults outputWrapperContainer. For other output batches which are 
> returned across EMIT batches the same output container of Sort is used with 
> same ValueVectors only the data is transferred in from outputWrapperContainer 
> of SortResults.
> 7) There is a new flag called _firstBatchOfSchema_ which basically if set to 
> true indicates that this is the first output batch of this schema. So in 
> scenarios like schema change this flag will be set to true and first output 
> container will be populated with fresh set of ValueVectors and returned 
> downstream with OK_New_Schema flag.
> 8) There are challenges like what IterOutcome to return with each output 
> batch. For example: with first very EMIT boundary Sort has to return 
> OK_NEW_SCHEMA along with EMIT outcome. And for subsequent EMIT boundary if 
> output spans across multiple batches then all the output batches should be 
> returned with OK except the last output batch for this EMIT boundary which 
> will be returned with EMIT. All these cases are handled in a single method 
> _getFinalOutcome_ which basically returns correct IterOutcome to return with 
> each output batch.
> 9) Once EMIT is returned for current input EMIT boundary then Sort state is 
> reset which involves clearing off the data structures and setting Sort state 
> correctly such that subsequent next() call is handled properly. Inside this 
> _resetSortState_ now lies the logic to not clear off the memory of very last 
> output batch with is sent with NONE outcome because of dependency of 
> StreamAgg.
> *Future Tasks:*
> If we want to support handling spilling scenarios with EMIT outcome then that 
> can also be achieved with current design. Basically 
> _BatchMerger::updateOutputContainer_ will have a logic to populate the actual 
> output container of Sort with SV4 type container instead of SV None container 
> which BatchMerger currently returns. With EMIT boundary and spilling 
> happening for some boundary and in-memory for others we cannot send 2 
> different types of container from Sort which is SV_NONE for spilling and SV4 
> for in-memory merge. If we do that then OK_NEW_SCHEMA needs to be send with 
> each output batch since the vector references has to be changed for 
> downstream. Instead we can always produce an output container of SV4 type, 
> such that in case of spilling across an EMIT boundary same output container 
> with same ValueVectors references will be populated with new data. Hence no 
> OK_NEW_SCHEMA needs to be sent downstream. There is one disadvantage of this 
> approach which is now if Sort is send SV4 container instead of SV None in 
> case of spilling as well then somewhere in downstream RemovingRecordBatch 
> will have to again copy the data from SV4 to create a regular SV None 
> container.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to