This is an automated email from the ASF dual-hosted git repository. gangwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push: new a533490dc PARQUET-2081: Fix support for rewriting files without ColumnIndexes (#1048) a533490dc is described below commit a533490dc8b86a1c01221c9e6a786a39de625c72 Author: Richard Kerr <kerr.rich...@gmail.com> AuthorDate: Fri Apr 14 03:04:52 2023 +0100 PARQUET-2081: Fix support for rewriting files without ColumnIndexes (#1048) Fix for failure when rewriting ColumnChunks that do not have a ColumnIndex populated --- .../apache/parquet/hadoop/ParquetFileWriter.java | 2 +- .../hadoop/rewrite/ParquetRewriterTest.java | 85 ++++++++++++++++++++-- 2 files changed, 81 insertions(+), 6 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index 3e5c718ba..9cd7f1381 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -1127,7 +1127,7 @@ public class ParquetFileWriter { long length = chunk.getTotalSize(); long newChunkStart = out.getPos(); - if (newChunkStart != start) { + if (offsetIndex != null && newChunkStart != start) { offsetIndex = OffsetIndexBuilder.getBuilder() .fromOffsetIndex(offsetIndex) .build(newChunkStart - start); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java index 043261f77..bc8d45199 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java @@ -19,10 +19,12 @@ package org.apache.parquet.hadoop.rewrite; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.HadoopReadOptions; import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.Version; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.crypto.FileDecryptionProperties; import org.apache.parquet.crypto.FileEncryptionProperties; @@ -49,12 +51,14 @@ import org.apache.parquet.hadoop.util.TestFileBuilder; import org.apache.parquet.internal.column.columnindex.ColumnIndex; import org.apache.parquet.internal.column.columnindex.OffsetIndex; import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.InvalidRecordException; import org.apache.parquet.io.SeekableInputStream; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.InvalidSchemaException; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; +import org.junit.Before; import org.junit.Test; import java.io.IOException; @@ -66,6 +70,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; @@ -132,6 +137,11 @@ public class ParquetRewriterTest { validateCreatedBy(); } + @Before + public void setUp() { + outputFile = TestFileBuilder.createTempFile("test"); + } + @Test public void testPruneSingleColumnTranslateCodecSingleFile() throws Exception { testSingleInputFileSetup("GZIP"); @@ -296,6 +306,70 @@ public class ParquetRewriterTest { testPruneEncryptTranslateCodec(inputPaths); } + @Test + public void testRewriteWithoutColumnIndexes() throws Exception { + List<Path> inputPaths = new ArrayList<Path>() {{ + add(new Path(ParquetRewriterTest.class.getResource("/test-file-with-no-column-indexes-1.parquet").toURI())); + }}; + + inputFiles = inputPaths.stream().map(p -> new EncryptionTestFile(p.toString(), null)).collect(Collectors.toList()); + + Path outputPath = new Path(outputFile); + RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, inputPaths, outputPath); + + Map<String, MaskMode> maskCols = Maps.newHashMap(); + maskCols.put("location.lat", MaskMode.NULLIFY); + maskCols.put("location.lon", MaskMode.NULLIFY); + maskCols.put("location", MaskMode.NULLIFY); + + List<String> pruneCols = Lists.newArrayList("phoneNumbers"); + + RewriteOptions options = builder.mask(maskCols).prune(pruneCols).build(); + rewriter = new ParquetRewriter(options); + rewriter.processBlocks(); + rewriter.close(); + + // Verify the schema are not changed for the columns not pruned + ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER); + MessageType schema = pmd.getFileMetaData().getSchema(); + List<Type> fields = schema.getFields(); + assertEquals(fields.size(), 3); + assertEquals(fields.get(0).getName(), "id"); + assertEquals(fields.get(1).getName(), "name"); + assertEquals(fields.get(2).getName(), "location"); + List<Type> subFields = fields.get(2).asGroupType().getFields(); + assertEquals(subFields.size(), 2); + assertEquals(subFields.get(0).getName(), "lon"); + assertEquals(subFields.get(1).getName(), "lat"); + + try(ParquetReader<Group> outReader = ParquetReader.builder(new GroupReadSupport(), new Path(outputFile)).withConf(conf).build(); + ParquetReader<Group> inReader = ParquetReader.builder(new GroupReadSupport(), inputPaths.get(0)).withConf(conf).build(); + ) { + + for(Group inRead = inReader.read(), outRead = outReader.read(); + inRead != null || outRead != null; + inRead = inReader.read(), outRead = outReader.read()) { + assertNotNull(inRead); + assertNotNull(outRead); + + assertEquals(inRead.getLong("id", 0), outRead.getLong("id", 0)); + assertEquals(inRead.getString("name", 0), outRead.getString("name", 0)); + + // location was nulled + Group finalOutRead = outRead; + assertThrows(RuntimeException.class, () -> finalOutRead.getGroup("location", 0).getDouble("lat", 0)); + assertThrows(RuntimeException.class, () -> finalOutRead.getGroup("location", 0).getDouble("lon", 0)); + + // phonenumbers was pruned + assertThrows(InvalidRecordException.class, () -> finalOutRead.getGroup("phoneNumbers", 0)); + + } + } + + // Verify original.created.by is preserved + validateCreatedBy(); + } + private void testNullifyAndEncryptColumn(List<Path> inputPaths) throws Exception { Map<String, MaskMode> maskColumns = new HashMap<>(); maskColumns.put("DocId", MaskMode.NULLIFY); @@ -436,7 +510,6 @@ public class ParquetRewriterTest { .withCodec("UNCOMPRESSED") .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE) .build()); - outputFile = TestFileBuilder.createTempFile("test"); List<Path> inputPaths = new ArrayList<>(); for (EncryptionTestFile inputFile : inputFiles) { @@ -458,7 +531,6 @@ public class ParquetRewriterTest { .withCodec(compression) .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE) .build()); - outputFile = TestFileBuilder.createTempFile("test"); } private void testMultipleInputFilesSetup() throws IOException { @@ -474,7 +546,7 @@ public class ParquetRewriterTest { .withCodec("UNCOMPRESSED") .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE) .build()); - outputFile = TestFileBuilder.createTempFile("test"); + } private MessageType createSchema() { @@ -686,10 +758,13 @@ public class ParquetRewriterTest { // Verify created_by has been set FileMetaData outFMD = getFileMetaData(outputFile, null).getFileMetaData(); - String inputCreatedBy = (String) inputCreatedBys[0]; - assertEquals(inputCreatedBy, outFMD.getCreatedBy()); + final String createdBy = outFMD.getCreatedBy(); + assertNotNull(createdBy); + assertEquals(createdBy, Version.FULL_VERSION); + // Verify original.created.by has been set + String inputCreatedBy = (String) inputCreatedBys[0]; String originalCreatedBy = outFMD.getKeyValueMetaData().get(ParquetRewriter.ORIGINAL_CREATED_BY_KEY); assertEquals(inputCreatedBy, originalCreatedBy); }