Repository: parquet-mr Updated Branches: refs/heads/master 7f8e952ab -> bd0b5af02
PARQUET-612: Add compression codec to FileEncodingsIT. Author: Ryan Blue <[email protected]> Closes #343 from rdblue/PARQUET-612-test-compression and squashes the following commits: a5b7dbb [Ryan Blue] PARQUET-612: Add compression codec to FileEncodingsIT. Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/bd0b5af0 Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/bd0b5af0 Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/bd0b5af0 Branch: refs/heads/master Commit: bd0b5af025fab9cad8f94260138741c252f45fc8 Parents: 7f8e952 Author: Ryan Blue <[email protected]> Authored: Thu Jun 30 09:54:08 2016 -0700 Committer: Julien Le Dem <[email protected]> Committed: Thu Jun 30 09:54:08 2016 -0700 ---------------------------------------------------------------------- .travis.yml | 4 +- .../parquet/encodings/FileEncodingsIT.java | 112 +++++++++++++++---- 2 files changed, 95 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/bd0b5af0/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index aa95349..890a372 100644 --- a/.travis.yml +++ b/.travis.yml @@ -24,8 +24,8 @@ before_install: - cd .. env: - - HADOOP_PROFILE=default - - HADOOP_PROFILE=hadoop-2 + - HADOOP_PROFILE=default TEST_CODECS=uncompressed + - HADOOP_PROFILE=hadoop-2 TEST_CODECS=gzip,snappy install: mvn install --batch-mode -DskipTests=true -Dmaven.javadoc.skip=true -Dsource.skip=true > mvn_install.log || mvn install --batch-mode -DskipTests=true -Dmaven.javadoc.skip=true -Dsource.skip=true > mvn_install.log || (cat mvn_install.log && false) script: mvn test -P $HADOOP_PROFILE http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/bd0b5af0/parquet-hadoop/src/test/java/org/apache/parquet/encodings/FileEncodingsIT.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/encodings/FileEncodingsIT.java b/parquet-hadoop/src/test/java/org/apache/parquet/encodings/FileEncodingsIT.java index 72d281f..4af9866 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/encodings/FileEncodingsIT.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/encodings/FileEncodingsIT.java @@ -20,10 +20,13 @@ package org.apache.parquet.encodings; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.ParquetProperties.WriterVersion; import org.apache.parquet.column.impl.ColumnReaderImpl; import org.apache.parquet.column.page.DataPage; +import org.apache.parquet.column.page.DataPageV1; +import org.apache.parquet.column.page.DataPageV2; import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.column.page.PageReader; @@ -32,9 +35,11 @@ import org.apache.parquet.example.data.simple.SimpleGroupFactory; import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; import org.apache.parquet.hadoop.example.GroupWriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.PrimitiveConverter; import org.apache.parquet.schema.*; @@ -78,23 +83,42 @@ public class FileEncodingsIT { // Parameters private PrimitiveTypeName paramTypeName; + private CompressionCodecName compression; @Parameterized.Parameters public static Collection<Object[]> getParameters() { - return Arrays.asList(new Object[][] { - { PrimitiveTypeName.BOOLEAN }, - { PrimitiveTypeName.INT32 }, - { PrimitiveTypeName.INT64 }, - { PrimitiveTypeName.INT96 }, - { PrimitiveTypeName.FLOAT }, - { PrimitiveTypeName.DOUBLE }, - { PrimitiveTypeName.BINARY }, - { PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY } - }); + List<PrimitiveTypeName> types = Arrays.asList( + PrimitiveTypeName.BOOLEAN, PrimitiveTypeName.INT32, PrimitiveTypeName.INT64, + PrimitiveTypeName.INT96, PrimitiveTypeName.FLOAT, PrimitiveTypeName.DOUBLE, + PrimitiveTypeName.BINARY, PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY); + + List<CompressionCodecName> codecs; + String codecList = System.getenv("TEST_CODECS"); + if (codecList != null) { + codecs = new ArrayList<CompressionCodecName>(); + for (String codec : codecList.split(",")) { + codecs.add(CompressionCodecName.valueOf(codec.toUpperCase(Locale.ENGLISH))); + } + } else { + // otherwise test just UNCOMPRESSED + codecs = Arrays.asList(CompressionCodecName.UNCOMPRESSED); + } + + System.err.println("Testing codecs: " + codecs); + + List<Object[]> parameters = new ArrayList<Object[]>(); + for (PrimitiveTypeName type : types) { + for (CompressionCodecName codec : codecs) { + parameters.add(new Object[] {type, codec}); + } + } + + return parameters; } - public FileEncodingsIT(PrimitiveTypeName typeName) { + public FileEncodingsIT(PrimitiveTypeName typeName, CompressionCodecName compression) { this.paramTypeName = typeName; + this.compression = compression; } @BeforeClass @@ -118,8 +142,8 @@ public class FileEncodingsIT { * This loop will make sure to test future writer versions added to WriterVersion enum. */ for (WriterVersion writerVersion : WriterVersion.values()) { - System.out.println(String.format("Testing %s/%s encodings using ROW_GROUP_SIZE=%d PAGE_SIZE=%d", - writerVersion.toString(), this.paramTypeName.toString(), TEST_ROW_GROUP_SIZE, TEST_PAGE_SIZE)); + System.out.println(String.format("Testing %s/%s/%s encodings using ROW_GROUP_SIZE=%d PAGE_SIZE=%d", + writerVersion, this.paramTypeName, this.compression, TEST_ROW_GROUP_SIZE, TEST_PAGE_SIZE)); Path parquetFile = createTempFile(); writeValuesToFile(parquetFile, this.paramTypeName, randomValues, TEST_ROW_GROUP_SIZE, TEST_PAGE_SIZE, DISABLE_DICTIONARY, writerVersion); @@ -136,8 +160,8 @@ public class FileEncodingsIT { * This loop will make sure to test future writer versions added to WriterVersion enum. */ for (WriterVersion writerVersion : WriterVersion.values()) { - System.out.println(String.format("Testing %s/%s + DICTIONARY encodings using ROW_GROUP_SIZE=%d PAGE_SIZE=%d", - writerVersion.toString(), this.paramTypeName.toString(), TEST_ROW_GROUP_SIZE, TEST_PAGE_SIZE)); + System.out.println(String.format("Testing %s/%s/%s + DICTIONARY encodings using ROW_GROUP_SIZE=%d PAGE_SIZE=%d", + writerVersion, this.paramTypeName, this.compression, TEST_ROW_GROUP_SIZE, TEST_PAGE_SIZE)); Path parquetFile = createTempFile(); writeValuesToFile(parquetFile, this.paramTypeName, dictionaryValues, TEST_ROW_GROUP_SIZE, TEST_PAGE_SIZE, ENABLE_DICTIONARY, writerVersion); @@ -169,8 +193,15 @@ public class FileEncodingsIT { SimpleGroupFactory message = new SimpleGroupFactory(schema); GroupWriteSupport.setSchema(schema, configuration); - ParquetWriter<Group> writer = new ParquetWriter<Group>(file, new GroupWriteSupport(), - CompressionCodecName.UNCOMPRESSED, rowGroupSize, pageSize, TEST_DICT_PAGE_SIZE, enableDictionary, false, version, configuration); + ParquetWriter<Group> writer = ExampleParquetWriter.builder(file) + .withCompressionCodec(compression) + .withRowGroupSize(rowGroupSize) + .withPageSize(pageSize) + .withDictionaryPageSize(TEST_DICT_PAGE_SIZE) + .withDictionaryEncoding(enableDictionary) + .withWriterVersion(version) + .withConf(configuration) + .build(); for (Object o: values) { switch (type) { @@ -303,7 +334,7 @@ public class FileEncodingsIT { for (PageReadStore pageReadStore : blockReaders) { for (ColumnDescriptor columnsDesc : fileSchema.getColumns()) { List<DataPage> pageGroup = getPageGroupForColumn(pageReadStore, columnsDesc); - DictionaryPage dictPage = getDictionaryPageForColumn(pageReadStore, columnsDesc); + DictionaryPage dictPage = reusableCopy(getDictionaryPageForColumn(pageReadStore, columnsDesc)); List<?> expectedRowGroupValues = expectedValues.subList(rowsRead, (int)(rowsRead + pageReadStore.getRowCount())); validateFirstToLast(rowGroupID, dictPage, pageGroup, columnsDesc, expectedRowGroupValues); @@ -315,6 +346,49 @@ public class FileEncodingsIT { } } + private static DictionaryPage reusableCopy(DictionaryPage dict) { + if (dict == null) { + return null; + } + try { + return new DictionaryPage( + BytesInput.from(dict.getBytes().toByteArray()), + dict.getDictionarySize(), dict.getEncoding()); + } catch (IOException e) { + throw new ParquetDecodingException("Cannot read dictionary", e); + } + } + + private static DataPage reusableCopy(DataPage page) { + return page.accept(new DataPage.Visitor<DataPage>() { + @Override + public DataPage visit(DataPageV1 data) { + try { + return new DataPageV1(BytesInput.from(data.getBytes().toByteArray()), + data.getValueCount(), data.getUncompressedSize(), data.getStatistics(), + data.getRlEncoding(), data.getDlEncoding(), data.getValueEncoding()); + } catch (IOException e) { + throw new ParquetDecodingException("Cannot read data", e); + } + } + + @Override + public DataPage visit(DataPageV2 data) { + try { + return new DataPageV2(data.getRowCount(), data.getNullCount(), data.getValueCount(), + BytesInput.from(data.getRepetitionLevels().toByteArray()), + BytesInput.from(data.getDefinitionLevels().toByteArray()), + data.getDataEncoding(), + BytesInput.from(data.getData().toByteArray()), + data.getUncompressedSize(), data.getStatistics(), + data.isCompressed()); + } catch (IOException e) { + throw new ParquetDecodingException("Cannot read data", e); + } + } + }); + } + private static void validateFirstToLast(int rowGroupID, DictionaryPage dictPage, List<DataPage> pageGroup, ColumnDescriptor desc, List<?> expectedValues) { int rowsRead = 0, pageID = 0; for (DataPage page : pageGroup) { @@ -347,7 +421,7 @@ public class FileEncodingsIT { DataPage page; while ((page = pageReader.readPage()) != null) { - pageGroup.add(page); + pageGroup.add(reusableCopy(page)); } return pageGroup;
