MaxNevermind commented on code in PR #1273:
URL: https://github.com/apache/parquet-mr/pull/1273#discussion_r1536910452


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -904,4 +953,171 @@ public byte[] getDictPageAAD() {
       return this.dictPageAAD;
     }
   }
+
+  private static class RightColumnWriter {
+    private final Queue<TransParquetFileReader> inputFiles;
+    private final ParquetRewriter parquetRewriter;
+    private final ParquetFileWriter writer;
+    private final MessageType schema;
+    private final Map<ColumnPath, ColumnDescriptor> descriptorsMap;
+    private final Map<ColumnDescriptor, ColumnReader> colReaders = new 
HashMap<>();
+    private final Map<ColumnDescriptor, ColumnChunkPageWriteStore> cPageStores 
= new HashMap<>();
+    private final Map<ColumnDescriptor, ColumnWriteStore> cStores = new 
HashMap<>();
+    private final Map<ColumnDescriptor, ColumnWriter> cWriters = new 
HashMap<>();
+    private int rowGroupIdxIn = 0;
+    private int rowGroupIdxOut = 0;
+    private int writtenFromBlock = 0;
+
+    public RightColumnWriter(Queue<TransParquetFileReader> inputFiles, 
ParquetRewriter parquetRewriter)
+        throws IOException {
+      this.inputFiles = inputFiles;
+      this.parquetRewriter = parquetRewriter;
+      this.writer = parquetRewriter.writer;
+      this.schema = 
inputFiles.peek().getFooter().getFileMetaData().getSchema();
+      this.descriptorsMap = this.schema.getColumns().stream()
+          .collect(Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+      initReaders();
+      initWriters();
+    }
+
+    public void writeRows(int rowGroupIdx, long rowsToWrite) throws 
IOException {
+      if (rowGroupIdxIn != rowGroupIdx) {
+        rowGroupIdxIn = rowGroupIdx;
+        flushWriters();
+        initWriters();
+      }
+      while (rowsToWrite > 0) {
+        List<BlockMetaData> blocks = inputFiles.peek().getFooter().getBlocks();
+        BlockMetaData block = blocks.get(rowGroupIdxOut);
+        List<ColumnChunkMetaData> chunks = block.getColumns();
+        long leftInBlock = block.getRowCount() - writtenFromBlock;
+        long writeFromBlock = Math.min(rowsToWrite, leftInBlock);
+        for (ColumnChunkMetaData chunk : chunks) {
+          if (chunk.isEncrypted()) {
+            throw new IOException("Column " + chunk.getPath().toDotString() + 
" is encrypted");
+          }
+          ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
+          copyValues(descriptor, writeFromBlock);
+        }
+        rowsToWrite -= writeFromBlock;
+        writtenFromBlock += writeFromBlock;
+        if (rowsToWrite > 0 || (block.getRowCount() == writtenFromBlock)) {
+          rowGroupIdxOut++;
+          if (rowGroupIdxOut == blocks.size()) {
+            inputFiles.poll();
+            rowGroupIdxOut = 0;
+          }
+          writtenFromBlock = 0;
+          // this is called after all rows are processed
+          initReaders();
+        }
+      }
+      flushWriters();
+    }
+
+    private void flushWriters() throws IOException {
+      cStores.values().forEach(cStore -> {
+        cStore.flush();
+        cStore.close();
+      });
+      cWriters.values().forEach(ColumnWriter::close);
+      for (ColumnDescriptor descriptor : descriptorsMap.values()) {
+        if (cPageStores.containsKey(descriptor))
+          cPageStores.get(descriptor).flushToFileWriter(writer);

Review Comment:
   > we may have some issues here since the column flush order does not match 
the schema order
   
   I didn't know that this might cause problems. I though that schema column 
order and actual physical order are decoupled, I made my conclusion based on 
[Parquet File Metadata 
Schema](https://github.com/apache/parquet-format#metadata) as each column chunk 
stores its metadata internally: column path and file offset, so it seems there 
is no need for them to be coupled. Also that is weird that my new test didn't 
fail as I actually overwrite a column `FloatFraction` which is in the middle of 
the left schema, and as you can see from `processBlocksFromReader` method right 
side is always written at the end after all the left side is written. I need to 
investigate more.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -904,4 +953,171 @@ public byte[] getDictPageAAD() {
       return this.dictPageAAD;
     }
   }
+
+  private static class RightColumnWriter {
+    private final Queue<TransParquetFileReader> inputFiles;
+    private final ParquetRewriter parquetRewriter;
+    private final ParquetFileWriter writer;
+    private final MessageType schema;
+    private final Map<ColumnPath, ColumnDescriptor> descriptorsMap;
+    private final Map<ColumnDescriptor, ColumnReader> colReaders = new 
HashMap<>();
+    private final Map<ColumnDescriptor, ColumnChunkPageWriteStore> cPageStores 
= new HashMap<>();
+    private final Map<ColumnDescriptor, ColumnWriteStore> cStores = new 
HashMap<>();
+    private final Map<ColumnDescriptor, ColumnWriter> cWriters = new 
HashMap<>();
+    private int rowGroupIdxIn = 0;
+    private int rowGroupIdxOut = 0;
+    private int writtenFromBlock = 0;
+
+    public RightColumnWriter(Queue<TransParquetFileReader> inputFiles, 
ParquetRewriter parquetRewriter)
+        throws IOException {
+      this.inputFiles = inputFiles;
+      this.parquetRewriter = parquetRewriter;
+      this.writer = parquetRewriter.writer;
+      this.schema = 
inputFiles.peek().getFooter().getFileMetaData().getSchema();
+      this.descriptorsMap = this.schema.getColumns().stream()
+          .collect(Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+      initReaders();
+      initWriters();
+    }
+
+    public void writeRows(int rowGroupIdx, long rowsToWrite) throws 
IOException {
+      if (rowGroupIdxIn != rowGroupIdx) {
+        rowGroupIdxIn = rowGroupIdx;
+        flushWriters();
+        initWriters();
+      }
+      while (rowsToWrite > 0) {
+        List<BlockMetaData> blocks = inputFiles.peek().getFooter().getBlocks();
+        BlockMetaData block = blocks.get(rowGroupIdxOut);
+        List<ColumnChunkMetaData> chunks = block.getColumns();
+        long leftInBlock = block.getRowCount() - writtenFromBlock;
+        long writeFromBlock = Math.min(rowsToWrite, leftInBlock);
+        for (ColumnChunkMetaData chunk : chunks) {
+          if (chunk.isEncrypted()) {
+            throw new IOException("Column " + chunk.getPath().toDotString() + 
" is encrypted");
+          }
+          ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
+          copyValues(descriptor, writeFromBlock);
+        }
+        rowsToWrite -= writeFromBlock;
+        writtenFromBlock += writeFromBlock;
+        if (rowsToWrite > 0 || (block.getRowCount() == writtenFromBlock)) {
+          rowGroupIdxOut++;
+          if (rowGroupIdxOut == blocks.size()) {
+            inputFiles.poll();
+            rowGroupIdxOut = 0;

Review Comment:
   You mean `rowGroupIdxOut++`? Yes, it is required. I can rewrite the code 
like this if it is easier to read:
   ```
           if (rowsToWrite > 0 || (block.getRowCount() == writtenFromBlock)) {
             if (rowGroupIdxR == blocks.size()) {
               inputFiles.poll();
               rowGroupIdxR = 0;
             } else {
               rowGroupIdxR++;
             }
             ...
           }
   ```



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -904,4 +953,171 @@ public byte[] getDictPageAAD() {
       return this.dictPageAAD;
     }
   }
+
+  private static class RightColumnWriter {
+    private final Queue<TransParquetFileReader> inputFiles;
+    private final ParquetRewriter parquetRewriter;
+    private final ParquetFileWriter writer;
+    private final MessageType schema;
+    private final Map<ColumnPath, ColumnDescriptor> descriptorsMap;
+    private final Map<ColumnDescriptor, ColumnReader> colReaders = new 
HashMap<>();
+    private final Map<ColumnDescriptor, ColumnChunkPageWriteStore> cPageStores 
= new HashMap<>();
+    private final Map<ColumnDescriptor, ColumnWriteStore> cStores = new 
HashMap<>();
+    private final Map<ColumnDescriptor, ColumnWriter> cWriters = new 
HashMap<>();
+    private int rowGroupIdxIn = 0;
+    private int rowGroupIdxOut = 0;
+    private int writtenFromBlock = 0;
+
+    public RightColumnWriter(Queue<TransParquetFileReader> inputFiles, 
ParquetRewriter parquetRewriter)
+        throws IOException {
+      this.inputFiles = inputFiles;
+      this.parquetRewriter = parquetRewriter;
+      this.writer = parquetRewriter.writer;
+      this.schema = 
inputFiles.peek().getFooter().getFileMetaData().getSchema();
+      this.descriptorsMap = this.schema.getColumns().stream()
+          .collect(Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+      initReaders();
+      initWriters();
+    }
+
+    public void writeRows(int rowGroupIdx, long rowsToWrite) throws 
IOException {
+      if (rowGroupIdxIn != rowGroupIdx) {

Review Comment:
   We can but right now that is not supposed to happen and I believe it would 
brake the right side tests immediately and a problem become visible through 
that. I can add something like that if you want me to:
   ```
         if (rowGroupIdxL > rowGroupIdx) {
           throw new IOException("A row group index decrease is determined in 
RightColumnWriter! Current index: " 
               + rowGroupIdxL + ", new index: " + rowGroupIdx);
         }
   ```
   Let me know



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to