This is an automated email from the ASF dual-hosted git repository. parthc pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 7655ec4f54976def63101daabf34e51697978c57 Author: Sorabh Hamirwasia <shamirwa...@maprtech.com> AuthorDate: Sun Jul 1 00:02:55 2018 -0600 DRILL-6516: Fix memory leak issue with Sort and StreamingAgg together --- .../impl/xsort/managed/ExternalSortBatch.java | 54 +++++++++++----------- 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java index ea7f51f..7db4d3b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java @@ -315,7 +315,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { case START: return load(); case LOAD: - resetSortState(); + if (!this.retainInMemoryBatchesOnNone) { + resetSortState(); + } return (sortState == SortState.DONE) ? NONE : load(); case DELIVER: return nextOutputBatch(); @@ -578,36 +580,20 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { } if (incoming instanceof ExternalSortBatch) { ExternalSortBatch esb = (ExternalSortBatch) incoming; - esb.releaseResources(); + esb.resetSortState(); } } private void releaseResources() { - // This means if it has received NONE outcome and flag to retain is false OR if it has seen an EMIT - // then release the resources - if ((sortState == SortState.DONE && !this.retainInMemoryBatchesOnNone) || - (sortState == SortState.LOAD)) { - - // Close the iterator here to release any remaining resources such - // as spill files. This is important when a query has a join: the - // first branch sort may complete before the second branch starts; - // it may be quite a while after returning the last batch before the - // fragment executor calls this operator's close method. - // - // Note however, that the StreamingAgg operator REQUIRES that the sort - // retain the batches behind an SV4 when doing an in-memory sort because - // the StreamingAgg retains a reference to that data that it will use - // after receiving a NONE result code. See DRILL-5656. - //zeroResources(); - if (resultsIterator != null) { - resultsIterator.close(); - } - // We only zero vectors for actual output container - outputWrapperContainer.clear(); - outputSV4.clear(); - container.zeroVectors(); + if (resultsIterator != null) { + resultsIterator.close(); } + // We only zero vectors for actual output container + outputWrapperContainer.clear(); + outputSV4.clear(); + container.zeroVectors(); + // Close sortImpl for this boundary if (sortImpl != null) { sortImpl.close(); @@ -620,6 +606,20 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { */ private void resetSortState() { sortState = (lastKnownOutcome == EMIT) ? SortState.LOAD : SortState.DONE; + // This means if it has received NONE/EMIT outcome and flag to retain is false which will be the case in presence of + // StreamingAggBatch only since it will explicitly call releaseBacthes on ExternalSort when its done consuming + // all the data buffer. + + // Close the iterator here to release any remaining resources such + // as spill files. This is important when a query has a join: the + // first branch sort may complete before the second branch starts; + // it may be quite a while after returning the last batch before the + // fragment executor calls this operator's close method. + // + // Note however, that the StreamingAgg operator REQUIRES that the sort + // retain the batches behind an SV4 when doing an in-memory sort because + // the StreamingAgg retains a reference to that data that it will use + // after receiving a NONE result code. See DRILL-5656. releaseResources(); if (lastKnownOutcome == EMIT) { @@ -674,7 +674,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { sortState = SortState.DELIVER; } else if (getRecordCount() == 0) { // There is no record to send downstream outcomeToReturn = lastKnownOutcome == EMIT ? EMIT : NONE; - resetSortState(); + if (!this.retainInMemoryBatchesOnNone) { + resetSortState(); + } } else if (lastKnownOutcome == EMIT) { final boolean hasMoreRecords = outputSV4.hasNext(); sortState = hasMoreRecords ? SortState.DELIVER : SortState.LOAD;