MaxNevermind commented on code in PR #1273:
URL: https://github.com/apache/parquet-mr/pull/1273#discussion_r1536910479
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -904,4 +953,171 @@ public byte[] getDictPageAAD() {
return this.dictPageAAD;
}
}
+
+ private static class RightColumnWriter {
+ private final Queue<TransParquetFileReader> inputFiles;
+ private final ParquetRewriter parquetRewriter;
+ private final ParquetFileWriter writer;
+ private final MessageType schema;
+ private final Map<ColumnPath, ColumnDescriptor> descriptorsMap;
+ private final Map<ColumnDescriptor, ColumnReader> colReaders = new
HashMap<>();
+ private final Map<ColumnDescriptor, ColumnChunkPageWriteStore> cPageStores
= new HashMap<>();
+ private final Map<ColumnDescriptor, ColumnWriteStore> cStores = new
HashMap<>();
+ private final Map<ColumnDescriptor, ColumnWriter> cWriters = new
HashMap<>();
+ private int rowGroupIdxIn = 0;
+ private int rowGroupIdxOut = 0;
+ private int writtenFromBlock = 0;
+
+ public RightColumnWriter(Queue<TransParquetFileReader> inputFiles,
ParquetRewriter parquetRewriter)
+ throws IOException {
+ this.inputFiles = inputFiles;
+ this.parquetRewriter = parquetRewriter;
+ this.writer = parquetRewriter.writer;
+ this.schema =
inputFiles.peek().getFooter().getFileMetaData().getSchema();
+ this.descriptorsMap = this.schema.getColumns().stream()
+ .collect(Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+ initReaders();
+ initWriters();
+ }
+
+ public void writeRows(int rowGroupIdx, long rowsToWrite) throws
IOException {
Review Comment:
Done, renamed them to rowGroupIdxL & rowGroupIdxR, hope that improves
readability 😄
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -347,8 +390,14 @@ private void processBlocksFromReader(IndexCache
indexCache) throws IOException {
columnId++;
}
+ // Writing extra columns
+ for (RightColumnWriter writer : columnWritersR) {
+ writer.writeRows(rowGroupIdx, blockMetaData.getRowCount());
+ }
+
writer.endBlock();
numBlocksRewritten++;
+ rowGroupIdx++;
Review Comment:
You are right. Replaced rowGroupIdx with numBlocksRewritten. We still need
to pass it to RightColumnWriter because it is a static class.
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -347,8 +390,14 @@ private void processBlocksFromReader(IndexCache
indexCache) throws IOException {
columnId++;
}
+ // Writing extra columns
+ for (RightColumnWriter writer : columnWritersR) {
+ writer.writeRows(rowGroupIdx, blockMetaData.getRowCount());
Review Comment:
done
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -137,16 +175,34 @@ public ParquetRewriter(RewriteOptions options) throws
IOException {
getPaths(schema, paths, null);
for (String col : pruneColumns) {
if (!paths.contains(col)) {
- LOG.warn("Input column name {} doesn't show up in the schema of file
{}", col, reader.getFile());
+ LOG.warn("Input column name {} doesn't show up in the schema", col);
}
}
Set<ColumnPath> prunePaths = convertToColumnPaths(pruneColumns);
schema = pruneColumnsInSchema(schema, prunePaths);
}
- this.descriptorsMap =
- schema.getColumns().stream().collect(Collectors.toMap(x ->
ColumnPath.get(x.getPath()), x -> x));
+ if (inputFilesR.isEmpty()) {
+ this.descriptorsMap =
+ schema.getColumns().stream().collect(Collectors.toMap(x ->
ColumnPath.get(x.getPath()), x -> x));
+ } else { // TODO: describe in documentation that only top level column can
be overwritten
+ this.descriptorsMap = schemaL.getColumns().stream()
+ .filter(x -> x.getPath().length == 0 ||
!fieldNamesR.containsKey(x.getPath()[0]))
Review Comment:
I wanted to preserve the original capabilities of ParquetRewriter which
allowed duplicated columns as I understand, but for the right side that would
create a problem and I added that check. Let me know if you want to support for
duplicated columns in general.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]