[ https://issues.apache.org/jira/browse/DRILL-6498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Pritesh Maker updated DRILL-6498: --------------------------------- Labels: ready-to-commit (was: ) > Support for EMIT outcome in ExternalSortBatch > --------------------------------------------- > > Key: DRILL-6498 > URL: https://issues.apache.org/jira/browse/DRILL-6498 > Project: Apache Drill > Issue Type: Task > Components: Execution - Relational Operators > Reporter: Sorabh Hamirwasia > Assignee: Sorabh Hamirwasia > Priority: Major > Labels: ready-to-commit > Fix For: 1.14.0 > > > With Lateral and Unnest if Sort is present in the sub-query, then it needs to > handle the EMIT outcome correctly. This means when a EMIT is received then > perform the Sort operation on the records buffered so far and produce output > with it. After EMIT Sort should refresh it's state and again work on next > batches of incoming record unless an EMIT is seen again. > For first cut Sort will not support spilling in the subquery between Lateral > and Unnest since spilling is very unlikely. The worst case that can happen is > that Lateral will get a batch with only 1 row of data because of repeated > type column data size being too big. In that case Unnest will produce 1 > output batch only and Sort or other blocking operators anyways needs enough > memory to at least hold 1 incoming batch. So in ideal cases spilling should > not happen. But if there is a operator between Sort and Unnest which > increases the data size then Sort might be in a situation to spill but thats > not a common case for now. > > *Description of Changes:* > Currently the sort operator is implemented in below way. This is to provide > general high level working of Sort and how EMIT support was implemented. > 1) In buildSchema phase SORT creates an empty container with SV NONE and > sends that downstream. > 2) Post buildSchema phase it goes into a LOAD and keeps calling next() on > upstream until it sees NONE or there is a failure. > 3) Each batch which it receives it applies SV2 on them if it already doesn't > have it and sort them and then buffers the batch after converting it into > something called BatchGroup.InputBatch. > 4) During buffering it looks for memory pressure and spill as needed. > 5) Once all the batches are received and it gets None from upstream, it > starts a merge phase. > 6) In Merge phase it check if the merge can happen in memory or spilling is > needed and perform the merge accordingly. > 7) Sort has a concept of SortResults which represents different kinds of > output container that sort can generate based on input batches and memory > conditions. For example if it's an in-memory merge then output container of > sort is SV4 container with SortResults of type MergeSortWrapper. If its spill > and merge then container is of SV_NONE type with SortResults as BatchMerger. > There are SortResults type for empty and single batches (not used anywhere). > 8) SortResults basically provides an abstraction such that it provides output > result with each next call to it backed by output container of the > ExternalSortRecordBatch along with correct recordCount and Sv2/SV4 as needed. > So for example: in case of MergeSortWrapper all the inputs are in memory and > hence all output is also in memory backed by SV4. For each next call > basically SV4 is updated with the start index and length which informs called > about record boundary that it should consume. For BatchMerger based on memory > pressure and number of record Sort can output with each output container, it > fills the output container with that many records and sends downstream. > 9) Also the abstraction of SortResults is such that at beginning of > MergePhase output container which is held by SortResults is cleared off and > later re-initialized after merge is completed. > Now in current condition since SORT is a blocking operator it was clearing > the output container ValueVectors post buildSchema phase and in load phase. > And later it create the final output container (with ValueVector objects > )after it has seen all the incoming data. The very first output batch is > always returned with OK_NEW_SCHEMA such that downstream operator can setup > the correct SV mode and schema with the first output batch, since schema > returned in buildSchema phase was a dummy one. So the vector references > maintained by downstream operator in buildSchema phase is updated with vector > references in the first output batch. > With EMIT however SORT will go into load phase multiple times and hence we > cannot clear off the output container of Sort after each EMIT boundary. If we > do that then downstream operator which is holding references to ExternalSort > output container ValueVector will become invalid and Sort also has to send > OK_NEW_SCHEMA with first output batch of each EMIT boundary. So to avoid that > we need to make sure that with each EMIT boundary, sort will go from load to > merge phase to produce output. And for each of these phase there is no > clearing of output container is happening. > *To achieve above and keep the code changes to minimum following method is > followed:* > 1) A wrapper output container (_outputWrapperContainer_) and wrapper SV4 > (_outputSV4_) is created. > 2) This outputWrapperContainer is provided to the SortResults instead of the > actual container which carries the output of ExternalSortBatch. > 3) For each EMIT boundary the Load and Merge phase will happen and mostly the > data for EMIT subquery is expected to be small so mostly sort will happen in > memory. > 4) During each merge phase across EMIT boundary the wrapper container will be > created with fresh set of vectors as it does earlier. Basically clearing off > the container during start of merge phase and re-creating the container with > fresh sets of vector. > 5) Once the Merge phase is done then _prepareOutputContainer_ is called which > basically calls the new method _updateOutputContainer_ of _SortResult_. > 6) Since we are not supporting spilling with EMIT hence in > updateOutputContainer of BatchMerger, outcome is checked and an exception is > thrown as UnsupportedOperation. Whereas for > _MergeSortWrapper::updateOutputContainer_ it creates the vector in sort > actual output container for the very first output with vectors from > SortResults outputWrapperContainer. For other output batches which are > returned across EMIT batches the same output container of Sort is used with > same ValueVectors only the data is transferred in from outputWrapperContainer > of SortResults. > 7) There is a new flag called _firstBatchOfSchema_ which basically if set to > true indicates that this is the first output batch of this schema. So in > scenarios like schema change this flag will be set to true and first output > container will be populated with fresh set of ValueVectors and returned > downstream with OK_New_Schema flag. > 8) There are challenges like what IterOutcome to return with each output > batch. For example: with first very EMIT boundary Sort has to return > OK_NEW_SCHEMA along with EMIT outcome. And for subsequent EMIT boundary if > output spans across multiple batches then all the output batches should be > returned with OK except the last output batch for this EMIT boundary which > will be returned with EMIT. All these cases are handled in a single method > _getFinalOutcome_ which basically returns correct IterOutcome to return with > each output batch. > 9) Once EMIT is returned for current input EMIT boundary then Sort state is > reset which involves clearing off the data structures and setting Sort state > correctly such that subsequent next() call is handled properly. Inside this > _resetSortState_ now lies the logic to not clear off the memory of very last > output batch with is sent with NONE outcome because of dependency of > StreamAgg. > *Future Tasks:* > If we want to support handling spilling scenarios with EMIT outcome then that > can also be achieved with current design. Basically > _BatchMerger::updateOutputContainer_ will have a logic to populate the actual > output container of Sort with SV4 type container instead of SV None container > which BatchMerger currently returns. With EMIT boundary and spilling > happening for some boundary and in-memory for others we cannot send 2 > different types of container from Sort which is SV_NONE for spilling and SV4 > for in-memory merge. If we do that then OK_NEW_SCHEMA needs to be send with > each output batch since the vector references has to be changed for > downstream. Instead we can always produce an output container of SV4 type, > such that in case of spilling across an EMIT boundary same output container > with same ValueVectors references will be populated with new data. Hence no > OK_NEW_SCHEMA needs to be sent downstream. There is one disadvantage of this > approach which is now if Sort is send SV4 container instead of SV None in > case of spilling as well then somewhere in downstream RemovingRecordBatch > will have to again copy the data from SV4 to create a regular SV None > container. -- This message was sent by Atlassian JIRA (v7.6.3#76005)