MaxNevermind commented on code in PR #1273:
URL: https://github.com/apache/parquet-mr/pull/1273#discussion_r1536910428


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java:
##########
@@ -119,6 +130,15 @@ public List<InputFile> getParquetInputFiles() {
     return inputFiles;
   }
 
+  /** TODO fix documentation after addition of inputFilesR
+   * Gets the right {@link InputFile}s for the rewrite.
+   *
+   * @return a {@link List} of the associated right {@link InputFile}s
+   */
+  public List<List<InputFile>> getParquetInputFilesR() {

Review Comment:
   Any preference?
   - InputFilesRight
   - InputFilesExtra
   - InputFilesToJoin
   - InputFilesStitched



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -904,4 +953,171 @@ public byte[] getDictPageAAD() {
       return this.dictPageAAD;
     }
   }
+
+  private static class RightColumnWriter {
+    private final Queue<TransParquetFileReader> inputFiles;
+    private final ParquetRewriter parquetRewriter;
+    private final ParquetFileWriter writer;
+    private final MessageType schema;
+    private final Map<ColumnPath, ColumnDescriptor> descriptorsMap;
+    private final Map<ColumnDescriptor, ColumnReader> colReaders = new 
HashMap<>();
+    private final Map<ColumnDescriptor, ColumnChunkPageWriteStore> cPageStores 
= new HashMap<>();
+    private final Map<ColumnDescriptor, ColumnWriteStore> cStores = new 
HashMap<>();
+    private final Map<ColumnDescriptor, ColumnWriter> cWriters = new 
HashMap<>();
+    private int rowGroupIdxIn = 0;
+    private int rowGroupIdxOut = 0;
+    private int writtenFromBlock = 0;
+
+    public RightColumnWriter(Queue<TransParquetFileReader> inputFiles, 
ParquetRewriter parquetRewriter)
+        throws IOException {
+      this.inputFiles = inputFiles;
+      this.parquetRewriter = parquetRewriter;
+      this.writer = parquetRewriter.writer;
+      this.schema = 
inputFiles.peek().getFooter().getFileMetaData().getSchema();
+      this.descriptorsMap = this.schema.getColumns().stream()
+          .collect(Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+      initReaders();
+      initWriters();
+    }
+
+    public void writeRows(int rowGroupIdx, long rowsToWrite) throws 
IOException {
+      if (rowGroupIdxIn != rowGroupIdx) {
+        rowGroupIdxIn = rowGroupIdx;
+        flushWriters();
+        initWriters();
+      }
+      while (rowsToWrite > 0) {
+        List<BlockMetaData> blocks = inputFiles.peek().getFooter().getBlocks();
+        BlockMetaData block = blocks.get(rowGroupIdxOut);
+        List<ColumnChunkMetaData> chunks = block.getColumns();
+        long leftInBlock = block.getRowCount() - writtenFromBlock;
+        long writeFromBlock = Math.min(rowsToWrite, leftInBlock);
+        for (ColumnChunkMetaData chunk : chunks) {
+          if (chunk.isEncrypted()) {
+            throw new IOException("Column " + chunk.getPath().toDotString() + 
" is encrypted");
+          }
+          ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
+          copyValues(descriptor, writeFromBlock);
+        }
+        rowsToWrite -= writeFromBlock;
+        writtenFromBlock += writeFromBlock;
+        if (rowsToWrite > 0 || (block.getRowCount() == writtenFromBlock)) {
+          rowGroupIdxOut++;
+          if (rowGroupIdxOut == blocks.size()) {
+            inputFiles.poll();
+            rowGroupIdxOut = 0;
+          }
+          writtenFromBlock = 0;
+          // this is called after all rows are processed
+          initReaders();
+        }
+      }
+      flushWriters();
+    }
+
+    private void flushWriters() throws IOException {
+      cStores.values().forEach(cStore -> {
+        cStore.flush();
+        cStore.close();
+      });
+      cWriters.values().forEach(ColumnWriter::close);
+      for (ColumnDescriptor descriptor : descriptorsMap.values()) {
+        if (cPageStores.containsKey(descriptor))
+          cPageStores.get(descriptor).flushToFileWriter(writer);
+      }
+      cStores.clear();
+      cWriters.clear();
+      cPageStores.clear();
+    }
+
+    private void initWriters() {
+      if (!inputFiles.isEmpty()) {
+        List<BlockMetaData> blocks = inputFiles.peek().getFooter().getBlocks();
+        descriptorsMap.forEach((columnPath, descriptor) -> {
+          ColumnChunkMetaData chunk = 
blocks.get(rowGroupIdxOut).getColumns().stream()
+              .filter(x -> x.getPath() == columnPath)
+              .findFirst()
+              .orElseThrow(() -> new IllegalStateException(
+                  "Could not find column [" + columnPath.toDotString() + 
"]."));
+          int bloomFilterLength = chunk.getBloomFilterLength();
+          ParquetProperties.WriterVersion writerVersion =
+              chunk.getEncodingStats().usesV2Pages()
+                  ? ParquetProperties.WriterVersion.PARQUET_2_0
+                  : ParquetProperties.WriterVersion.PARQUET_1_0;
+          ParquetProperties props = ParquetProperties.builder()
+              .withWriterVersion(writerVersion)
+              .withBloomFilterEnabled(bloomFilterLength > 0)
+              .build();
+          CodecFactory codecFactory = new CodecFactory(new Configuration(), 
props.getPageSizeThreshold());
+          CompressionCodecFactory.BytesInputCompressor compressor =
+              codecFactory.getCompressor(chunk.getCodec());

Review Comment:
   Originally I planned to skip the support for pruning/masking/codec 
changing/encryption for the right side, you can find the checks for that in 
[RewriteOptions 
builder](https://github.com/MaxNevermind/parquet-mr/blob/883e93586589f2027e9187d2b4eeda8c4e8e2b89/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java#L409).
 My reasoning was that parquet joiner is for a very niche use-case, primarily 
for large dataset stitching to save resource. Let me know if you want me to 
make those not-supported extra features.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to