This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new a533490dc PARQUET-2081: Fix support for rewriting files without 
ColumnIndexes (#1048)
a533490dc is described below

commit a533490dc8b86a1c01221c9e6a786a39de625c72
Author: Richard Kerr <kerr.rich...@gmail.com>
AuthorDate: Fri Apr 14 03:04:52 2023 +0100

    PARQUET-2081: Fix support for rewriting files without ColumnIndexes (#1048)
    
    Fix for failure when rewriting ColumnChunks that do not have a ColumnIndex 
populated
---
 .../apache/parquet/hadoop/ParquetFileWriter.java   |  2 +-
 .../hadoop/rewrite/ParquetRewriterTest.java        | 85 ++++++++++++++++++++--
 2 files changed, 81 insertions(+), 6 deletions(-)

diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 3e5c718ba..9cd7f1381 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -1127,7 +1127,7 @@ public class ParquetFileWriter {
     long length = chunk.getTotalSize();
     long newChunkStart = out.getPos();
 
-    if (newChunkStart != start) {
+    if (offsetIndex != null && newChunkStart != start) {
       offsetIndex = OffsetIndexBuilder.getBuilder()
         .fromOffsetIndex(offsetIndex)
         .build(newChunkStart - start);
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
index 043261f77..bc8d45199 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
@@ -19,10 +19,12 @@
 package org.apache.parquet.hadoop.rewrite;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.HadoopReadOptions;
 import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.Version;
 import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.crypto.FileDecryptionProperties;
 import org.apache.parquet.crypto.FileEncryptionProperties;
@@ -49,12 +51,14 @@ import org.apache.parquet.hadoop.util.TestFileBuilder;
 import org.apache.parquet.internal.column.columnindex.ColumnIndex;
 import org.apache.parquet.internal.column.columnindex.OffsetIndex;
 import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.InvalidRecordException;
 import org.apache.parquet.io.SeekableInputStream;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.InvalidSchemaException;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -66,6 +70,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
@@ -132,6 +137,11 @@ public class ParquetRewriterTest {
     validateCreatedBy();
   }
 
+  @Before
+  public void setUp() {
+    outputFile = TestFileBuilder.createTempFile("test");
+  }
+
   @Test
   public void testPruneSingleColumnTranslateCodecSingleFile() throws Exception 
{
     testSingleInputFileSetup("GZIP");
@@ -296,6 +306,70 @@ public class ParquetRewriterTest {
     testPruneEncryptTranslateCodec(inputPaths);
   }
 
+  @Test
+  public void testRewriteWithoutColumnIndexes() throws Exception {
+    List<Path> inputPaths = new ArrayList<Path>() {{
+      add(new 
Path(ParquetRewriterTest.class.getResource("/test-file-with-no-column-indexes-1.parquet").toURI()));
+    }};
+
+    inputFiles = inputPaths.stream().map(p -> new 
EncryptionTestFile(p.toString(), null)).collect(Collectors.toList());
+
+    Path outputPath = new Path(outputFile);
+    RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, 
inputPaths, outputPath);
+
+    Map<String, MaskMode> maskCols = Maps.newHashMap();
+    maskCols.put("location.lat", MaskMode.NULLIFY);
+    maskCols.put("location.lon", MaskMode.NULLIFY);
+    maskCols.put("location", MaskMode.NULLIFY);
+
+    List<String> pruneCols = Lists.newArrayList("phoneNumbers");
+
+    RewriteOptions options = builder.mask(maskCols).prune(pruneCols).build();
+    rewriter = new ParquetRewriter(options);
+    rewriter.processBlocks();
+    rewriter.close();
+
+    // Verify the schema are not changed for the columns not pruned
+    ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new 
Path(outputFile), ParquetMetadataConverter.NO_FILTER);
+    MessageType schema = pmd.getFileMetaData().getSchema();
+    List<Type> fields = schema.getFields();
+    assertEquals(fields.size(), 3);
+    assertEquals(fields.get(0).getName(), "id");
+    assertEquals(fields.get(1).getName(), "name");
+    assertEquals(fields.get(2).getName(), "location");
+    List<Type> subFields = fields.get(2).asGroupType().getFields();
+    assertEquals(subFields.size(), 2);
+    assertEquals(subFields.get(0).getName(), "lon");
+    assertEquals(subFields.get(1).getName(), "lat");
+
+    try(ParquetReader<Group> outReader = ParquetReader.builder(new 
GroupReadSupport(), new Path(outputFile)).withConf(conf).build();
+        ParquetReader<Group> inReader = ParquetReader.builder(new 
GroupReadSupport(), inputPaths.get(0)).withConf(conf).build();
+    ) {
+
+      for(Group inRead = inReader.read(), outRead = outReader.read();
+          inRead != null || outRead != null;
+          inRead = inReader.read(), outRead = outReader.read()) {
+        assertNotNull(inRead);
+        assertNotNull(outRead);
+
+        assertEquals(inRead.getLong("id", 0), outRead.getLong("id", 0));
+        assertEquals(inRead.getString("name", 0), outRead.getString("name", 
0));
+
+        // location was nulled
+        Group finalOutRead = outRead;
+        assertThrows(RuntimeException.class, () -> 
finalOutRead.getGroup("location", 0).getDouble("lat", 0));
+        assertThrows(RuntimeException.class, () -> 
finalOutRead.getGroup("location", 0).getDouble("lon", 0));
+
+        // phonenumbers was pruned
+        assertThrows(InvalidRecordException.class, () -> 
finalOutRead.getGroup("phoneNumbers", 0));
+
+      }
+    }
+
+    // Verify original.created.by is preserved
+    validateCreatedBy();
+  }
+
   private void testNullifyAndEncryptColumn(List<Path> inputPaths) throws 
Exception {
     Map<String, MaskMode> maskColumns = new HashMap<>();
     maskColumns.put("DocId", MaskMode.NULLIFY);
@@ -436,7 +510,6 @@ public class ParquetRewriterTest {
             .withCodec("UNCOMPRESSED")
             .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
             .build());
-    outputFile = TestFileBuilder.createTempFile("test");
 
     List<Path> inputPaths = new ArrayList<>();
     for (EncryptionTestFile inputFile : inputFiles) {
@@ -458,7 +531,6 @@ public class ParquetRewriterTest {
             .withCodec(compression)
             .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
             .build());
-    outputFile = TestFileBuilder.createTempFile("test");
   }
 
   private void testMultipleInputFilesSetup() throws IOException {
@@ -474,7 +546,7 @@ public class ParquetRewriterTest {
             .withCodec("UNCOMPRESSED")
             .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
             .build());
-    outputFile = TestFileBuilder.createTempFile("test");
+
   }
 
   private MessageType createSchema() {
@@ -686,10 +758,13 @@ public class ParquetRewriterTest {
 
     // Verify created_by has been set
     FileMetaData outFMD = getFileMetaData(outputFile, null).getFileMetaData();
-    String inputCreatedBy = (String) inputCreatedBys[0];
-    assertEquals(inputCreatedBy, outFMD.getCreatedBy());
+    final String createdBy = outFMD.getCreatedBy();
+    assertNotNull(createdBy);
+    assertEquals(createdBy, Version.FULL_VERSION);
+
 
     // Verify original.created.by has been set
+    String inputCreatedBy = (String) inputCreatedBys[0];
     String originalCreatedBy = 
outFMD.getKeyValueMetaData().get(ParquetRewriter.ORIGINAL_CREATED_BY_KEY);
     assertEquals(inputCreatedBy, originalCreatedBy);
   }

Reply via email to