[ 
https://issues.apache.org/jira/browse/IMPALA-4268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tim Armstrong updated IMPALA-4268:
----------------------------------
    Description: 
{{PlanRootSink}} executes the producer thread (the coordinator fragment 
execution thread) in a separate thread to the consumer (i.e. the thread 
handling the fetch RPC), which calls {{GetNext()}} to retrieve the rows. The 
implementation was simplified by handing off a single batch at a time from the 
producers to consumer.

This decision causes some problems:
* Many context switches for the sender. Adding buffering would allow the sender 
to append to the buffer and continue progress without a context switch.
* Query execution can't release resources until the client has fetched the 
final batch, because the coordinator fragment thread is still running and 
potentially producing backpressure all the way down the plan tree.
* The consumer can't fulfil fetch requests greater than Impala's internal 
BATCH_SIZE, because it is only given one batch at a time.

The tricky part is managing the mismatch between the size of the row batches 
processed in {{Send()}} and the size of the fetch result asked for by the 
client without impacting performance too badly. The sender materializes output 
rows in a {{QueryResultSet}} that is owned by the coordinator. That is not, 
currently, a splittable object - instead it contains the actual RPC response 
struct that will hit the wire when the RPC completes. As asynchronous sender 
does not know the batch size, because it can in theory change on every fetch 
call (although most reasonable clients will not randomly change the fetch size).

  was:
In IMPALA-2905, we are introducing a {{PlanRootSink}} that handles the 
production of output rows at the root of a plan.

The implementation in IMPALA-2905 has the plan execute in a separate thread to 
the consumer, which calls {{GetNext()}} to retrieve the rows. However, the 
sender thread will block until {{GetNext()}} is called, so that there are no 
complications about memory usage and ownership due to having several batches in 
flight at one time.

However, this also leads to many context switches, as each {{GetNext()}} call 
yields to the sender to produce the rows. If the sender was to fill a buffer 
asynchronously, the consumer could pull out of that buffer without taking a 
context switch in many cases (and the extra buffering might smooth out any 
performance spikes due to client delays, which currently directly affect plan 
execution).

The tricky part is managing the mismatch between the size of the row batches 
processed in {{Send()}} and the size of the fetch result asked for by the 
client. The sender materializes output rows in a {{QueryResultSet}} that is 
owned by the coordinator. That is not, currently, a splittable object - instead 
it contains the actual RPC response struct that will hit the wire when the RPC 
completes. As asynchronous sender cannot know the batch size, which may change 
on every fetch call. So the {{GetNext()}} implementation would need to be able 
to split out the {{QueryResultSet}} to match the correct fetch size, and handle 
stitching together other {{QueryResultSets}} - without doing extra copies.


> Rework coordinator buffering to buffer more data
> ------------------------------------------------
>
>                 Key: IMPALA-4268
>                 URL: https://issues.apache.org/jira/browse/IMPALA-4268
>             Project: IMPALA
>          Issue Type: Improvement
>          Components: Backend
>    Affects Versions: Impala 2.8.0
>            Reporter: Henry Robinson
>            Assignee: Pooja Nilangekar
>            Priority: Major
>              Labels: query-lifecycle, resource-management
>         Attachments: rows-produced-histogram.png
>
>
> {{PlanRootSink}} executes the producer thread (the coordinator fragment 
> execution thread) in a separate thread to the consumer (i.e. the thread 
> handling the fetch RPC), which calls {{GetNext()}} to retrieve the rows. The 
> implementation was simplified by handing off a single batch at a time from 
> the producers to consumer.
> This decision causes some problems:
> * Many context switches for the sender. Adding buffering would allow the 
> sender to append to the buffer and continue progress without a context switch.
> * Query execution can't release resources until the client has fetched the 
> final batch, because the coordinator fragment thread is still running and 
> potentially producing backpressure all the way down the plan tree.
> * The consumer can't fulfil fetch requests greater than Impala's internal 
> BATCH_SIZE, because it is only given one batch at a time.
> The tricky part is managing the mismatch between the size of the row batches 
> processed in {{Send()}} and the size of the fetch result asked for by the 
> client without impacting performance too badly. The sender materializes 
> output rows in a {{QueryResultSet}} that is owned by the coordinator. That is 
> not, currently, a splittable object - instead it contains the actual RPC 
> response struct that will hit the wire when the RPC completes. As 
> asynchronous sender does not know the batch size, because it can in theory 
> change on every fetch call (although most reasonable clients will not 
> randomly change the fetch size).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to