This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 0a066d8a5 PARQUET-2372: Avoid unnecessary reading of RowGroup data
during rewriting (#1183)
0a066d8a5 is described below
commit 0a066d8a5c71386e56dee7bd7a21170b27e4283a
Author: Xianyang Liu <[email protected]>
AuthorDate: Wed Nov 8 23:45:06 2023 +0800
PARQUET-2372: Avoid unnecessary reading of RowGroup data during rewriting
(#1183)
---
.../parquet/hadoop/rewrite/ParquetRewriter.java | 22 ++++++++--------------
1 file changed, 8 insertions(+), 14 deletions(-)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
index bf2155e28..004a1d135 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
@@ -265,14 +265,9 @@ public class ParquetRewriter implements Closeable {
}
private void processBlocksFromReader(IndexCache indexCache) throws
IOException {
- PageReadStore store = reader.readNextRowGroup();
- ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(store, new
DummyGroupConverter(), schema, originalCreatedBy);
-
- int blockId = 0;
- while (store != null) {
- writer.startBlock(store.getRowCount());
-
+ for (int blockId = 0; blockId < meta.getBlocks().size(); blockId ++) {
BlockMetaData blockMetaData = meta.getBlocks().get(blockId);
+ writer.startBlock(blockMetaData.getRowCount());
indexCache.setBlockMetadata(blockMetaData);
List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();
for (int i = 0, columnId = 0; i < columnsInOrder.size(); i++) {
@@ -304,9 +299,9 @@ public class ParquetRewriter implements Closeable {
"Required column [" +
descriptor.getPrimitiveType().getName() + "] cannot be nullified");
}
nullifyColumn(
+ blockId,
descriptor,
chunk,
- crStore,
writer,
schema,
newCodecName,
@@ -323,7 +318,7 @@ public class ParquetRewriter implements Closeable {
}
// Translate compression and/or encryption
- writer.startColumn(descriptor,
crStore.getColumnReader(descriptor).getTotalValueCount(), newCodecName);
+ writer.startColumn(descriptor, chunk.getValueCount(), newCodecName);
processChunk(
chunk,
newCodecName,
@@ -345,9 +340,6 @@ public class ParquetRewriter implements Closeable {
}
writer.endBlock();
- store = reader.readNextRowGroup();
- crStore = new ColumnReadStoreImpl(store, new DummyGroupConverter(),
schema, originalCreatedBy);
- blockId++;
numBlocksRewritten++;
}
}
@@ -675,9 +667,9 @@ public class ParquetRewriter implements Closeable {
return prunePaths;
}
- private void nullifyColumn(ColumnDescriptor descriptor,
+ private void nullifyColumn(int blockIndex,
+ ColumnDescriptor descriptor,
ColumnChunkMetaData chunk,
- ColumnReadStoreImpl crStore,
ParquetFileWriter writer,
MessageType schema,
CompressionCodecName newCodecName,
@@ -688,6 +680,8 @@ public class ParquetRewriter implements Closeable {
long totalChunkValues = chunk.getValueCount();
int dMax = descriptor.getMaxDefinitionLevel();
+ PageReadStore pageReadStore = reader.readRowGroup(blockIndex);
+ ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(pageReadStore, new
DummyGroupConverter(), schema, originalCreatedBy);
ColumnReader cReader = crStore.getColumnReader(descriptor);
ParquetProperties.WriterVersion writerVersion =
chunk.getEncodingStats().usesV2Pages() ?