This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b5a962df PARQUET-2343: Fixes NPE when rewriting file with multiple 
rowgroups (#1136)
9b5a962df is described below

commit 9b5a962df3007009a227ef421600197531f970a5
Author: Xianyang Liu <[email protected]>
AuthorDate: Mon Sep 4 13:09:42 2023 +0800

    PARQUET-2343: Fixes NPE when rewriting file with multiple rowgroups (#1136)
---
 .../parquet/hadoop/rewrite/ParquetRewriter.java    |  1 +
 .../hadoop/rewrite/ParquetRewriterTest.java        | 46 ++++++++++++++++++++--
 .../parquet/hadoop/util/TestFileBuilder.java       |  8 ++++
 3 files changed, 51 insertions(+), 4 deletions(-)

diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
index 22c5f0f68..043eb2423 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
@@ -329,6 +329,7 @@ public class ParquetRewriter implements Closeable {
 
       writer.endBlock();
       store = reader.readNextRowGroup();
+      crStore = new ColumnReadStoreImpl(store, new DummyGroupConverter(), 
schema, originalCreatedBy);
       blockId++;
       numBlocksRewritten++;
     }
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
index bc8d45199..a08633d15 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
@@ -37,6 +37,7 @@ import org.apache.parquet.format.PageHeader;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.example.GroupReadSupport;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
@@ -135,6 +136,7 @@ public class ParquetRewriterTest {
 
     // Verify original.created.by is preserved
     validateCreatedBy();
+    validateRowGroupRowCount();
   }
 
   @Before
@@ -204,6 +206,7 @@ public class ParquetRewriterTest {
 
     // Verify original.created.by is preserved
     validateCreatedBy();
+    validateRowGroupRowCount();
   }
 
   @Test
@@ -285,6 +288,7 @@ public class ParquetRewriterTest {
 
     // Verify original.created.by is preserved
     validateCreatedBy();
+    validateRowGroupRowCount();
   }
 
   @Test
@@ -368,6 +372,7 @@ public class ParquetRewriterTest {
 
     // Verify original.created.by is preserved
     validateCreatedBy();
+    validateRowGroupRowCount();
   }
 
   private void testNullifyAndEncryptColumn(List<Path> inputPaths) throws 
Exception {
@@ -484,6 +489,7 @@ public class ParquetRewriterTest {
 
     // Verify original.created.by is preserved
     validateCreatedBy();
+    validateRowGroupRowCount();
   }
 
   @Test(expected = InvalidSchemaException.class)
@@ -523,14 +529,28 @@ public class ParquetRewriterTest {
     rewriter = new ParquetRewriter(options);
   }
 
+  @Test
+  public void testRewriteFileWithMultipleBlocks() throws Exception {
+    testSingleInputFileSetup("GZIP", 1024L);
+    List<Path> inputPaths = new ArrayList<Path>() {{
+      add(new Path(inputFiles.get(0).getFileName()));
+    }};
+    testPruneSingleColumnTranslateCodec(inputPaths);
+  }
+
   private void testSingleInputFileSetup(String compression) throws IOException 
{
+    testSingleInputFileSetup(compression, ParquetWriter.DEFAULT_BLOCK_SIZE);
+  }
+
+  private void testSingleInputFileSetup(String compression, long rowGroupSize) 
throws IOException {
     MessageType schema = createSchema();
     inputFiles = Lists.newArrayList();
     inputFiles.add(new TestFileBuilder(conf, schema)
-            .withNumRecord(numRecord)
-            .withCodec(compression)
-            .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
-            .build());
+      .withNumRecord(numRecord)
+      .withCodec(compression)
+      .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+      .withRowGroupSize(rowGroupSize)
+      .build());
   }
 
   private void testMultipleInputFilesSetup() throws IOException {
@@ -769,4 +789,22 @@ public class ParquetRewriterTest {
     assertEquals(inputCreatedBy, originalCreatedBy);
   }
 
+  private void validateRowGroupRowCount() throws Exception {
+    List<Long> inputRowCounts = new ArrayList<>();
+    for (EncryptionTestFile inputFile : inputFiles) {
+      ParquetMetadata inputPmd = getFileMetaData(inputFile.getFileName(), 
null);
+      for (BlockMetaData blockMetaData: inputPmd.getBlocks()) {
+        inputRowCounts.add(blockMetaData.getRowCount());
+      }
+    }
+
+    List<Long> outputRowCounts = new ArrayList<>();
+    ParquetMetadata outPmd = getFileMetaData(outputFile, null);
+    for (BlockMetaData blockMetaData: outPmd.getBlocks()) {
+      outputRowCounts.add(blockMetaData.getRowCount());
+    }
+
+    assertEquals(inputRowCounts, outputRowCounts);
+  }
+
 }
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
index 5018beb3c..bea744cf5 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
@@ -51,6 +51,7 @@ public class TestFileBuilder
     private String[] encryptColumns = {};
     private ParquetCipher cipher = ParquetCipher.AES_GCM_V1;
     private Boolean footerEncryption = false;
+    private long rowGroupSize = ParquetWriter.DEFAULT_BLOCK_SIZE;
 
     public TestFileBuilder(Configuration conf, MessageType schema)
     {
@@ -107,6 +108,12 @@ public class TestFileBuilder
         return this;
     }
 
+    public TestFileBuilder withRowGroupSize(long rowGroupSize)
+    {
+        this.rowGroupSize = rowGroupSize;
+        return this;
+    }
+
     public EncryptionTestFile build()
             throws IOException
     {
@@ -119,6 +126,7 @@ public class TestFileBuilder
                 .withExtraMetaData(extraMeta)
                 .withValidation(true)
                 .withPageSize(pageSize)
+                .withRowGroupSize(rowGroupSize)
                 .withEncryption(encryptionProperties)
                 .withCompressionCodec(CompressionCodecName.valueOf(codec));
         try (ParquetWriter writer = builder.build()) {

Reply via email to