wgtmac commented on code in PR #1273:
URL: https://github.com/apache/parquet-mr/pull/1273#discussion_r1536743697
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java:
##########
@@ -119,6 +130,15 @@ public List<InputFile> getParquetInputFiles() {
return inputFiles;
}
+ /** TODO fix documentation after addition of inputFilesR
+ * Gets the right {@link InputFile}s for the rewrite.
+ *
+ * @return a {@link List} of the associated right {@link InputFile}s
+ */
+ public List<List<InputFile>> getParquetInputFilesR() {
Review Comment:
I'd rather find a better name instead of `InputFilesR`
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -904,4 +953,171 @@ public byte[] getDictPageAAD() {
return this.dictPageAAD;
}
}
+
+ private static class RightColumnWriter {
+ private final Queue<TransParquetFileReader> inputFiles;
+ private final ParquetRewriter parquetRewriter;
+ private final ParquetFileWriter writer;
+ private final MessageType schema;
+ private final Map<ColumnPath, ColumnDescriptor> descriptorsMap;
+ private final Map<ColumnDescriptor, ColumnReader> colReaders = new
HashMap<>();
+ private final Map<ColumnDescriptor, ColumnChunkPageWriteStore> cPageStores
= new HashMap<>();
+ private final Map<ColumnDescriptor, ColumnWriteStore> cStores = new
HashMap<>();
+ private final Map<ColumnDescriptor, ColumnWriter> cWriters = new
HashMap<>();
+ private int rowGroupIdxIn = 0;
+ private int rowGroupIdxOut = 0;
+ private int writtenFromBlock = 0;
+
+ public RightColumnWriter(Queue<TransParquetFileReader> inputFiles,
ParquetRewriter parquetRewriter)
+ throws IOException {
+ this.inputFiles = inputFiles;
+ this.parquetRewriter = parquetRewriter;
+ this.writer = parquetRewriter.writer;
+ this.schema =
inputFiles.peek().getFooter().getFileMetaData().getSchema();
+ this.descriptorsMap = this.schema.getColumns().stream()
+ .collect(Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+ initReaders();
+ initWriters();
+ }
+
+ public void writeRows(int rowGroupIdx, long rowsToWrite) throws
IOException {
Review Comment:
nit: please add some comments for `rowGroupIdx`, `rowGroupIdxIn` and
`rowGroupIdxOut`.
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -60,10 +63,7 @@
import org.apache.parquet.format.DictionaryPageHeader;
import org.apache.parquet.format.PageHeader;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
-import org.apache.parquet.hadoop.CodecFactory;
-import org.apache.parquet.hadoop.ColumnChunkPageWriteStore;
-import org.apache.parquet.hadoop.IndexCache;
-import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.*;
Review Comment:
please avoid any wildcard import statement.
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -904,4 +953,171 @@ public byte[] getDictPageAAD() {
return this.dictPageAAD;
}
}
+
+ private static class RightColumnWriter {
+ private final Queue<TransParquetFileReader> inputFiles;
+ private final ParquetRewriter parquetRewriter;
+ private final ParquetFileWriter writer;
+ private final MessageType schema;
+ private final Map<ColumnPath, ColumnDescriptor> descriptorsMap;
+ private final Map<ColumnDescriptor, ColumnReader> colReaders = new
HashMap<>();
+ private final Map<ColumnDescriptor, ColumnChunkPageWriteStore> cPageStores
= new HashMap<>();
+ private final Map<ColumnDescriptor, ColumnWriteStore> cStores = new
HashMap<>();
+ private final Map<ColumnDescriptor, ColumnWriter> cWriters = new
HashMap<>();
+ private int rowGroupIdxIn = 0;
+ private int rowGroupIdxOut = 0;
+ private int writtenFromBlock = 0;
+
+ public RightColumnWriter(Queue<TransParquetFileReader> inputFiles,
ParquetRewriter parquetRewriter)
+ throws IOException {
+ this.inputFiles = inputFiles;
+ this.parquetRewriter = parquetRewriter;
+ this.writer = parquetRewriter.writer;
+ this.schema =
inputFiles.peek().getFooter().getFileMetaData().getSchema();
+ this.descriptorsMap = this.schema.getColumns().stream()
+ .collect(Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+ initReaders();
+ initWriters();
+ }
+
+ public void writeRows(int rowGroupIdx, long rowsToWrite) throws
IOException {
+ if (rowGroupIdxIn != rowGroupIdx) {
+ rowGroupIdxIn = rowGroupIdx;
+ flushWriters();
+ initWriters();
+ }
+ while (rowsToWrite > 0) {
+ List<BlockMetaData> blocks = inputFiles.peek().getFooter().getBlocks();
+ BlockMetaData block = blocks.get(rowGroupIdxOut);
+ List<ColumnChunkMetaData> chunks = block.getColumns();
+ long leftInBlock = block.getRowCount() - writtenFromBlock;
+ long writeFromBlock = Math.min(rowsToWrite, leftInBlock);
+ for (ColumnChunkMetaData chunk : chunks) {
+ if (chunk.isEncrypted()) {
+ throw new IOException("Column " + chunk.getPath().toDotString() +
" is encrypted");
+ }
+ ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
+ copyValues(descriptor, writeFromBlock);
+ }
+ rowsToWrite -= writeFromBlock;
+ writtenFromBlock += writeFromBlock;
+ if (rowsToWrite > 0 || (block.getRowCount() == writtenFromBlock)) {
+ rowGroupIdxOut++;
+ if (rowGroupIdxOut == blocks.size()) {
+ inputFiles.poll();
+ rowGroupIdxOut = 0;
+ }
+ writtenFromBlock = 0;
+ // this is called after all rows are processed
+ initReaders();
+ }
+ }
+ flushWriters();
+ }
+
+ private void flushWriters() throws IOException {
+ cStores.values().forEach(cStore -> {
+ cStore.flush();
+ cStore.close();
+ });
+ cWriters.values().forEach(ColumnWriter::close);
+ for (ColumnDescriptor descriptor : descriptorsMap.values()) {
+ if (cPageStores.containsKey(descriptor))
+ cPageStores.get(descriptor).flushToFileWriter(writer);
Review Comment:
It looks like `descriptorsMap` is a HashMap which is not order-preserving
and sorted by the ColumnPath key. Does it keep the same order as the input
schema of the right side file? If not, we may have some issues here since the
column flush order does not match the schema order.
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -904,4 +953,171 @@ public byte[] getDictPageAAD() {
return this.dictPageAAD;
}
}
+
+ private static class RightColumnWriter {
+ private final Queue<TransParquetFileReader> inputFiles;
+ private final ParquetRewriter parquetRewriter;
+ private final ParquetFileWriter writer;
+ private final MessageType schema;
+ private final Map<ColumnPath, ColumnDescriptor> descriptorsMap;
+ private final Map<ColumnDescriptor, ColumnReader> colReaders = new
HashMap<>();
+ private final Map<ColumnDescriptor, ColumnChunkPageWriteStore> cPageStores
= new HashMap<>();
+ private final Map<ColumnDescriptor, ColumnWriteStore> cStores = new
HashMap<>();
+ private final Map<ColumnDescriptor, ColumnWriter> cWriters = new
HashMap<>();
+ private int rowGroupIdxIn = 0;
+ private int rowGroupIdxOut = 0;
+ private int writtenFromBlock = 0;
+
+ public RightColumnWriter(Queue<TransParquetFileReader> inputFiles,
ParquetRewriter parquetRewriter)
+ throws IOException {
+ this.inputFiles = inputFiles;
+ this.parquetRewriter = parquetRewriter;
+ this.writer = parquetRewriter.writer;
+ this.schema =
inputFiles.peek().getFooter().getFileMetaData().getSchema();
+ this.descriptorsMap = this.schema.getColumns().stream()
+ .collect(Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+ initReaders();
+ initWriters();
+ }
+
+ public void writeRows(int rowGroupIdx, long rowsToWrite) throws
IOException {
+ if (rowGroupIdxIn != rowGroupIdx) {
+ rowGroupIdxIn = rowGroupIdx;
+ flushWriters();
+ initWriters();
+ }
+ while (rowsToWrite > 0) {
+ List<BlockMetaData> blocks = inputFiles.peek().getFooter().getBlocks();
+ BlockMetaData block = blocks.get(rowGroupIdxOut);
+ List<ColumnChunkMetaData> chunks = block.getColumns();
+ long leftInBlock = block.getRowCount() - writtenFromBlock;
+ long writeFromBlock = Math.min(rowsToWrite, leftInBlock);
+ for (ColumnChunkMetaData chunk : chunks) {
+ if (chunk.isEncrypted()) {
+ throw new IOException("Column " + chunk.getPath().toDotString() +
" is encrypted");
+ }
+ ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
+ copyValues(descriptor, writeFromBlock);
+ }
+ rowsToWrite -= writeFromBlock;
+ writtenFromBlock += writeFromBlock;
+ if (rowsToWrite > 0 || (block.getRowCount() == writtenFromBlock)) {
+ rowGroupIdxOut++;
+ if (rowGroupIdxOut == blocks.size()) {
+ inputFiles.poll();
+ rowGroupIdxOut = 0;
+ }
+ writtenFromBlock = 0;
+ // this is called after all rows are processed
+ initReaders();
+ }
+ }
+ flushWriters();
+ }
+
+ private void flushWriters() throws IOException {
+ cStores.values().forEach(cStore -> {
+ cStore.flush();
+ cStore.close();
+ });
+ cWriters.values().forEach(ColumnWriter::close);
+ for (ColumnDescriptor descriptor : descriptorsMap.values()) {
+ if (cPageStores.containsKey(descriptor))
+ cPageStores.get(descriptor).flushToFileWriter(writer);
+ }
+ cStores.clear();
+ cWriters.clear();
+ cPageStores.clear();
+ }
+
+ private void initWriters() {
+ if (!inputFiles.isEmpty()) {
+ List<BlockMetaData> blocks = inputFiles.peek().getFooter().getBlocks();
+ descriptorsMap.forEach((columnPath, descriptor) -> {
+ ColumnChunkMetaData chunk =
blocks.get(rowGroupIdxOut).getColumns().stream()
+ .filter(x -> x.getPath() == columnPath)
+ .findFirst()
+ .orElseThrow(() -> new IllegalStateException(
+ "Could not find column [" + columnPath.toDotString() +
"]."));
+ int bloomFilterLength = chunk.getBloomFilterLength();
+ ParquetProperties.WriterVersion writerVersion =
+ chunk.getEncodingStats().usesV2Pages()
+ ? ParquetProperties.WriterVersion.PARQUET_2_0
+ : ParquetProperties.WriterVersion.PARQUET_1_0;
+ ParquetProperties props = ParquetProperties.builder()
+ .withWriterVersion(writerVersion)
+ .withBloomFilterEnabled(bloomFilterLength > 0)
+ .build();
+ CodecFactory codecFactory = new CodecFactory(new Configuration(),
props.getPageSizeThreshold());
+ CompressionCodecFactory.BytesInputCompressor compressor =
+ codecFactory.getCompressor(chunk.getCodec());
+
+ MessageType columnSchema = parquetRewriter.newSchema(schema,
descriptor);
+ ColumnChunkPageWriteStore cPageStore = new ColumnChunkPageWriteStore(
+ compressor,
+ columnSchema,
+ props.getAllocator(),
+ props.getColumnIndexTruncateLength(),
+ props.getPageWriteChecksumEnabled(),
+ writer.getEncryptor(),
+ rowGroupIdxIn);
+ ColumnWriteStore cwStore = props.newColumnWriteStore(columnSchema,
cPageStore, cPageStore);
+ ColumnWriter cWriter = cwStore.getColumnWriter(descriptor);
+ cPageStores.put(descriptor, cPageStore);
+ cStores.put(descriptor, cwStore);
+ cWriters.put(descriptor, cWriter);
+ });
+ }
+ }
+
+ private void initReaders() throws IOException {
+ if (!inputFiles.isEmpty()) {
+ TransParquetFileReader reader = inputFiles.peek();
+ PageReadStore pageReadStore = reader.readRowGroup(rowGroupIdxOut);
+ String createdBy = reader.getFooter().getFileMetaData().getCreatedBy();
+ ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(
+ pageReadStore, new ParquetRewriter.DummyGroupConverter(), schema,
createdBy);
+ for (ColumnDescriptor descriptor : descriptorsMap.values()) {
+ ColumnReader cReader = crStore.getColumnReader(descriptor);
+ colReaders.put(descriptor, cReader);
+ }
+ }
+ }
+
+ private void copyValues(ColumnDescriptor descriptor, long rowsToWrite) {
+ ColumnWriteStore cStore = cStores.get(descriptor);
+ ColumnWriter cWriter = cWriters.get(descriptor);
+ int dMax = descriptor.getMaxDefinitionLevel();
+ Class<?> columnType =
descriptor.getPrimitiveType().getPrimitiveTypeName().javaType;
+ ColumnReader reader = colReaders.get(descriptor);
+ for (int i = 0; i < rowsToWrite; i++) {
+ int rlvl = reader.getCurrentRepetitionLevel();
+ int dlvl = reader.getCurrentDefinitionLevel();
+ do {
+ if (dlvl < dMax) {
+ cWriter.writeNull(rlvl, dlvl);
Review Comment:
If a column to mask (only nullifying column is supported) is on the right
side, we also need to handle it here.
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -904,4 +953,171 @@ public byte[] getDictPageAAD() {
return this.dictPageAAD;
}
}
+
+ private static class RightColumnWriter {
+ private final Queue<TransParquetFileReader> inputFiles;
+ private final ParquetRewriter parquetRewriter;
+ private final ParquetFileWriter writer;
+ private final MessageType schema;
+ private final Map<ColumnPath, ColumnDescriptor> descriptorsMap;
+ private final Map<ColumnDescriptor, ColumnReader> colReaders = new
HashMap<>();
+ private final Map<ColumnDescriptor, ColumnChunkPageWriteStore> cPageStores
= new HashMap<>();
+ private final Map<ColumnDescriptor, ColumnWriteStore> cStores = new
HashMap<>();
+ private final Map<ColumnDescriptor, ColumnWriter> cWriters = new
HashMap<>();
+ private int rowGroupIdxIn = 0;
+ private int rowGroupIdxOut = 0;
+ private int writtenFromBlock = 0;
+
+ public RightColumnWriter(Queue<TransParquetFileReader> inputFiles,
ParquetRewriter parquetRewriter)
+ throws IOException {
+ this.inputFiles = inputFiles;
+ this.parquetRewriter = parquetRewriter;
+ this.writer = parquetRewriter.writer;
+ this.schema =
inputFiles.peek().getFooter().getFileMetaData().getSchema();
+ this.descriptorsMap = this.schema.getColumns().stream()
+ .collect(Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+ initReaders();
+ initWriters();
+ }
+
+ public void writeRows(int rowGroupIdx, long rowsToWrite) throws
IOException {
Review Comment:
BTW, I'm not a native speaker but the `in` and `out` look counter-intuitive
to me.
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -137,16 +175,34 @@ public ParquetRewriter(RewriteOptions options) throws
IOException {
getPaths(schema, paths, null);
for (String col : pruneColumns) {
if (!paths.contains(col)) {
- LOG.warn("Input column name {} doesn't show up in the schema of file
{}", col, reader.getFile());
+ LOG.warn("Input column name {} doesn't show up in the schema", col);
}
}
Set<ColumnPath> prunePaths = convertToColumnPaths(pruneColumns);
schema = pruneColumnsInSchema(schema, prunePaths);
}
- this.descriptorsMap =
- schema.getColumns().stream().collect(Collectors.toMap(x ->
ColumnPath.get(x.getPath()), x -> x));
+ if (inputFilesR.isEmpty()) {
+ this.descriptorsMap =
+ schema.getColumns().stream().collect(Collectors.toMap(x ->
ColumnPath.get(x.getPath()), x -> x));
+ } else { // TODO: describe in documentation that only top level column can
be overwritten
+ this.descriptorsMap = schemaL.getColumns().stream()
+ .filter(x -> x.getPath().length == 0 ||
!fieldNamesR.containsKey(x.getPath()[0]))
Review Comment:
If we restrict both left and right sides do not have duplicate fields, would
we make things simpler here and after?
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -904,4 +953,171 @@ public byte[] getDictPageAAD() {
return this.dictPageAAD;
}
}
+
+ private static class RightColumnWriter {
+ private final Queue<TransParquetFileReader> inputFiles;
+ private final ParquetRewriter parquetRewriter;
+ private final ParquetFileWriter writer;
+ private final MessageType schema;
+ private final Map<ColumnPath, ColumnDescriptor> descriptorsMap;
+ private final Map<ColumnDescriptor, ColumnReader> colReaders = new
HashMap<>();
+ private final Map<ColumnDescriptor, ColumnChunkPageWriteStore> cPageStores
= new HashMap<>();
+ private final Map<ColumnDescriptor, ColumnWriteStore> cStores = new
HashMap<>();
+ private final Map<ColumnDescriptor, ColumnWriter> cWriters = new
HashMap<>();
+ private int rowGroupIdxIn = 0;
+ private int rowGroupIdxOut = 0;
+ private int writtenFromBlock = 0;
+
+ public RightColumnWriter(Queue<TransParquetFileReader> inputFiles,
ParquetRewriter parquetRewriter)
+ throws IOException {
+ this.inputFiles = inputFiles;
+ this.parquetRewriter = parquetRewriter;
+ this.writer = parquetRewriter.writer;
+ this.schema =
inputFiles.peek().getFooter().getFileMetaData().getSchema();
+ this.descriptorsMap = this.schema.getColumns().stream()
+ .collect(Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+ initReaders();
+ initWriters();
+ }
+
+ public void writeRows(int rowGroupIdx, long rowsToWrite) throws
IOException {
+ if (rowGroupIdxIn != rowGroupIdx) {
Review Comment:
Should we add a check to make sure `rowGroupIdx` will never decrease?
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -904,4 +953,171 @@ public byte[] getDictPageAAD() {
return this.dictPageAAD;
}
}
+
+ private static class RightColumnWriter {
+ private final Queue<TransParquetFileReader> inputFiles;
+ private final ParquetRewriter parquetRewriter;
+ private final ParquetFileWriter writer;
+ private final MessageType schema;
+ private final Map<ColumnPath, ColumnDescriptor> descriptorsMap;
+ private final Map<ColumnDescriptor, ColumnReader> colReaders = new
HashMap<>();
+ private final Map<ColumnDescriptor, ColumnChunkPageWriteStore> cPageStores
= new HashMap<>();
+ private final Map<ColumnDescriptor, ColumnWriteStore> cStores = new
HashMap<>();
+ private final Map<ColumnDescriptor, ColumnWriter> cWriters = new
HashMap<>();
+ private int rowGroupIdxIn = 0;
+ private int rowGroupIdxOut = 0;
+ private int writtenFromBlock = 0;
+
+ public RightColumnWriter(Queue<TransParquetFileReader> inputFiles,
ParquetRewriter parquetRewriter)
+ throws IOException {
+ this.inputFiles = inputFiles;
+ this.parquetRewriter = parquetRewriter;
+ this.writer = parquetRewriter.writer;
+ this.schema =
inputFiles.peek().getFooter().getFileMetaData().getSchema();
+ this.descriptorsMap = this.schema.getColumns().stream()
+ .collect(Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+ initReaders();
+ initWriters();
+ }
+
+ public void writeRows(int rowGroupIdx, long rowsToWrite) throws
IOException {
+ if (rowGroupIdxIn != rowGroupIdx) {
+ rowGroupIdxIn = rowGroupIdx;
+ flushWriters();
+ initWriters();
+ }
+ while (rowsToWrite > 0) {
+ List<BlockMetaData> blocks = inputFiles.peek().getFooter().getBlocks();
+ BlockMetaData block = blocks.get(rowGroupIdxOut);
+ List<ColumnChunkMetaData> chunks = block.getColumns();
+ long leftInBlock = block.getRowCount() - writtenFromBlock;
+ long writeFromBlock = Math.min(rowsToWrite, leftInBlock);
+ for (ColumnChunkMetaData chunk : chunks) {
+ if (chunk.isEncrypted()) {
+ throw new IOException("Column " + chunk.getPath().toDotString() +
" is encrypted");
+ }
+ ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
+ copyValues(descriptor, writeFromBlock);
+ }
+ rowsToWrite -= writeFromBlock;
+ writtenFromBlock += writeFromBlock;
+ if (rowsToWrite > 0 || (block.getRowCount() == writtenFromBlock)) {
+ rowGroupIdxOut++;
+ if (rowGroupIdxOut == blocks.size()) {
+ inputFiles.poll();
+ rowGroupIdxOut = 0;
Review Comment:
Is this increment required? Or better stay at `blocks.size()`?
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -347,8 +390,14 @@ private void processBlocksFromReader(IndexCache
indexCache) throws IOException {
columnId++;
}
+ // Writing extra columns
+ for (RightColumnWriter writer : columnWritersR) {
+ writer.writeRows(rowGroupIdx, blockMetaData.getRowCount());
+ }
+
writer.endBlock();
numBlocksRewritten++;
+ rowGroupIdx++;
Review Comment:
Is `rowGroupIdx` a duplicate of `numBlocksRewritten`? Is it better to add a
`RightColumnWriter.nextBlock()` to explicitly tell the row group switch instead
of passing `rowGroupIdx` every time?
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -347,8 +390,14 @@ private void processBlocksFromReader(IndexCache
indexCache) throws IOException {
columnId++;
}
+ // Writing extra columns
+ for (RightColumnWriter writer : columnWritersR) {
+ writer.writeRows(rowGroupIdx, blockMetaData.getRowCount());
Review Comment:
nit: rename `writer` to `columnWriter` since there is another `writer`
instance outside the loop.
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -115,19 +117,55 @@ public class ParquetRewriter implements Closeable {
private ParquetMetadata meta = null;
// created_by information of current reader being processed
private String originalCreatedBy = "";
- // Unique created_by information from all input files
- private final Set<String> allOriginalCreatedBys = new HashSet<>();
// The index cache strategy
private final IndexCache.CacheStrategy indexCacheStrategy;
public ParquetRewriter(RewriteOptions options) throws IOException {
ParquetConfiguration conf = options.getParquetConfiguration();
OutputFile out = options.getParquetOutputFile();
- openInputFiles(options.getParquetInputFiles(), conf);
+ inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf));
+ List<Queue<TransParquetFileReader>> inputFilesR =
options.getParquetInputFilesR().stream()
+ .map(x -> getFileReaders(x, conf))
+ .collect(Collectors.toList());
+ ensureSameSchema(inputFiles);
+ inputFilesR.forEach(this::ensureSameSchema);
LOG.info("Start rewriting {} input file(s) {} to {}", inputFiles.size(),
options.getParquetInputFiles(), out);
- // Init reader of the first input file
- initNextReader();
+ extraMetaData.put(
+ ORIGINAL_CREATED_BY_KEY,
+ Stream.concat(inputFiles.stream(),
inputFilesR.stream().flatMap(Collection::stream))
+ .map(x -> x.getFooter().getFileMetaData().getCreatedBy())
+ .collect(Collectors.toSet())
+ .stream()
+ .reduce((a, b) -> a + "\n" + b)
+ .orElse(""));
+ Stream.concat(inputFiles.stream(),
inputFilesR.stream().flatMap(Collection::stream))
+ .forEach(x ->
extraMetaData.putAll(x.getFileMetaData().getKeyValueMetaData()));
+
+ MessageType schemaL =
inputFiles.peek().getFooter().getFileMetaData().getSchema();
+ List<MessageType> schemaR = inputFilesR.stream()
+ .map(x -> x.peek().getFooter().getFileMetaData().getSchema())
+ .collect(Collectors.toList());
+ Map<String, Type> fieldNamesL = new LinkedHashMap<>();
+ schemaL.getFields().forEach(x -> fieldNamesL.put(x.getName(), x));
+ Map<String, Type> fieldNamesR = new LinkedHashMap<>();
+ schemaR.stream().flatMap(x -> x.getFields().stream()).forEach(x -> {
+ if (fieldNamesR.containsKey(x.getName())) {
+ throw new IllegalArgumentException(
+ "Found a duplicated field `" + x.getName() + "` in the right side
file groups!");
+ }
+ fieldNamesR.put(x.getName(), x);
+ });
+ List<Type> fields = Stream.concat(
+ fieldNamesL.values().stream()
+ .map(x -> fieldNamesR.getOrDefault(
Review Comment:
Why do we need to try to get it from fieldNamesR?
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -904,4 +953,171 @@ public byte[] getDictPageAAD() {
return this.dictPageAAD;
}
}
+
+ private static class RightColumnWriter {
+ private final Queue<TransParquetFileReader> inputFiles;
+ private final ParquetRewriter parquetRewriter;
+ private final ParquetFileWriter writer;
+ private final MessageType schema;
+ private final Map<ColumnPath, ColumnDescriptor> descriptorsMap;
+ private final Map<ColumnDescriptor, ColumnReader> colReaders = new
HashMap<>();
+ private final Map<ColumnDescriptor, ColumnChunkPageWriteStore> cPageStores
= new HashMap<>();
+ private final Map<ColumnDescriptor, ColumnWriteStore> cStores = new
HashMap<>();
+ private final Map<ColumnDescriptor, ColumnWriter> cWriters = new
HashMap<>();
+ private int rowGroupIdxIn = 0;
+ private int rowGroupIdxOut = 0;
+ private int writtenFromBlock = 0;
+
+ public RightColumnWriter(Queue<TransParquetFileReader> inputFiles,
ParquetRewriter parquetRewriter)
+ throws IOException {
+ this.inputFiles = inputFiles;
+ this.parquetRewriter = parquetRewriter;
+ this.writer = parquetRewriter.writer;
+ this.schema =
inputFiles.peek().getFooter().getFileMetaData().getSchema();
+ this.descriptorsMap = this.schema.getColumns().stream()
+ .collect(Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+ initReaders();
+ initWriters();
+ }
+
+ public void writeRows(int rowGroupIdx, long rowsToWrite) throws
IOException {
+ if (rowGroupIdxIn != rowGroupIdx) {
+ rowGroupIdxIn = rowGroupIdx;
+ flushWriters();
+ initWriters();
+ }
+ while (rowsToWrite > 0) {
+ List<BlockMetaData> blocks = inputFiles.peek().getFooter().getBlocks();
+ BlockMetaData block = blocks.get(rowGroupIdxOut);
+ List<ColumnChunkMetaData> chunks = block.getColumns();
+ long leftInBlock = block.getRowCount() - writtenFromBlock;
+ long writeFromBlock = Math.min(rowsToWrite, leftInBlock);
+ for (ColumnChunkMetaData chunk : chunks) {
+ if (chunk.isEncrypted()) {
+ throw new IOException("Column " + chunk.getPath().toDotString() +
" is encrypted");
+ }
+ ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
+ copyValues(descriptor, writeFromBlock);
+ }
+ rowsToWrite -= writeFromBlock;
+ writtenFromBlock += writeFromBlock;
+ if (rowsToWrite > 0 || (block.getRowCount() == writtenFromBlock)) {
+ rowGroupIdxOut++;
+ if (rowGroupIdxOut == blocks.size()) {
+ inputFiles.poll();
+ rowGroupIdxOut = 0;
+ }
+ writtenFromBlock = 0;
+ // this is called after all rows are processed
+ initReaders();
+ }
+ }
+ flushWriters();
+ }
+
+ private void flushWriters() throws IOException {
+ cStores.values().forEach(cStore -> {
+ cStore.flush();
+ cStore.close();
+ });
+ cWriters.values().forEach(ColumnWriter::close);
+ for (ColumnDescriptor descriptor : descriptorsMap.values()) {
+ if (cPageStores.containsKey(descriptor))
+ cPageStores.get(descriptor).flushToFileWriter(writer);
+ }
+ cStores.clear();
+ cWriters.clear();
+ cPageStores.clear();
+ }
+
+ private void initWriters() {
+ if (!inputFiles.isEmpty()) {
+ List<BlockMetaData> blocks = inputFiles.peek().getFooter().getBlocks();
+ descriptorsMap.forEach((columnPath, descriptor) -> {
+ ColumnChunkMetaData chunk =
blocks.get(rowGroupIdxOut).getColumns().stream()
+ .filter(x -> x.getPath() == columnPath)
+ .findFirst()
+ .orElseThrow(() -> new IllegalStateException(
+ "Could not find column [" + columnPath.toDotString() +
"]."));
+ int bloomFilterLength = chunk.getBloomFilterLength();
+ ParquetProperties.WriterVersion writerVersion =
+ chunk.getEncodingStats().usesV2Pages()
+ ? ParquetProperties.WriterVersion.PARQUET_2_0
+ : ParquetProperties.WriterVersion.PARQUET_1_0;
+ ParquetProperties props = ParquetProperties.builder()
+ .withWriterVersion(writerVersion)
+ .withBloomFilterEnabled(bloomFilterLength > 0)
+ .build();
+ CodecFactory codecFactory = new CodecFactory(new Configuration(),
props.getPageSizeThreshold());
+ CompressionCodecFactory.BytesInputCompressor compressor =
+ codecFactory.getCompressor(chunk.getCodec());
Review Comment:
This uses the same codec in the input file. Should we respect the
trans-codec rewriter options?
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -904,4 +953,171 @@ public byte[] getDictPageAAD() {
return this.dictPageAAD;
}
}
+
+ private static class RightColumnWriter {
+ private final Queue<TransParquetFileReader> inputFiles;
+ private final ParquetRewriter parquetRewriter;
+ private final ParquetFileWriter writer;
+ private final MessageType schema;
+ private final Map<ColumnPath, ColumnDescriptor> descriptorsMap;
+ private final Map<ColumnDescriptor, ColumnReader> colReaders = new
HashMap<>();
+ private final Map<ColumnDescriptor, ColumnChunkPageWriteStore> cPageStores
= new HashMap<>();
+ private final Map<ColumnDescriptor, ColumnWriteStore> cStores = new
HashMap<>();
+ private final Map<ColumnDescriptor, ColumnWriter> cWriters = new
HashMap<>();
+ private int rowGroupIdxIn = 0;
+ private int rowGroupIdxOut = 0;
+ private int writtenFromBlock = 0;
+
+ public RightColumnWriter(Queue<TransParquetFileReader> inputFiles,
ParquetRewriter parquetRewriter)
+ throws IOException {
+ this.inputFiles = inputFiles;
+ this.parquetRewriter = parquetRewriter;
+ this.writer = parquetRewriter.writer;
+ this.schema =
inputFiles.peek().getFooter().getFileMetaData().getSchema();
+ this.descriptorsMap = this.schema.getColumns().stream()
+ .collect(Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+ initReaders();
+ initWriters();
+ }
+
+ public void writeRows(int rowGroupIdx, long rowsToWrite) throws
IOException {
+ if (rowGroupIdxIn != rowGroupIdx) {
+ rowGroupIdxIn = rowGroupIdx;
+ flushWriters();
+ initWriters();
+ }
+ while (rowsToWrite > 0) {
+ List<BlockMetaData> blocks = inputFiles.peek().getFooter().getBlocks();
+ BlockMetaData block = blocks.get(rowGroupIdxOut);
+ List<ColumnChunkMetaData> chunks = block.getColumns();
+ long leftInBlock = block.getRowCount() - writtenFromBlock;
+ long writeFromBlock = Math.min(rowsToWrite, leftInBlock);
+ for (ColumnChunkMetaData chunk : chunks) {
+ if (chunk.isEncrypted()) {
+ throw new IOException("Column " + chunk.getPath().toDotString() +
" is encrypted");
+ }
+ ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
+ copyValues(descriptor, writeFromBlock);
+ }
+ rowsToWrite -= writeFromBlock;
+ writtenFromBlock += writeFromBlock;
+ if (rowsToWrite > 0 || (block.getRowCount() == writtenFromBlock)) {
+ rowGroupIdxOut++;
+ if (rowGroupIdxOut == blocks.size()) {
+ inputFiles.poll();
+ rowGroupIdxOut = 0;
+ }
+ writtenFromBlock = 0;
+ // this is called after all rows are processed
+ initReaders();
+ }
+ }
+ flushWriters();
+ }
+
+ private void flushWriters() throws IOException {
+ cStores.values().forEach(cStore -> {
+ cStore.flush();
+ cStore.close();
+ });
+ cWriters.values().forEach(ColumnWriter::close);
+ for (ColumnDescriptor descriptor : descriptorsMap.values()) {
+ if (cPageStores.containsKey(descriptor))
+ cPageStores.get(descriptor).flushToFileWriter(writer);
+ }
+ cStores.clear();
+ cWriters.clear();
+ cPageStores.clear();
+ }
+
+ private void initWriters() {
+ if (!inputFiles.isEmpty()) {
+ List<BlockMetaData> blocks = inputFiles.peek().getFooter().getBlocks();
+ descriptorsMap.forEach((columnPath, descriptor) -> {
+ ColumnChunkMetaData chunk =
blocks.get(rowGroupIdxOut).getColumns().stream()
+ .filter(x -> x.getPath() == columnPath)
+ .findFirst()
+ .orElseThrow(() -> new IllegalStateException(
+ "Could not find column [" + columnPath.toDotString() +
"]."));
+ int bloomFilterLength = chunk.getBloomFilterLength();
+ ParquetProperties.WriterVersion writerVersion =
+ chunk.getEncodingStats().usesV2Pages()
+ ? ParquetProperties.WriterVersion.PARQUET_2_0
+ : ParquetProperties.WriterVersion.PARQUET_1_0;
+ ParquetProperties props = ParquetProperties.builder()
+ .withWriterVersion(writerVersion)
+ .withBloomFilterEnabled(bloomFilterLength > 0)
+ .build();
+ CodecFactory codecFactory = new CodecFactory(new Configuration(),
props.getPageSizeThreshold());
+ CompressionCodecFactory.BytesInputCompressor compressor =
+ codecFactory.getCompressor(chunk.getCodec());
+
+ MessageType columnSchema = parquetRewriter.newSchema(schema,
descriptor);
+ ColumnChunkPageWriteStore cPageStore = new ColumnChunkPageWriteStore(
+ compressor,
+ columnSchema,
+ props.getAllocator(),
+ props.getColumnIndexTruncateLength(),
+ props.getPageWriteChecksumEnabled(),
+ writer.getEncryptor(),
Review Comment:
IIUC, this line makes sure that right side column can be encrypted.
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -137,16 +175,34 @@ public ParquetRewriter(RewriteOptions options) throws
IOException {
getPaths(schema, paths, null);
for (String col : pruneColumns) {
if (!paths.contains(col)) {
- LOG.warn("Input column name {} doesn't show up in the schema of file
{}", col, reader.getFile());
+ LOG.warn("Input column name {} doesn't show up in the schema", col);
}
}
Set<ColumnPath> prunePaths = convertToColumnPaths(pruneColumns);
schema = pruneColumnsInSchema(schema, prunePaths);
Review Comment:
Right side columns get pruned here, but they are still written in the
RightColumnWriter. Do you plan to enable pruning columns on the right side? For
whatever reason, we need to state this in the docstring of RewriterOptions.
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -347,8 +390,14 @@ private void processBlocksFromReader(IndexCache
indexCache) throws IOException {
columnId++;
}
+ // Writing extra columns
+ for (RightColumnWriter writer : columnWritersR) {
+ writer.writeRows(rowGroupIdx, blockMetaData.getRowCount());
+ }
+
writer.endBlock();
numBlocksRewritten++;
+ rowGroupIdx++;
Review Comment:
After a second look, is passing `rowGroupIdx` really necessary? `writeRows`
method always writes a new row group any way.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]