Repository: incubator-drill Updated Branches: refs/heads/master 4e817d115 -> 84d23350c
DRILL-443: Fix potential memory leaks inside HashAggregation and HashTable. Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/84d23350 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/84d23350 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/84d23350 Branch: refs/heads/master Commit: 84d23350ca278d84dcff48a47537db63707abad5 Parents: 4e817d1 Author: Aman Sinha <[email protected]> Authored: Fri Mar 28 16:12:28 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Sat Mar 29 15:15:40 2014 -0700 ---------------------------------------------------------------------- .../physical/impl/aggregate/HashAggBatch.java | 8 +++- .../impl/aggregate/HashAggTemplate.java | 47 ++++++++++++++------ .../physical/impl/common/HashTableTemplate.java | 13 +++--- .../exec/physical/impl/agg/TestHashAggr.java | 8 ++-- 4 files changed, 50 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/84d23350/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java index d720390..9add544 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java @@ -121,17 +121,21 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { } } + if (aggregator.allFlushed()) { return IterOutcome.NONE; } + logger.debug("Starting aggregator doWork; incoming record count = {} ", incoming.getRecordCount()); + while(true){ AggOutcome out = aggregator.doWork(); logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount()); switch(out){ case CLEANUP_AND_RETURN: - container.zeroVectors(); - aggregator.cleanup(); + container.clear(); + aggregator.cleanup(); + incoming.cleanup(); done = true; return aggregator.getOutcome(); case RETURN_OUTCOME: http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/84d23350/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java index 3109124..21c0c7d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java @@ -40,6 +40,7 @@ import org.apache.drill.exec.physical.config.HashAggregate; import org.apache.drill.exec.physical.impl.common.ChainedHashTable; import org.apache.drill.exec.physical.impl.common.HashTable; import org.apache.drill.exec.physical.impl.common.HashTableConfig; +import org.apache.drill.exec.physical.impl.common.HashTableTemplate.BatchHolder; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; @@ -58,7 +59,8 @@ import com.google.common.collect.Lists; public abstract class HashAggTemplate implements HashAggregator { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregator.class); - private static final boolean EXTRA_DEBUG = false; + private static final boolean EXTRA_DEBUG_1 = false; + private static final boolean EXTRA_DEBUG_2 = false; private static final String TOO_BIG_ERROR = "Couldn't add value to an empty batch. This likely means that a single value is too long for a varlen field."; private boolean first = true; private boolean newSchema = false; @@ -120,7 +122,7 @@ public abstract class HashAggTemplate implements HashAggregator { private boolean outputValues() { for (int i = 0; i <= maxOccupiedIdx; i++) { if (outputRecordValues(i, outputCount) ) { - if (EXTRA_DEBUG) logger.debug("Outputting values to {}", outputCount) ; + if (EXTRA_DEBUG_2) logger.debug("Outputting values to {}", outputCount) ; outputCount++; } else { return false; @@ -129,6 +131,10 @@ public abstract class HashAggTemplate implements HashAggregator { return true; } + private void clear() { + aggrValuesContainer.clear(); + } + // Code-generated methods (implemented in HashAggBatch) @RuntimeOverridden @@ -199,25 +205,28 @@ public abstract class HashAggTemplate implements HashAggregator { outside: while(true) { // loop through existing records, aggregating the values as necessary. + if (EXTRA_DEBUG_1) logger.debug ("Starting outer loop of doWork()..."); for (; underlyingIndex < incoming.getRecordCount(); incIndex()) { - if(EXTRA_DEBUG) logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); + if(EXTRA_DEBUG_2) logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); checkGroupAndAggrValues(currentIndex); } + if (EXTRA_DEBUG_1) logger.debug("Processed {} records", underlyingIndex); + try{ while(true){ IterOutcome out = incoming.next(); - if(EXTRA_DEBUG) logger.debug("Received IterOutcome of {}", out); + if(EXTRA_DEBUG_1) logger.debug("Received IterOutcome of {}", out); switch(out){ case NOT_YET: this.outcome = out; return AggOutcome.RETURN_OUTCOME; case OK_NEW_SCHEMA: - if(EXTRA_DEBUG) logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount()); - newSchema = true; - + if(EXTRA_DEBUG_1) logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount()); + newSchema = true; + this.cleanup(); // TODO: new schema case needs to be handled appropriately return AggOutcome.UPDATE_AGGREGATOR; @@ -229,14 +238,20 @@ public abstract class HashAggTemplate implements HashAggregator { checkGroupAndAggrValues(currentIndex); incIndex(); - if(EXTRA_DEBUG) logger.debug("Continuing outside loop"); + if(EXTRA_DEBUG_1) logger.debug("Continuing outside loop"); continue outside; } case NONE: outcome = out; outputKeysAndValues() ; - + + // cleanup my internal state since there is nothing more to return + this.cleanup(); + // cleanup incoming batch since output of aggregation does not need + // any references to the incoming + + incoming.cleanup(); return setOkAndReturn(); case STOP: @@ -257,12 +272,12 @@ public abstract class HashAggTemplate implements HashAggregator { private void allocateOutgoing() { for (VectorAllocator a : keyAllocators) { - if(EXTRA_DEBUG) logger.debug("Outgoing batch: Allocating {} with {} records.", a, numGroupedRecords); + if(EXTRA_DEBUG_2) logger.debug("Outgoing batch: Allocating {} with {} records.", a, numGroupedRecords); a.alloc(numGroupedRecords); } for (VectorAllocator a : valueAllocators) { - if(EXTRA_DEBUG) logger.debug("Outgoing batch: Allocating {} with {} records.", a, numGroupedRecords); + if(EXTRA_DEBUG_2) logger.debug("Outgoing batch: Allocating {} with {} records.", a, numGroupedRecords); a.alloc(numGroupedRecords); } } @@ -283,6 +298,10 @@ public abstract class HashAggTemplate implements HashAggregator { htable = null; htIdxHolder = null; materializedValueFields = null; + + for (BatchHolder bh : batchHolders) { + bh.clear(); + } batchHolders.clear(); batchHolders = null; } @@ -323,7 +342,7 @@ public abstract class HashAggTemplate implements HashAggregator { BatchHolder bh = new BatchHolder(); batchHolders.add(bh); - if (EXTRA_DEBUG) logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders.size()); + if (EXTRA_DEBUG_1) logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders.size()); int batchIdx = batchHolders.size() - 1; bh.setup(batchIdx); @@ -382,7 +401,7 @@ public abstract class HashAggTemplate implements HashAggregator { int idxWithinBatch = currentIdx & HashTable.BATCH_MASK; if (putStatus == HashTable.PutStatus.KEY_PRESENT) { - if (EXTRA_DEBUG) logger.debug("Group-by key already present in hash table, updating the aggregate values"); + if (EXTRA_DEBUG_2) logger.debug("Group-by key already present in hash table, updating the aggregate values"); // debugging //if (holder.value == 100018 || holder.value == 100021) { @@ -391,7 +410,7 @@ public abstract class HashAggTemplate implements HashAggregator { } else if (putStatus == HashTable.PutStatus.KEY_ADDED) { - if (EXTRA_DEBUG) logger.debug("Group-by key was added to hash table, inserting new aggregate values") ; + if (EXTRA_DEBUG_2) logger.debug("Group-by key was added to hash table, inserting new aggregate values") ; // debugging // if (holder.value == 100018 || holder.value == 100021) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/84d23350/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java index a4400f0..5a7a6fa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java @@ -239,6 +239,9 @@ public abstract class HashTableTemplate implements HashTable { } + links.clear(); + hashValues.clear(); + links = newLinks; hashValues = newHashValues; } @@ -287,11 +290,9 @@ public abstract class HashTableTemplate implements HashTable { } private void clear() { - htContainer = null; + htContainer.clear();; links.clear(); - links = null; hashValues.clear(); - hashValues = null; } // These methods will be code-generated @@ -377,8 +378,7 @@ public abstract class HashTableTemplate implements HashTable { batchHolders.clear(); batchHolders = null; startIndices.clear(); - startIndices = null; - currentIdxHolder = null; + currentIdxHolder = null; numEntries = 0; } @@ -572,7 +572,8 @@ public abstract class HashTableTemplate implements HashTable { int batchStartIdx = i * BATCH_SIZE; bh.rehash(tableSize, newStartIndices, batchStartIdx); } - + + startIndices.clear(); startIndices = newStartIndices; if (EXTRA_DEBUG) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/84d23350/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggr.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggr.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggr.java index 8401d7e..1ab7248 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggr.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggr.java @@ -22,10 +22,9 @@ import org.apache.drill.BaseTestQuery; import org.junit.Ignore; import org.junit.Test; -@Ignore // DRILL-443 + public class TestHashAggr extends BaseTestQuery{ - @Test public void testQ6() throws Exception{ testPhysicalFromFile("agg/hashagg/q6.json"); @@ -41,15 +40,16 @@ public class TestHashAggr extends BaseTestQuery{ testPhysicalFromFile("agg/hashagg/q7_2.json"); } - + @Ignore // ignore temporarily since this shows memory leak in ParquetRecordReader (DRILL-443) @Test public void testQ8_1() throws Exception{ testPhysicalFromFile("agg/hashagg/q8_1.json"); } + @Ignore // ignore temporarily since this shows memory leak in ParquetRecordReader (DRILL-443) @Test public void test8() throws Exception{ testPhysicalFromFile("agg/hashagg/q8.json"); } - + }
