http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java index eb9fdd9..86edd79 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java @@ -36,9 +36,11 @@ import it.unimi.dsi.fastutil.objects.Object2IntMap; import it.unimi.dsi.fastutil.objects.ObjectIterator; import java.io.IOException; -import java.util.Arrays; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; +import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.Log; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.BytesUtils; @@ -92,17 +94,28 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req /* dictionary encoded values */ protected IntList encodedValues = new IntList(); + /** indicates if this is the first page being processed */ + protected boolean firstPage = true; + + protected ByteBufferAllocator allocator; + /* Track the list of writers used so they can be appropriately closed when necessary + (currently used for off-heap memory which is not garbage collected) */ + private List<RunLengthBitPackingHybridEncoder> encoders = new ArrayList<RunLengthBitPackingHybridEncoder>(); + /** * @param maxDictionaryByteSize */ - protected DictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) { + protected DictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage, ByteBufferAllocator allocator) { + this.allocator = allocator; this.maxDictionaryByteSize = maxDictionaryByteSize; this.encodingForDataPage = encodingForDataPage; this.encodingForDictionaryPage = encodingForDictionaryPage; } - protected DictionaryPage dictPage(ValuesWriter dictionaryEncoder) { - return new DictionaryPage(dictionaryEncoder.getBytes(), lastUsedDictionarySize, encodingForDictionaryPage); + protected DictionaryPage dictPage(ValuesWriter dictPageWriter) { + DictionaryPage ret = new DictionaryPage(dictPageWriter.getBytes(), lastUsedDictionarySize, encodingForDictionaryPage); + dictPageWriter.close(); + return ret; } @Override @@ -147,12 +160,12 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req int maxDicId = getDictionarySize() - 1; if (DEBUG) LOG.debug("max dic id " + maxDicId); int bitWidth = BytesUtils.getWidthFromMaxInt(maxDicId); - int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_INITIAL_SLAB_SIZE, maxDictionaryByteSize, 10); RunLengthBitPackingHybridEncoder encoder = - new RunLengthBitPackingHybridEncoder(bitWidth, initialSlabSize, maxDictionaryByteSize); + new RunLengthBitPackingHybridEncoder(bitWidth, initialSlabSize, maxDictionaryByteSize, this.allocator); + encoders.add(encoder); IntIterator iterator = encodedValues.iterator(); try { while (iterator.hasNext()) { @@ -179,10 +192,20 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req @Override public void reset() { + close(); encodedValues = new IntList(); } @Override + public void close() { + encodedValues = null; + for (RunLengthBitPackingHybridEncoder encoder : encoders) { + encoder.close(); + } + encoders.clear(); + } + + @Override public void resetDictionary() { lastUsedDictionaryByteSize = 0; lastUsedDictionarySize = 0; @@ -225,8 +248,8 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req /** * @param maxDictionaryByteSize */ - public PlainBinaryDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) { - super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage); + public PlainBinaryDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage, ByteBufferAllocator allocator) { + super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage, allocator); binaryDictionaryContent.defaultReturnValue(-1); } @@ -243,10 +266,10 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req } @Override - public DictionaryPage createDictionaryPage() { + public DictionaryPage toDictPageAndClose() { if (lastUsedDictionarySize > 0) { // return a dictionary only if we actually used it - PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize); + PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize, allocator); Iterator<Binary> binaryIterator = binaryDictionaryContent.keySet().iterator(); // write only the part of the dict that we used for (int i = 0; i < lastUsedDictionarySize; i++) { @@ -294,10 +317,9 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req /** * @param maxDictionaryByteSize - * @param initialSize */ - public PlainFixedLenArrayDictionaryValuesWriter(int maxDictionaryByteSize, int length, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) { - super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage); + public PlainFixedLenArrayDictionaryValuesWriter(int maxDictionaryByteSize, int length, Encoding encodingForDataPage, Encoding encodingForDictionaryPage, ByteBufferAllocator allocator) { + super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage, allocator); this.length = length; } @@ -313,10 +335,10 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req } @Override - public DictionaryPage createDictionaryPage() { + public DictionaryPage toDictPageAndClose() { if (lastUsedDictionarySize > 0) { // return a dictionary only if we actually used it - FixedLenByteArrayPlainValuesWriter dictionaryEncoder = new FixedLenByteArrayPlainValuesWriter(length, lastUsedDictionaryByteSize, maxDictionaryByteSize); + FixedLenByteArrayPlainValuesWriter dictionaryEncoder = new FixedLenByteArrayPlainValuesWriter(length, lastUsedDictionaryByteSize, maxDictionaryByteSize, allocator); Iterator<Binary> binaryIterator = binaryDictionaryContent.keySet().iterator(); // write only the part of the dict that we used for (int i = 0; i < lastUsedDictionarySize; i++) { @@ -339,10 +361,9 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req /** * @param maxDictionaryByteSize - * @param initialSize */ - public PlainLongDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) { - super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage); + public PlainLongDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage, ByteBufferAllocator allocator) { + super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage, allocator); longDictionaryContent.defaultReturnValue(-1); } @@ -358,10 +379,10 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req } @Override - public DictionaryPage createDictionaryPage() { + public DictionaryPage toDictPageAndClose() { if (lastUsedDictionarySize > 0) { // return a dictionary only if we actually used it - PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize); + PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize, allocator); LongIterator longIterator = longDictionaryContent.keySet().iterator(); // write only the part of the dict that we used for (int i = 0; i < lastUsedDictionarySize; i++) { @@ -411,10 +432,9 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req /** * @param maxDictionaryByteSize - * @param initialSize */ - public PlainDoubleDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) { - super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage); + public PlainDoubleDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage, ByteBufferAllocator allocator) { + super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage, allocator); doubleDictionaryContent.defaultReturnValue(-1); } @@ -430,10 +450,10 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req } @Override - public DictionaryPage createDictionaryPage() { + public DictionaryPage toDictPageAndClose() { if (lastUsedDictionarySize > 0) { // return a dictionary only if we actually used it - PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize); + PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize, allocator); DoubleIterator doubleIterator = doubleDictionaryContent.keySet().iterator(); // write only the part of the dict that we used for (int i = 0; i < lastUsedDictionarySize; i++) { @@ -483,10 +503,9 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req /** * @param maxDictionaryByteSize - * @param initialSize */ - public PlainIntegerDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) { - super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage); + public PlainIntegerDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage, ByteBufferAllocator allocator) { + super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage, allocator); intDictionaryContent.defaultReturnValue(-1); } @@ -502,10 +521,10 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req } @Override - public DictionaryPage createDictionaryPage() { + public DictionaryPage toDictPageAndClose() { if (lastUsedDictionarySize > 0) { // return a dictionary only if we actually used it - PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize); + PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize, allocator); it.unimi.dsi.fastutil.ints.IntIterator intIterator = intDictionaryContent.keySet().iterator(); // write only the part of the dict that we used for (int i = 0; i < lastUsedDictionarySize; i++) { @@ -555,10 +574,9 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req /** * @param maxDictionaryByteSize - * @param initialSize */ - public PlainFloatDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) { - super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage); + public PlainFloatDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage, ByteBufferAllocator allocator) { + super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage, allocator); floatDictionaryContent.defaultReturnValue(-1); } @@ -574,10 +592,10 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req } @Override - public DictionaryPage createDictionaryPage() { + public DictionaryPage toDictPageAndClose() { if (lastUsedDictionarySize > 0) { // return a dictionary only if we actually used it - PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize); + PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize, allocator); FloatIterator floatIterator = floatDictionaryContent.keySet().iterator(); // write only the part of the dict that we used for (int i = 0; i < lastUsedDictionarySize; i++) {
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java index e671310..0fa6cc6 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java @@ -23,6 +23,7 @@ import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY; import static org.apache.parquet.column.Encoding.PLAIN; import java.io.IOException; +import java.nio.ByteBuffer; import org.apache.parquet.Preconditions; import org.apache.parquet.column.Dictionary; @@ -86,9 +87,10 @@ public abstract class PlainValuesDictionary extends Dictionary { */ public PlainBinaryDictionary(DictionaryPage dictionaryPage, Integer length) throws IOException { super(dictionaryPage); - final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray(); + final ByteBuffer dictionaryBytes = dictionaryPage.getBytes().toByteBuffer(); binaryDictionaryContent = new Binary[dictionaryPage.getDictionarySize()]; - int offset = 0; + // dictionary values are stored in order: size (4 bytes LE) followed by {size} bytes + int offset = dictionaryBytes.position(); if (length == null) { // dictionary values are stored in order: size (4 bytes LE) followed by {size} bytes for (int i = 0; i < binaryDictionaryContent.length; i++) { @@ -96,7 +98,7 @@ public abstract class PlainValuesDictionary extends Dictionary { // read the length offset += 4; // wrap the content in a binary - binaryDictionaryContent[i] = Binary.fromConstantByteArray(dictionaryBytes, offset, len); + binaryDictionaryContent[i] = Binary.fromConstantByteBuffer(dictionaryBytes, offset, len); // increment to the next value offset += len; } @@ -106,7 +108,7 @@ public abstract class PlainValuesDictionary extends Dictionary { "Invalid byte array length: " + length); for (int i = 0; i < binaryDictionaryContent.length; i++) { // wrap the content in a Binary - binaryDictionaryContent[i] = Binary.fromConstantByteArray( + binaryDictionaryContent[i] = Binary.fromConstantByteBuffer( dictionaryBytes, offset, length); // increment to the next value offset += length; @@ -148,10 +150,10 @@ public abstract class PlainValuesDictionary extends Dictionary { */ public PlainLongDictionary(DictionaryPage dictionaryPage) throws IOException { super(dictionaryPage); - final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray(); + final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer(); longDictionaryContent = new long[dictionaryPage.getDictionarySize()]; LongPlainValuesReader longReader = new LongPlainValuesReader(); - longReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryBytes, 0); + longReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, dictionaryByteBuf.position()); for (int i = 0; i < longDictionaryContent.length; i++) { longDictionaryContent[i] = longReader.readLong(); } @@ -191,10 +193,10 @@ public abstract class PlainValuesDictionary extends Dictionary { */ public PlainDoubleDictionary(DictionaryPage dictionaryPage) throws IOException { super(dictionaryPage); - final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray(); + final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer(); doubleDictionaryContent = new double[dictionaryPage.getDictionarySize()]; DoublePlainValuesReader doubleReader = new DoublePlainValuesReader(); - doubleReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryBytes, 0); + doubleReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, 0); for (int i = 0; i < doubleDictionaryContent.length; i++) { doubleDictionaryContent[i] = doubleReader.readDouble(); } @@ -234,10 +236,10 @@ public abstract class PlainValuesDictionary extends Dictionary { */ public PlainIntegerDictionary(DictionaryPage dictionaryPage) throws IOException { super(dictionaryPage); - final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray(); + final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer(); intDictionaryContent = new int[dictionaryPage.getDictionarySize()]; IntegerPlainValuesReader intReader = new IntegerPlainValuesReader(); - intReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryBytes, 0); + intReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, 0); for (int i = 0; i < intDictionaryContent.length; i++) { intDictionaryContent[i] = intReader.readInteger(); } @@ -277,10 +279,10 @@ public abstract class PlainValuesDictionary extends Dictionary { */ public PlainFloatDictionary(DictionaryPage dictionaryPage) throws IOException { super(dictionaryPage); - final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray(); + final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer(); floatDictionaryContent = new float[dictionaryPage.getDictionarySize()]; FloatPlainValuesReader floatReader = new FloatPlainValuesReader(); - floatReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryBytes, 0); + floatReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, dictionaryByteBuf.position()); for (int i = 0; i < floatDictionaryContent.length; i++) { floatDictionaryContent[i] = floatReader.readFloat(); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java index f66c7c9..19fed7d 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java @@ -97,11 +97,17 @@ public class FallbackValuesWriter<I extends ValuesWriter & RequiresFallback, F e currentWriter.reset(); } - public DictionaryPage createDictionaryPage() { + @Override + public void close() { + initialWriter.close(); + fallBackWriter.close(); + } + + public DictionaryPage toDictPageAndClose() { if (initialUsedAndHadDictionary) { - return initialWriter.createDictionaryPage(); + return initialWriter.toDictPageAndClose(); } else { - return currentWriter.createDictionaryPage(); + return currentWriter.toDictPageAndClose(); } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java index 4346e02..26f5e29 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java @@ -21,6 +21,7 @@ package org.apache.parquet.column.values.plain; import static org.apache.parquet.Log.DEBUG; import java.io.IOException; +import java.nio.ByteBuffer; import org.apache.parquet.Log; import org.apache.parquet.bytes.BytesUtils; @@ -30,7 +31,7 @@ import org.apache.parquet.io.api.Binary; public class BinaryPlainValuesReader extends ValuesReader { private static final Log LOG = Log.getLog(BinaryPlainValuesReader.class); - private byte[] in; + private ByteBuffer in; private int offset; @Override @@ -39,7 +40,7 @@ public class BinaryPlainValuesReader extends ValuesReader { int length = BytesUtils.readIntLittleEndian(in, offset); int start = offset + 4; offset = start + length; - return Binary.fromConstantByteArray(in, start, length); + return Binary.fromConstantByteBuffer(in, start, length); } catch (IOException e) { throw new ParquetDecodingException("could not read bytes at offset " + offset, e); } catch (RuntimeException e) { @@ -60,11 +61,10 @@ public class BinaryPlainValuesReader extends ValuesReader { } @Override - public void initFromPage(int valueCount, byte[] in, int offset) + public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException { - if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset)); + if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.limit() - offset)); this.in = in; this.offset = offset; } - } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java index 31e711f..c330490 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java @@ -22,6 +22,7 @@ import static org.apache.parquet.Log.DEBUG; import static org.apache.parquet.column.values.bitpacking.Packer.LITTLE_ENDIAN; import java.io.IOException; +import java.nio.ByteBuffer; import org.apache.parquet.Log; import org.apache.parquet.column.values.ValuesReader; @@ -62,8 +63,8 @@ public class BooleanPlainValuesReader extends ValuesReader { * @see org.apache.parquet.column.values.ValuesReader#initFromPage(byte[], int) */ @Override - public void initFromPage(int valueCount, byte[] in, int offset) throws IOException { - if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset)); + public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException { + if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.limit() - offset)); this.in.initFromPage(valueCount, in, offset); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java index 78920f0..c3e88ea 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java @@ -18,6 +18,7 @@ */ package org.apache.parquet.column.values.plain; + import static org.apache.parquet.column.Encoding.PLAIN; import static org.apache.parquet.column.values.bitpacking.Packer.LITTLE_ENDIAN; import org.apache.parquet.bytes.BytesInput; @@ -61,6 +62,11 @@ public class BooleanPlainValuesWriter extends ValuesWriter { } @Override + public void close() { + bitPackingWriter.close(); + } + + @Override public long getAllocatedSize() { return bitPackingWriter.getAllocatedSize(); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java index 098a486..8496e7e 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java @@ -19,6 +19,7 @@ package org.apache.parquet.column.values.plain; import java.io.IOException; +import java.nio.ByteBuffer; import org.apache.parquet.Log; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.io.ParquetDecodingException; @@ -33,7 +34,7 @@ import static org.apache.parquet.Log.DEBUG; */ public class FixedLenByteArrayPlainValuesReader extends ValuesReader { private static final Log LOG = Log.getLog(FixedLenByteArrayPlainValuesReader.class); - private byte[] in; + private ByteBuffer in; private int offset; private int length; @@ -46,7 +47,7 @@ public class FixedLenByteArrayPlainValuesReader extends ValuesReader { try { int start = offset; offset = start + length; - return Binary.fromConstantByteArray(in, start, length); + return Binary.fromConstantByteBuffer(in, start, length); } catch (RuntimeException e) { throw new ParquetDecodingException("could not read bytes at offset " + offset, e); } @@ -58,9 +59,9 @@ public class FixedLenByteArrayPlainValuesReader extends ValuesReader { } @Override - public void initFromPage(int valueCount, byte[] in, int offset) + public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException { - if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset)); + if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.limit() - offset)); this.in = in; this.offset = offset; } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java index 986ae0b..6ab2dea 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java @@ -20,6 +20,7 @@ package org.apache.parquet.column.values.plain; import java.io.IOException; +import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.Log; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.CapacityByteArrayOutputStream; @@ -40,10 +41,13 @@ public class FixedLenByteArrayPlainValuesWriter extends ValuesWriter { private CapacityByteArrayOutputStream arrayOut; private LittleEndianDataOutputStream out; private int length; + private ByteBufferAllocator allocator; + - public FixedLenByteArrayPlainValuesWriter(int length, int initialSize, int pageSize) { + public FixedLenByteArrayPlainValuesWriter(int length, int initialSize, int pageSize, ByteBufferAllocator allocator) { this.length = length; - this.arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize); + this.allocator = allocator; + this.arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize, this.allocator); this.out = new LittleEndianDataOutputStream(arrayOut); } @@ -82,6 +86,11 @@ public class FixedLenByteArrayPlainValuesWriter extends ValuesWriter { } @Override + public void close() { + arrayOut.close(); + } + + @Override public long getAllocatedSize() { return arrayOut.getCapacity(); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java index bd938ee..c8fb303 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java @@ -20,9 +20,10 @@ package org.apache.parquet.column.values.plain; import static org.apache.parquet.Log.DEBUG; -import java.io.ByteArrayInputStream; import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.Log; import org.apache.parquet.bytes.LittleEndianDataInputStream; import org.apache.parquet.column.values.ValuesReader; @@ -41,12 +42,16 @@ abstract public class PlainValuesReader extends ValuesReader { /** * {@inheritDoc} - * @see org.apache.parquet.column.values.ValuesReader#initFromPage(byte[], int) + * @see org.apache.parquet.column.values.ValuesReader#initFromPage(int, ByteBuffer, int) */ @Override - public void initFromPage(int valueCount, byte[] in, int offset) throws IOException { - if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset)); - this.in = new LittleEndianDataInputStream(new ByteArrayInputStream(in, offset, in.length - offset)); + public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException { + if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.limit() - offset)); + this.in = new LittleEndianDataInputStream(toInputStream(in, offset)); + } + + private ByteBufferInputStream toInputStream(ByteBuffer in, int offset) { + return new ByteBufferInputStream(in.duplicate(), offset, in.limit() - offset); } public static class DoublePlainValuesReader extends PlainValuesReader { http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java index f33bd81..add5495 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java @@ -21,6 +21,7 @@ package org.apache.parquet.column.values.plain; import java.io.IOException; import java.nio.charset.Charset; +import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.Log; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.CapacityByteArrayOutputStream; @@ -44,8 +45,8 @@ public class PlainValuesWriter extends ValuesWriter { private CapacityByteArrayOutputStream arrayOut; private LittleEndianDataOutputStream out; - public PlainValuesWriter(int initialSize, int pageSize) { - arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize); + public PlainValuesWriter(int initialSize, int pageSize, ByteBufferAllocator allocator) { + arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize, allocator); out = new LittleEndianDataOutputStream(arrayOut); } @@ -126,6 +127,12 @@ public class PlainValuesWriter extends ValuesWriter { } @Override + public void close() { + arrayOut.close(); + out.close(); + } + + @Override public long getAllocatedSize() { return arrayOut.getCapacity(); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java index 38eb354..1280e8d 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java @@ -20,10 +20,12 @@ package org.apache.parquet.column.values.rle; import static org.apache.parquet.Log.DEBUG; -import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.Log; import org.apache.parquet.Preconditions; import org.apache.parquet.bytes.BytesUtils; @@ -43,14 +45,14 @@ public class RunLengthBitPackingHybridDecoder { private final int bitWidth; private final BytePacker packer; - private final ByteArrayInputStream in; + private final InputStream in; private MODE mode; private int currentCount; private int currentValue; private int[] currentBuffer; - public RunLengthBitPackingHybridDecoder(int bitWidth, ByteArrayInputStream in) { + public RunLengthBitPackingHybridDecoder(int bitWidth, InputStream in) { if (DEBUG) LOG.debug("decoding bitWidth " + bitWidth); Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32"); http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java index 9d37574..001d3f6 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java @@ -20,6 +20,7 @@ package org.apache.parquet.column.values.rle; import java.io.IOException; +import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.Log; import org.apache.parquet.Preconditions; import org.apache.parquet.bytes.BytesInput; @@ -116,7 +117,7 @@ public class RunLengthBitPackingHybridEncoder { private boolean toBytesCalled; - public RunLengthBitPackingHybridEncoder(int bitWidth, int initialCapacity, int pageSize) { + public RunLengthBitPackingHybridEncoder(int bitWidth, int initialCapacity, int pageSize, ByteBufferAllocator allocator) { if (DEBUG) { LOG.debug(String.format("Encoding: RunLengthBitPackingHybridEncoder with " + "bithWidth: %d initialCapacity %d", bitWidth, initialCapacity)); @@ -125,7 +126,7 @@ public class RunLengthBitPackingHybridEncoder { Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32"); this.bitWidth = bitWidth; - this.baos = new CapacityByteArrayOutputStream(initialCapacity, pageSize); + this.baos = new CapacityByteArrayOutputStream(initialCapacity, pageSize, allocator); this.packBuffer = new byte[bitWidth]; this.bufferedValues = new int[8]; this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth); @@ -281,6 +282,11 @@ public class RunLengthBitPackingHybridEncoder { reset(true); } + public void close() { + reset(false); + baos.close(); + } + public long getBufferedSize() { return baos.size(); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java index bd4e11d..4ccf2b8 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java @@ -18,9 +18,10 @@ */ package org.apache.parquet.column.values.rle; -import java.io.ByteArrayInputStream; import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.io.ParquetDecodingException; @@ -41,8 +42,8 @@ public class RunLengthBitPackingHybridValuesReader extends ValuesReader { } @Override - public void initFromPage(int valueCountL, byte[] page, int offset) throws IOException { - ByteArrayInputStream in = new ByteArrayInputStream(page, offset, page.length - offset); + public void initFromPage(int valueCountL, ByteBuffer page, int offset) throws IOException { + ByteBufferInputStream in = new ByteBufferInputStream(page, offset, page.limit() - offset); int length = BytesUtils.readIntLittleEndian(in); decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in); http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java index bccfd34..14ef161 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java @@ -20,6 +20,7 @@ package org.apache.parquet.column.values.rle; import java.io.IOException; +import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.Ints; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.Encoding; @@ -32,8 +33,8 @@ import org.apache.parquet.io.ParquetEncodingException; public class RunLengthBitPackingHybridValuesWriter extends ValuesWriter { private final RunLengthBitPackingHybridEncoder encoder; - public RunLengthBitPackingHybridValuesWriter(int bitWidth, int initialCapacity, int pageSize) { - this.encoder = new RunLengthBitPackingHybridEncoder(bitWidth, initialCapacity, pageSize); + public RunLengthBitPackingHybridValuesWriter(int bitWidth, int initialCapacity, int pageSize, ByteBufferAllocator allocator) { + this.encoder = new RunLengthBitPackingHybridEncoder(bitWidth, initialCapacity, pageSize, allocator); } @Override @@ -82,6 +83,11 @@ public class RunLengthBitPackingHybridValuesWriter extends ValuesWriter { } @Override + public void close() { + encoder.close(); + } + + @Override public String memUsageString(String prefix) { return String.format("%s RunLengthBitPackingHybrid %d bytes", prefix, getAllocatedSize()); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java index f88d740..ff833ec 100644 --- a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java +++ b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java @@ -62,12 +62,16 @@ abstract public class Binary implements Comparable<Binary>, Serializable { abstract boolean equals(byte[] bytes, int offset, int length); + abstract boolean equals(ByteBuffer bytes, int offset, int length); + abstract boolean equals(Binary other); abstract public int compareTo(Binary other); abstract int compareTo(byte[] bytes, int offset, int length); + abstract int compareTo(ByteBuffer bytes, int offset, int length); + abstract public ByteBuffer toByteBuffer(); @Override @@ -174,6 +178,11 @@ abstract public class Binary implements Comparable<Binary>, Serializable { } @Override + boolean equals(ByteBuffer bytes, int otherOffset, int otherLength) { + return Binary.equals(value, offset, length, bytes, otherOffset, otherLength); + } + + @Override public int compareTo(Binary other) { return other.compareTo(value, offset, length); } @@ -184,6 +193,11 @@ abstract public class Binary implements Comparable<Binary>, Serializable { } @Override + int compareTo(ByteBuffer bytes, int otherOffset, int otherLength) { + return Binary.compareByteArrayToByteBuffer(value, offset, length, bytes, otherOffset, otherLength); + } + + @Override public ByteBuffer toByteBuffer() { return ByteBuffer.wrap(value, offset, length); } @@ -292,6 +306,11 @@ abstract public class Binary implements Comparable<Binary>, Serializable { } @Override + boolean equals(ByteBuffer bytes, int otherOffset, int otherLength) { + return Binary.equals(value, 0, value.length, bytes, otherOffset, otherLength); + } + + @Override public int compareTo(Binary other) { return other.compareTo(value, 0, value.length); } @@ -302,6 +321,11 @@ abstract public class Binary implements Comparable<Binary>, Serializable { } @Override + int compareTo(ByteBuffer bytes, int otherOffset, int otherLength) { + return Binary.compareByteArrayToByteBuffer(value, 0, value.length, bytes, otherOffset, otherLength); + } + + @Override public ByteBuffer toByteBuffer() { return ByteBuffer.wrap(value); } @@ -330,36 +354,58 @@ abstract public class Binary implements Comparable<Binary>, Serializable { } private static class ByteBufferBackedBinary extends Binary { - private transient ByteBuffer value; - private transient byte[] cachedBytes; + private ByteBuffer value; + private byte[] cachedBytes; + private final int offset; + private final int length; - public ByteBufferBackedBinary(ByteBuffer value, boolean isBackingBytesReused) { + public ByteBufferBackedBinary(ByteBuffer value, int offset, int length, boolean isBackingBytesReused) { this.value = value; + this.offset = offset; + this.length = length; this.isBackingBytesReused = isBackingBytesReused; } @Override public String toStringUsingUTF8() { - return UTF8.decode(value).toString(); + int limit = value.limit(); + value.limit(offset+length); + int position = value.position(); + value.position(offset); + // no corresponding interface to read a subset of a buffer, would have to slice it + // which creates another ByteBuffer object or do what is done here to adjust the + // limit/offset and set them back after + String ret = UTF8.decode(value).toString(); + value.limit(limit); + value.position(position); + return ret; } @Override public int length() { - return value.remaining(); + return length; } @Override public void writeTo(OutputStream out) throws IOException { - // TODO: should not have to materialize those bytes - out.write(getBytesUnsafe()); + if (value.hasArray()) { + out.write(value.array(), value.arrayOffset() + offset, length); + } else { + out.write(getBytesUnsafe(), 0, length); + } } @Override public byte[] getBytes() { - byte[] bytes = new byte[value.remaining()]; + byte[] bytes = new byte[length]; - value.mark(); - value.get(bytes).reset(); + int limit = value.limit(); + value.limit(offset + length); + int position = value.position(); + value.position(offset); + value.get(bytes); + value.limit(limit); + value.position(position); if (!isBackingBytesReused) { // backing buffer might change cachedBytes = bytes; } @@ -375,60 +421,68 @@ abstract public class Binary implements Comparable<Binary>, Serializable { public Binary slice(int start, int length) { return Binary.fromConstantByteArray(getBytesUnsafe(), start, length); } - @Override public int hashCode() { if (value.hasArray()) { - return Binary.hashCode(value.array(), value.arrayOffset() + value.position(), - value.arrayOffset() + value.remaining()); + return Binary.hashCode(value.array(), value.arrayOffset() + offset, length); + } else { + return Binary.hashCode(value, offset, length); } - byte[] bytes = getBytesUnsafe(); - return Binary.hashCode(bytes, 0, bytes.length); } @Override boolean equals(Binary other) { if (value.hasArray()) { - return other.equals(value.array(), value.arrayOffset() + value.position(), - value.arrayOffset() + value.remaining()); + return other.equals(value.array(), value.arrayOffset() + offset, length); + } else { + return other.equals(value, offset, length); } - byte[] bytes = getBytesUnsafe(); - return other.equals(bytes, 0, bytes.length); } @Override boolean equals(byte[] other, int otherOffset, int otherLength) { if (value.hasArray()) { - return Binary.equals(value.array(), value.arrayOffset() + value.position(), - value.arrayOffset() + value.remaining(), other, otherOffset, otherLength); + return Binary.equals(value.array(), value.arrayOffset() + offset, length, other, otherOffset, otherLength); + } else { + return Binary.equals(other, otherOffset, otherLength, value, offset, length); } - byte[] bytes = getBytesUnsafe(); - return Binary.equals(bytes, 0, bytes.length, other, otherOffset, otherLength); + } + + @Override + boolean equals(ByteBuffer otherBytes, int otherOffset, int otherLength) { + return Binary.equals(value, 0, length, otherBytes, otherOffset, otherLength); } @Override public int compareTo(Binary other) { if (value.hasArray()) { - return other.compareTo(value.array(), value.arrayOffset() + value.position(), - value.arrayOffset() + value.remaining()); + return other.compareTo(value.array(), value.arrayOffset() + offset, length); + } else { + return other.compareTo(value, offset, length); } - byte[] bytes = getBytesUnsafe(); - return other.compareTo(bytes, 0, bytes.length); } @Override int compareTo(byte[] other, int otherOffset, int otherLength) { if (value.hasArray()) { - return Binary.compareTwoByteArrays(value.array(), value.arrayOffset() + value.position(), - value.arrayOffset() + value.remaining(), other, otherOffset, otherLength); + return Binary.compareTwoByteArrays(value.array(), value.arrayOffset() + offset, length, + other, otherOffset, otherLength); + } { + return Binary.compareByteBufferToByteArray(value, offset, length, other, otherOffset, otherLength); } - byte[] bytes = getBytesUnsafe(); - return Binary.compareTwoByteArrays(bytes, 0, bytes.length, other, otherOffset, otherLength); + } + + @Override + int compareTo(ByteBuffer bytes, int otherOffset, int otherLength) { + return Binary.compareTwoByteBuffers(value, offset, length, bytes, otherOffset, otherLength); } @Override public ByteBuffer toByteBuffer() { - return value; + ByteBuffer ret = value.slice(); + ret.position(offset); + ret.limit(offset + length); + return ret; } @Override @@ -456,12 +510,20 @@ abstract public class Binary implements Comparable<Binary>, Serializable { } + public static Binary fromReusedByteBuffer(final ByteBuffer value, int offset, int length) { + return new ByteBufferBackedBinary(value, offset, length, true); + } + + public static Binary fromConstantByteBuffer(final ByteBuffer value, int offset, int length) { + return new ByteBufferBackedBinary(value, offset, length, false); + } + public static Binary fromReusedByteBuffer(final ByteBuffer value) { - return new ByteBufferBackedBinary(value, true); + return new ByteBufferBackedBinary(value, value.position(), value.remaining(), true); } public static Binary fromConstantByteBuffer(final ByteBuffer value) { - return new ByteBufferBackedBinary(value, false); + return new ByteBufferBackedBinary(value, value.position(), value.remaining(), false); } @Deprecated @@ -492,6 +554,39 @@ abstract public class Binary implements Comparable<Binary>, Serializable { return result; } + private static final int hashCode(ByteBuffer buf, int offset, int length) { + int result = 1; + for (int i = offset; i < offset + length; i++) { + byte b = buf.get(i); + result = 31 * result + b; + } + return result; + } + + private static final boolean equals(ByteBuffer buf1, int offset1, int length1, ByteBuffer buf2, int offset2, int length2) { + if (buf1 == null && buf2 == null) return true; + if (buf1 == null || buf2 == null) return false; + if (length1 != length2) return false; + for (int i = 0; i < length1; i++) { + if (buf1.get(i + offset1) != buf2.get(i + offset2)) { + return false; + } + } + return true; + } + + private static final boolean equals(byte[] array1, int offset1, int length1, ByteBuffer buf, int offset2, int length2) { + if (array1 == null && buf == null) return true; + if (array1 == null || buf == null) return false; + if (length1 != length2) return false; + for (int i = 0; i < length1; i++) { + if (array1[i + offset1] != buf.get(i + offset2)) { + return false; + } + } + return true; + } + /** * @see {@link Arrays#equals(byte[], byte[])} * @param array1 @@ -515,6 +610,47 @@ abstract public class Binary implements Comparable<Binary>, Serializable { return true; } + private static final int compareByteBufferToByteArray(ByteBuffer buf, int offset1, int length1, + byte[] array, int offset2, int length2) { + return -1 * Binary.compareByteArrayToByteBuffer(array, offset1, length1, buf, offset2, length2); + } + + private static final int compareByteArrayToByteBuffer(byte[] array1, int offset1, int length1, + ByteBuffer buf, int offset2, int length2) { + if (array1 == null && buf == null) return 0; + int min_length = (length1 < length2) ? length1 : length2; + for (int i = 0; i < min_length; i++) { + if (array1[i + offset1] < buf.get(i + offset2)) { + return 1; + } + if (array1[i + offset1] > buf.get(i + offset2)) { + return -1; + } + } + // check remainder + if (length1 == length2) { return 0; } + else if (length1 < length2) { return 1;} + else { return -1; } + } + + private static final int compareTwoByteBuffers(ByteBuffer buf1, int offset1, int length1, + ByteBuffer buf2, int offset2, int length2) { + if (buf1 == null && buf2 == null) return 0; + int min_length = (length1 < length2) ? length1 : length2; + for (int i = 0; i < min_length; i++) { + if (buf1.get(i + offset1) < buf2.get(i + offset2)) { + return 1; + } + if (buf1.get(i + offset1) > buf2.get(i + offset2)) { + return -1; + } + } + // check remainder + if (length1 == length2) { return 0; } + else if (length1 < length2) { return 1;} + else { return -1; } + } + private static final int compareTwoByteArrays(byte[] array1, int offset1, int length1, byte[] array2, int offset2, int length2) { if (array1 == null && array2 == null) return 0; http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java index 7988f4a..5c6e460 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java @@ -333,10 +333,9 @@ public final class PrimitiveType extends Type { * @param decimalMeta (optional) metadata about the decimal type * @param id the id of the field */ - PrimitiveType( - Repetition repetition, PrimitiveTypeName primitive, - int length, String name, OriginalType originalType, - DecimalMetadata decimalMeta, ID id) { + public PrimitiveType(Repetition repetition, PrimitiveTypeName primitive, + int length, String name, OriginalType originalType, + DecimalMetadata decimalMeta, ID id) { super(name, repetition, originalType, id); this.primitive = primitive; this.length = length; http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java index a1820e6..6792361 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java @@ -25,6 +25,7 @@ import java.util.List; import org.apache.parquet.Version; import org.apache.parquet.VersionParser; +import org.apache.parquet.bytes.HeapByteBufferAllocator; import org.junit.Test; import org.apache.parquet.column.ColumnDescriptor; http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java index 0327948..9bb2759 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java @@ -36,8 +36,10 @@ import org.apache.parquet.io.api.PrimitiveConverter; import org.apache.parquet.schema.PrimitiveType; import org.junit.Assert; import org.junit.Test; +import org.apache.parquet.bytes.HeapByteBufferAllocator; import java.lang.reflect.Field; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -74,16 +76,20 @@ public class TestCorruptDeltaByteArrays { assertFalse(CorruptDeltaByteArrays.requiresSequentialReads(fixed, Encoding.DELTA_BYTE_ARRAY)); } + private DeltaByteArrayWriter getDeltaByteArrayWriter() { + return new DeltaByteArrayWriter(10, 100, new HeapByteBufferAllocator()); + } + @Test public void testReassemblyWithCorruptPage() throws Exception { - DeltaByteArrayWriter writer = new DeltaByteArrayWriter(10, 100); + DeltaByteArrayWriter writer = getDeltaByteArrayWriter(); String lastValue = null; for (int i = 0; i < 10; i += 1) { lastValue = str(i); writer.writeBytes(Binary.fromString(lastValue)); } - byte[] firstPageBytes = writer.getBytes().toByteArray(); + ByteBuffer firstPageBytes = writer.getBytes().toByteBuffer(); writer.reset(); // sets previous to new byte[0] corruptWriter(writer, lastValue); @@ -91,7 +97,7 @@ public class TestCorruptDeltaByteArrays { for (int i = 10; i < 20; i += 1) { writer.writeBytes(Binary.fromString(str(i))); } - byte[] corruptPageBytes = writer.getBytes().toByteArray(); + ByteBuffer corruptPageBytes = writer.getBytes().toByteBuffer(); DeltaByteArrayReader firstPageReader = new DeltaByteArrayReader(); firstPageReader.initFromPage(10, firstPageBytes, 0); @@ -119,19 +125,19 @@ public class TestCorruptDeltaByteArrays { @Test public void testReassemblyWithoutCorruption() throws Exception { - DeltaByteArrayWriter writer = new DeltaByteArrayWriter(10, 100); + DeltaByteArrayWriter writer = getDeltaByteArrayWriter(); for (int i = 0; i < 10; i += 1) { writer.writeBytes(Binary.fromString(str(i))); } - byte[] firstPageBytes = writer.getBytes().toByteArray(); + ByteBuffer firstPageBytes = writer.getBytes().toByteBuffer(); writer.reset(); // sets previous to new byte[0] for (int i = 10; i < 20; i += 1) { writer.writeBytes(Binary.fromString(str(i))); } - byte[] secondPageBytes = writer.getBytes().toByteArray(); + ByteBuffer secondPageBytes = writer.getBytes().toByteBuffer(); DeltaByteArrayReader firstPageReader = new DeltaByteArrayReader(); firstPageReader.initFromPage(10, firstPageBytes, 0); @@ -150,19 +156,19 @@ public class TestCorruptDeltaByteArrays { @Test public void testOldReassemblyWithoutCorruption() throws Exception { - DeltaByteArrayWriter writer = new DeltaByteArrayWriter(10, 100); + DeltaByteArrayWriter writer = getDeltaByteArrayWriter(); for (int i = 0; i < 10; i += 1) { writer.writeBytes(Binary.fromString(str(i))); } - byte[] firstPageBytes = writer.getBytes().toByteArray(); + ByteBuffer firstPageBytes = writer.getBytes().toByteBuffer(); writer.reset(); // sets previous to new byte[0] for (int i = 10; i < 20; i += 1) { writer.writeBytes(Binary.fromString(str(i))); } - byte[] secondPageBytes = writer.getBytes().toByteArray(); + ByteBuffer secondPageBytes = writer.getBytes().toByteBuffer(); DeltaByteArrayReader firstPageReader = new DeltaByteArrayReader(); firstPageReader.initFromPage(10, firstPageBytes, 0); @@ -185,15 +191,16 @@ public class TestCorruptDeltaByteArrays { MemPageStore pages = new MemPageStore(0); PageWriter memWriter = pages.getPageWriter(column); + ParquetProperties parquetProps = new ParquetProperties(0, ParquetProperties.WriterVersion.PARQUET_1_0, false, new HeapByteBufferAllocator()); + // get generic repetition and definition level bytes to use for pages - ValuesWriter rdValues = ParquetProperties - .getColumnDescriptorValuesWriter(0, 10, 100); + ValuesWriter rdValues = parquetProps.getColumnDescriptorValuesWriter(0, 10, 100); for (int i = 0; i < 10; i += 1) { rdValues.writeInteger(0); } // use a byte array backed BytesInput because it is reused BytesInput rd = BytesInput.from(rdValues.getBytes().toByteArray()); - DeltaByteArrayWriter writer = new DeltaByteArrayWriter(10, 100); + DeltaByteArrayWriter writer = getDeltaByteArrayWriter(); String lastValue = null; List<String> values = new ArrayList<String>(); for (int i = 0; i < 10; i += 1) { http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java index 135123f..044fe2a 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java @@ -20,6 +20,7 @@ package org.apache.parquet.column.mem; import static org.junit.Assert.assertEquals; +import org.apache.parquet.bytes.HeapByteBufferAllocator; import org.junit.Test; import org.apache.parquet.Log; @@ -160,6 +161,6 @@ public class TestMemColumn { } private ColumnWriteStoreV1 newColumnWriteStoreImpl(MemPageStore memPageStore) { - return new ColumnWriteStoreV1(memPageStore, 2048, 2048, false, WriterVersion.PARQUET_1_0); + return new ColumnWriteStoreV1(memPageStore, 2048, 2048, false, WriterVersion.PARQUET_1_0, new HeapByteBufferAllocator()); } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java b/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java index d5bfe22..ddab636 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java @@ -109,5 +109,4 @@ public class MemPageWriter implements PageWriter { return String.format("%s %,d bytes", prefix, memSize); } - } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java b/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java index c9a62b4..8caad2b 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java @@ -19,6 +19,7 @@ package org.apache.parquet.column.values; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Random; import org.apache.parquet.io.api.Binary; @@ -61,7 +62,7 @@ public class Utils { public static Binary[] readData(ValuesReader reader, byte[] data, int offset, int length) throws IOException { Binary[] bins = new Binary[length]; - reader.initFromPage(length, data, 0); + reader.initFromPage(length, ByteBuffer.wrap(data), 0); for(int i=0; i < length; i++) { bins[i] = reader.readBytes(); } @@ -76,7 +77,7 @@ public class Utils { public static int[] readInts(ValuesReader reader, byte[] data, int offset, int length) throws IOException { int[] ints = new int[length]; - reader.initFromPage(length, data, offset); + reader.initFromPage(length, ByteBuffer.wrap(data), offset); for(int i=0; i < length; i++) { ints[i] = reader.readInteger(); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java index e74e787..2733b72 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java @@ -20,6 +20,7 @@ package org.apache.parquet.column.values.bitpacking; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.column.values.bitpacking.BitPacking.BitPackingWriter; @@ -87,7 +88,7 @@ public class BitPackingPerfTest { System.out.print(" no gc <"); for (int k = 0; k < N; k++) { long t2 = System.nanoTime(); - r.initFromPage(result.length, bytes, 0); + r.initFromPage(result.length, ByteBuffer.wrap(bytes), 0); for (int i = 0; i < result.length; i++) { result[i] = r.readInteger(); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java index 2f311ec..aef259c 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java @@ -23,9 +23,11 @@ import static org.junit.Assert.assertEquals; import static org.apache.parquet.column.values.bitpacking.Packer.BIG_ENDIAN; import java.io.IOException; +import java.nio.ByteBuffer; import org.junit.Test; +import org.apache.parquet.bytes.DirectByteBufferAllocator; import org.apache.parquet.Log; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.column.values.ValuesWriter; @@ -172,7 +174,7 @@ public class TestBitPackingColumn { LOG.debug("bytes: " + TestBitPacking.toString(bytes)); assertEquals(type.toString(), expected, TestBitPacking.toString(bytes)); ValuesReader r = type.getReader(bound); - r.initFromPage(vals.length, bytes, 0); + r.initFromPage(vals.length, ByteBuffer.wrap(bytes), 0); int[] result = new int[vals.length]; for (int i = 0; i < result.length; i++) { result[i] = r.readInteger(); @@ -188,7 +190,7 @@ public class TestBitPackingColumn { return new BitPackingValuesReader(bound); } public ValuesWriter getWriter(final int bound) { - return new BitPackingValuesWriter(bound, 32*1024, 64*1024); + return new BitPackingValuesWriter(bound, 32*1024, 64*1024, new DirectByteBufferAllocator()); } } , http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/boundedint/TestBoundedColumns.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/boundedint/TestBoundedColumns.java b/parquet-column/src/test/java/org/apache/parquet/column/values/boundedint/TestBoundedColumns.java index ba979b7..d1e43d2 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/boundedint/TestBoundedColumns.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/boundedint/TestBoundedColumns.java @@ -23,11 +23,14 @@ import static org.junit.Assert.assertTrue; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Random; import org.junit.Test; +import org.apache.parquet.bytes.DirectByteBufferAllocator; + public class TestBoundedColumns { private final Random r = new Random(42L); @@ -54,7 +57,7 @@ public class TestBoundedColumns { } private void compareOutput(int bound, int[] ints, String[] result) throws IOException { - BoundedIntValuesWriter bicw = new BoundedIntValuesWriter(bound, 64*1024, 64*1024); + BoundedIntValuesWriter bicw = new BoundedIntValuesWriter(bound, 64*1024, 64*1024, new DirectByteBufferAllocator()); for (int i : ints) { bicw.writeInteger(i); } @@ -63,7 +66,7 @@ public class TestBoundedColumns { byte[] byteArray = bicw.getBytes().toByteArray(); assertEquals(concat(result), toBinaryString(byteArray, 4)); BoundedIntValuesReader bicr = new BoundedIntValuesReader(bound); - bicr.initFromPage(1, byteArray, 0); + bicr.initFromPage(1, ByteBuffer.wrap(byteArray), 0); String expected = ""; String got = ""; for (int i : ints) { @@ -123,7 +126,7 @@ public class TestBoundedColumns { ByteArrayOutputStream tmp = new ByteArrayOutputStream(); int[] stream = new int[totalValuesInStream]; - BoundedIntValuesWriter bicw = new BoundedIntValuesWriter(bound, 64 * 1024, 64*1024); + BoundedIntValuesWriter bicw = new BoundedIntValuesWriter(bound, 64 * 1024, 64*1024, new DirectByteBufferAllocator()); int idx = 0; for (int stripeNum = 0; stripeNum < valuesPerStripe.length; stripeNum++) { int next = 0; @@ -155,7 +158,7 @@ public class TestBoundedColumns { idx = 0; int offset = 0; for (int stripeNum = 0; stripeNum < valuesPerStripe.length; stripeNum++) { - bicr.initFromPage(1, input, offset); + bicr.initFromPage(1, ByteBuffer.wrap(input), offset); offset = bicr.getNextOffset(); for (int i = 0; i < valuesPerStripe[stripeNum]; i++) { int number = stream[idx++]; http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java index d428fbf..6308e47 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java @@ -22,11 +22,13 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Random; import org.junit.Before; import org.junit.Test; +import org.apache.parquet.bytes.DirectByteBufferAllocator; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.values.ValuesWriter; import org.apache.parquet.io.ParquetDecodingException; @@ -42,13 +44,13 @@ public class DeltaBinaryPackingValuesWriterTest { public void setUp() { blockSize = 128; miniBlockNum = 4; - writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 200); + writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 200, new DirectByteBufferAllocator()); random = new Random(); } @Test(expected = IllegalArgumentException.class) public void miniBlockSizeShouldBeMultipleOf8() { - new DeltaBinaryPackingValuesWriter(1281, 4, 100, 100); + new DeltaBinaryPackingValuesWriter(1281, 4, 100, 100, new DirectByteBufferAllocator()); } /* When data size is multiple of Block*/ @@ -154,7 +156,7 @@ public class DeltaBinaryPackingValuesWriterTest { System.arraycopy(valueContent, 0, pageContent, contentOffsetInPage, valueContent.length); //offset should be correct - reader.initFromPage(100, pageContent, contentOffsetInPage); + reader.initFromPage(100, ByteBuffer.wrap(pageContent), contentOffsetInPage); int offset= reader.getNextOffset(); assertEquals(valueContent.length + contentOffsetInPage, offset); @@ -187,7 +189,7 @@ public class DeltaBinaryPackingValuesWriterTest { } writeData(data); reader = new DeltaBinaryPackingValuesReader(); - reader.initFromPage(100, writer.getBytes().toByteArray(), 0); + reader.initFromPage(100, writer.getBytes().toByteBuffer(), 0); for (int i = 0; i < data.length; i++) { if (i % 3 == 0) { reader.skip(); @@ -243,7 +245,7 @@ public class DeltaBinaryPackingValuesWriterTest { + blockFlushed * miniBlockNum //bitWidth of mini blocks + (5.0 * blockFlushed);//min delta for each block assertTrue(estimatedSize >= page.length); - reader.initFromPage(100, page, 0); + reader.initFromPage(100, ByteBuffer.wrap(page), 0); for (int i = 0; i < length; i++) { assertEquals(data[i], reader.readInteger()); http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java index dc69fcc..40f6bfc 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java @@ -19,6 +19,7 @@ package org.apache.parquet.column.values.delta.benchmark; import org.junit.Test; +import org.apache.parquet.bytes.DirectByteBufferAllocator; import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter; import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter; import java.util.Random; @@ -77,8 +78,8 @@ public class BenchmarkIntegerOutputSize { } public void testRandomIntegers(IntFunc func,int bitWidth) { - DeltaBinaryPackingValuesWriter delta=new DeltaBinaryPackingValuesWriter(blockSize,miniBlockNum, 100, 20000); - RunLengthBitPackingHybridValuesWriter rle= new RunLengthBitPackingHybridValuesWriter(bitWidth, 100, 20000); + DeltaBinaryPackingValuesWriter delta=new DeltaBinaryPackingValuesWriter(blockSize,miniBlockNum, 100, 20000, new DirectByteBufferAllocator()); + RunLengthBitPackingHybridValuesWriter rle= new RunLengthBitPackingHybridValuesWriter(bitWidth, 100, 20000, new DirectByteBufferAllocator()); for (int i = 0; i < dataSize; i++) { int v = func.getIntValue(); delta.writeInteger(v); http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java index 24b007f..4ad5dad 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java @@ -25,6 +25,7 @@ import com.carrotsearch.junitbenchmarks.annotation.BenchmarkMethodChart; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; +import org.apache.parquet.bytes.DirectByteBufferAllocator; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.column.values.ValuesWriter; import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader; @@ -33,6 +34,7 @@ import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesReade import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Random; @AxisRange(min = 0, max = 1) @@ -54,8 +56,8 @@ public class BenchmarkReadingRandomIntegers { data[i] = random.nextInt(100) - 200; } - ValuesWriter delta = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 20000); - ValuesWriter rle = new RunLengthBitPackingHybridValuesWriter(32, 100, 20000); + ValuesWriter delta = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 20000, new DirectByteBufferAllocator()); + ValuesWriter rle = new RunLengthBitPackingHybridValuesWriter(32, 100, 20000, new DirectByteBufferAllocator()); for (int i = 0; i < data.length; i++) { delta.writeInteger(data[i]); @@ -86,7 +88,7 @@ public class BenchmarkReadingRandomIntegers { } private void readData(ValuesReader reader, byte[] deltaBytes) throws IOException { - reader.initFromPage(data.length, deltaBytes, 0); + reader.initFromPage(data.length, ByteBuffer.wrap(deltaBytes), 0); for (int i = 0; i < data.length; i++) { reader.readInteger(); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java index 50c97cf..80e6533 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java @@ -25,6 +25,7 @@ import com.carrotsearch.junitbenchmarks.annotation.BenchmarkMethodChart; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; +import org.apache.parquet.bytes.DirectByteBufferAllocator; import org.apache.parquet.column.values.ValuesWriter; import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter; import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter; @@ -50,21 +51,21 @@ public class RandomWritingBenchmarkTest extends BenchMarkTest{ @BenchmarkOptions(benchmarkRounds = 10, warmupRounds = 2) @Test public void writeDeltaPackingTest(){ - DeltaBinaryPackingValuesWriter writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 20000); + DeltaBinaryPackingValuesWriter writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 20000, new DirectByteBufferAllocator()); runWriteTest(writer); } @BenchmarkOptions(benchmarkRounds = 10, warmupRounds = 2) @Test public void writeRLETest(){ - ValuesWriter writer = new RunLengthBitPackingHybridValuesWriter(32, 100, 20000); + ValuesWriter writer = new RunLengthBitPackingHybridValuesWriter(32, 100, 20000, new DirectByteBufferAllocator()); runWriteTest(writer); } @BenchmarkOptions(benchmarkRounds = 10, warmupRounds = 2) @Test public void writeDeltaPackingTest2(){ - DeltaBinaryPackingValuesWriter writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 20000); + DeltaBinaryPackingValuesWriter writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 20000, new DirectByteBufferAllocator()); runWriteTest(writer); } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/SmallRangeWritingBenchmarkTest.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/SmallRangeWritingBenchmarkTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/SmallRangeWritingBenchmarkTest.java index 3141fd7..0dc7cb0 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/SmallRangeWritingBenchmarkTest.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/SmallRangeWritingBenchmarkTest.java @@ -23,6 +23,7 @@ import com.carrotsearch.junitbenchmarks.annotation.AxisRange; import com.carrotsearch.junitbenchmarks.annotation.BenchmarkMethodChart; import org.junit.BeforeClass; import org.junit.Test; +import org.apache.parquet.bytes.DirectByteBufferAllocator; import org.apache.parquet.column.values.ValuesWriter; import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter; import java.util.Random; @@ -42,7 +43,7 @@ public class SmallRangeWritingBenchmarkTest extends RandomWritingBenchmarkTest { @BenchmarkOptions(benchmarkRounds = 10, warmupRounds = 2) @Test public void writeRLEWithSmallBitWidthTest(){ - ValuesWriter writer = new RunLengthBitPackingHybridValuesWriter(2, 100, 20000); + ValuesWriter writer = new RunLengthBitPackingHybridValuesWriter(2, 100, 20000, new DirectByteBufferAllocator()); runWriteTest(writer); } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java index aaae064..d7ebee5 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java @@ -23,6 +23,7 @@ import java.io.IOException; import org.junit.Test; import org.junit.Assert; +import org.apache.parquet.bytes.DirectByteBufferAllocator; import org.apache.parquet.column.values.Utils; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader; @@ -32,11 +33,15 @@ public class TestDeltaLengthByteArray { String[] values = { "parquet", "hadoop", "mapreduce"}; + private DeltaLengthByteArrayValuesWriter getDeltaLengthByteArrayValuesWriter() { + return new DeltaLengthByteArrayValuesWriter(64 * 1024, 64 * 1024, new DirectByteBufferAllocator()); + } + @Test public void testSerialization () throws IOException { - DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64 * 1024, 64 * 1024); + DeltaLengthByteArrayValuesWriter writer = getDeltaLengthByteArrayValuesWriter(); DeltaLengthByteArrayValuesReader reader = new DeltaLengthByteArrayValuesReader(); - + Utils.writeData(writer, values); Binary[] bin = Utils.readData(reader, writer.getBytes().toByteArray(), values.length); @@ -47,7 +52,7 @@ public class TestDeltaLengthByteArray { @Test public void testRandomStrings() throws IOException { - DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64 * 1024, 64 * 1024); + DeltaLengthByteArrayValuesWriter writer = getDeltaLengthByteArrayValuesWriter(); DeltaLengthByteArrayValuesReader reader = new DeltaLengthByteArrayValuesReader(); String[] values = Utils.getRandomStringSamples(1000, 32); @@ -61,7 +66,7 @@ public class TestDeltaLengthByteArray { @Test public void testLengths() throws IOException { - DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64 * 1024, 64 * 1024); + DeltaLengthByteArrayValuesWriter writer = getDeltaLengthByteArrayValuesWriter(); ValuesReader reader = new DeltaBinaryPackingValuesReader(); Utils.writeData(writer, values); http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java index f5f9d76..69c5e15 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java @@ -23,6 +23,7 @@ import java.io.IOException; import org.junit.Rule; import org.junit.Test; +import org.apache.parquet.bytes.DirectByteBufferAllocator; import org.apache.parquet.column.values.Utils; import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesReader; import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesWriter; @@ -47,7 +48,7 @@ public class BenchmarkDeltaLengthByteArray { @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) @Test public void benchmarkRandomStringsWithPlainValuesWriter() throws IOException { - PlainValuesWriter writer = new PlainValuesWriter(64 * 1024, 64 * 1024); + PlainValuesWriter writer = new PlainValuesWriter(64 * 1024, 64 * 1024, new DirectByteBufferAllocator()); BinaryPlainValuesReader reader = new BinaryPlainValuesReader(); Utils.writeData(writer, values); @@ -59,7 +60,7 @@ public class BenchmarkDeltaLengthByteArray { @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) @Test public void benchmarkRandomStringsWithDeltaLengthByteArrayValuesWriter() throws IOException { - DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64 * 1024, 64 * 1024); + DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64 * 1024, 64 * 1024, new DirectByteBufferAllocator()); DeltaLengthByteArrayValuesReader reader = new DeltaLengthByteArrayValuesReader(); Utils.writeData(writer, values);
