ConeyLiu commented on code in PR #1273:
URL: https://github.com/apache/parquet-mr/pull/1273#discussion_r1558998941


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -115,19 +121,55 @@ public class ParquetRewriter implements Closeable {
   private ParquetMetadata meta = null;
   // created_by information of current reader being processed
   private String originalCreatedBy = "";
-  // Unique created_by information from all input files
-  private final Set<String> allOriginalCreatedBys = new HashSet<>();
   // The index cache strategy
   private final IndexCache.CacheStrategy indexCacheStrategy;
 
   public ParquetRewriter(RewriteOptions options) throws IOException {
     ParquetConfiguration conf = options.getParquetConfiguration();
     OutputFile out = options.getParquetOutputFile();
-    openInputFiles(options.getParquetInputFiles(), conf);
+    inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf));
+    List<Queue<TransParquetFileReader>> inputFilesR = 
options.getParquetInputFilesR().stream()
+        .map(x -> getFileReaders(x, conf))
+        .collect(Collectors.toList());
+    ensureSameSchema(inputFiles);
+    inputFilesR.forEach(this::ensureSameSchema);
     LOG.info("Start rewriting {} input file(s) {} to {}", inputFiles.size(), 
options.getParquetInputFiles(), out);
 
-    // Init reader of the first input file
-    initNextReader();
+    extraMetaData.put(
+        ORIGINAL_CREATED_BY_KEY,
+        Stream.concat(inputFiles.stream(), 
inputFilesR.stream().flatMap(Collection::stream))
+            .map(x -> x.getFooter().getFileMetaData().getCreatedBy())
+            .collect(Collectors.toSet())
+            .stream()
+            .reduce((a, b) -> a + "\n" + b)
+            .orElse(""));
+    Stream.concat(inputFiles.stream(), 
inputFilesR.stream().flatMap(Collection::stream))
+        .forEach(x -> 
extraMetaData.putAll(x.getFileMetaData().getKeyValueMetaData()));
+
+    MessageType schemaL = 
inputFiles.peek().getFooter().getFileMetaData().getSchema();
+    List<MessageType> schemaR = inputFilesR.stream()
+        .map(x -> x.peek().getFooter().getFileMetaData().getSchema())
+        .collect(Collectors.toList());
+    Map<String, Type> fieldNamesL = new LinkedHashMap<>();
+    schemaL.getFields().forEach(x -> fieldNamesL.put(x.getName(), x));
+    Map<String, Type> fieldNamesR = new LinkedHashMap<>();
+    schemaR.stream().flatMap(x -> x.getFields().stream()).forEach(x -> {
+      if (fieldNamesR.containsKey(x.getName())) {
+        throw new IllegalArgumentException(
+            "Found a duplicated field `" + x.getName() + "` in the right side 
file groups!");
+      }
+      fieldNamesR.put(x.getName(), x);
+    });
+    List<Type> fields = Stream.concat(
+            fieldNamesL.values().stream()
+                .map(x -> fieldNamesR.getOrDefault(
+                    x.getName(), x)), // take a field on the right if we can
+            fieldNamesR.values().stream()
+                .filter(x -> !fieldNamesL.containsKey(
+                    x.getName())) // takes fields on the right if it was not 
present on the left
+            )
+        .collect(Collectors.toList());
+    schema = new MessageType(schemaL.getName(), fields);

Review Comment:
   The schema does not match the actual column orders if there is any 
overwrite. We need to add ut to verify this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to