MaxNevermind commented on code in PR #1273:
URL: https://github.com/apache/parquet-mr/pull/1273#discussion_r1564278524


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -115,19 +121,55 @@ public class ParquetRewriter implements Closeable {
   private ParquetMetadata meta = null;
   // created_by information of current reader being processed
   private String originalCreatedBy = "";
-  // Unique created_by information from all input files
-  private final Set<String> allOriginalCreatedBys = new HashSet<>();
   // The index cache strategy
   private final IndexCache.CacheStrategy indexCacheStrategy;
 
   public ParquetRewriter(RewriteOptions options) throws IOException {
     ParquetConfiguration conf = options.getParquetConfiguration();
     OutputFile out = options.getParquetOutputFile();
-    openInputFiles(options.getParquetInputFiles(), conf);
+    inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf));
+    List<Queue<TransParquetFileReader>> inputFilesR = 
options.getParquetInputFilesR().stream()
+        .map(x -> getFileReaders(x, conf))
+        .collect(Collectors.toList());
+    ensureSameSchema(inputFiles);
+    inputFilesR.forEach(this::ensureSameSchema);
     LOG.info("Start rewriting {} input file(s) {} to {}", inputFiles.size(), 
options.getParquetInputFiles(), out);
 
-    // Init reader of the first input file
-    initNextReader();
+    extraMetaData.put(
+        ORIGINAL_CREATED_BY_KEY,
+        Stream.concat(inputFiles.stream(), 
inputFilesR.stream().flatMap(Collection::stream))
+            .map(x -> x.getFooter().getFileMetaData().getCreatedBy())
+            .collect(Collectors.toSet())
+            .stream()
+            .reduce((a, b) -> a + "\n" + b)
+            .orElse(""));
+    Stream.concat(inputFiles.stream(), 
inputFilesR.stream().flatMap(Collection::stream))
+        .forEach(x -> 
extraMetaData.putAll(x.getFileMetaData().getKeyValueMetaData()));
+
+    MessageType schemaL = 
inputFiles.peek().getFooter().getFileMetaData().getSchema();
+    List<MessageType> schemaR = inputFilesR.stream()
+        .map(x -> x.peek().getFooter().getFileMetaData().getSchema())
+        .collect(Collectors.toList());
+    Map<String, Type> fieldNamesL = new LinkedHashMap<>();
+    schemaL.getFields().forEach(x -> fieldNamesL.put(x.getName(), x));
+    Map<String, Type> fieldNamesR = new LinkedHashMap<>();
+    schemaR.stream().flatMap(x -> x.getFields().stream()).forEach(x -> {
+      if (fieldNamesR.containsKey(x.getName())) {
+        throw new IllegalArgumentException(
+            "Found a duplicated field `" + x.getName() + "` in the right side 
file groups!");
+      }
+      fieldNamesR.put(x.getName(), x);
+    });
+    List<Type> fields = Stream.concat(
+            fieldNamesL.values().stream()
+                .map(x -> fieldNamesR.getOrDefault(
+                    x.getName(), x)), // take a field on the right if we can
+            fieldNamesR.values().stream()
+                .filter(x -> !fieldNamesL.containsKey(
+                    x.getName())) // takes fields on the right if it was not 
present on the left
+            )
+        .collect(Collectors.toList());
+    schema = new MessageType(schemaL.getName(), fields);

Review Comment:
   @wgtmac you also mentioned that same problem [in this 
comment](https://github.com/apache/parquet-mr/pull/1273/files#r1547172865).
   I've just spend some time on trying to refactor this part. I tried to align 
schema column order and actual write order of columns and it is mostly doable 
but RightColumnWriter state becomes too complex, for example file queue need to 
be present for each individual column on the right. I think the code will 
become to difficult to support long term if we do that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to