[ 
https://issues.apache.org/jira/browse/DRILL-6238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Padma Penumarthy updated DRILL-6238:
------------------------------------
    Description: 
*Batch Sizing For Operators*

This document describes the approach we are taking for limiting batch sizes for 
operators other than scan.

*Motivation*

Main goals are
 # Improve concurrency - By having each query use less memory i.e. stay within 
budget, we can have more queries run concurrently. 
 # Reduce query failures because of out of memory errors

To accomplish these goals, we need to make queries execute within a specified 
memory budget. To enforce per query memory limit, we need to be able to enforce 
per fragment and per operator memory limits. Controlling individual operators 
batch sizes is the first step towards all this.

*Background*

In Drill, different operators have different limits w.r.to outgoing batches. 
Some use hard coded row counts, some use hard coded memory and some have none 
at all. Based on input data size and what the operator is doing, memory used by 
the outgoing batch can vary widely as there are no limits imposed. Queries fail 
because we are not able to allocate the memory needed. Some operators produce 
very large batches, causing blocking operators like sort, hash agg which have 
to work under tight memory constraints to fail. Size of batches should be a 
function of available memory rather than input data size and/or what the 
operator does. Please refer to table at the end of this document for details on 
what each operator does today.

*Design*

Goal is to have all operators behave the same way i.e. produce batches with 
size less than or equal to configured outgoing batch size with a minimum of 1 
row per batch and maximum of 64k rows per batch. A new system option 
‘drill.exec.memory.operator.output_batch_size’ is added which has default value 
of 16MB.

The basic idea is to limit size of outgoing batch by deciding how many rows we 
can have in the batch based on average entry size of each outgoing column, 
taking into account actual data size and metadata vector overhead we add on top 
for tracking variable length, mode(repeated, optional, required) etc. The 
calculations to figure out number of rows will be different for each operator 
and is based on
 # What the operator is doing
 # Incoming batch size that includes information on type and average size of 
each column
 # What is being projected out

By taking this adaptive approach based on actual average data sizes, for 
operators which were limiting batch size to less than 64K rows before can 
possibly do lot more rows (upto 64K rows) in a batch if the memory stays within 
the budget. For example, flatten and joins have batch size of 4K rows, which 
probably might have been done to be conservative w.r.to memory usage. By making 
these operators go upto 64K rows as long as they stay with in the memory budget 
should help improve performance.

Also, to improve performance and utilize memory more efficiently, we will
 # Allocate memory for value vectors upfront. Since we know the number of rows 
and sizing information for each column in the  outgoing batch, we will use that 
information to allocate memory for value vectors upfront.  Currently, we either 
do initial allocation for 4K values and keep doubling every time we need more 
or allocate for maximum needed upfront. With this change to pre allocate memory 
based on sizing calculation, we can improve performance by reducing the memory 
copies and zeroing the new half we do every time we double and help save memory 
in cases where we were over allocating before.
 # Round down the number of rows in outgoing batch to a power of two. Since 
memory is allocated in powers of two, this will help us pack the value vectors 
densely thereby reducing the amount of memory that gets wasted because of 
doubling effect.

So, to summarize, the benefits we will get are improved memory utilization, 
better performance, higher concurrency and less queries dying because of out of 
memory errors.

Note: Since these sizing calculations are based on averages, strict memory 
usage enforcement is not possible. There could be pathological cases where 
because of uneven data distribution, we might exceed the configured output 
batch size potentially causing OOM errors and problems in downstream operators.

Other issues that will be addressed:
 * We are adding extra processing for each batch in each operator to figure out 
the sizing information. This overhead can be reduced by passing this 
information along with the batch between operators.
 * For some operators, it will be complex to figure out average size of 
outgoing columns especially if we have to evaluate complex expression trees and 
UDFs to figure out the transformation on incoming batches. We will use 
approximations as appropriate.

Following table summarizes the limits we have today for each operator.

flatten, merge join and external sort adhere to batch size limits as described 
in this document as of drill release 1.13.

 
|*Operator*     |*Limit* 
 *(Rows, Memory)*      |*Notes*|
|Flatten      |4K,  512MB    |Flatten can produce very large batches based on 
average cardinality of the flatten column. Batch size limit in 1.13.|
|Merge Receiver|32K|No memory limit. |
|Hash Aggregate|64K|No memory limit.|
|Streaming Aggregate|32K|No memory limit. Streaming Aggregate typically does 
not produce large batches. |
|Broadcast Sender|None|No limits.|
|Filter, Limit|None|No limits. These operators just add selection vector (max 
of 256KB) on top of incoming batch. If incoming batch is with in batch size 
limit, it is not a priority to adjust outgoing batch size for these operators. |
|Hash Join|4K|No memory limit. Joins produce large batches. 4k limit might have 
been put in place to be conservative w.r.to memory usage.|
|Merge Join|4K|batch size limit is added in 1.13|
|Nested Loop Join|4K|No memory limit|
|Partition Sender|1K| |
|Project|64K|No memory limit. Project can produce large batches if we are 
projecting lot of columns i.e. more than incoming. Also, some functions like 
concat can increase average row width of columns, using up lot more memory. It 
is also complex to figure out average row width of the columns in the outgoing 
batch because of complex expressions and UDFs.|
|Selection Vector Remover|None|No limits. Output batches are smaller than input 
batches. It is not a priority to limit output batch size for this operator.|
|TopN|4K|No memory limit. TopN typically does not produce large batches.|
|Union|None|No limit|
|Windows|None|No limit|
|External Sort|64k, 16 MB| External sort already limits the size of output 
batches to 16MB. |
|Unordered Receiver|None|No limit|

  was:
*Batch Sizing For Operators*

This document describes the approach we are taking for limiting batch sizes for 
operators other than scan.

*Motivation*

Main goals are
 # Improve concurrency - By having each query using less memory i.e. staying 
within budget, we can have more queries run concurrently. 
 # Reduce query failures because of out of memory errors

To accomplish these goals, we need to make queries execute within a specified 
memory budget. To enforce per query memory limit, we need to be able to enforce 
per fragment and per operator memory limits. Controlling individual operators 
batch sizes is the first step towards all this.

*Background*

In Drill, different operators have different limits w.r.to outgoing batches. 
Some use hard coded row counts, some use hard coded memory and some have none 
at all. Based on input data size and what the operator is doing, memory used by 
the outgoing batch can vary widely as there are no limits imposed. Queries fail 
because we are not able to allocate the memory needed. Some operators produce 
very large batches, causing blocking operators like sort, hash agg which have 
to work under tight memory constraints to fail. Size of batches should be a 
function of available memory rather than input data size and/or what the 
operator does. Please refer to table at the end of this document for details on 
what each operator does today.

*Design*

Goal is to have all operators behave the same way i.e. produce batches with 
size less than or equal to configured outgoing batch size with a minimum of 1 
row per batch and maximum of 64k rows per batch. A new system option 
‘drill.exec.memory.operator.output_batch_size’ is added which has default value 
of 16MB.

The basic idea is to limit size of outgoing batch by deciding how many rows we 
can have in the batch based on average entry size of each outgoing column, 
taking into account actual data size and metadata vector overhead we add on top 
for tracking variable length, mode(repeated, optional, required) etc. The 
calculations to figure out number of rows will be different for each operator 
and is based on
 # What the operator is doing
 # Incoming batch size that includes information on type and average size of 
each column
 # What is being projected out

By taking this adaptive approach based on actual average data sizes, for 
operators which were limiting batch size to less than 64K rows before can 
possibly do lot more rows (upto 64K rows) in a batch if the memory stays within 
the budget. For example, flatten and joins have batch size of 4K rows, which 
probably might have been done to be conservative w.r.to memory usage. By making 
these operators go upto 64K rows as long as they stay with in the memory budget 
should help improve performance.

Also, to improve performance and utilize memory more efficiently, we will
 # Allocate memory for value vectors upfront. Since we know the number of rows 
and sizing information for each column in the  outgoing batch, we will use that 
information to allocate memory for value vectors upfront.  Currently, we either 
do initial allocation for 4K values and keep doubling every time we need more 
or allocate for maximum needed upfront. With this change to pre allocate memory 
based on sizing calculation, we can improve performance by reducing the memory 
copies and zeroing the new half we do every time we double and help save memory 
in cases where we were over allocating before.
 # Round down the number of rows in outgoing batch to a power of two. Since 
memory is allocated in powers of two, this will help us pack the value vectors 
densely thereby reducing the amount of memory that gets wasted because of 
doubling effect.

So, to summarize, the benefits we will get are improved memory utilization, 
better performance, higher concurrency and less queries dying because of out of 
memory errors.

Note: Since these sizing calculations are based on averages, strict memory 
usage enforcement is not possible. There could be pathological cases where 
because of uneven data distribution, we might exceed the configured output 
batch size potentially causing OOM errors and problems in downstream operators.

Other issues that will be addressed:
 * We are adding extra processing for each batch in each operator to figure out 
the sizing information. This overhead can be reduced by passing this 
information along with the batch between operators.
 * For some operators, it will be complex to figure out average size of 
outgoing columns especially if we have to evaluate complex expression trees and 
UDFs to figure out the transformation on incoming batches. We will use 
approximations as appropriate.

Following table summarizes the limits we have today for each operator.

flatten, merge join and external sort adhere to batch size limits as described 
in this document as of drill release 1.13.

 
|*Operator*     |*Limit* 
 *(Rows, Memory)*      |*Notes*|
|Flatten      |4K,  512MB    |Flatten can produce very large batches based on 
average cardinality of the flatten column. Batch size limit in 1.13.|
|Merge Receiver|32K|No memory limit. |
|Hash Aggregate|64K|No memory limit.|
|Streaming Aggregate|32K|No memory limit. Streaming Aggregate typically does 
not produce large batches. |
|Broadcast Sender|None|No limits.|
|Filter, Limit|None|No limits. These operators just add selection vector (max 
of 256KB) on top of incoming batch. If incoming batch is with in batch size 
limit, it is not a priority to adjust outgoing batch size for these operators. |
|Hash Join|4K|No memory limit. Joins produce large batches. 4k limit might have 
been put in place to be conservative w.r.to memory usage.|
|Merge Join|4K|batch size limit is added in 1.13|
|Nested Loop Join|4K|No memory limit|
|Partition Sender|1K| |
|Project|64K|No memory limit. Project can produce large batches if we are 
projecting lot of columns i.e. more than incoming. Also, some functions like 
concat can increase average row width of columns, using up lot more memory. It 
is also complex to figure out average row width of the columns in the outgoing 
batch because of complex expressions and UDFs.|
|Selection Vector Remover|None|No limits. Output batches are smaller than input 
batches. It is not a priority to limit output batch size for this operator.|
|TopN|4K|No memory limit. TopN typically does not produce large batches.|
|Union|None|No limit|
|Windows|None|No limit|
|External Sort|64k, 16 MB| External sort already limits the size of output 
batches to 16MB. |
|Unordered Receiver|None|No limit|


> Batch sizing for operators
> --------------------------
>
>                 Key: DRILL-6238
>                 URL: https://issues.apache.org/jira/browse/DRILL-6238
>             Project: Apache Drill
>          Issue Type: New Feature
>            Reporter: Padma Penumarthy
>            Assignee: Padma Penumarthy
>            Priority: Major
>
> *Batch Sizing For Operators*
> This document describes the approach we are taking for limiting batch sizes 
> for operators other than scan.
> *Motivation*
> Main goals are
>  # Improve concurrency - By having each query use less memory i.e. stay 
> within budget, we can have more queries run concurrently. 
>  # Reduce query failures because of out of memory errors
> To accomplish these goals, we need to make queries execute within a specified 
> memory budget. To enforce per query memory limit, we need to be able to 
> enforce per fragment and per operator memory limits. Controlling individual 
> operators batch sizes is the first step towards all this.
> *Background*
> In Drill, different operators have different limits w.r.to outgoing batches. 
> Some use hard coded row counts, some use hard coded memory and some have none 
> at all. Based on input data size and what the operator is doing, memory used 
> by the outgoing batch can vary widely as there are no limits imposed. Queries 
> fail because we are not able to allocate the memory needed. Some operators 
> produce very large batches, causing blocking operators like sort, hash agg 
> which have to work under tight memory constraints to fail. Size of batches 
> should be a function of available memory rather than input data size and/or 
> what the operator does. Please refer to table at the end of this document for 
> details on what each operator does today.
> *Design*
> Goal is to have all operators behave the same way i.e. produce batches with 
> size less than or equal to configured outgoing batch size with a minimum of 1 
> row per batch and maximum of 64k rows per batch. A new system option 
> ‘drill.exec.memory.operator.output_batch_size’ is added which has default 
> value of 16MB.
> The basic idea is to limit size of outgoing batch by deciding how many rows 
> we can have in the batch based on average entry size of each outgoing column, 
> taking into account actual data size and metadata vector overhead we add on 
> top for tracking variable length, mode(repeated, optional, required) etc. The 
> calculations to figure out number of rows will be different for each operator 
> and is based on
>  # What the operator is doing
>  # Incoming batch size that includes information on type and average size of 
> each column
>  # What is being projected out
> By taking this adaptive approach based on actual average data sizes, for 
> operators which were limiting batch size to less than 64K rows before can 
> possibly do lot more rows (upto 64K rows) in a batch if the memory stays 
> within the budget. For example, flatten and joins have batch size of 4K rows, 
> which probably might have been done to be conservative w.r.to memory usage. 
> By making these operators go upto 64K rows as long as they stay with in the 
> memory budget should help improve performance.
> Also, to improve performance and utilize memory more efficiently, we will
>  # Allocate memory for value vectors upfront. Since we know the number of 
> rows and sizing information for each column in the  outgoing batch, we will 
> use that information to allocate memory for value vectors upfront.  
> Currently, we either do initial allocation for 4K values and keep doubling 
> every time we need more or allocate for maximum needed upfront. With this 
> change to pre allocate memory based on sizing calculation, we can improve 
> performance by reducing the memory copies and zeroing the new half we do 
> every time we double and help save memory in cases where we were over 
> allocating before.
>  # Round down the number of rows in outgoing batch to a power of two. Since 
> memory is allocated in powers of two, this will help us pack the value 
> vectors densely thereby reducing the amount of memory that gets wasted 
> because of doubling effect.
> So, to summarize, the benefits we will get are improved memory utilization, 
> better performance, higher concurrency and less queries dying because of out 
> of memory errors.
> Note: Since these sizing calculations are based on averages, strict memory 
> usage enforcement is not possible. There could be pathological cases where 
> because of uneven data distribution, we might exceed the configured output 
> batch size potentially causing OOM errors and problems in downstream 
> operators.
> Other issues that will be addressed:
>  * We are adding extra processing for each batch in each operator to figure 
> out the sizing information. This overhead can be reduced by passing this 
> information along with the batch between operators.
>  * For some operators, it will be complex to figure out average size of 
> outgoing columns especially if we have to evaluate complex expression trees 
> and UDFs to figure out the transformation on incoming batches. We will use 
> approximations as appropriate.
> Following table summarizes the limits we have today for each operator.
> flatten, merge join and external sort adhere to batch size limits as 
> described in this document as of drill release 1.13.
>  
> |*Operator*     |*Limit* 
>  *(Rows, Memory)*      |*Notes*|
> |Flatten      |4K,  512MB    |Flatten can produce very large batches based on 
> average cardinality of the flatten column. Batch size limit in 1.13.|
> |Merge Receiver|32K|No memory limit. |
> |Hash Aggregate|64K|No memory limit.|
> |Streaming Aggregate|32K|No memory limit. Streaming Aggregate typically does 
> not produce large batches. |
> |Broadcast Sender|None|No limits.|
> |Filter, Limit|None|No limits. These operators just add selection vector (max 
> of 256KB) on top of incoming batch. If incoming batch is with in batch size 
> limit, it is not a priority to adjust outgoing batch size for these 
> operators. |
> |Hash Join|4K|No memory limit. Joins produce large batches. 4k limit might 
> have been put in place to be conservative w.r.to memory usage.|
> |Merge Join|4K|batch size limit is added in 1.13|
> |Nested Loop Join|4K|No memory limit|
> |Partition Sender|1K| |
> |Project|64K|No memory limit. Project can produce large batches if we are 
> projecting lot of columns i.e. more than incoming. Also, some functions like 
> concat can increase average row width of columns, using up lot more memory. 
> It is also complex to figure out average row width of the columns in the 
> outgoing batch because of complex expressions and UDFs.|
> |Selection Vector Remover|None|No limits. Output batches are smaller than 
> input batches. It is not a priority to limit output batch size for this 
> operator.|
> |TopN|4K|No memory limit. TopN typically does not produce large batches.|
> |Union|None|No limit|
> |Windows|None|No limit|
> |External Sort|64k, 16 MB| External sort already limits the size of output 
> batches to 16MB. |
> |Unordered Receiver|None|No limit|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to