This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch parquet-1.13.x
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/parquet-1.13.x by this push:
new cec54e4bb PARQUET-2343: Fixes NPE when rewriting file with multiple
rowgroups (#1137)
cec54e4bb is described below
commit cec54e4bb2fd8e7a24db83c929f392d3f671db1e
Author: Gang Wu <[email protected]>
AuthorDate: Tue Sep 5 09:30:22 2023 +0800
PARQUET-2343: Fixes NPE when rewriting file with multiple rowgroups (#1137)
Co-authored-by: Xianyang Liu <[email protected]>
---
.../parquet/hadoop/rewrite/ParquetRewriter.java | 1 +
.../hadoop/rewrite/ParquetRewriterTest.java | 46 ++++++++++++++++++++--
.../parquet/hadoop/util/TestFileBuilder.java | 8 ++++
3 files changed, 51 insertions(+), 4 deletions(-)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
index 22c5f0f68..043eb2423 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
@@ -329,6 +329,7 @@ public class ParquetRewriter implements Closeable {
writer.endBlock();
store = reader.readNextRowGroup();
+ crStore = new ColumnReadStoreImpl(store, new DummyGroupConverter(),
schema, originalCreatedBy);
blockId++;
numBlocksRewritten++;
}
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
index bc8d45199..a08633d15 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
@@ -37,6 +37,7 @@ import org.apache.parquet.format.PageHeader;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
@@ -135,6 +136,7 @@ public class ParquetRewriterTest {
// Verify original.created.by is preserved
validateCreatedBy();
+ validateRowGroupRowCount();
}
@Before
@@ -204,6 +206,7 @@ public class ParquetRewriterTest {
// Verify original.created.by is preserved
validateCreatedBy();
+ validateRowGroupRowCount();
}
@Test
@@ -285,6 +288,7 @@ public class ParquetRewriterTest {
// Verify original.created.by is preserved
validateCreatedBy();
+ validateRowGroupRowCount();
}
@Test
@@ -368,6 +372,7 @@ public class ParquetRewriterTest {
// Verify original.created.by is preserved
validateCreatedBy();
+ validateRowGroupRowCount();
}
private void testNullifyAndEncryptColumn(List<Path> inputPaths) throws
Exception {
@@ -484,6 +489,7 @@ public class ParquetRewriterTest {
// Verify original.created.by is preserved
validateCreatedBy();
+ validateRowGroupRowCount();
}
@Test(expected = InvalidSchemaException.class)
@@ -523,14 +529,28 @@ public class ParquetRewriterTest {
rewriter = new ParquetRewriter(options);
}
+ @Test
+ public void testRewriteFileWithMultipleBlocks() throws Exception {
+ testSingleInputFileSetup("GZIP", 1024L);
+ List<Path> inputPaths = new ArrayList<Path>() {{
+ add(new Path(inputFiles.get(0).getFileName()));
+ }};
+ testPruneSingleColumnTranslateCodec(inputPaths);
+ }
+
private void testSingleInputFileSetup(String compression) throws IOException
{
+ testSingleInputFileSetup(compression, ParquetWriter.DEFAULT_BLOCK_SIZE);
+ }
+
+ private void testSingleInputFileSetup(String compression, long rowGroupSize)
throws IOException {
MessageType schema = createSchema();
inputFiles = Lists.newArrayList();
inputFiles.add(new TestFileBuilder(conf, schema)
- .withNumRecord(numRecord)
- .withCodec(compression)
- .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
- .build());
+ .withNumRecord(numRecord)
+ .withCodec(compression)
+ .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+ .withRowGroupSize(rowGroupSize)
+ .build());
}
private void testMultipleInputFilesSetup() throws IOException {
@@ -769,4 +789,22 @@ public class ParquetRewriterTest {
assertEquals(inputCreatedBy, originalCreatedBy);
}
+ private void validateRowGroupRowCount() throws Exception {
+ List<Long> inputRowCounts = new ArrayList<>();
+ for (EncryptionTestFile inputFile : inputFiles) {
+ ParquetMetadata inputPmd = getFileMetaData(inputFile.getFileName(),
null);
+ for (BlockMetaData blockMetaData: inputPmd.getBlocks()) {
+ inputRowCounts.add(blockMetaData.getRowCount());
+ }
+ }
+
+ List<Long> outputRowCounts = new ArrayList<>();
+ ParquetMetadata outPmd = getFileMetaData(outputFile, null);
+ for (BlockMetaData blockMetaData: outPmd.getBlocks()) {
+ outputRowCounts.add(blockMetaData.getRowCount());
+ }
+
+ assertEquals(inputRowCounts, outputRowCounts);
+ }
+
}
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
index 5018beb3c..bea744cf5 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileBuilder.java
@@ -51,6 +51,7 @@ public class TestFileBuilder
private String[] encryptColumns = {};
private ParquetCipher cipher = ParquetCipher.AES_GCM_V1;
private Boolean footerEncryption = false;
+ private long rowGroupSize = ParquetWriter.DEFAULT_BLOCK_SIZE;
public TestFileBuilder(Configuration conf, MessageType schema)
{
@@ -107,6 +108,12 @@ public class TestFileBuilder
return this;
}
+ public TestFileBuilder withRowGroupSize(long rowGroupSize)
+ {
+ this.rowGroupSize = rowGroupSize;
+ return this;
+ }
+
public EncryptionTestFile build()
throws IOException
{
@@ -119,6 +126,7 @@ public class TestFileBuilder
.withExtraMetaData(extraMeta)
.withValidation(true)
.withPageSize(pageSize)
+ .withRowGroupSize(rowGroupSize)
.withEncryption(encryptionProperties)
.withCompressionCodec(CompressionCodecName.valueOf(codec));
try (ParquetWriter writer = builder.build()) {