This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch parquet-1.13.x
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/parquet-1.13.x by this push:
new fad89ee39 PARQUET-2081: Fix support for rewriting files without
ColumnIndexes (#1048) (#1058)
fad89ee39 is described below
commit fad89ee39f6a29fb001a433a2f2006a00e39ce8e
Author: Gang Wu <[email protected]>
AuthorDate: Sun Apr 16 19:41:53 2023 +0800
PARQUET-2081: Fix support for rewriting files without ColumnIndexes (#1048)
(#1058)
Fix for failure when rewriting ColumnChunks that do not have a ColumnIndex
populated
Co-authored-by: Richard Kerr <[email protected]>
---
.../apache/parquet/hadoop/ParquetFileWriter.java | 2 +-
.../hadoop/rewrite/ParquetRewriterTest.java | 85 ++++++++++++++++++++--
2 files changed, 81 insertions(+), 6 deletions(-)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 3e5c718ba..9cd7f1381 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -1127,7 +1127,7 @@ public class ParquetFileWriter {
long length = chunk.getTotalSize();
long newChunkStart = out.getPos();
- if (newChunkStart != start) {
+ if (offsetIndex != null && newChunkStart != start) {
offsetIndex = OffsetIndexBuilder.getBuilder()
.fromOffsetIndex(offsetIndex)
.build(newChunkStart - start);
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
index 043261f77..bc8d45199 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
@@ -19,10 +19,12 @@
package org.apache.parquet.hadoop.rewrite;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.HadoopReadOptions;
import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.Version;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.crypto.FileDecryptionProperties;
import org.apache.parquet.crypto.FileEncryptionProperties;
@@ -49,12 +51,14 @@ import org.apache.parquet.hadoop.util.TestFileBuilder;
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.InvalidRecordException;
import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.InvalidSchemaException;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
+import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
@@ -66,6 +70,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
@@ -132,6 +137,11 @@ public class ParquetRewriterTest {
validateCreatedBy();
}
+ @Before
+ public void setUp() {
+ outputFile = TestFileBuilder.createTempFile("test");
+ }
+
@Test
public void testPruneSingleColumnTranslateCodecSingleFile() throws Exception
{
testSingleInputFileSetup("GZIP");
@@ -296,6 +306,70 @@ public class ParquetRewriterTest {
testPruneEncryptTranslateCodec(inputPaths);
}
+ @Test
+ public void testRewriteWithoutColumnIndexes() throws Exception {
+ List<Path> inputPaths = new ArrayList<Path>() {{
+ add(new
Path(ParquetRewriterTest.class.getResource("/test-file-with-no-column-indexes-1.parquet").toURI()));
+ }};
+
+ inputFiles = inputPaths.stream().map(p -> new
EncryptionTestFile(p.toString(), null)).collect(Collectors.toList());
+
+ Path outputPath = new Path(outputFile);
+ RewriteOptions.Builder builder = new RewriteOptions.Builder(conf,
inputPaths, outputPath);
+
+ Map<String, MaskMode> maskCols = Maps.newHashMap();
+ maskCols.put("location.lat", MaskMode.NULLIFY);
+ maskCols.put("location.lon", MaskMode.NULLIFY);
+ maskCols.put("location", MaskMode.NULLIFY);
+
+ List<String> pruneCols = Lists.newArrayList("phoneNumbers");
+
+ RewriteOptions options = builder.mask(maskCols).prune(pruneCols).build();
+ rewriter = new ParquetRewriter(options);
+ rewriter.processBlocks();
+ rewriter.close();
+
+ // Verify the schema are not changed for the columns not pruned
+ ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new
Path(outputFile), ParquetMetadataConverter.NO_FILTER);
+ MessageType schema = pmd.getFileMetaData().getSchema();
+ List<Type> fields = schema.getFields();
+ assertEquals(fields.size(), 3);
+ assertEquals(fields.get(0).getName(), "id");
+ assertEquals(fields.get(1).getName(), "name");
+ assertEquals(fields.get(2).getName(), "location");
+ List<Type> subFields = fields.get(2).asGroupType().getFields();
+ assertEquals(subFields.size(), 2);
+ assertEquals(subFields.get(0).getName(), "lon");
+ assertEquals(subFields.get(1).getName(), "lat");
+
+ try(ParquetReader<Group> outReader = ParquetReader.builder(new
GroupReadSupport(), new Path(outputFile)).withConf(conf).build();
+ ParquetReader<Group> inReader = ParquetReader.builder(new
GroupReadSupport(), inputPaths.get(0)).withConf(conf).build();
+ ) {
+
+ for(Group inRead = inReader.read(), outRead = outReader.read();
+ inRead != null || outRead != null;
+ inRead = inReader.read(), outRead = outReader.read()) {
+ assertNotNull(inRead);
+ assertNotNull(outRead);
+
+ assertEquals(inRead.getLong("id", 0), outRead.getLong("id", 0));
+ assertEquals(inRead.getString("name", 0), outRead.getString("name",
0));
+
+ // location was nulled
+ Group finalOutRead = outRead;
+ assertThrows(RuntimeException.class, () ->
finalOutRead.getGroup("location", 0).getDouble("lat", 0));
+ assertThrows(RuntimeException.class, () ->
finalOutRead.getGroup("location", 0).getDouble("lon", 0));
+
+ // phonenumbers was pruned
+ assertThrows(InvalidRecordException.class, () ->
finalOutRead.getGroup("phoneNumbers", 0));
+
+ }
+ }
+
+ // Verify original.created.by is preserved
+ validateCreatedBy();
+ }
+
private void testNullifyAndEncryptColumn(List<Path> inputPaths) throws
Exception {
Map<String, MaskMode> maskColumns = new HashMap<>();
maskColumns.put("DocId", MaskMode.NULLIFY);
@@ -436,7 +510,6 @@ public class ParquetRewriterTest {
.withCodec("UNCOMPRESSED")
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
.build());
- outputFile = TestFileBuilder.createTempFile("test");
List<Path> inputPaths = new ArrayList<>();
for (EncryptionTestFile inputFile : inputFiles) {
@@ -458,7 +531,6 @@ public class ParquetRewriterTest {
.withCodec(compression)
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
.build());
- outputFile = TestFileBuilder.createTempFile("test");
}
private void testMultipleInputFilesSetup() throws IOException {
@@ -474,7 +546,7 @@ public class ParquetRewriterTest {
.withCodec("UNCOMPRESSED")
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
.build());
- outputFile = TestFileBuilder.createTempFile("test");
+
}
private MessageType createSchema() {
@@ -686,10 +758,13 @@ public class ParquetRewriterTest {
// Verify created_by has been set
FileMetaData outFMD = getFileMetaData(outputFile, null).getFileMetaData();
- String inputCreatedBy = (String) inputCreatedBys[0];
- assertEquals(inputCreatedBy, outFMD.getCreatedBy());
+ final String createdBy = outFMD.getCreatedBy();
+ assertNotNull(createdBy);
+ assertEquals(createdBy, Version.FULL_VERSION);
+
// Verify original.created.by has been set
+ String inputCreatedBy = (String) inputCreatedBys[0];
String originalCreatedBy =
outFMD.getKeyValueMetaData().get(ParquetRewriter.ORIGINAL_CREATED_BY_KEY);
assertEquals(inputCreatedBy, originalCreatedBy);
}