[ 
https://issues.apache.org/jira/browse/DRILL-6498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sorabh Hamirwasia updated DRILL-6498:
-------------------------------------
    Description: 
With Lateral and Unnest if Sort is present in the sub-query, then it needs to 
handle the EMIT outcome correctly. This means when a EMIT is received then 
perform the Sort operation on the records buffered so far and produce output 
with it. After EMIT Sort should refresh it's state and again work on next 
batches of incoming record unless an EMIT is seen again.

For first cut Sort will not support spilling in the subquery between Lateral 
and Unnest since spilling is very unlikely. The worst case that can happen is 
that Lateral will get a batch with only 1 row of data because of repeated type 
column data size being too big. In that case Unnest will produce 1 output batch 
only and Sort or other blocking operators anyways needs enough memory to at 
least hold 1 incoming batch. So in ideal cases spilling should not happen. But 
if there is a operator between Sort and Unnest which increases the data size 
then Sort might be in a situation to spill but thats not a common case for now.

 

*Description of Changes:*
Currently the sort operator is implemented in below way. This is to provide 
general high level working of Sort and how EMIT support was implemented.


1) In buildSchema phase SORT creates an empty container with SV NONE and sends 
that downstream.

2) Post buildSchema phase it goes into a LOAD and keeps calling next() on 
upstream until it sees NONE or there is a failure.

3) Each batch which it receives it applies SV2 on them if it already doesn't 
have it and sort them and then buffers the batch after converting it into 
something called BatchGroup.InputBatch.

4) During buffering it looks for memory pressure and spill as needed.

5) Once all the batches are received and it gets None from upstream, it starts 
a merge phase.

6) In Merge phase it check if the merge can happen in memory or spilling is 
needed and perform the merge accordingly. 

7) Sort has a concept of SortResults which represents different kinds of output 
container that sort can generate based on input batches and memory conditions. 
For example if it's an in-memory merge then output container of sort is SV4 
container with SortResults of type MergeSortWrapper. If its spill and merge 
then container is of SV_NONE type with SortResults as BatchMerger. There are 
SortResults type for empty and single batches (not used anywhere).

8) SortResults basically provides an abstraction such that it provides output 
result with each next call to it backed by output container of the 
ExternalSortRecordBatch along with correct recordCount and Sv2/SV4 as needed. 
So for example: in case of MergeSortWrapper all the inputs are in memory and 
hence all output is also in memory backed by SV4. For each next call basically 
SV4 is updated with the start index and length which informs called about 
record boundary that it should consume. For BatchMerger based on memory 
pressure and number of record Sort can output with each output container, it 
fills the output container with that many records and sends downstream.

9) Also the abstraction of SortResults is such that at beginning of MergePhase 
output container which is held by SortResults is cleared off and later 
re-initialized after merge is completed.

Now in current condition since SORT is a blocking operator it was clearing the 
output container ValueVectors post buildSchema phase and in load phase. And 
later it create the final output container (with ValueVector objects )after it 
has seen all the incoming data. The very first output batch is always returned 
with OK_NEW_SCHEMA such that downstream operator can setup the correct SV mode 
and schema with the first output batch, since schema returned in buildSchema 
phase was a dummy one. So the vector references maintained by downstream 
operator in buildSchema phase is updated with vector references in the first 
output batch.

With EMIT however SORT will go into load phase multiple times and hence we 
cannot clear off the output container of Sort after each EMIT boundary. If we 
do that then downstream operator which is holding references to ExternalSort 
output container ValueVector will become invalid and Sort also has to send 
OK_NEW_SCHEMA with first output batch of each EMIT boundary. So to avoid that 
we need to make sure that with each EMIT boundary, sort will go from load to 
merge phase to produce output. And for each of these phase there is no clearing 
of output container is happening.

To achieve above and keep the code changes to minimum following method is 
followed:

1)  A wrapper output container (_outputWrapperContainer_) and wrapper SV4 
(_outputSV4_) is created.

2) This outputWrapperContainer is provided to the SortResults instead of the 
actual container which carries the output of ExternalSortBatch.

3) For each EMIT boundary the Load and Merge phase will happen and mostly the 
data for EMIT subquery is expected to be small so mostly sort will happen in 
memory.

4) During each merge phase across EMIT boundary the wrapper container will be 
created with fresh set of vectors as it does earlier. Basically clearing off 
the container during start of merge phase and re-creating the container with 
fresh sets of vector.

5) Once the Merge phase is done then _prepareOutputContainer_ is called which 
basically calls the new method _updateOutputContainer_ of _SortResult_.

