[
https://issues.apache.org/jira/browse/DRILL-6498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Pritesh Maker updated DRILL-6498:
---------------------------------
Labels: ready-to-commit (was: )
> Support for EMIT outcome in ExternalSortBatch
> ---------------------------------------------
>
> Key: DRILL-6498
> URL: https://issues.apache.org/jira/browse/DRILL-6498
> Project: Apache Drill
> Issue Type: Task
> Components: Execution - Relational Operators
> Reporter: Sorabh Hamirwasia
> Assignee: Sorabh Hamirwasia
> Priority: Major
> Labels: ready-to-commit
> Fix For: 1.14.0
>
>
> With Lateral and Unnest if Sort is present in the sub-query, then it needs to
> handle the EMIT outcome correctly. This means when a EMIT is received then
> perform the Sort operation on the records buffered so far and produce output
> with it. After EMIT Sort should refresh it's state and again work on next
> batches of incoming record unless an EMIT is seen again.
> For first cut Sort will not support spilling in the subquery between Lateral
> and Unnest since spilling is very unlikely. The worst case that can happen is
> that Lateral will get a batch with only 1 row of data because of repeated
> type column data size being too big. In that case Unnest will produce 1
> output batch only and Sort or other blocking operators anyways needs enough
> memory to at least hold 1 incoming batch. So in ideal cases spilling should
> not happen. But if there is a operator between Sort and Unnest which
> increases the data size then Sort might be in a situation to spill but thats
> not a common case for now.
>
> *Description of Changes:*
> Currently the sort operator is implemented in below way. This is to provide
> general high level working of Sort and how EMIT support was implemented.
> 1) In buildSchema phase SORT creates an empty container with SV NONE and
> sends that downstream.
> 2) Post buildSchema phase it goes into a LOAD and keeps calling next() on
> upstream until it sees NONE or there is a failure.
> 3) Each batch which it receives it applies SV2 on them if it already doesn't
> have it and sort them and then buffers the batch after converting it into
> something called BatchGroup.InputBatch.
> 4) During buffering it looks for memory pressure and spill as needed.
> 5) Once all the batches are received and it gets None from upstream, it
> starts a merge phase.
> 6) In Merge phase it check if the merge can happen in memory or spilling is
> needed and perform the merge accordingly.
> 7) Sort has a concept of SortResults which represents different kinds of
> output container that sort can generate based on input batches and memory
> conditions. For example if it's an in-memory merge then output container of
> sort is SV4 container with SortResults of type MergeSortWrapper. If its spill
> and merge then container is of SV_NONE type with SortResults as BatchMerger.
> There are SortResults type for empty and single batches (not used anywhere).
> 8) SortResults basically provides an abstraction such that it provides output
> result with each next call to it backed by output container of the
> ExternalSortRecordBatch along with correct recordCount and Sv2/SV4 as needed.
> So for example: in case of MergeSortWrapper all the inputs are in memory and
> hence all output is also in memory backed by SV4. For each next call
> basically SV4 is updated with the start index and length which informs called
> about record boundary that it should consume. For BatchMerger based on memory
> pressure and number of record Sort can output with each output container, it
> fills the output container with that many records and sends downstream.
> 9) Also the abstraction of SortResults is such that at beginning of
> MergePhase output container which is held by SortResults is cleared off and
> later re-initialized after merge is completed.
> Now in current condition since SORT is a blocking operator it was clearing
> the output container ValueVectors post buildSchema phase and in load phase.
> And later it create the final output container (with ValueVector objects
> )after it has seen all the incoming data. The very first output batch is
> always returned with OK_NEW_SCHEMA such that downstream operator can setup
> the correct SV mode and schema with the first output batch, since schema
> returned in buildSchema phase was a dummy one. So the vector references
> maintained by downstream operator in buildSchema phase is updated with vector
> references in the first output batch.
> With EMIT however SORT will go into load phase multiple times and hence we
> cannot clear off the output container of Sort after each EMIT boundary. If we
> do that then downstream operator which is holding references to ExternalSort
> output container ValueVector will become invalid and Sort also has to send
> OK_NEW_SCHEMA with first output batch of each EMIT boundary. So to avoid that
> we need to make sure that with each EMIT boundary, sort will go from load to
> merge phase to produce output. And for each of these phase there is no
> clearing of output container is happening.
> *To achieve above and keep the code changes to minimum following method is
> followed:*
> 1) A wrapper output container (_outputWrapperContainer_) and wrapper SV4
> (_outputSV4_) is created.
> 2) This outputWrapperContainer is provided to the SortResults instead of the
> actual container which carries the output of ExternalSortBatch.
> 3) For each EMIT boundary the Load and Merge phase will happen and mostly the
> data for EMIT subquery is expected to be small so mostly sort will happen in
> memory.
> 4) During each merge phase across EMIT boundary the wrapper container will be
> created with fresh set of vectors as it does earlier. Basically clearing off
> the container during start of merge phase and re-creating the container with
> fresh sets of vector.
> 5) Once the Merge phase is done then _prepareOutputContainer_ is called which
> basically calls the new method _updateOutputContainer_ of _SortResult_.
> 6) Since we are not supporting spilling with EMIT hence in
> updateOutputContainer of BatchMerger, outcome is checked and an exception is
> thrown as UnsupportedOperation. Whereas for
> _MergeSortWrapper::updateOutputContainer_ it creates the vector in sort
> actual output container for the very first output with vectors from
> SortResults outputWrapperContainer. For other output batches which are
> returned across EMIT batches the same output container of Sort is used with
> same ValueVectors only the data is transferred in from outputWrapperContainer
> of SortResults.
> 7) There is a new flag called _firstBatchOfSchema_ which basically if set to
> true indicates that this is the first output batch of this schema. So in
> scenarios like schema change this flag will be set to true and first output
> container will be populated with fresh set of ValueVectors and returned
> downstream with OK_New_Schema flag.
> 8) There are challenges like what IterOutcome to return with each output
> batch. For example: with first very EMIT boundary Sort has to return
> OK_NEW_SCHEMA along with EMIT outcome. And for subsequent EMIT boundary if
> output spans across multiple batches then all the output batches should be
> returned with OK except the last output batch for this EMIT boundary which
> will be returned with EMIT. All these cases are handled in a single method
> _getFinalOutcome_ which basically returns correct IterOutcome to return with
> each output batch.
> 9) Once EMIT is returned for current input EMIT boundary then Sort state is
> reset which involves clearing off the data structures and setting Sort state
> correctly such that subsequent next() call is handled properly. Inside this
> _resetSortState_ now lies the logic to not clear off the memory of very last
> output batch with is sent with NONE outcome because of dependency of
> StreamAgg.
> *Future Tasks:*
> If we want to support handling spilling scenarios with EMIT outcome then that
> can also be achieved with current design. Basically
> _BatchMerger::updateOutputContainer_ will have a logic to populate the actual
> output container of Sort with SV4 type container instead of SV None container
> which BatchMerger currently returns. With EMIT boundary and spilling
> happening for some boundary and in-memory for others we cannot send 2
> different types of container from Sort which is SV_NONE for spilling and SV4
> for in-memory merge. If we do that then OK_NEW_SCHEMA needs to be send with
> each output batch since the vector references has to be changed for
> downstream. Instead we can always produce an output container of SV4 type,
> such that in case of spilling across an EMIT boundary same output container
> with same ValueVectors references will be populated with new data. Hence no
> OK_NEW_SCHEMA needs to be sent downstream. There is one disadvantage of this
> approach which is now if Sort is send SV4 container instead of SV None in
> case of spilling as well then somewhere in downstream RemovingRecordBatch
> will have to again copy the data from SV4 to create a regular SV None
> container.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)