Updated Branches: refs/heads/master dc2e17308 -> 0e830960f
Fix over memory pre-allocation within ParquetRecordReader. Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/0e830960 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/0e830960 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/0e830960 Branch: refs/heads/master Commit: 0e830960f55fbf3bca51c405ae67a02fdc0521b7 Parents: dc2e173 Author: Jacques Nadeau <[email protected]> Authored: Fri Nov 15 16:13:32 2013 -0800 Committer: Jacques Nadeau <[email protected]> Committed: Fri Nov 15 16:13:32 2013 -0800 ---------------------------------------------------------------------- .../exec/store/parquet/ParquetRecordReader.java | 91 +++++++++++--------- 1 file changed, 48 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0e830960/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java index fc3ba8c..491779a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java @@ -75,15 +75,16 @@ public class ParquetRecordReader implements RecordReader { private int recordsPerBatch; private ByteBuf bufferWithAllData; private final FieldReference ref; - long totalRecords; - long rowGroupOffset; + private long totalRecords; + private long rowGroupOffset; private List<ColumnReader> columnStatuses; - FileSystem fileSystem; + private FileSystem fileSystem; private BufferAllocator allocator; private long batchSize; - Path hadoopPath; - private final VarLenBinaryReader varLengthReader; + private Path hadoopPath; + private VarLenBinaryReader varLengthReader; + private ParquetMetadata footer; public CodecFactoryExposer getCodecFactoryExposer() { return codecFactoryExposer; @@ -111,6 +112,46 @@ public class ParquetRecordReader implements RecordReader { this.codecFactoryExposer = codecFactoryExposer; this.rowGroupIndex = rowGroupIndex; this.batchSize = batchSize; + this.footer = footer; + } + + public ByteBuf getBufferWithAllData() { + return bufferWithAllData; + } + + public int getRowGroupIndex() { + return rowGroupIndex; + } + + public int getBitWidthAllFixedFields() { + return bitWidthAllFixedFields; + } + + public long getBatchSize() { + return batchSize; + } + + /** + * @param type a fixed length type from the parquet library enum + * @return the length in pageDataByteArray of the type + */ + public static int getTypeLengthInBits(PrimitiveType.PrimitiveTypeName type) { + switch (type) { + case INT64: return 64; + case INT32: return 32; + case BOOLEAN: return 1; + case FLOAT: return 32; + case DOUBLE: return 64; + case INT96: return 96; + // binary and fixed length byte array + default: + throw new IllegalStateException("Length cannot be determined for type " + type); + } + } + + @Override + public void setup(OutputMutator output) throws ExecutionSetupException { + columnStatuses = new ArrayList<>(); @@ -172,44 +213,8 @@ public class ParquetRecordReader implements RecordReader { } catch (SchemaChangeException e) { throw new ExecutionSetupException(e); } - } - - public ByteBuf getBufferWithAllData() { - return bufferWithAllData; - } - - public int getRowGroupIndex() { - return rowGroupIndex; - } - - public int getBitWidthAllFixedFields() { - return bitWidthAllFixedFields; - } - - public long getBatchSize() { - return batchSize; - } - - /** - * @param type a fixed length type from the parquet library enum - * @return the length in pageDataByteArray of the type - */ - public static int getTypeLengthInBits(PrimitiveType.PrimitiveTypeName type) { - switch (type) { - case INT64: return 64; - case INT32: return 32; - case BOOLEAN: return 1; - case FLOAT: return 32; - case DOUBLE: return 64; - case INT96: return 96; - // binary and fixed length byte array - default: - throw new IllegalStateException("Length cannot be determined for type " + type); - } - } - - @Override - public void setup(OutputMutator output) throws ExecutionSetupException { + + output.removeAllFields(); try {
