MaxNevermind commented on code in PR #1273:
URL: https://github.com/apache/parquet-mr/pull/1273#discussion_r1536910479


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -904,4 +953,171 @@ public byte[] getDictPageAAD() {
       return this.dictPageAAD;
     }
   }
+
+  private static class RightColumnWriter {
+    private final Queue<TransParquetFileReader> inputFiles;
+    private final ParquetRewriter parquetRewriter;
+    private final ParquetFileWriter writer;
+    private final MessageType schema;
+    private final Map<ColumnPath, ColumnDescriptor> descriptorsMap;
+    private final Map<ColumnDescriptor, ColumnReader> colReaders = new 
HashMap<>();
+    private final Map<ColumnDescriptor, ColumnChunkPageWriteStore> cPageStores 
= new HashMap<>();
+    private final Map<ColumnDescriptor, ColumnWriteStore> cStores = new 
HashMap<>();
+    private final Map<ColumnDescriptor, ColumnWriter> cWriters = new 
HashMap<>();
+    private int rowGroupIdxIn = 0;
+    private int rowGroupIdxOut = 0;
+    private int writtenFromBlock = 0;
+
+    public RightColumnWriter(Queue<TransParquetFileReader> inputFiles, 
ParquetRewriter parquetRewriter)
+        throws IOException {
+      this.inputFiles = inputFiles;
+      this.parquetRewriter = parquetRewriter;
+      this.writer = parquetRewriter.writer;
+      this.schema = 
inputFiles.peek().getFooter().getFileMetaData().getSchema();
+      this.descriptorsMap = this.schema.getColumns().stream()
+          .collect(Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+      initReaders();
+      initWriters();
+    }
+
+    public void writeRows(int rowGroupIdx, long rowsToWrite) throws 
IOException {

Review Comment:
   Done, renamed them to rowGroupIdxL & rowGroupIdxR, hope that improves 
readability 😄 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -347,8 +390,14 @@ private void processBlocksFromReader(IndexCache 
indexCache) throws IOException {
         columnId++;
       }
 
+      // Writing extra columns
+      for (RightColumnWriter writer : columnWritersR) {
+        writer.writeRows(rowGroupIdx, blockMetaData.getRowCount());
+      }
+
       writer.endBlock();
       numBlocksRewritten++;
+      rowGroupIdx++;

Review Comment:
   You are right. Replaced rowGroupIdx with numBlocksRewritten. We still need 
to pass it to RightColumnWriter because it is a static class.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -347,8 +390,14 @@ private void processBlocksFromReader(IndexCache 
indexCache) throws IOException {
         columnId++;
       }
 
+      // Writing extra columns
+      for (RightColumnWriter writer : columnWritersR) {
+        writer.writeRows(rowGroupIdx, blockMetaData.getRowCount());

Review Comment:
   done



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -137,16 +175,34 @@ public ParquetRewriter(RewriteOptions options) throws 
IOException {
       getPaths(schema, paths, null);
       for (String col : pruneColumns) {
         if (!paths.contains(col)) {
-          LOG.warn("Input column name {} doesn't show up in the schema of file 
{}", col, reader.getFile());
+          LOG.warn("Input column name {} doesn't show up in the schema", col);
         }
       }
 
       Set<ColumnPath> prunePaths = convertToColumnPaths(pruneColumns);
       schema = pruneColumnsInSchema(schema, prunePaths);
     }
 
-    this.descriptorsMap =
-        schema.getColumns().stream().collect(Collectors.toMap(x -> 
ColumnPath.get(x.getPath()), x -> x));
+    if (inputFilesR.isEmpty()) {
+      this.descriptorsMap =
+          schema.getColumns().stream().collect(Collectors.toMap(x -> 
ColumnPath.get(x.getPath()), x -> x));
+    } else { // TODO: describe in documentation that only top level column can 
be overwritten
+      this.descriptorsMap = schemaL.getColumns().stream()
+          .filter(x -> x.getPath().length == 0 || 
!fieldNamesR.containsKey(x.getPath()[0]))

Review Comment:
   I wanted to preserve the original capabilities of ParquetRewriter which 
allowed duplicated columns as I understand, but for the right side that would 
create a problem and I added that check. Let me know if you want to support for 
duplicated columns in general.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to