6) Since we are not supporting spilling with EMIT hence in 
updateOutputContainer of BatchMerger, outcome is checked and an exception is 
thrown as UnsupportedOperation. Whereas for 
_MergeSortWrapper::updateOutputContainer_ it creates the vector in sort actual 
output container for the very first output with vectors from SortResults 
outputWrapperContainer. For other output batches which are returned across EMIT 
batches the same output container of Sort is used with same ValueVectors only 
the data is transferred in from outputWrapperContainer of SortResults.

7) There is a new flag called _firstBatchOfSchema_ which basically if set to 
true indicates that this is the first output batch of this schema. So in 
scenarios like schema change this flag will be set to true and first output 
container will be populated with fresh set of ValueVectors and returned 
downstream with OK_New_Schema flag.

8) There are challenges like what IterOutcome to return with each output batch. 
For example: with first very EMIT boundary Sort has to return OK_NEW_SCHEMA 
along with EMIT outcome. And for subsequent EMIT boundary if output spans 
across multiple batches then all the output batches should be returned with OK 
except the last output batch for this EMIT boundary which will be returned with 
EMIT. All these cases are handled in a single method _getFinalOutcome_ which 
basically returns correct IterOutcome to return with each output batch.

9) Once EMIT is returned for current input EMIT boundary then Sort state is 
reset which involves clearing off the data structures and setting Sort state 
correctly such that subsequent next() call is handled properly. Inside this 
_resetSortState_ now lies the logic to not clear off the memory of very last 
output batch with is sent with NONE outcome because of dependency of StreamAgg.

  was:
With Lateral and Unnest if Sort is present in the sub-query, then it needs to 
handle the EMIT outcome correctly. This means when a EMIT is received then 
perform the Sort operation on the records buffered so far and produce output 
with it. After EMIT Sort should refresh it's state and again work on next 
batches of incoming record unless an EMIT is seen again.

For first cut Sort will not support spilling in the subquery between Lateral 
and Unnest since spilling is very unlikely. The worst case that can happen is 
that Lateral will get a batch with only 1 row of data because of repeated type 
column data size being too big. In that case Unnest will produce 1 output batch 
only and Sort or other blocking operators anyways needs enough memory to at 
least hold 1 incoming batch. So in ideal cases spilling should not happen. But 
if there is a operator between Sort and Unnest which increases the data size 
then Sort might be in a situation to spill but thats not a common case for now.


