wgtmac commented on code in PR #1273:
URL: https://github.com/apache/parquet-mr/pull/1273#discussion_r1547152074


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java:
##########
@@ -119,6 +130,15 @@ public List<InputFile> getParquetInputFiles() {
     return inputFiles;
   }
 
+  /** TODO fix documentation after addition of inputFilesR
+   * Gets the right {@link InputFile}s for the rewrite.
+   *
+   * @return a {@link List} of the associated right {@link InputFile}s
+   */
+  public List<List<InputFile>> getParquetInputFilesR() {

Review Comment:
   What about `InputFilesToJoinColumns` or `InputFilesToAddColumns`. This is 
not a strong opinion. Just want to clarify what to do with these input files.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -115,19 +117,55 @@ public class ParquetRewriter implements Closeable {
   private ParquetMetadata meta = null;
   // created_by information of current reader being processed
   private String originalCreatedBy = "";
-  // Unique created_by information from all input files
-  private final Set<String> allOriginalCreatedBys = new HashSet<>();
   // The index cache strategy
   private final IndexCache.CacheStrategy indexCacheStrategy;
 
   public ParquetRewriter(RewriteOptions options) throws IOException {
     ParquetConfiguration conf = options.getParquetConfiguration();
     OutputFile out = options.getParquetOutputFile();
-    openInputFiles(options.getParquetInputFiles(), conf);
+    inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf));
+    List<Queue<TransParquetFileReader>> inputFilesR = 
options.getParquetInputFilesR().stream()
+        .map(x -> getFileReaders(x, conf))
+        .collect(Collectors.toList());
+    ensureSameSchema(inputFiles);
+    inputFilesR.forEach(this::ensureSameSchema);
     LOG.info("Start rewriting {} input file(s) {} to {}", inputFiles.size(), 
options.getParquetInputFiles(), out);
 
-    // Init reader of the first input file
-    initNextReader();
+    extraMetaData.put(
+        ORIGINAL_CREATED_BY_KEY,
+        Stream.concat(inputFiles.stream(), 
inputFilesR.stream().flatMap(Collection::stream))
+            .map(x -> x.getFooter().getFileMetaData().getCreatedBy())
+            .collect(Collectors.toSet())
+            .stream()
+            .reduce((a, b) -> a + "\n" + b)
+            .orElse(""));
+    Stream.concat(inputFiles.stream(), 
inputFilesR.stream().flatMap(Collection::stream))
+        .forEach(x -> 
extraMetaData.putAll(x.getFileMetaData().getKeyValueMetaData()));
+
+    MessageType schemaL = 
inputFiles.peek().getFooter().getFileMetaData().getSchema();
+    List<MessageType> schemaR = inputFilesR.stream()
+        .map(x -> x.peek().getFooter().getFileMetaData().getSchema())
+        .collect(Collectors.toList());
+    Map<String, Type> fieldNamesL = new LinkedHashMap<>();
+    schemaL.getFields().forEach(x -> fieldNamesL.put(x.getName(), x));
+    Map<String, Type> fieldNamesR = new LinkedHashMap<>();
+    schemaR.stream().flatMap(x -> x.getFields().stream()).forEach(x -> {
+      if (fieldNamesR.containsKey(x.getName())) {
+        throw new IllegalArgumentException(
+            "Found a duplicated field `" + x.getName() + "` in the right side 
file groups!");
+      }
+      fieldNamesR.put(x.getName(), x);
+    });
+    List<Type> fields = Stream.concat(
+            fieldNamesL.values().stream()
+                .map(x -> fieldNamesR.getOrDefault(

Review Comment:
   This implicit behavior may introduce unexpected use. Does it sound better to 
add a resolve strategy for duplicate column names? By default we can pick it up 
from the right side.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -904,4 +953,171 @@ public byte[] getDictPageAAD() {
       return this.dictPageAAD;
     }
   }
+
+  private static class RightColumnWriter {
+    private final Queue<TransParquetFileReader> inputFiles;
+    private final ParquetRewriter parquetRewriter;
+    private final ParquetFileWriter writer;
+    private final MessageType schema;
+    private final Map<ColumnPath, ColumnDescriptor> descriptorsMap;
+    private final Map<ColumnDescriptor, ColumnReader> colReaders = new 
HashMap<>();
+    private final Map<ColumnDescriptor, ColumnChunkPageWriteStore> cPageStores 
= new HashMap<>();
+    private final Map<ColumnDescriptor, ColumnWriteStore> cStores = new 
HashMap<>();
+    private final Map<ColumnDescriptor, ColumnWriter> cWriters = new 
HashMap<>();
+    private int rowGroupIdxIn = 0;
+    private int rowGroupIdxOut = 0;
+    private int writtenFromBlock = 0;
+
+    public RightColumnWriter(Queue<TransParquetFileReader> inputFiles, 
ParquetRewriter parquetRewriter)
+        throws IOException {
+      this.inputFiles = inputFiles;
+      this.parquetRewriter = parquetRewriter;
+      this.writer = parquetRewriter.writer;
+      this.schema = 
inputFiles.peek().getFooter().getFileMetaData().getSchema();
+      this.descriptorsMap = this.schema.getColumns().stream()
+          .collect(Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+      initReaders();
+      initWriters();
+    }
+
+    public void writeRows(int rowGroupIdx, long rowsToWrite) throws 
IOException {
+      if (rowGroupIdxIn != rowGroupIdx) {
+        rowGroupIdxIn = rowGroupIdx;
+        flushWriters();
+        initWriters();
+      }
+      while (rowsToWrite > 0) {
+        List<BlockMetaData> blocks = inputFiles.peek().getFooter().getBlocks();
+        BlockMetaData block = blocks.get(rowGroupIdxOut);
+        List<ColumnChunkMetaData> chunks = block.getColumns();
+        long leftInBlock = block.getRowCount() - writtenFromBlock;
+        long writeFromBlock = Math.min(rowsToWrite, leftInBlock);
+        for (ColumnChunkMetaData chunk : chunks) {
+          if (chunk.isEncrypted()) {
+            throw new IOException("Column " + chunk.getPath().toDotString() + 
" is encrypted");
+          }
+          ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
+          copyValues(descriptor, writeFromBlock);
+        }
+        rowsToWrite -= writeFromBlock;
+        writtenFromBlock += writeFromBlock;
+        if (rowsToWrite > 0 || (block.getRowCount() == writtenFromBlock)) {
+          rowGroupIdxOut++;
+          if (rowGroupIdxOut == blocks.size()) {
+            inputFiles.poll();
+            rowGroupIdxOut = 0;
+          }
+          writtenFromBlock = 0;
+          // this is called after all rows are processed
+          initReaders();
+        }
+      }
+      flushWriters();
+    }
+
+    private void flushWriters() throws IOException {
+      cStores.values().forEach(cStore -> {
+        cStore.flush();
+        cStore.close();
+      });
+      cWriters.values().forEach(ColumnWriter::close);
+      for (ColumnDescriptor descriptor : descriptorsMap.values()) {
+        if (cPageStores.containsKey(descriptor))
+          cPageStores.get(descriptor).flushToFileWriter(writer);
+      }
+      cStores.clear();
+      cWriters.clear();
+      cPageStores.clear();
+    }
+
+    private void initWriters() {
+      if (!inputFiles.isEmpty()) {
+        List<BlockMetaData> blocks = inputFiles.peek().getFooter().getBlocks();
+        descriptorsMap.forEach((columnPath, descriptor) -> {
+          ColumnChunkMetaData chunk = 
blocks.get(rowGroupIdxOut).getColumns().stream()
+              .filter(x -> x.getPath() == columnPath)
+              .findFirst()
+              .orElseThrow(() -> new IllegalStateException(
+                  "Could not find column [" + columnPath.toDotString() + 
"]."));
+          int bloomFilterLength = chunk.getBloomFilterLength();
+          ParquetProperties.WriterVersion writerVersion =
+              chunk.getEncodingStats().usesV2Pages()
+                  ? ParquetProperties.WriterVersion.PARQUET_2_0
+                  : ParquetProperties.WriterVersion.PARQUET_1_0;
+          ParquetProperties props = ParquetProperties.builder()
+              .withWriterVersion(writerVersion)
+              .withBloomFilterEnabled(bloomFilterLength > 0)
+              .build();
+          CodecFactory codecFactory = new CodecFactory(new Configuration(), 
props.getPageSizeThreshold());
+          CompressionCodecFactory.BytesInputCompressor compressor =
+              codecFactory.getCompressor(chunk.getCodec());

Review Comment:
   That makes sense. I didn't mean we have to implement everything.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -137,16 +175,34 @@ public ParquetRewriter(RewriteOptions options) throws 
IOException {
       getPaths(schema, paths, null);
       for (String col : pruneColumns) {
         if (!paths.contains(col)) {
-          LOG.warn("Input column name {} doesn't show up in the schema of file 
{}", col, reader.getFile());
+          LOG.warn("Input column name {} doesn't show up in the schema", col);
         }
       }
 
       Set<ColumnPath> prunePaths = convertToColumnPaths(pruneColumns);
       schema = pruneColumnsInSchema(schema, prunePaths);
     }
 
-    this.descriptorsMap =
-        schema.getColumns().stream().collect(Collectors.toMap(x -> 
ColumnPath.get(x.getPath()), x -> x));
+    if (inputFilesR.isEmpty()) {
+      this.descriptorsMap =
+          schema.getColumns().stream().collect(Collectors.toMap(x -> 
ColumnPath.get(x.getPath()), x -> x));
+    } else { // TODO: describe in documentation that only top level column can 
be overwritten
+      this.descriptorsMap = schemaL.getColumns().stream()
+          .filter(x -> x.getPath().length == 0 || 
!fieldNamesR.containsKey(x.getPath()[0]))

Review Comment:
   IIRC, the original rewriter does not support duplicated columns. However, 
supporting it while joining files is a good idea. Just as my previous comment, 
it would be better to support explicit resolve strategy for them if that is not 
too complicated.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -904,4 +953,171 @@ public byte[] getDictPageAAD() {
       return this.dictPageAAD;
     }
   }
+
+  private static class RightColumnWriter {
+    private final Queue<TransParquetFileReader> inputFiles;
+    private final ParquetRewriter parquetRewriter;
+    private final ParquetFileWriter writer;
+    private final MessageType schema;
+    private final Map<ColumnPath, ColumnDescriptor> descriptorsMap;
+    private final Map<ColumnDescriptor, ColumnReader> colReaders = new 
HashMap<>();
+    private final Map<ColumnDescriptor, ColumnChunkPageWriteStore> cPageStores 
= new HashMap<>();
+    private final Map<ColumnDescriptor, ColumnWriteStore> cStores = new 
HashMap<>();
+    private final Map<ColumnDescriptor, ColumnWriter> cWriters = new 
HashMap<>();
+    private int rowGroupIdxIn = 0;
+    private int rowGroupIdxOut = 0;
+    private int writtenFromBlock = 0;
+
+    public RightColumnWriter(Queue<TransParquetFileReader> inputFiles, 
ParquetRewriter parquetRewriter)
+        throws IOException {
+      this.inputFiles = inputFiles;
+      this.parquetRewriter = parquetRewriter;
+      this.writer = parquetRewriter.writer;
+      this.schema = 
inputFiles.peek().getFooter().getFileMetaData().getSchema();
+      this.descriptorsMap = this.schema.getColumns().stream()
+          .collect(Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+      initReaders();
+      initWriters();
+    }
+
+    public void writeRows(int rowGroupIdx, long rowsToWrite) throws 
IOException {
+      if (rowGroupIdxIn != rowGroupIdx) {
+        rowGroupIdxIn = rowGroupIdx;
+        flushWriters();
+        initWriters();
+      }
+      while (rowsToWrite > 0) {
+        List<BlockMetaData> blocks = inputFiles.peek().getFooter().getBlocks();
+        BlockMetaData block = blocks.get(rowGroupIdxOut);
+        List<ColumnChunkMetaData> chunks = block.getColumns();
+        long leftInBlock = block.getRowCount() - writtenFromBlock;
+        long writeFromBlock = Math.min(rowsToWrite, leftInBlock);
+        for (ColumnChunkMetaData chunk : chunks) {
+          if (chunk.isEncrypted()) {
+            throw new IOException("Column " + chunk.getPath().toDotString() + 
" is encrypted");
+          }
+          ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
+          copyValues(descriptor, writeFromBlock);
+        }
+        rowsToWrite -= writeFromBlock;
+        writtenFromBlock += writeFromBlock;
+        if (rowsToWrite > 0 || (block.getRowCount() == writtenFromBlock)) {
+          rowGroupIdxOut++;
+          if (rowGroupIdxOut == blocks.size()) {
+            inputFiles.poll();
+            rowGroupIdxOut = 0;
+          }
+          writtenFromBlock = 0;
+          // this is called after all rows are processed
+          initReaders();
+        }
+      }
+      flushWriters();
+    }
+
+    private void flushWriters() throws IOException {
+      cStores.values().forEach(cStore -> {
+        cStore.flush();
+        cStore.close();
+      });
+      cWriters.values().forEach(ColumnWriter::close);
+      for (ColumnDescriptor descriptor : descriptorsMap.values()) {
+        if (cPageStores.containsKey(descriptor))
+          cPageStores.get(descriptor).flushToFileWriter(writer);

Review Comment:
   Yes, the specs do not require the column chunk order should be coupled to 
the schema order. However, I'm not sure if this will break some parquet 
implementations on the wild.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -115,19 +108,49 @@ public class ParquetRewriter implements Closeable {
   private ParquetMetadata meta = null;
   // created_by information of current reader being processed
   private String originalCreatedBy = "";
-  // Unique created_by information from all input files
-  private final Set<String> allOriginalCreatedBys = new HashSet<>();
   // The index cache strategy
   private final IndexCache.CacheStrategy indexCacheStrategy;
 
   public ParquetRewriter(RewriteOptions options) throws IOException {
     ParquetConfiguration conf = options.getParquetConfiguration();
     OutputFile out = options.getParquetOutputFile();
-    openInputFiles(options.getParquetInputFiles(), conf);
+    inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf));
+    List<Queue<TransParquetFileReader>> inputFilesR = 
options.getParquetInputFilesR()
+        .stream()
+        .map(x -> getFileReaders(x, conf))
+        .collect(Collectors.toList());
+    ensureSameSchema(inputFiles);
+    inputFilesR.forEach(this::ensureSameSchema);
     LOG.info("Start rewriting {} input file(s) {} to {}", inputFiles.size(), 
options.getParquetInputFiles(), out);
 
-    // Init reader of the first input file
-    initNextReader();
+    extraMetaData.put(

Review Comment:
   I'm a little concerned about the possible explosion of the metadata. Should 
we add an option to ignore those on the right side? Or at least add a comment 
to raise the potential issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to