Padma Penumarthy created DRILL-6238:
---------------------------------------

             Summary: Batch sizing for operators
                 Key: DRILL-6238
                 URL: https://issues.apache.org/jira/browse/DRILL-6238
             Project: Apache Drill
          Issue Type: New Feature
            Reporter: Padma Penumarthy
            Assignee: Padma Penumarthy


*Batch Sizing For Operators*

This document describes the approach we are taking for limiting batch sizes for 
operators other than scan. 

*Motivation*

Main goals are
 # Improve concurrency 
 # Reduce query failures because of out of memory errors

To accomplish these goals, we need to make queries execute within a specified 
memory budget. To enforce per query memory limit, we need to be able to enforce 
per fragment and per operator memory limits. Controlling individual operators 
batch sizes is the first step towards all this. 

*Background*

In Drill, different operators have different limits w.r.to outgoing batches. 
Some use hard coded row counts, some use hard coded memory and some have none 
at all. Based on input data size and what the operator is doing, memory used by 
the outgoing batch can vary widely as there are no limits imposed. Queries fail 
because we are not able to allocate the memory needed. Some operators produce 
very large batches, causing blocking operators like sort, hash agg which have 
to work under tight memory constraints to fail. Size of batches should be a 
function of available memory rather than input data size and/or what the 
operator does. Please refer to table at the end of this document for details on 
what each operator does today.

*Design*

Goal is to have all operators behave the same way i.e. produce batches with 
size less than or equal to configured outgoing batch size with a minimum of 1 
row per batch and maximum of 64k rows per batch. A new system option 
‘drill.exec.memory.operator.output_batch_size’ is added which has default value 
of 16MB.

The basic idea is to limit size of outgoing batch by deciding how many rows we 
can have in the batch based on average entry size of each outgoing column, 
taking into account actual data size and metadata vector overhead we add on top 
for tracking variable length, mode(repeated, optional, required) etc. This 
calculation will be different for each operator, based on what the operator is 
doing, incoming data and what is being projected out. 

By taking this adaptive approach based on actual data sizes, for operators 
which were limiting batch size to something less than 64K before can possibly 
do lot more rows (upto 64K) in a batch if the memory stays within the budget. 
This should help improve performance.

Also, to improve performance and utilize memory more efficiently, we will
 # Allocate memory for value vectors upfront. Since we know the number of rows 
and sizing information for each column in the  outgoing batch, we will use that 
information to allocate memory for value vectors upfront. This will help 
improve performance by reducing the memory copies and zeroing the new half we 
do every time we double.
 # Make the number of rows in outgoing batch a power of two. Since memory is 
allocated in powers of two, this will help us pack the value vectors densely 
thereby reducing the amount of memory that gets wasted because of doubling 
effect.

So, to summarize, the benefits we will get are improved memory utilization, 
better performance, higher concurrency and less queries dying because of out of 
memory errors. 

So, what are the cons ? 
 * Since this is based on averages, strict enforcement is not possible. There 
could be pathological cases where because of uneven data distribution, we might 
exceed the configured output batch size potentially causing OOM errors and 
problems in downstream operators.

Other issues that will be addressed:
 * We are adding extra processing for each batch in each operator to figure out 
the sizing information. This overhead can be reduced by passing this 
information along with the batch between operators. 
 * For some operators, it will be complex to figure out average size of 
outgoing columns especially if we have to evaluate complex expression trees and 
UDFs to figure out the transformation on incoming batches. We will use 
approximations as appropriate.

Following table summarizes the limits we have today for each operator. 

flatten, merge join and external sort have already been changed to adhere to 
batch size limits as described in this document as of drill release 1.13.

 
|*Operator*|*Limit* 
*(Rows, Memory)*|*Notes*|
|Flatten|4K, 512MB|Flatten can produce very large batches based on average 
cardinality of the flatten column. |
|Merge Receiver|32K|No memory limit. |
|Hash Aggregate|64K|No memory limit.|
|Streaming Aggregate|32K|No memory limit.|
|Broadcast Sender|None|No limits. |
|Filter, Limit|None|No limits.|
|Hash Join|4K|No memory limit|
|Merge Join|4K|No memory limit|
|Nested Loop Join|4K|No memory limit|
|Partition Sender|1K| |
|Project|64K|No memory limit|
|Selection Vector Remover|None|No limits. |
|TopN|4K|No memory limit|
|Union|None|No limit|
|Windows|None|No limit|
|External Sort|64k, 16 MB| |
|Unordered Receiver|None|No limit|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to