MaxNevermind commented on code in PR #1273:
URL: https://github.com/apache/parquet-mr/pull/1273#discussion_r1560393432


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -115,19 +117,55 @@ public class ParquetRewriter implements Closeable {
   private ParquetMetadata meta = null;
   // created_by information of current reader being processed
   private String originalCreatedBy = "";
-  // Unique created_by information from all input files
-  private final Set<String> allOriginalCreatedBys = new HashSet<>();
   // The index cache strategy
   private final IndexCache.CacheStrategy indexCacheStrategy;
 
   public ParquetRewriter(RewriteOptions options) throws IOException {
     ParquetConfiguration conf = options.getParquetConfiguration();
     OutputFile out = options.getParquetOutputFile();
-    openInputFiles(options.getParquetInputFiles(), conf);
+    inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf));
+    List<Queue<TransParquetFileReader>> inputFilesR = 
options.getParquetInputFilesR().stream()
+        .map(x -> getFileReaders(x, conf))
+        .collect(Collectors.toList());
+    ensureSameSchema(inputFiles);
+    inputFilesR.forEach(this::ensureSameSchema);
     LOG.info("Start rewriting {} input file(s) {} to {}", inputFiles.size(), 
options.getParquetInputFiles(), out);
 
-    // Init reader of the first input file
-    initNextReader();
+    extraMetaData.put(
+        ORIGINAL_CREATED_BY_KEY,
+        Stream.concat(inputFiles.stream(), 
inputFilesR.stream().flatMap(Collection::stream))
+            .map(x -> x.getFooter().getFileMetaData().getCreatedBy())
+            .collect(Collectors.toSet())
+            .stream()
+            .reduce((a, b) -> a + "\n" + b)
+            .orElse(""));
+    Stream.concat(inputFiles.stream(), 
inputFilesR.stream().flatMap(Collection::stream))
+        .forEach(x -> 
extraMetaData.putAll(x.getFileMetaData().getKeyValueMetaData()));
+
+    MessageType schemaL = 
inputFiles.peek().getFooter().getFileMetaData().getSchema();
+    List<MessageType> schemaR = inputFilesR.stream()
+        .map(x -> x.peek().getFooter().getFileMetaData().getSchema())
+        .collect(Collectors.toList());
+    Map<String, Type> fieldNamesL = new LinkedHashMap<>();
+    schemaL.getFields().forEach(x -> fieldNamesL.put(x.getName(), x));
+    Map<String, Type> fieldNamesR = new LinkedHashMap<>();
+    schemaR.stream().flatMap(x -> x.getFields().stream()).forEach(x -> {
+      if (fieldNamesR.containsKey(x.getName())) {
+        throw new IllegalArgumentException(
+            "Found a duplicated field `" + x.getName() + "` in the right side 
file groups!");
+      }
+      fieldNamesR.put(x.getName(), x);
+    });
+    List<Type> fields = Stream.concat(
+            fieldNamesL.values().stream()
+                .map(x -> fieldNamesR.getOrDefault(

Review Comment:
   > resolve strategy for duplicate
   
   Can you please clarify what do you mean by that? You mean if the schema of 
the column on the left is "compatible" with a new column schema on the right 
side then we try to "merge" the columns? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to