Fix allocation errors and bug in external sort
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/da988222 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/da988222 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/da988222 Branch: refs/heads/master Commit: da988222c6f133347eccdefb815df4657f2fad8b Parents: a0d3906 Author: Steven Phillips <sphill...@maprtech.com> Authored: Sat Aug 30 19:12:13 2014 -0700 Committer: Jacques Nadeau <jacq...@apache.org> Committed: Sun Aug 31 10:27:26 2014 -0700 ---------------------------------------------------------------------- .../org/apache/drill/exec/physical/impl/ScanBatch.java | 1 + .../exec/physical/impl/sort/SortRecordBatchBuilder.java | 10 +++++++++- .../exec/physical/impl/xsort/ExternalSortBatch.java | 11 +++++++---- .../java/parquet/hadoop/ColumnChunkIncReadStore.java | 1 + 4 files changed, 18 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/da988222/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index e56d883..79a25dc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -349,6 +349,7 @@ public class ScanBatch implements RecordBatch { v.clear(); } fieldVectorMap.clear(); + currentReader.cleanup(); oContext.close(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/da988222/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java index 9626a97..80b4ef6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java @@ -30,6 +30,7 @@ import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.vector.ValueVector; @@ -105,7 +106,14 @@ public class SortRecordBatchBuilder { } - if (rbd.getRecordCount() == 0 && batches.size() > 0) return true; + if (rbd.getRecordCount() == 0 && batches.size() > 0) { + rbd.getContainer().zeroVectors(); + SelectionVector2 sv2 = rbd.getSv2(); + if (sv2 != null) { + sv2.clear(); + } + return true; + } runningBytes += batchBytes; batches.put(rbd.getContainer().getSchema(), rbd); recordCount += rbd.getRecordCount(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/da988222/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index aa65272..505f567 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -236,7 +236,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { return upstream; case OK_NEW_SCHEMA: // only change in the case that the schema truly changes. Artificial schema changes are ignored. - first = false; if(!incoming.getSchema().equals(schema)){ if (schema != null) throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas."); this.schema = incoming.getSchema(); @@ -244,11 +243,15 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { } // fall through. case OK: + if (!first && incoming.getRecordCount() == 0) { + for (VectorWrapper w : incoming) { + w.clear(); + } + break; + } + if (first) first = false; totalSizeInMemory += getBufferSize(incoming); SelectionVector2 sv2; -// if (incoming.getRecordCount() == 0) { -// break outer; -// } if (incoming.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE) { sv2 = incoming.getSelectionVector2(); if (sv2.getBuffer(false).isRootBuffer()) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/da988222/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java index 516be0e..50107ac 100644 --- a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java +++ b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java @@ -170,6 +170,7 @@ public class ColumnChunkIncReadStore implements PageReadStore { void close() { if (lastPage != null) { lastPage.release(); + lastPage = null; } } }