> Support for EMIT outcome in ExternalSortBatch
> ---------------------------------------------
>
>                 Key: DRILL-6498
>                 URL: https://issues.apache.org/jira/browse/DRILL-6498
>             Project: Apache Drill
>          Issue Type: Task
>          Components: Execution - Relational Operators
>            Reporter: Sorabh Hamirwasia
>            Assignee: Sorabh Hamirwasia
>            Priority: Major
>             Fix For: 1.14.0
>
>
> With Lateral and Unnest if Sort is present in the sub-query, then it needs to 
> handle the EMIT outcome correctly. This means when a EMIT is received then 
> perform the Sort operation on the records buffered so far and produce output 
> with it. After EMIT Sort should refresh it's state and again work on next 
> batches of incoming record unless an EMIT is seen again.
> For first cut Sort will not support spilling in the subquery between Lateral 
> and Unnest since spilling is very unlikely. The worst case that can happen is 
> that Lateral will get a batch with only 1 row of data because of repeated 
> type column data size being too big. In that case Unnest will produce 1 
> output batch only and Sort or other blocking operators anyways needs enough 
> memory to at least hold 1 incoming batch. So in ideal cases spilling should 
> not happen. But if there is a operator between Sort and Unnest which 
> increases the data size then Sort might be in a situation to spill but thats 
> not a common case for now.
>  
> *Description of Changes:*
> Currently the sort operator is implemented in below way. This is to provide 
> general high level working of Sort and how EMIT support was implemented.
> 1) In buildSchema phase SORT creates an empty container with SV NONE and 
> sends that downstream.
> 2) Post buildSchema phase it goes into a LOAD and keeps calling next() on 
> upstream until it sees NONE or there is a failure.
> 3) Each batch which it receives it applies SV2 on them if it already doesn't 
> have it and sort them and then buffers the batch after converting it into 
> something called BatchGroup.InputBatch.
> 4) During buffering it looks for memory pressure and spill as needed.
> 5) Once all the batches are received and it gets None from upstream, it 
> starts a merge phase.
> 6) In Merge phase it check if the merge can happen in memory or spilling is 
> needed and perform the merge accordingly. 
> 7) Sort has a concept of SortResults which represents different kinds of 
> output container that sort can generate based on input batches and memory 
> conditions. For example if it's an in-memory merge then output container of 
> sort is SV4 container with SortResults of type MergeSortWrapper. If its spill 
> and merge then container is of SV_NONE type with SortResults as BatchMerger. 
> There are SortResults type for empty and single batches (not used anywhere).
> 8) SortResults basically provides an abstraction such that it provides output 
> result with each next call to it backed by output container of the 
> ExternalSortRecordBatch along with correct recordCount and Sv2/SV4 as needed. 
> So for example: in case of MergeSortWrapper all the inputs are in memory and 
> hence all output is also in memory backed by SV4. For each next call 
> basically SV4 is updated with the start index and length which informs called 
> about record boundary that it should consume. For BatchMerger based on memory 
> pressure and number of record Sort can output with each output container, it 
> fills the output container with that many records and sends downstream.
> 9) Also the abstraction of SortResults is such that at beginning of 
> MergePhase output container which is held by SortResults is cleared off and 
> later re-initialized after merge is completed.
> Now in current condition since SORT is a blocking operator it was clearing 
> the output container ValueVectors post buildSchema phase and in load phase. 
> And later it create the final output container (with ValueVector objects 
> )after it has seen all the incoming data. The very first output batch is 
> always returned with OK_NEW_SCHEMA such that downstream operator can setup 
> the correct SV mode and schema with the first output batch, since schema 
> returned in buildSchema phase was a dummy one. So the vector references 
> maintained by downstream operator in buildSchema phase is updated with vector 
> references in the first output batch.
> With EMIT however SORT will go into load phase multiple times and hence we 
> cannot clear off the output container of Sort after each EMIT boundary. If we 
> do that then downstream operator which is holding references to ExternalSort 
> output container ValueVector will become invalid and Sort also has to send 
> OK_NEW_SCHEMA with first output batch of each EMIT boundary. So to avoid that 
> we need to make sure that with each EMIT boundary, sort will go from load to 
> merge phase to produce output. And for each of these phase there is no 
> clearing of output container is happening.
> To achieve above and keep the code changes to minimum following method is 
> followed:
> 1)  A wrapper output container (_outputWrapperContainer_) and wrapper SV4 
> (_outputSV4_) is created.
> 2) This outputWrapperContainer is provided to the SortResults instead of the 
> actual container which carries the output of ExternalSortBatch.
> 3) For each EMIT boundary the Load and Merge phase will happen and mostly the 
> data for EMIT subquery is expected to be small so mostly sort will happen in 
> memory.
> 4) During each merge phase across EMIT boundary the wrapper container will be 
> created with fresh set of vectors as it does earlier. Basically clearing off 
> the container during start of merge phase and re-creating the container with 
> fresh sets of vector.
> 5) Once the Merge phase is done then _prepareOutputContainer_ is called which 
> basically calls the new method _updateOutputContainer_ of _SortResult_.
> 6) Since we are not supporting spilling with EMIT hence in 
> updateOutputContainer of BatchMerger, outcome is checked and an exception is 
> thrown as UnsupportedOperation. Whereas for 
> _MergeSortWrapper::updateOutputContainer_ it creates the vector in sort 
> actual output container for the very first output with vectors from 
> SortResults outputWrapperContainer. For other output batches which are 
> returned across EMIT batches the same output container of Sort is used with 
> same ValueVectors only the data is transferred in from outputWrapperContainer 
> of SortResults.
> 7) There is a new flag called _firstBatchOfSchema_ which basically if set to 
> true indicates that this is the first output batch of this schema. So in 
> scenarios like schema change this flag will be set to true and first output 
> container will be populated with fresh set of ValueVectors and returned 
> downstream with OK_New_Schema flag.
> 8) There are challenges like what IterOutcome to return with each output 
> batch. For example: with first very EMIT boundary Sort has to return 
> OK_NEW_SCHEMA along with EMIT outcome. And for subsequent EMIT boundary if 
> output spans across multiple batches then all the output batches should be 
> returned with OK except the last output batch for this EMIT boundary which 
> will be returned with EMIT. All these cases are handled in a single method 
> _getFinalOutcome_ which basically returns correct IterOutcome to return with 
> each output batch.
> 9) Once EMIT is returned for current input EMIT boundary then Sort state is 
> reset which involves clearing off the data structures and setting Sort state 
> correctly such that subsequent next() call is handled properly. Inside this 
> _resetSortState_ now lies the logic to not clear off the memory of very last 
> output batch with is sent with NONE outcome because of dependency of 
> StreamAgg.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to