[
https://issues.apache.org/jira/browse/IMPALA-4268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16720490#comment-16720490
]
Tim Armstrong commented on IMPALA-4268:
---------------------------------------
Here's a rough approach that I think would work reasonably well in practice.
The goal is to avoid multiple copies of the results in cases where it is likely
to be a performance bottleneck, i.e. when the result set is large and when the
client is rapidly fetching the results. We should assume that the common case
is clients repeatedly fetching the same number of rows.
* The producer, at every point in time, has a target batch size which is a
guess about what the next Fetch() size is likely to be.
** Initially this would be some hardcoded value, based on common client
behaviour
** It would be updated to the last fetch request size every time the client
fetches.
* The ClientRequestState or PlanRootSink (tbd) has some number of buffered
QueryResultSet objects. We want this to be bounded to a threshold in bytes.
* The producer tries to fill the last queued QueryResultSet object up to the
target fetch size, moving onto a new one once it is full.
* The consumer tries to fulfil the requested fetch size by either returning a
queued QueryResultSet directly (if it is close enough to the right size) or by
copying rows from the queued QueryResultsSets into its own output
QueryResultSet. There are some subtle issues to think about:
** Should the consumer block waiting for more rows if its target number of rows
are not yet available? Returning too eagerly results in many tiny batches being
returned, but blocking indefinitely delays return of results to the client. One
option is to, initially, block until either there are enough rows to fill the
target request size, the producer has hit eos, or until some timeout has
elapsed.
** We need to have some way to get the batches aligned if they get misaligned
earlier in the process. Maybe a heuristic is, when reaching a batch boundary,
only continue consuming that batch if it will fit entirely in the output.
One thing to keep in mind is that, ideally, the client result cache and the
buffer should be the same thing. The difference is that the client result cache
retains all results and discards the cache if it overflows, whereas the
PlanRootSink buffering is a bounded queue where the producer blocks when it
gets full. We could combine those behaviours if we have a single buffer with a
read cursor that is the next row to be returned. We would only advance the read
cursor when returning rows instead of discarding the read rows. Then, when the
buffer is full, the producer first discards the already-read cached rows to
free up space, then blocks if the buffer is still full.
We should look at staging the work - maybe we don't need to merge them as part
of the first bit of work.
> buffer more than a batch of rows at coordinator
> -----------------------------------------------
>
> Key: IMPALA-4268
> URL: https://issues.apache.org/jira/browse/IMPALA-4268
> Project: IMPALA
> Issue Type: Improvement
> Components: Backend
> Affects Versions: Impala 2.8.0
> Reporter: Henry Robinson
> Assignee: Bikramjeet Vig
> Priority: Major
> Labels: query-lifecycle, resource-management
> Attachments: rows-produced-histogram.png
>
>
> In IMPALA-2905, we are introducing a {{PlanRootSink}} that handles the
> production of output rows at the root of a plan.
> The implementation in IMPALA-2905 has the plan execute in a separate thread
> to the consumer, which calls {{GetNext()}} to retrieve the rows. However, the
> sender thread will block until {{GetNext()}} is called, so that there are no
> complications about memory usage and ownership due to having several batches
> in flight at one time.
> However, this also leads to many context switches, as each {{GetNext()}} call
> yields to the sender to produce the rows. If the sender was to fill a buffer
> asynchronously, the consumer could pull out of that buffer without taking a
> context switch in many cases (and the extra buffering might smooth out any
> performance spikes due to client delays, which currently directly affect plan
> execution).
> The tricky part is managing the mismatch between the size of the row batches
> processed in {{Send()}} and the size of the fetch result asked for by the
> client. The sender materializes output rows in a {{QueryResultSet}} that is
> owned by the coordinator. That is not, currently, a splittable object -
> instead it contains the actual RPC response struct that will hit the wire
> when the RPC completes. As asynchronous sender cannot know the batch size,
> which may change on every fetch call. So the {{GetNext()}} implementation
> would need to be able to split out the {{QueryResultSet}} to match the
> correct fetch size, and handle stitching together other {{QueryResultSets}} -
> without doing extra copies.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]