wgtmac commented on code in PR #1273:
URL: https://github.com/apache/parquet-mr/pull/1273#discussion_r1524117636


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -115,19 +108,49 @@ public class ParquetRewriter implements Closeable {
   private ParquetMetadata meta = null;
   // created_by information of current reader being processed
   private String originalCreatedBy = "";
-  // Unique created_by information from all input files
-  private final Set<String> allOriginalCreatedBys = new HashSet<>();
   // The index cache strategy
   private final IndexCache.CacheStrategy indexCacheStrategy;
 
   public ParquetRewriter(RewriteOptions options) throws IOException {
     ParquetConfiguration conf = options.getParquetConfiguration();
     OutputFile out = options.getParquetOutputFile();
-    openInputFiles(options.getParquetInputFiles(), conf);
+    inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf));
+    List<Queue<TransParquetFileReader>> inputFilesR = 
options.getParquetInputFilesR()
+        .stream()
+        .map(x -> getFileReaders(x, conf))
+        .collect(Collectors.toList());
+    ensureSameSchema(inputFiles);
+    inputFilesR.forEach(this::ensureSameSchema);

Review Comment:
   So this imposes strict schema check here. Is it possible to loose the 
restriction to allow the files on the right side to have different schemas but 
in total all columns are available? For example: a group of right side files 
with schema below:
   - `a:bigint,b:string`, `c:double,d:binary`
   - `a:bigint`, `b:string`, `c:double`, `d:binary`
   - `a:bigint,b:string,c:double,d:binary`
   
   I know this would be complex, just asking and not sure if it is an valid 
