http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java b/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java index b62ef84..770f4dc 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java @@ -23,6 +23,7 @@ import java.io.IOException; import org.junit.Test; import org.junit.Assert; +import org.apache.parquet.bytes.DirectByteBufferAllocator; import org.apache.parquet.column.values.Utils; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader; @@ -35,7 +36,7 @@ public class TestDeltaByteArray { @Test public void testSerialization () throws Exception { - DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024); + DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024, new DirectByteBufferAllocator()); DeltaByteArrayReader reader = new DeltaByteArrayReader(); assertReadWrite(writer, reader, values); @@ -43,14 +44,14 @@ public class TestDeltaByteArray { @Test public void testRandomStrings() throws Exception { - DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024); + DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024, new DirectByteBufferAllocator()); DeltaByteArrayReader reader = new DeltaByteArrayReader(); assertReadWrite(writer, reader, randvalues); } @Test public void testLengths() throws IOException { - DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024); + DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024, new DirectByteBufferAllocator()); ValuesReader reader = new DeltaBinaryPackingValuesReader(); Utils.writeData(writer, values); @@ -82,7 +83,7 @@ public class TestDeltaByteArray { @Test public void testWriterReset() throws Exception { - DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024); + DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024, new DirectByteBufferAllocator()); assertReadWrite(writer, new DeltaByteArrayReader(), values);
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java b/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java index c61ef30..eac4bd2 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java @@ -24,6 +24,7 @@ import java.util.Arrays; import org.junit.Rule; import org.junit.Test; +import org.apache.parquet.bytes.DirectByteBufferAllocator; import org.apache.parquet.column.values.Utils; import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader; import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter; @@ -54,7 +55,7 @@ public class BenchmarkDeltaByteArray { @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) @Test public void benchmarkRandomStringsWithPlainValuesWriter() throws IOException { - PlainValuesWriter writer = new PlainValuesWriter(64 * 1024, 64 * 1024); + PlainValuesWriter writer = new PlainValuesWriter(64 * 1024, 64 * 1024, new DirectByteBufferAllocator()); BinaryPlainValuesReader reader = new BinaryPlainValuesReader(); Utils.writeData(writer, values); @@ -66,7 +67,7 @@ public class BenchmarkDeltaByteArray { @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) @Test public void benchmarkRandomStringsWithDeltaLengthByteArrayValuesWriter() throws IOException { - DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024); + DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024, new DirectByteBufferAllocator()); DeltaByteArrayReader reader = new DeltaByteArrayReader(); Utils.writeData(writer, values); @@ -78,7 +79,7 @@ public class BenchmarkDeltaByteArray { @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) @Test public void benchmarkSortedStringsWithPlainValuesWriter() throws IOException { - PlainValuesWriter writer = new PlainValuesWriter(64 * 1024, 64 * 1024); + PlainValuesWriter writer = new PlainValuesWriter(64 * 1024, 64 * 1024, new DirectByteBufferAllocator()); BinaryPlainValuesReader reader = new BinaryPlainValuesReader(); Utils.writeData(writer, sortedVals); @@ -90,7 +91,7 @@ public class BenchmarkDeltaByteArray { @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4) @Test public void benchmarkSortedStringsWithDeltaLengthByteArrayValuesWriter() throws IOException { - DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024); + DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024, new DirectByteBufferAllocator()); DeltaByteArrayReader reader = new DeltaByteArrayReader(); Utils.writeData(writer, sortedVals); http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java b/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java index 020868e..ada1c93 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java @@ -28,10 +28,12 @@ import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; import org.junit.Assert; import org.junit.Test; +import org.apache.parquet.bytes.DirectByteBufferAllocator; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Dictionary; @@ -54,27 +56,27 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; public class TestDictionary { private <I extends DictionaryValuesWriter> FallbackValuesWriter<I, PlainValuesWriter> plainFallBack(I dvw, int initialSize) { - return FallbackValuesWriter.of(dvw, new PlainValuesWriter(initialSize, initialSize * 5)); + return FallbackValuesWriter.of(dvw, new PlainValuesWriter(initialSize, initialSize * 5, new DirectByteBufferAllocator())); } private FallbackValuesWriter<PlainBinaryDictionaryValuesWriter, PlainValuesWriter> newPlainBinaryDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize) { - return plainFallBack(new PlainBinaryDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY), initialSize); + return plainFallBack(new PlainBinaryDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY, new DirectByteBufferAllocator()), initialSize); } private FallbackValuesWriter<PlainLongDictionaryValuesWriter, PlainValuesWriter> newPlainLongDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize) { - return plainFallBack(new PlainLongDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY), initialSize); + return plainFallBack(new PlainLongDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY, new DirectByteBufferAllocator()), initialSize); } private FallbackValuesWriter<PlainIntegerDictionaryValuesWriter, PlainValuesWriter> newPlainIntegerDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize) { - return plainFallBack(new PlainIntegerDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY), initialSize); + return plainFallBack(new PlainIntegerDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY, new DirectByteBufferAllocator()), initialSize); } private FallbackValuesWriter<PlainDoubleDictionaryValuesWriter, PlainValuesWriter> newPlainDoubleDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize) { - return plainFallBack(new PlainDoubleDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY), initialSize); + return plainFallBack(new PlainDoubleDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY, new DirectByteBufferAllocator()), initialSize); } private FallbackValuesWriter<PlainFloatDictionaryValuesWriter, PlainValuesWriter> newPlainFloatDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize) { - return plainFallBack(new PlainFloatDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY), initialSize); + return plainFallBack(new PlainFloatDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY, new DirectByteBufferAllocator()), initialSize); } @Test @@ -116,7 +118,7 @@ public class TestDictionary { //Fallbacked to Plain encoding, therefore use PlainValuesReader to read it back ValuesReader reader = new BinaryPlainValuesReader(); - reader.initFromPage(100, cw.getBytes().toByteArray(), 0); + reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0); for (long i = 0; i < 100; i++) { assertEquals(Binary.fromString("str" + i), reader.readBytes()); @@ -202,13 +204,13 @@ public class TestDictionary { DictionaryValuesReader cr = initDicReader(cw, PrimitiveTypeName.INT64); - cr.initFromPage(COUNT, bytes1.toByteArray(), 0); + cr.initFromPage(COUNT, bytes1.toByteBuffer(), 0); for (long i = 0; i < COUNT; i++) { long back = cr.readLong(); assertEquals(i % 50, back); } - cr.initFromPage(COUNT2, bytes2.toByteArray(), 0); + cr.initFromPage(COUNT2, bytes2.toByteBuffer(), 0); for (long i = COUNT2; i > 0; i--) { long back = cr.readLong(); assertEquals(i % 50, back); @@ -226,7 +228,7 @@ public class TestDictionary { } } - reader.initFromPage(100, cw.getBytes().toByteArray(), 0); + reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0); for (long i = 0; i < 100; i++) { assertEquals(i, reader.readLong()); @@ -272,13 +274,13 @@ public class TestDictionary { final DictionaryValuesReader cr = initDicReader(cw, DOUBLE); - cr.initFromPage(COUNT, bytes1.toByteArray(), 0); + cr.initFromPage(COUNT, bytes1.toByteBuffer(), 0); for (double i = 0; i < COUNT; i++) { double back = cr.readDouble(); assertEquals(i % 50, back, 0.0); } - cr.initFromPage(COUNT2, bytes2.toByteArray(), 0); + cr.initFromPage(COUNT2, bytes2.toByteBuffer(), 0); for (double i = COUNT2; i > 0; i--) { double back = cr.readDouble(); assertEquals(i % 50, back, 0.0); @@ -297,7 +299,7 @@ public class TestDictionary { } } - reader.initFromPage(100, cw.getBytes().toByteArray(), 0); + reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0); for (double i = 0; i < 100; i++) { assertEquals(i, reader.readDouble(), 0.00001); @@ -343,13 +345,13 @@ public class TestDictionary { DictionaryValuesReader cr = initDicReader(cw, INT32); - cr.initFromPage(COUNT, bytes1.toByteArray(), 0); + cr.initFromPage(COUNT, bytes1.toByteBuffer(), 0); for (int i = 0; i < COUNT; i++) { int back = cr.readInteger(); assertEquals(i % 50, back); } - cr.initFromPage(COUNT2, bytes2.toByteArray(), 0); + cr.initFromPage(COUNT2, bytes2.toByteBuffer(), 0); for (int i = COUNT2; i > 0; i--) { int back = cr.readInteger(); assertEquals(i % 50, back); @@ -368,7 +370,7 @@ public class TestDictionary { } } - reader.initFromPage(100, cw.getBytes().toByteArray(), 0); + reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0); for (int i = 0; i < 100; i++) { assertEquals(i, reader.readInteger()); @@ -414,13 +416,13 @@ public class TestDictionary { DictionaryValuesReader cr = initDicReader(cw, FLOAT); - cr.initFromPage(COUNT, bytes1.toByteArray(), 0); + cr.initFromPage(COUNT, bytes1.toByteBuffer(), 0); for (float i = 0; i < COUNT; i++) { float back = cr.readFloat(); assertEquals(i % 50, back, 0.0f); } - cr.initFromPage(COUNT2, bytes2.toByteArray(), 0); + cr.initFromPage(COUNT2, bytes2.toByteBuffer(), 0); for (float i = COUNT2; i > 0; i--) { float back = cr.readFloat(); assertEquals(i % 50, back, 0.0f); @@ -439,7 +441,7 @@ public class TestDictionary { } } - reader.initFromPage(100, cw.getBytes().toByteArray(), 0); + reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0); for (float i = 0; i < 100; i++) { assertEquals(i, reader.readFloat(), 0.00001); @@ -473,14 +475,14 @@ public class TestDictionary { DictionaryValuesReader reader = initDicReader(cw, INT32); // pretend there are 100 nulls. what matters is offset = bytes.length. - byte[] bytes = {0x00, 0x01, 0x02, 0x03}; // data doesn't matter - int offset = bytes.length; + ByteBuffer bytes = ByteBuffer.wrap(new byte[] {0x00, 0x01, 0x02, 0x03}); // data doesn't matter + int offset = bytes.remaining(); reader.initFromPage(100, bytes, offset); } private DictionaryValuesReader initDicReader(ValuesWriter cw, PrimitiveTypeName type) throws IOException { - final DictionaryPage dictionaryPage = cw.createDictionaryPage().copy(); + final DictionaryPage dictionaryPage = cw.toDictPageAndClose().copy(); final ColumnDescriptor descriptor = new ColumnDescriptor(new String[] {"foo"}, type, 0, 0); final Dictionary dictionary = PLAIN.initDictionary(descriptor, dictionaryPage); final DictionaryValuesReader cr = new DictionaryValuesReader(dictionary); @@ -488,14 +490,14 @@ public class TestDictionary { } private void checkDistinct(int COUNT, BytesInput bytes, ValuesReader cr, String prefix) throws IOException { - cr.initFromPage(COUNT, bytes.toByteArray(), 0); + cr.initFromPage(COUNT, bytes.toByteBuffer(), 0); for (int i = 0; i < COUNT; i++) { Assert.assertEquals(prefix + i, cr.readBytes().toStringUsingUTF8()); } } private void checkRepeated(int COUNT, BytesInput bytes, ValuesReader cr, String prefix) throws IOException { - cr.initFromPage(COUNT, bytes.toByteArray(), 0); + cr.initFromPage(COUNT, bytes.toByteBuffer(), 0); for (int i = 0; i < COUNT; i++) { Assert.assertEquals(prefix + i % 10, cr.readBytes().toStringUsingUTF8()); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java index 707a507..712fb27 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java @@ -18,9 +18,11 @@ */ package org.apache.parquet.column.values.rle; -import java.io.ByteArrayInputStream; +import java.nio.ByteBuffer; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.junit.Test; +import org.apache.parquet.bytes.DirectByteBufferAllocator; import static org.junit.Assert.assertEquals; @@ -39,7 +41,7 @@ public class RunLengthBitPackingHybridIntegrationTest { private void doIntegrationTest(int bitWidth) throws Exception { long modValue = 1L << bitWidth; - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(bitWidth, 1000, 64000); + RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(bitWidth, 1000, 64000, new DirectByteBufferAllocator()); int numValues = 0; for (int i = 0; i < 100; i++) { @@ -69,8 +71,8 @@ public class RunLengthBitPackingHybridIntegrationTest { } numValues += 1000; - byte[] encodedBytes = encoder.toBytes().toByteArray(); - ByteArrayInputStream in = new ByteArrayInputStream(encodedBytes); + ByteBuffer encodedBytes = encoder.toBytes().toByteBuffer(); + ByteBufferInputStream in = new ByteBufferInputStream(encodedBytes); RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in); http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java index 06664de..5696d7b 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java @@ -21,12 +21,15 @@ package org.apache.parquet.column.values.rle; import static org.junit.Assert.assertEquals; import java.io.ByteArrayInputStream; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.junit.Test; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.DirectByteBufferAllocator; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.values.bitpacking.BytePacker; import org.apache.parquet.column.values.bitpacking.Packer; @@ -36,9 +39,19 @@ import org.apache.parquet.column.values.bitpacking.Packer; */ public class TestRunLengthBitPackingHybridEncoder { + private RunLengthBitPackingHybridEncoder getRunLengthBitPackingHybridEncoder() { + return getRunLengthBitPackingHybridEncoder(3, 5, 10); + } + + private RunLengthBitPackingHybridEncoder getRunLengthBitPackingHybridEncoder( + int bitWidth, int initialCapacity, int pageSize) { + return new RunLengthBitPackingHybridEncoder(bitWidth, initialCapacity, + pageSize, new DirectByteBufferAllocator()); + } + @Test public void testRLEOnly() throws Exception { - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5, 10); + RunLengthBitPackingHybridEncoder encoder = getRunLengthBitPackingHybridEncoder(); for (int i = 0; i < 100; i++) { encoder.writeInt(4); } @@ -68,7 +81,7 @@ public class TestRunLengthBitPackingHybridEncoder { // make sure that repeated 0s at the beginning // of the stream don't trip up the repeat count - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5, 10); + RunLengthBitPackingHybridEncoder encoder = getRunLengthBitPackingHybridEncoder(); for (int i = 0; i < 10; i++) { encoder.writeInt(0); } @@ -86,7 +99,7 @@ public class TestRunLengthBitPackingHybridEncoder { @Test public void testBitWidthZero() throws Exception { - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(0, 5, 10); + RunLengthBitPackingHybridEncoder encoder = getRunLengthBitPackingHybridEncoder(0, 5, 10); for (int i = 0; i < 10; i++) { encoder.writeInt(0); } @@ -102,8 +115,7 @@ public class TestRunLengthBitPackingHybridEncoder { @Test public void testBitPackingOnly() throws Exception { - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5, 10); - + RunLengthBitPackingHybridEncoder encoder = getRunLengthBitPackingHybridEncoder(); for (int i = 0; i < 100; i++) { encoder.writeInt(i % 3); } @@ -125,7 +137,7 @@ public class TestRunLengthBitPackingHybridEncoder { @Test public void testBitPackingOverflow() throws Exception { - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5, 10); + RunLengthBitPackingHybridEncoder encoder = getRunLengthBitPackingHybridEncoder(); for (int i = 0; i < 1000; i++) { encoder.writeInt(i % 3); @@ -157,7 +169,7 @@ public class TestRunLengthBitPackingHybridEncoder { @Test public void testTransitionFromBitPackingToRle() throws Exception { - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5, 10); + RunLengthBitPackingHybridEncoder encoder = getRunLengthBitPackingHybridEncoder(); // 5 obviously bit-packed values encoder.writeInt(0); @@ -195,7 +207,7 @@ public class TestRunLengthBitPackingHybridEncoder { @Test public void testPaddingZerosOnUnfinishedBitPackedRuns() throws Exception { - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(5, 5, 10); + RunLengthBitPackingHybridEncoder encoder = getRunLengthBitPackingHybridEncoder(5, 5, 10); for (int i = 0; i < 9; i++) { encoder.writeInt(i+1); } @@ -214,7 +226,7 @@ public class TestRunLengthBitPackingHybridEncoder { @Test public void testSwitchingModes() throws Exception { - RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(9, 100, 1000); + RunLengthBitPackingHybridEncoder encoder = getRunLengthBitPackingHybridEncoder(9, 100, 1000); // rle first for (int i = 0; i < 25; i++) { http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/io/PerfTest.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/io/PerfTest.java b/parquet-column/src/test/java/org/apache/parquet/io/PerfTest.java index 3abf804..aff3937 100644 --- a/parquet-column/src/test/java/org/apache/parquet/io/PerfTest.java +++ b/parquet-column/src/test/java/org/apache/parquet/io/PerfTest.java @@ -27,6 +27,7 @@ import static org.apache.parquet.example.Paper.schema3; import java.util.logging.Level; import org.apache.parquet.Log; +import org.apache.parquet.bytes.HeapByteBufferAllocator; import org.apache.parquet.column.ParquetProperties.WriterVersion; import org.apache.parquet.column.impl.ColumnWriteStoreV1; import org.apache.parquet.column.page.mem.MemPageStore; @@ -77,7 +78,7 @@ public class PerfTest { private static void write(MemPageStore memPageStore) { - ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0); + ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0, new HeapByteBufferAllocator()); MessageColumnIO columnIO = newColumnFactory(schema); GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema); http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java b/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java index e7274cc..06f22b6 100644 --- a/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java +++ b/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java @@ -38,6 +38,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; +import org.apache.parquet.bytes.HeapByteBufferAllocator; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -526,7 +527,7 @@ public class TestColumnIO { } private ColumnWriteStoreV1 newColumnWriteStore(MemPageStore memPageStore) { - return new ColumnWriteStoreV1(memPageStore, 800, 800, useDictionary, WriterVersion.PARQUET_1_0); + return new ColumnWriteStoreV1(memPageStore, 800, 800, useDictionary, WriterVersion.PARQUET_1_0, new HeapByteBufferAllocator()); } @Test @@ -599,6 +600,8 @@ public class TestColumnIO { groupWriter.write(r2); recordWriter.flush(); columns.validate(); + columns.flush(); + columns.close(); } } final class ValidatingColumnWriteStore implements ColumnWriteStore { @@ -610,6 +613,11 @@ final class ValidatingColumnWriteStore implements ColumnWriteStore { } @Override + public void close() { + + } + + @Override public ColumnWriter getColumnWriter(final ColumnDescriptor path) { return new ColumnWriter() { private void validate(Object value, int repetitionLevel, @@ -630,6 +638,11 @@ final class ValidatingColumnWriteStore implements ColumnWriteStore { } @Override + public void write(float value, int repetitionLevel, int definitionLevel) { + validate(value, repetitionLevel, definitionLevel); + } + + @Override public void write(boolean value, int repetitionLevel, int definitionLevel) { validate(value, repetitionLevel, definitionLevel); } @@ -645,8 +658,13 @@ final class ValidatingColumnWriteStore implements ColumnWriteStore { } @Override - public void write(float value, int repetitionLevel, int definitionLevel) { - validate(value, repetitionLevel, definitionLevel); + public void close() { + + } + + @Override + public long getBufferedSizeInMemory() { + throw new UnsupportedOperationException(); } @Override http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java b/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java index 9fde4b1..25b629b 100644 --- a/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java +++ b/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java @@ -21,6 +21,7 @@ package org.apache.parquet.io; import java.util.ArrayList; import java.util.List; +import org.apache.parquet.bytes.HeapByteBufferAllocator; import org.apache.parquet.io.api.RecordConsumer; import org.junit.Test; @@ -258,7 +259,7 @@ public class TestFiltered { private MemPageStore writeTestRecords(MessageColumnIO columnIO, int number) { MemPageStore memPageStore = new MemPageStore(number * 2); - ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 800, 800, false, WriterVersion.PARQUET_1_0); + ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 800, 800, false, WriterVersion.PARQUET_1_0, new HeapByteBufferAllocator()); RecordConsumer recordWriter = columnIO.getRecordWriter(columns); GroupWriter groupWriter = new GroupWriter(recordWriter, schema); http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java b/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java index bd8a69d..c8444dc 100644 --- a/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java +++ b/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java @@ -18,6 +18,8 @@ */ package org.apache.parquet.io.api; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; @@ -143,6 +145,29 @@ public class TestBinary { } @Test + public void testEqualityMethods() throws Exception { + Binary bin1 = Binary.fromConstantByteArray("alice".getBytes(), 1, 3); + Binary bin2 = Binary.fromConstantByteBuffer(ByteBuffer.wrap("alice".getBytes(), 1, 3)); + assertEquals(bin1, bin2); + } + + @Test + public void testWriteAllTo() throws Exception { + byte[] orig = {10, 9 ,8, 7, 6, 5, 4, 3, 2, 1}; + testWriteAllToHelper(Binary.fromConstantByteBuffer(ByteBuffer.wrap(orig)), orig); + ByteBuffer buf = ByteBuffer.allocateDirect(orig.length); + buf.put(orig); + buf.flip(); + testWriteAllToHelper(Binary.fromConstantByteBuffer(buf), orig); + } + + private void testWriteAllToHelper(Binary binary, byte[] orig) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(orig.length); + binary.writeTo(out); + assertArrayEquals(orig, out.toByteArray()); + } + + @Test public void testFromStringBinary() throws Exception { testBinary(STRING_BF, false); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-common/src/main/java/org/apache/parquet/IOExceptionUtils.java ---------------------------------------------------------------------- diff --git a/parquet-common/src/main/java/org/apache/parquet/IOExceptionUtils.java b/parquet-common/src/main/java/org/apache/parquet/IOExceptionUtils.java new file mode 100644 index 0000000..2ac8a2b --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/IOExceptionUtils.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Utilities for managing I/O resources. + */ +public class IOExceptionUtils { + + /** + * Call the #close() method on a {@see Closable}, wrapping any IOException + * in a runtime exception. + * + * @param closeable - resource to close + */ + public static void closeQuietly(Closeable closeable) { + try { + closeable.close(); + } catch(IOException e) { + throw new ParquetRuntimeException("Error closing I/O related resources.", e) {}; + } + } + +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-common/src/main/java/org/apache/parquet/OutputStreamCloseException.java ---------------------------------------------------------------------- diff --git a/parquet-common/src/main/java/org/apache/parquet/OutputStreamCloseException.java b/parquet-common/src/main/java/org/apache/parquet/OutputStreamCloseException.java new file mode 100644 index 0000000..5271000 --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/OutputStreamCloseException.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.parquet; + +/** + * Runtime exception indicating that a stream failed to be closed properly. + * + * Used to wrap up the checked IOException usually thrown from IO operations, + * these are generally not recoverable so it does not make sense to pollute the + * codebase declaring that they can be thrown whenever resources are being + * closed out. + */ +public class OutputStreamCloseException extends ParquetRuntimeException { + + private static final long serialVersionUID = 1L; + + public OutputStreamCloseException() { + } + + public OutputStreamCloseException(String message, Throwable cause) { + super(message, cause); + } + + public OutputStreamCloseException(String message) { + super(message); + } + + public OutputStreamCloseException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-common/src/main/java/org/apache/parquet/ParquetRuntimeException.java ---------------------------------------------------------------------- diff --git a/parquet-common/src/main/java/org/apache/parquet/ParquetRuntimeException.java b/parquet-common/src/main/java/org/apache/parquet/ParquetRuntimeException.java index f67b15a..d0f13a8 100644 --- a/parquet-common/src/main/java/org/apache/parquet/ParquetRuntimeException.java +++ b/parquet-common/src/main/java/org/apache/parquet/ParquetRuntimeException.java @@ -18,6 +18,9 @@ */ package org.apache.parquet; +import java.io.Closeable; +import java.io.IOException; + /** * The parent class for all runtime exceptions * @@ -42,5 +45,4 @@ abstract public class ParquetRuntimeException extends RuntimeException { public ParquetRuntimeException(Throwable cause) { super(cause); } - } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferAllocator.java ---------------------------------------------------------------------- diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferAllocator.java b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferAllocator.java new file mode 100644 index 0000000..ee36b74 --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferAllocator.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.bytes; + +import java.nio.ByteBuffer; + +public interface ByteBufferAllocator { + ByteBuffer allocate(int size); + + /** + * For RefCounted implementations using direct memory, the release method + * needs to be called to free references to the allocated memory. + */ + void release(ByteBuffer b); + + /** + * Indicates if this allocator will produce ByteBuffers backed by direct memory. + * + * @return true if direct memory backed buffers will be created by this allocator, else false + */ + boolean isDirect(); +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java ---------------------------------------------------------------------- diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java new file mode 100644 index 0000000..5b3b853 --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.bytes; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +/** + * This ByteBufferInputStream does not consume the ByteBuffer being passed in, + * but will create a slice of the current buffer. + */ +public class ByteBufferInputStream extends InputStream { + + protected ByteBuffer byteBuf; + protected int initPos; + protected int count; + public ByteBufferInputStream(ByteBuffer buffer) { + this(buffer, buffer.position(), buffer.remaining()); + } + + public ByteBufferInputStream(ByteBuffer buffer, int offset, int count) { + ByteBuffer temp = buffer.duplicate(); + temp.position(offset); + byteBuf = temp.slice(); + byteBuf.limit(count); + this.initPos = offset; + this.count = count; + } + + public ByteBuffer toByteBuffer() { + return byteBuf.slice(); + } + + @Override + public int read() throws IOException { + if (!byteBuf.hasRemaining()) { + return -1; + } + //Workaround for unsigned byte + return byteBuf.get() & 0xFF; + } + + @Override + public int read(byte[] bytes, int offset, int length) throws IOException { + int count = Math.min(byteBuf.remaining(), length); + if (count == 0) return -1; + byteBuf.get(bytes, offset, count); + return count; + } + + @Override + public long skip(long n) { + if (n > byteBuf.remaining()) + n = byteBuf.remaining(); + int pos = byteBuf.position(); + byteBuf.position((int)(pos + n)); + return n; + } + + + @Override + public int available() { + return byteBuf.remaining(); + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java ---------------------------------------------------------------------- diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java index d96a1e5..d40721a 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java @@ -22,6 +22,7 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.nio.charset.Charset; import org.apache.parquet.Log; @@ -53,6 +54,21 @@ public class BytesUtils { * @return * @throws IOException */ + public static int readIntLittleEndian(ByteBuffer in, int offset) throws IOException { + int ch4 = in.get(offset) & 0xff; + int ch3 = in.get(offset + 1) & 0xff; + int ch2 = in.get(offset + 2) & 0xff; + int ch1 = in.get(offset + 3) & 0xff; + return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); + } + + /** + * reads an int in little endian at the given position + * @param in + * @param offset + * @return + * @throws IOException + */ public static int readIntLittleEndian(byte[] in, int offset) throws IOException { int ch4 = in[offset] & 0xff; int ch3 = in[offset + 1] & 0xff; @@ -205,6 +221,14 @@ public class BytesUtils { out.write(value & 0x7F); } + public static void writeUnsignedVarInt(int value, ByteBuffer dest) throws IOException { + while ((value & 0xFFFFFF80) != 0L) { + dest.putInt((value & 0x7F) | 0x80); + value >>>= 7; + } + dest.putInt(value & 0x7F); + } + public static void writeZigZagVarInt(int intValue, OutputStream out) throws IOException{ writeUnsignedVarInt((intValue << 1) ^ (intValue >> 31), out); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-common/src/main/java/org/apache/parquet/bytes/DirectByteBufferAllocator.java ---------------------------------------------------------------------- diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/DirectByteBufferAllocator.java b/parquet-common/src/main/java/org/apache/parquet/bytes/DirectByteBufferAllocator.java new file mode 100644 index 0000000..9fe4538 --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/DirectByteBufferAllocator.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.bytes; + +import java.nio.ByteBuffer; + +public class DirectByteBufferAllocator implements ByteBufferAllocator{ + public static final DirectByteBufferAllocator getInstance(){return new DirectByteBufferAllocator();} + public DirectByteBufferAllocator() { + super(); + } + + public ByteBuffer allocate(final int size) { + return ByteBuffer.allocateDirect(size); + } + + @Override + public void release(ByteBuffer b) { + // The ByteBuffer.allocateDirect + return; + } + + @Override + public boolean isDirect() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-common/src/main/java/org/apache/parquet/bytes/HeapByteBufferAllocator.java ---------------------------------------------------------------------- diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/HeapByteBufferAllocator.java b/parquet-common/src/main/java/org/apache/parquet/bytes/HeapByteBufferAllocator.java new file mode 100644 index 0000000..c5f475d --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/HeapByteBufferAllocator.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.bytes; + +import java.nio.ByteBuffer; + +public class HeapByteBufferAllocator implements ByteBufferAllocator{ + + public static final HeapByteBufferAllocator getInstance(){ return new HeapByteBufferAllocator();} + + public HeapByteBufferAllocator() { + super(); + } + + public ByteBuffer allocate(final int size) { + return ByteBuffer.allocate(size); + } + + public void release(ByteBuffer b) { + return; + } + + @Override + public boolean isDirect() { + return false; + } + +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-encoding/src/main/java/org/apache/parquet/bytes/BytesInput.java ---------------------------------------------------------------------- diff --git a/parquet-encoding/src/main/java/org/apache/parquet/bytes/BytesInput.java b/parquet-encoding/src/main/java/org/apache/parquet/bytes/BytesInput.java index ac334ae..40190ee 100644 --- a/parquet-encoding/src/main/java/org/apache/parquet/bytes/BytesInput.java +++ b/parquet-encoding/src/main/java/org/apache/parquet/bytes/BytesInput.java @@ -25,6 +25,9 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.Arrays; import java.util.List; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; import org.apache.parquet.Log; @@ -71,6 +74,15 @@ abstract public class BytesInput { public static BytesInput from(InputStream in, int bytes) { return new StreamBytesInput(in, bytes); } + + /** + * @param buffer + * @param length number of bytes to read + * @return a BytesInput that will read the given bytes from the ByteBuffer + */ + public static BytesInput from(ByteBuffer buffer, int offset, int length) { + return new ByteBufferBytesInput(buffer, offset, length); + } /** * @@ -121,7 +133,7 @@ abstract public class BytesInput { } /** - * @param arrayOut + * @param baos - stream to wrap into a BytesInput * @return a BytesInput that will write the content of the buffer */ public static BytesInput from(ByteArrayOutputStream baos) { @@ -166,6 +178,24 @@ abstract public class BytesInput { /** * + * @return a new ByteBuffer materializing the contents of this input + * @throws IOException + */ + public ByteBuffer toByteBuffer() throws IOException { + return ByteBuffer.wrap(toByteArray()); + } + + /** + * + * @return a new InputStream materializing the contents of this input + * @throws IOException + */ + public InputStream toInputStream() throws IOException { + return new ByteBufferInputStream(toByteBuffer()); + } + + /** + * * @return the size in bytes that would be written */ abstract public long size(); @@ -258,6 +288,10 @@ abstract public class BytesInput { BytesUtils.writeIntLittleEndian(out, intValue); } + public ByteBuffer toByteBuffer() throws IOException { + return ByteBuffer.allocate(4).putInt(0, intValue); + } + @Override public long size() { return 4; @@ -278,6 +312,12 @@ abstract public class BytesInput { BytesUtils.writeUnsignedVarInt(intValue, out); } + public ByteBuffer toByteBuffer() throws IOException { + ByteBuffer ret = ByteBuffer.allocate((int) size()); + BytesUtils.writeUnsignedVarInt(intValue, ret); + return ret; + } + @Override public long size() { int s = 5 - ((Integer.numberOfLeadingZeros(intValue) + 3) / 7); @@ -296,6 +336,10 @@ abstract public class BytesInput { return 0; } + public ByteBuffer toByteBuffer() throws IOException { + return ByteBuffer.allocate(0); + } + } private static class CapacityBAOSBytesInput extends BytesInput { @@ -355,11 +399,49 @@ abstract public class BytesInput { out.write(in, offset, length); } + public ByteBuffer toByteBuffer() throws IOException { + return ByteBuffer.wrap(in, offset, length); + } + @Override public long size() { return length; } } + + private static class ByteBufferBytesInput extends BytesInput { + + private final ByteBuffer byteBuf; + private final int length; + private final int offset; + private ByteBufferBytesInput(ByteBuffer byteBuf, int offset, int length) { + this.byteBuf = byteBuf; + this.offset = offset; + this.length = length; + } + + @Override + public void writeAllTo(OutputStream out) throws IOException { + final WritableByteChannel outputChannel = Channels.newChannel(out); + byteBuf.position(offset); + ByteBuffer tempBuf = byteBuf.slice(); + tempBuf.limit(length); + outputChannel.write(tempBuf); + } + + @Override + public ByteBuffer toByteBuffer() throws IOException { + byteBuf.position(offset); + ByteBuffer buf = byteBuf.slice(); + buf.limit(length); + return buf; + } + + @Override + public long size() { + return length; + } + } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-encoding/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java ---------------------------------------------------------------------- diff --git a/parquet-encoding/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java b/parquet-encoding/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java index 1670f9c..6155565 100644 --- a/parquet-encoding/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java +++ b/parquet-encoding/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java @@ -21,16 +21,17 @@ package org.apache.parquet.bytes; import static java.lang.Math.max; import static java.lang.Math.pow; import static java.lang.String.format; -import static java.lang.System.arraycopy; import static org.apache.parquet.Preconditions.checkArgument; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import org.apache.parquet.Log; +import org.apache.parquet.OutputStreamCloseException; /** * Similar to a {@link ByteArrayOutputStream}, but uses a different strategy for growing that does not involve copying. @@ -54,16 +55,17 @@ import org.apache.parquet.Log; */ public class CapacityByteArrayOutputStream extends OutputStream { private static final Log LOG = Log.getLog(CapacityByteArrayOutputStream.class); - private static final byte[] EMPTY_SLAB = new byte[0]; + private static final ByteBuffer EMPTY_SLAB = ByteBuffer.wrap(new byte[0]); private int initialSlabSize; private final int maxCapacityHint; - private final List<byte[]> slabs = new ArrayList<byte[]>(); + private final List<ByteBuffer> slabs = new ArrayList<ByteBuffer>(); - private byte[] currentSlab; + private ByteBuffer currentSlab; private int currentSlabIndex; private int bytesAllocated = 0; private int bytesUsed = 0; + private ByteBufferAllocator allocator; /** * Return an initial slab size such that a CapacityByteArrayOutputStream constructed with it @@ -86,38 +88,64 @@ public class CapacityByteArrayOutputStream extends OutputStream { return max(minSlabSize, ((int) (targetCapacity / pow(2, targetNumSlabs)))); } + public static CapacityByteArrayOutputStream withTargetNumSlabs( + int minSlabSize, int maxCapacityHint, int targetNumSlabs) { + return withTargetNumSlabs(minSlabSize, maxCapacityHint, targetNumSlabs, new HeapByteBufferAllocator()); + } + /** * Construct a CapacityByteArrayOutputStream configured such that its initial slab size is * determined by {@link #initialSlabSizeHeuristic}, with targetCapacity == maxCapacityHint */ public static CapacityByteArrayOutputStream withTargetNumSlabs( - int minSlabSize, int maxCapacityHint, int targetNumSlabs) { + int minSlabSize, int maxCapacityHint, int targetNumSlabs, ByteBufferAllocator allocator) { return new CapacityByteArrayOutputStream( initialSlabSizeHeuristic(minSlabSize, maxCapacityHint, targetNumSlabs), - maxCapacityHint); + maxCapacityHint, allocator); } /** * Defaults maxCapacityHint to 1MB * @param initialSlabSize - * @deprecated use {@link CapacityByteArrayOutputStream#CapacityByteArrayOutputStream(int, int)} + * @deprecated use {@link CapacityByteArrayOutputStream#CapacityByteArrayOutputStream(int, int, ByteBufferAllocator)} */ @Deprecated public CapacityByteArrayOutputStream(int initialSlabSize) { - this(initialSlabSize, 1024 * 1024); + this(initialSlabSize, 1024 * 1024, new HeapByteBufferAllocator()); + } + + /** + * Defaults maxCapacityHint to 1MB + * @param initialSlabSize + * @deprecated use {@link CapacityByteArrayOutputStream#CapacityByteArrayOutputStream(int, int, ByteBufferAllocator)} + */ + @Deprecated + public CapacityByteArrayOutputStream(int initialSlabSize, ByteBufferAllocator allocator) { + this(initialSlabSize, 1024 * 1024, allocator); } /** * @param initialSlabSize the size to make the first slab * @param maxCapacityHint a hint (not guarantee) of the max amount of data written to this stream + * @deprecated use {@link CapacityByteArrayOutputStream#CapacityByteArrayOutputStream(int, int, ByteBufferAllocator)} */ + @Deprecated public CapacityByteArrayOutputStream(int initialSlabSize, int maxCapacityHint) { + this(initialSlabSize, maxCapacityHint, new HeapByteBufferAllocator()); + } + + /** + * @param initialSlabSize the size to make the first slab + * @param maxCapacityHint a hint (not guarantee) of the max amount of data written to this stream + */ + public CapacityByteArrayOutputStream(int initialSlabSize, int maxCapacityHint, ByteBufferAllocator allocator) { checkArgument(initialSlabSize > 0, "initialSlabSize must be > 0"); checkArgument(maxCapacityHint > 0, "maxCapacityHint must be > 0"); checkArgument(maxCapacityHint >= initialSlabSize, String.format("maxCapacityHint can't be less than initialSlabSize %d %d", initialSlabSize, maxCapacityHint)); this.initialSlabSize = initialSlabSize; this.maxCapacityHint = maxCapacityHint; + this.allocator = allocator; reset(); } @@ -145,7 +173,7 @@ public class CapacityByteArrayOutputStream extends OutputStream { if (Log.DEBUG) LOG.debug(format("used %d slabs, adding new slab of size %d", slabs.size(), nextSlabSize)); - this.currentSlab = new byte[nextSlabSize]; + this.currentSlab = allocator.allocate(nextSlabSize); this.slabs.add(currentSlab); this.bytesAllocated += nextSlabSize; this.currentSlabIndex = 0; @@ -153,11 +181,12 @@ public class CapacityByteArrayOutputStream extends OutputStream { @Override public void write(int b) { - if (currentSlabIndex == currentSlab.length) { + if (!currentSlab.hasRemaining()) { addSlab(1); } - currentSlab[currentSlabIndex] = (byte) b; + currentSlab.put(currentSlabIndex, (byte) b); currentSlabIndex += 1; + currentSlab.position(currentSlabIndex); bytesUsed += 1; } @@ -168,18 +197,34 @@ public class CapacityByteArrayOutputStream extends OutputStream { throw new IndexOutOfBoundsException( String.format("Given byte array of size %d, with requested length(%d) and offset(%d)", b.length, len, off)); } - if (currentSlabIndex + len >= currentSlab.length) { - final int length1 = currentSlab.length - currentSlabIndex; - arraycopy(b, off, currentSlab, currentSlabIndex, length1); + if (len >= currentSlab.remaining()) { + final int length1 = currentSlab.remaining(); + currentSlab.put(b, off, length1); + bytesUsed += length1; + currentSlabIndex += length1; final int length2 = len - length1; addSlab(length2); - arraycopy(b, off + length1, currentSlab, currentSlabIndex, length2); + currentSlab.put(b, off + length1, length2); currentSlabIndex = length2; + bytesUsed += length2; } else { - arraycopy(b, off, currentSlab, currentSlabIndex, len); + currentSlab.put(b, off, len); currentSlabIndex += len; + bytesUsed += len; + } + } + + private void writeToOutput(OutputStream out, ByteBuffer buf, int len) throws IOException { + if (buf.hasArray()) { + out.write(buf.array(), buf.arrayOffset(), len); + } else { + // The OutputStream interface only takes a byte[], unfortunately this means that a ByteBuffer + // not backed by a byte array must be copied to fulfil this interface + byte[] copy = new byte[len]; + buf.flip(); + buf.get(copy); + out.write(copy); } - bytesUsed += len; } /** @@ -191,10 +236,9 @@ public class CapacityByteArrayOutputStream extends OutputStream { */ public void writeTo(OutputStream out) throws IOException { for (int i = 0; i < slabs.size() - 1; i++) { - final byte[] slab = slabs.get(i); - out.write(slab); + writeToOutput(out, slabs.get(i), slabs.get(i).position()); } - out.write(currentSlab, 0, currentSlabIndex); + writeToOutput(out, currentSlab, currentSlabIndex); } /** @@ -222,6 +266,9 @@ public class CapacityByteArrayOutputStream extends OutputStream { // 7 = 2^3 - 1 so that doubling the initial size 3 times will get to the same size this.initialSlabSize = max(bytesUsed / 7, initialSlabSize); if (Log.DEBUG) LOG.debug(String.format("initial slab of size %d", initialSlabSize)); + for (ByteBuffer slab : slabs) { + allocator.release(slab); + } this.slabs.clear(); this.bytesAllocated = 0; this.bytesUsed = 0; @@ -249,13 +296,13 @@ public class CapacityByteArrayOutputStream extends OutputStream { long seen = 0; for (int i = 0; i < slabs.size(); i++) { - byte[] slab = slabs.get(i); - if (index < seen + slab.length) { + ByteBuffer slab = slabs.get(i); + if (index < seen + slab.limit()) { // ok found index - slab[(int)(index-seen)] = value; + slab.put((int)(index-seen), value); break; } - seen += slab.length; + seen += slab.limit(); } } @@ -273,4 +320,16 @@ public class CapacityByteArrayOutputStream extends OutputStream { int getSlabCount() { return slabs.size(); } + + @Override + public void close() { + for (ByteBuffer slab : slabs) { + allocator.release(slab); + } + try { + super.close(); + }catch(IOException e){ + throw new OutputStreamCloseException(e); + } + } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-encoding/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java ---------------------------------------------------------------------- diff --git a/parquet-encoding/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java b/parquet-encoding/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java index da4e92f..9d4a8a9 100644 --- a/parquet-encoding/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java +++ b/parquet-encoding/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java @@ -18,6 +18,9 @@ */ package org.apache.parquet.bytes; +import org.apache.parquet.IOExceptionUtils; +import org.apache.parquet.ParquetRuntimeException; + import java.io.IOException; import java.io.OutputStream; @@ -210,4 +213,8 @@ public class LittleEndianDataOutputStream extends OutputStream { writeLong(Double.doubleToLongBits(v)); } + public void close() { + IOExceptionUtils.closeQuietly(out); + } + } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/BytePacker.java ---------------------------------------------------------------------- diff --git a/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/BytePacker.java b/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/BytePacker.java index b9a37ad..675576c 100644 --- a/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/BytePacker.java +++ b/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/BytePacker.java @@ -18,6 +18,8 @@ */ package org.apache.parquet.column.values.bitpacking; +import java.nio.ByteBuffer; + /** * Packs and unpacks into bytes * @@ -71,7 +73,15 @@ public abstract class BytePacker { * @param output the output values * @param outPos where to write to in output */ - public abstract void unpack8Values(final byte[] input, final int inPos, final int[] output, final int outPos); + public abstract void unpack8Values(final ByteBuffer input, final int inPos, final int[] output, final int outPos); + + /** + * Compatibility API + */ + @Deprecated + public void unpack8Values(final byte[] input, final int inPos, final int[] output, final int outPos) { + unpack8Values(ByteBuffer.wrap(input), inPos, output, outPos); + } /** * unpack bitWidth * 4 bytes from input at inPos into 32 values in output at outPos. @@ -81,6 +91,13 @@ public abstract class BytePacker { * @param output the output values * @param outPos where to write to in output */ - public abstract void unpack32Values(byte[] input, int inPos, int[] output, int outPos); + public abstract void unpack32Values(ByteBuffer input, int inPos, int[] output, int outPos); + /** + * Compatibility API + */ + @Deprecated + public void unpack32Values(byte[] input, int inPos, int[] output, int outPos) { + unpack32Values(ByteBuffer.wrap(input), inPos, output, outPos); + } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestBytesInput.java ---------------------------------------------------------------------- diff --git a/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestBytesInput.java b/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestBytesInput.java new file mode 100644 index 0000000..1cb0304 --- /dev/null +++ b/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestBytesInput.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.bytes; + +import org.junit.Test; + +import java.io.ByteArrayInputStream; + +import static org.junit.Assert.assertEquals; + +public class TestBytesInput { + + @Test + public void testWriteInt() throws Throwable { + int[] testVals = { + Integer.MIN_VALUE, + Integer.MAX_VALUE, + 0, 100, 1000, 0xdaedbeef}; + for (Integer testVal : testVals) { + BytesInput varInt = BytesInput.fromUnsignedVarInt(testVal); + byte[] rno = varInt.toByteArray(); + int i = BytesUtils.readUnsignedVarInt(new ByteArrayInputStream(rno)); + assertEquals((int) testVal, i); + } + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestCapacityByteArrayOutputStream.java ---------------------------------------------------------------------- diff --git a/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestCapacityByteArrayOutputStream.java b/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestCapacityByteArrayOutputStream.java index b80fe40..89db198 100644 --- a/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestCapacityByteArrayOutputStream.java +++ b/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestCapacityByteArrayOutputStream.java @@ -63,7 +63,7 @@ public class TestCapacityByteArrayOutputStream { } protected CapacityByteArrayOutputStream newCapacityBAOS(int initialSize) { - return new CapacityByteArrayOutputStream(10, 1000000); + return new CapacityByteArrayOutputStream(initialSize, 1000000, new HeapByteBufferAllocator()); } @Test @@ -129,12 +129,12 @@ public class TestCapacityByteArrayOutputStream { assertEquals(i % (v * 3), byteArray[i]); } // verifying we have not created 500 * 23 / 10 slabs - assertTrue("slab count: " + capacityByteArrayOutputStream.getSlabCount(),capacityByteArrayOutputStream.getSlabCount() <= 20); + assertTrue("slab count: " + capacityByteArrayOutputStream.getSlabCount(), capacityByteArrayOutputStream.getSlabCount() <= 20); capacityByteArrayOutputStream.reset(); writeArraysOf3(capacityByteArrayOutputStream, v); validate(capacityByteArrayOutputStream, v * 3); // verifying we use less slabs now - assertTrue("slab count: " + capacityByteArrayOutputStream.getSlabCount(),capacityByteArrayOutputStream.getSlabCount() <= 2); + assertTrue("slab count: " + capacityByteArrayOutputStream.getSlabCount(), capacityByteArrayOutputStream.getSlabCount() <= 2); } @Test http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestByteBitPacking.java ---------------------------------------------------------------------- diff --git a/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestByteBitPacking.java b/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestByteBitPacking.java index a5ce37e..8df5f39 100644 --- a/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestByteBitPacking.java +++ b/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestByteBitPacking.java @@ -21,6 +21,7 @@ package org.apache.parquet.column.values.bitpacking; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; import org.junit.Assert; import org.junit.Test; @@ -50,7 +51,7 @@ public class TestByteBitPacking { byte[] packed = new byte[packer.getBitWidth() * 4]; packer.pack32Values(values, 0, packed, 0); LOG.debug("packed: " + TestBitPacking.toString(packed)); - packer.unpack32Values(packed, 0, unpacked, 0); + packer.unpack32Values(ByteBuffer.wrap(packed), 0, unpacked, 0); } private int[] generateValues(int bitWidth) { @@ -140,7 +141,7 @@ public class TestByteBitPacking { LOG.debug("Gener. out: " + TestBitPacking.toString(packedGenerated)); Assert.assertEquals(pack.name() + " width " + i, TestBitPacking.toString(packedByLemireAsBytes), TestBitPacking.toString(packedGenerated)); - bytePacker.unpack32Values(packedByLemireAsBytes, 0, unpacked, 0); + bytePacker.unpack32Values(ByteBuffer.wrap(packedByLemireAsBytes), 0, unpacked, 0); LOG.debug("Output: " + TestBitPacking.toString(unpacked)); Assert.assertArrayEquals("width " + i, values, unpacked); http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestLemireBitPacking.java ---------------------------------------------------------------------- diff --git a/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestLemireBitPacking.java b/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestLemireBitPacking.java index e0c97e0..2c5fa58 100644 --- a/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestLemireBitPacking.java +++ b/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestLemireBitPacking.java @@ -21,6 +21,7 @@ package org.apache.parquet.column.values.bitpacking; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; import org.junit.Assert; import org.junit.Test; @@ -64,7 +65,7 @@ public class TestLemireBitPacking { private void packUnpack(BytePacker packer, int[] values, int[] unpacked) { byte[] packed = new byte[packer.getBitWidth() * 4]; packer.pack32Values(values, 0, packed, 0); - packer.unpack32Values(packed, 0, unpacked, 0); + packer.unpack32Values(ByteBuffer.wrap(packed), 0, unpacked, 0); } private int[] generateValues(int bitWidth) { http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-generator/src/main/java/org/apache/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java ---------------------------------------------------------------------- diff --git a/parquet-generator/src/main/java/org/apache/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java b/parquet-generator/src/main/java/org/apache/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java index 9a7c562..3d182e2 100644 --- a/parquet-generator/src/main/java/org/apache/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java +++ b/parquet-generator/src/main/java/org/apache/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java @@ -50,6 +50,7 @@ public class ByteBasedBitPackingGenerator { } FileWriter fw = new FileWriter(file); fw.append("package org.apache.parquet.column.values.bitpacking;\n"); + fw.append("import java.nio.ByteBuffer;\n"); fw.append("\n"); fw.append("/**\n"); if (msbFirst) { @@ -97,8 +98,10 @@ public class ByteBasedBitPackingGenerator { generatePack(fw, bitWidth, 4, msbFirst); // Unpacking - generateUnpack(fw, bitWidth, 1, msbFirst); - generateUnpack(fw, bitWidth, 4, msbFirst); + generateUnpack(fw, bitWidth, 1, msbFirst, true); + generateUnpack(fw, bitWidth, 1, msbFirst, false); + generateUnpack(fw, bitWidth, 4, msbFirst, true); + generateUnpack(fw, bitWidth, 4, msbFirst, false); fw.append(" }\n"); } @@ -203,9 +206,15 @@ public class ByteBasedBitPackingGenerator { fw.append(" }\n"); } - private static void generateUnpack(FileWriter fw, int bitWidth, int batch, boolean msbFirst) + private static void generateUnpack(FileWriter fw, int bitWidth, int batch, boolean msbFirst, boolean useByteArray) throws IOException { - fw.append(" public final void unpack" + (batch * 8) + "Values(final byte[] in, final int inPos, final int[] out, final int outPos) {\n"); + final String bufferDataType; + if (useByteArray) { + bufferDataType = "byte[]"; + } else { + bufferDataType = "ByteBuffer"; + } + fw.append(" public final void unpack" + (batch * 8) + "Values(final " + bufferDataType + " in, final int inPos, final int[] out, final int outPos) {\n"); if (bitWidth > 0) { int mask = genMask(bitWidth); for (int valueIndex = 0; valueIndex < (batch * 8); ++valueIndex) { @@ -228,7 +237,14 @@ public class ByteBasedBitPackingGenerator { } else if (shift > 0){ shiftString = "<< " + shift; } - fw.append(" (((((int)in[" + align(byteIndex, 2) + " + inPos]) & 255) " + shiftString + ") & " + mask + ")"); + final String byteAccess; + if (useByteArray) { + byteAccess = "in[" + align(byteIndex, 2) + " + inPos]"; + } else { + // use ByteBuffer#get(index) method + byteAccess = "in.get(" + align(byteIndex, 2) + " + inPos)"; + } + fw.append(" (((((int)" + byteAccess + ") & 255) " + shiftString + ") & " + mask + ")"); } fw.append(";\n"); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/pom.xml ---------------------------------------------------------------------- diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml index a7f9d2c..2f2e932 100644 --- a/parquet-hadoop/pom.xml +++ b/parquet-hadoop/pom.xml @@ -92,6 +92,11 @@ <version>1.9.5</version> <scope>test</scope> </dependency> + <dependency> + <groupId>commons-pool</groupId> + <artifactId>commons-pool</artifactId> + <version>1.5.4</version> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index fdeb2ba..6821bbf 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -36,6 +36,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.parquet.CorruptStatistics; import org.apache.parquet.Log; import org.apache.parquet.hadoop.metadata.ColumnPath; http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java index 6840950..8bf882f 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -18,14 +18,14 @@ */ package org.apache.parquet.hadoop; -import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.Collections; import java.util.HashMap; import java.util.Map; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; @@ -34,18 +34,64 @@ import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.io.compress.Decompressor; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.hadoop.metadata.CompressionCodecName; -class CodecFactory { +public class CodecFactory { + + protected static final Map<String, CompressionCodec> CODEC_BY_NAME = Collections + .synchronizedMap(new HashMap<String, CompressionCodec>()); + + private final Map<CompressionCodecName, BytesCompressor> compressors = new HashMap<CompressionCodecName, BytesCompressor>(); + private final Map<CompressionCodecName, BytesDecompressor> decompressors = new HashMap<CompressionCodecName, BytesDecompressor>(); - public class BytesDecompressor { + protected final Configuration configuration; + protected final int pageSize; + + /** + * Create a new codec factory. + * + * @param configuration used to pass compression codec configuration information + * @param pageSize the expected page size, does not set a hard limit, currently just + * used to set the initial size of the output stream used when + * compressing a buffer. If this factory is only used to construct + * decompressors this parameter has no impact on the function of the factory + */ + public CodecFactory(Configuration configuration, int pageSize) { + this.configuration = configuration; + this.pageSize = pageSize; + } + + /** + * Create a codec factory that will provide compressors and decompressors + * that will work natively with ByteBuffers backed by direct memory. + * + * @param config configuration options for different compression codecs + * @param allocator an allocator for creating result buffers during compression + * and decompression, must provide buffers backed by Direct + * memory and return true for the isDirect() method + * on the ByteBufferAllocator interface + * @param pageSize the default page size. This does not set a hard limit on the + * size of buffers that can be compressed, but performance may + * be improved by setting it close to the expected size of buffers + * (in the case of parquet, pages) that will be compressed. This + * setting is unused in the case of decompressing data, as parquet + * always records the uncompressed size of a buffer. If this + * CodecFactory is only going to be used for decompressors, this + * parameter will not impact the function of the factory. + */ + public static CodecFactory createDirectCodecFactory(Configuration config, ByteBufferAllocator allocator, int pageSize) { + return new DirectCodecFactory(config, allocator, pageSize); + } + + class HeapBytesDecompressor extends BytesDecompressor { private final CompressionCodec codec; private final Decompressor decompressor; - public BytesDecompressor(CompressionCodec codec) { - this.codec = codec; + HeapBytesDecompressor(CompressionCodecName codecName) { + this.codec = getCodec(codecName); if (codec != null) { decompressor = CodecPool.getDecompressor(codec); } else { @@ -53,11 +99,12 @@ class CodecFactory { } } + @Override public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException { final BytesInput decompressed; if (codec != null) { decompressor.reset(); - InputStream is = codec.createInputStream(new ByteArrayInputStream(bytes.toByteArray()), decompressor); + InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); decompressed = BytesInput.from(is, uncompressedSize); } else { decompressed = bytes; @@ -65,7 +112,13 @@ class CodecFactory { return decompressed; } - private void release() { + @Override + public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) throws IOException { + ByteBuffer decompressed = decompress(BytesInput.from(input, 0, input.remaining()), uncompressedSize).toByteBuffer(); + output.put(decompressed); + } + + protected void release() { if (decompressor != null) { CodecPool.returnDecompressor(decompressor); } @@ -78,16 +131,16 @@ class CodecFactory { * @author Julien Le Dem * */ - public static class BytesCompressor { + class HeapBytesCompressor extends BytesCompressor { private final CompressionCodec codec; private final Compressor compressor; private final ByteArrayOutputStream compressedOutBuffer; private final CompressionCodecName codecName; - public BytesCompressor(CompressionCodecName codecName, CompressionCodec codec, int pageSize) { + HeapBytesCompressor(CompressionCodecName codecName) { this.codecName = codecName; - this.codec = codec; + this.codec = getCodec(codecName); if (codec != null) { this.compressor = CodecPool.getCompressor(codec); this.compressedOutBuffer = new ByteArrayOutputStream(pageSize); @@ -97,6 +150,7 @@ class CodecFactory { } } + @Override public BytesInput compress(BytesInput bytes) throws IOException { final BytesInput compressedBytes; if (codec == null) { @@ -116,7 +170,8 @@ class CodecFactory { return compressedBytes; } - private void release() { + @Override + protected void release() { if (compressor != null) { CodecPool.returnCompressor(compressor); } @@ -128,60 +183,58 @@ class CodecFactory { } - private final Map<CompressionCodecName, BytesCompressor> compressors = new HashMap<CompressionCodecName, BytesCompressor>(); - private final Map<CompressionCodecName, BytesDecompressor> decompressors = new HashMap<CompressionCodecName, BytesDecompressor>(); - private final Map<String, CompressionCodec> codecByName = new HashMap<String, CompressionCodec>(); - private final Configuration configuration; + public BytesCompressor getCompressor(CompressionCodecName codecName) { + BytesCompressor comp = compressors.get(codecName); + if (comp == null) { + comp = createCompressor(codecName); + compressors.put(codecName, comp); + } + return comp; + } - public CodecFactory(Configuration configuration) { - this.configuration = configuration; + public BytesDecompressor getDecompressor(CompressionCodecName codecName) { + BytesDecompressor decomp = decompressors.get(codecName); + if (decomp == null) { + decomp = createDecompressor(codecName); + decompressors.put(codecName, decomp); + } + return decomp; + } + + protected BytesCompressor createCompressor(CompressionCodecName codecName) { + return new HeapBytesCompressor(codecName); + } + + protected BytesDecompressor createDecompressor(CompressionCodecName codecName) { + return new HeapBytesDecompressor(codecName); } /** * - * @param codecName the requested codec + * @param codecName + * the requested codec * @return the corresponding hadoop codec. null if UNCOMPRESSED */ - private CompressionCodec getCodec(CompressionCodecName codecName) { + protected CompressionCodec getCodec(CompressionCodecName codecName) { String codecClassName = codecName.getHadoopCompressionCodecClassName(); if (codecClassName == null) { return null; } - CompressionCodec codec = codecByName.get(codecClassName); + CompressionCodec codec = CODEC_BY_NAME.get(codecClassName); if (codec != null) { return codec; } try { Class<?> codecClass = Class.forName(codecClassName); - codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, configuration); - codecByName.put(codecClassName, codec); + codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, configuration); + CODEC_BY_NAME.put(codecClassName, codec); return codec; } catch (ClassNotFoundException e) { throw new BadConfigurationException("Class " + codecClassName + " was not found", e); } } - public BytesCompressor getCompressor(CompressionCodecName codecName, int pageSize) { - BytesCompressor comp = compressors.get(codecName); - if (comp == null) { - CompressionCodec codec = getCodec(codecName); - comp = new BytesCompressor(codecName, codec, pageSize); - compressors.put(codecName, comp); - } - return comp; - } - - public BytesDecompressor getDecompressor(CompressionCodecName codecName) { - BytesDecompressor decomp = decompressors.get(codecName); - if (decomp == null) { - CompressionCodec codec = getCodec(codecName); - decomp = new BytesDecompressor(codec); - decompressors.put(codecName, decomp); - } - return decomp; - } - public void release() { for (BytesCompressor compressor : compressors.values()) { compressor.release(); @@ -192,4 +245,16 @@ class CodecFactory { } decompressors.clear(); } + + public static abstract class BytesCompressor { + public abstract BytesInput compress(BytesInput bytes) throws IOException; + public abstract CompressionCodecName getCodecName(); + protected abstract void release(); + } + + public static abstract class BytesDecompressor { + public abstract BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException; + public abstract void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) throws IOException; + protected abstract void release(); + } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java index b6934c2..af06747 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
