Paul Rogers created DRILL-5656:
----------------------------------
Summary: Streaming Agg Batch forces sort to retain in-memory
batches past NONE
Key: DRILL-5656
URL: https://issues.apache.org/jira/browse/DRILL-5656
Project: Apache Drill
Issue Type: Bug
Affects Versions: 1.10.0
Reporter: Paul Rogers
Drill provides the {{StreamingAggBatch}} operator. The operator performs quite
a bit of processing in the template used to generate code. Consider
{{StreamingAggTemplate.doWork()}}. Consider an input operator that returns one
batch (with {{OK_NEW_SCHEMA}} then returns {{NONE}}).
The top part of {{doWork()}} iterates over the incoming rows of the first batch
(indexed by an SV4 in this case.) Then comes a loop to process the remaining
batches:
{code}
InternalBatch previous = new InternalBatch(incoming, context);
while (true) {
IterOutcome out = outgoing.next(0, incoming);
{code}
For an SV4, the above code does not clone or transfer the incoming data; it
simply retains a reference to it. This is because of the way an SV4 works: an
SV4 iterates over a fixed hyper-batch: a set of batches. Consider the sort. The
first call to the sort's {{next()}} returns a virtual batch with references,
say, to 5 underlying batches. The next {{next()}} returns a different set of
references to the same set of five batches. The virtual batch consists of a set
of references that read out records in sorted order. Complex, yes, but avoids
unnecessary data copies.
Consider what happens when the sort reaches the end of the virtual batches. The
sort has a large amount of data sitting in memory (potentially many GB). The
sort is done; there is nothing left to deliver downstream. A tidy
implementation would free up the batches so that the memory can be better used
elsewhere.
But, doing that causes the following streaming agg code to fail with an
{{IndexOutOfBounds}} exception:
{code}
switch (out) {
case NONE:
...
if (addedRecordCount > 0) {
outputToBatchPrev(previous, previousIndex, outputCount);
{code}
That is, on NONE, the streaming agg batch wants to reference to the data from
the prior batch, by holding a reference to the hyper-vectors from (in this
case) the sort.
The result is that the sort *cannot* release memory when returning NONE; it
must retain all the in-memory batches (since the last virtual batch might
reference any of them.)
But, the next opportunity for the sort to release memory is on {{close()}}. The
problem is, the operators above streaming agg might include other
memory-intensive operations. Since sort cannot release its memory, those other
operations suffer. The sort memory is retained, though, say, another sort or
hash join, all the way to query completion.
The reason for this JIRA (rather than just fixing the bug) is that the present
iterator interface has no way to work around this issue cleanly. Four "hacks"
are possible:
1. The streaming agg calls {{close()}} on the incoming after finishing with the
previous incoming batch. (The problem is, all possible incoming operators must
be able to handle multiple calls to close: one from streaming agg, another from
the segment executor.)
2. The streaming agg calls {{next()}} on the incoming batch a second time after
receiving {{NONE}}. The sort can track this case and release its memory. The
problem is that the iterator validator (and probably other operators) forbid
this pattern.
3. Add a new method to the iterator protocol, {{cleanup}}, that is a partial
close: says to release all resources after the operator returns {{NONE.}}
4. Special case hack if the type of the incoming is the sort: cast to that
operator and call an operator-specific method.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)