real world use case.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -137,16 +160,31 @@ public ParquetRewriter(RewriteOptions options) throws 
IOException {
       getPaths(schema, paths, null);
       for (String col : pruneColumns) {
         if (!paths.contains(col)) {
-          LOG.warn("Input column name {} doesn't show up in the schema of file 
{}", col, reader.getFile());
+          LOG.warn("Input column name {} doesn't show up in the schema", col);
         }
       }
 
       Set<ColumnPath> prunePaths = convertToColumnPaths(pruneColumns);
       schema = pruneColumnsInSchema(schema, prunePaths);
     }
 
-    this.descriptorsMap =
-        schema.getColumns().stream().collect(Collectors.toMap(x -> 
ColumnPath.get(x.getPath()), x -> x));
+    if (inputFilesR.isEmpty()) { // TODO: find a more suitable solution
+      this.descriptorsMap =
+          schema.getColumns().stream().collect(Collectors.toMap(x -> 
ColumnPath.get(x.getPath()), x -> x));
+    } else {
+      this.descriptorsMap = schemaL.getColumns().stream()

Review Comment:
   OK, now I can answer my own question, the complex of loose schema 
enforcement on the right side may complicate `descriptorsMap` here.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -813,7 +841,7 @@ private Type extractField(GroupType candidate, Type 
targetField) {
       } else {
         Type tempField = extractField(field.asGroupType(), targetField);
         if (tempField != null) {
-          return tempField;
+          return new GroupType(candidate.getRepetition(), candidate.getName(), 
tempField);

Review Comment:
   Is it an individual bug fix?



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java:
##########
@@ -39,11 +36,20 @@
 
 /**
  * A set of options to create a ParquetRewriter.
+ *
+ * TODO find a place where to put a proper description of functionality as it 
is not trivial:
+ * ParquetRewriter allows to stitch files with the same schema into a single 
file.
+ * Note that ParquetRewriter also can be used for effectively stitch/joining 
multiple parquet files with
+ * different schemas.
+ * You can provide the main input file group and multiple right side ones. 
That is possible when:
+ * 1) the number of rows in the main and extra input groups are the same,
+ * 2) the ordering of rows in the main and extra input groups is the same.
  */
 public class RewriteOptions {
 
   private final ParquetConfiguration conf;
   private final List<InputFile> inputFiles;
+  private final List<List<InputFile>> inputFilesR;

Review Comment:
   Could we consolidate the internal member variables `inputFiles` and 
`inputFilesR`. My rough idea is to model these files into `HorizontalInputFile` 
and `VerticalInputFile`.
   - VerticalInputFile: An `InputFile` of a single parquet file which may have 
partial or full schema.
   - HorizontalInputFile: one or more `VerticalInputFile`s with same number 
rows but different schemas to form a logical wide file. It can choose one 
`VerticalInputFile` to be the left side and the others to be the right side in 
your join algorithm. If it contains only one `VerticalInputFile`, this is what 
we have today.
   
   WDYT?



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java:
##########
@@ -18,10 +18,7 @@
  */
 package org.apache.parquet.hadoop.rewrite;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
+import java.util.*;

Review Comment:
   Please avoid wildcard import.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -115,19 +108,49 @@ public class ParquetRewriter implements Closeable {
   private ParquetMetadata meta = null;
   // created_by information of current reader being processed
   private String originalCreatedBy = "";
-  // Unique created_by information from all input files
-  private final Set<String> allOriginalCreatedBys = new HashSet<>();
   // The index cache strategy
   private final IndexCache.CacheStrategy indexCacheStrategy;
 
   public ParquetRewriter(RewriteOptions options) throws IOException {
     ParquetConfiguration conf = options.getParquetConfiguration();
     OutputFile out = options.getParquetOutputFile();
-    openInputFiles(options.getParquetInputFiles(), conf);
+    inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf));
+    List<Queue<TransParquetFileReader>> inputFilesR = 
options.getParquetInputFilesR()
+        .stream()
+        .map(x -> getFileReaders(x, conf))
+        .collect(Collectors.toList());
+    ensureSameSchema(inputFiles);
+    inputFilesR.forEach(this::ensureSameSchema);
     LOG.info("Start rewriting {} input file(s) {} to {}", inputFiles.size(), 
options.getParquetInputFiles(), out);
 
-    // Init reader of the first input file
-    initNextReader();
+    extraMetaData.put(
+        ORIGINAL_CREATED_BY_KEY,
+        Stream.concat(inputFiles.stream(), 
inputFilesR.stream().flatMap(Collection::stream))
+            .map(x -> x.getFooter().getFileMetaData().getCreatedBy())
+            .collect(Collectors.toSet())
+            .stream()
+            .reduce((a, b) -> a + "\n" + b)
+            .orElse("")
+    );
+    Stream.concat(inputFiles.stream(), 
inputFilesR.stream().flatMap(Collection::stream))
+        .forEach(x -> 
extraMetaData.putAll(x.getFileMetaData().getKeyValueMetaData()));
+
+    // TODO check that schema on the left and on the right is not identical

Review Comment:
   nit: the schema check now is complex so it worths a separate method.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java:
##########
@@ -110,6 +118,27 @@ public List<Path> getInputFiles() {
         .collect(Collectors.toList());
   }
 
+  /** TODO fix documentation after addition of inputFilesR
+   * Gets the right input {@link Path}s for the rewrite if they exist for all 
input files,
+   * otherwise throws a {@link RuntimeException}.
+   *
+   * @return a {@link List} of the associated right input {@link Path}s
+   */
+  public List<List<Path>> getInputFilesR() {

Review Comment:
   I'd suggest removing this method as it exposes hadoop dependency. We might 
need to stick to `InputFile` as much as possible.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -115,19 +108,49 @@ public class ParquetRewriter implements Closeable {
   private ParquetMetadata meta = null;
   // created_by information of current reader being processed
   private String originalCreatedBy = "";
-  // Unique created_by information from all input files
-  private final Set<String> allOriginalCreatedBys = new HashSet<>();
   // The index cache strategy
   private final IndexCache.CacheStrategy indexCacheStrategy;
 
   public ParquetRewriter(RewriteOptions options) throws IOException {
     ParquetConfiguration conf = options.getParquetConfiguration();
     OutputFile out = options.getParquetOutputFile();
-    openInputFiles(options.getParquetInputFiles(), conf);
+    inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf));
+    List<Queue<TransParquetFileReader>> inputFilesR = 
options.getParquetInputFilesR()
+        .stream()
+        .map(x -> getFileReaders(x, conf))
+        .collect(Collectors.toList());
+    ensureSameSchema(inputFiles);
+    inputFilesR.forEach(this::ensureSameSchema);
     LOG.info("Start rewriting {} input file(s) {} to {}", inputFiles.size(), 
options.getParquetInputFiles(), out);
 
-    // Init reader of the first input file
-    initNextReader();
+    extraMetaData.put(

Review Comment:
   I'm skeptical of the importance of metadata from the right side files. 
However I don't have a better suggestion here.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -904,4 +932,208 @@ public byte[] getDictPageAAD() {
       return this.dictPageAAD;
     }
   }
+
+  private void processBlocksFromReaderWithStitching(IndexCache indexCache) 
throws IOException {
+    // TODO add the test for empty files joins, it should merge schemas
+    LOG.info("Rewriting input fileLeft: {}, remaining filesLeft: {}", 
reader.getFile(), inputFiles.size());
+    int rowGroupIdx = 0;
+    List<BlockMetaData> blocks = reader.getFooter().getBlocks();
+    for (BlockMetaData blockMetaData: blocks) {
+      writer.startBlock(blockMetaData.getRowCount());
+
+      // Writing the left side
+      indexCache.setBlockMetadata(blockMetaData);
+      List<ColumnChunkMetaData> chunksL = blockMetaData.getColumns();
+      for (ColumnChunkMetaData chunk : chunksL) {
+        if (chunk.isEncrypted()) { // TODO add that detail to docs
+          throw new IOException("Column " + chunk.getPath().toDotString() + " 
is encrypted");
+        }
+        ColumnDescriptor descriptorL = descriptorsMap.get(chunk.getPath());
+        if (descriptorL != null) { // descriptorL might be NULL if a column is 
from the right side of a join
+          reader.setStreamPosition(chunk.getStartingPos());
+          BloomFilter bloomFilter = indexCache.getBloomFilter(chunk);
+          ColumnIndex columnIndex = indexCache.getColumnIndex(chunk);
+          OffsetIndex offsetIndex = indexCache.getOffsetIndex(chunk);
+          writer.appendColumnChunk(descriptorL, reader.getStream(), chunk, 
bloomFilter, columnIndex, offsetIndex);
+        }
+      }
+
+      // Writing the right side
+      for (RightColumnWriter writer: columnWritersR) {

Review Comment:
   Is it possible to move this into `processBlocksFromReader`?



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -137,16 +160,31 @@ public ParquetRewriter(RewriteOptions options) throws 
IOException {
       getPaths(schema, paths, null);
       for (String col : pruneColumns) {
         if (!paths.contains(col)) {
-          LOG.warn("Input column name {} doesn't show up in the schema of file 
{}", col, reader.getFile());
+          LOG.warn("Input column name {} doesn't show up in the schema", col);
         }
       }
 
       Set<ColumnPath> prunePaths = convertToColumnPaths(pruneColumns);
       schema = pruneColumnsInSchema(schema, prunePaths);
     }
 
-    this.descriptorsMap =
-        schema.getColumns().stream().collect(Collectors.toMap(x -> 
ColumnPath.get(x.getPath()), x -> x));
+    if (inputFilesR.isEmpty()) { // TODO: find a more suitable solution
+      this.descriptorsMap =
+          schema.getColumns().stream().collect(Collectors.toMap(x -> 
ColumnPath.get(x.getPath()), x -> x));
+    } else {
+      this.descriptorsMap = schemaL.getColumns().stream()
+        .filter(x -> x.getPath().length == 0 || 
!fieldNamesR.containsKey(x.getPath()[0]))
+          .collect(Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+    }
+
+    long rowCountL = 
inputFiles.stream().mapToLong(ParquetFileReader::getRecordCount).sum();
+    inputFilesR.stream()
+        .map(x -> 
x.stream().mapToLong(ParquetFileReader::getRecordCount).sum())
+        .forEach(rowCountR -> {
+          if (rowCountL != rowCountR) {
+            throw new IllegalArgumentException("The number of records on the 
left and on the right don't match!");

Review Comment:
   nit: print the row count in the error msg.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java:
##########
@@ -110,6 +118,27 @@ public List<Path> getInputFiles() {
         .collect(Collectors.toList());
   }
 
+  /** TODO fix documentation after addition of inputFilesR
+   * Gets the right input {@link Path}s for the rewrite if they exist for all 
input files,
+   * otherwise throws a {@link RuntimeException}.
+   *
+   * @return a {@link List} of the associated right input {@link Path}s
+   */
+  public List<List<Path>> getInputFilesR() {

Review Comment:
   The getInputFiles() method above is not used anywhere. Probably we need to 
remove that as well. @amousavigourabi



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to