ConeyLiu commented on code in PR #1335:
URL: https://github.com/apache/parquet-java/pull/1335#discussion_r1713562763


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -106,48 +109,63 @@ public class ParquetRewriter implements Closeable {
   private int numBlocksRewritten = 0;
   // Reader and relevant states of the in-processing input file
   private final Queue<TransParquetFileReader> inputFiles = new LinkedList<>();
-  // Schema of input files (should be the same) and to write to the output file
-  private MessageType schema = null;
-  private final Map<ColumnPath, ColumnDescriptor> descriptorsMap;
-  // The reader for the current input file
-  private TransParquetFileReader reader = null;
-  // The metadata of current reader being processed
-  private ParquetMetadata meta = null;
-  // created_by information of current reader being processed
-  private String originalCreatedBy = "";
-  // Unique created_by information from all input files
-  private final Set<String> allOriginalCreatedBys = new HashSet<>();
+  private final Queue<TransParquetFileReader> inputFilesToJoin = new 
LinkedList<>();
+  private MessageType outSchema;
   // The index cache strategy
   private final IndexCache.CacheStrategy indexCacheStrategy;
+  private final boolean overwriteInputWithJoinColumns;
 
   public ParquetRewriter(RewriteOptions options) throws IOException {
+    this.newCodecName = options.getNewCodecName();
+    this.indexCacheStrategy = options.getIndexCacheStrategy();
+    this.overwriteInputWithJoinColumns = 
options.getOverwriteInputWithJoinColumns();
     ParquetConfiguration conf = options.getParquetConfiguration();
     OutputFile out = options.getParquetOutputFile();
-    openInputFiles(options.getParquetInputFiles(), conf);
-    LOG.info("Start rewriting {} input file(s) {} to {}", inputFiles.size(), 
options.getParquetInputFiles(), out);
-
-    // Init reader of the first input file
-    initNextReader();
-
-    newCodecName = options.getNewCodecName();
-    List<String> pruneColumns = options.getPruneColumns();
-    // Prune columns if specified
-    if (pruneColumns != null && !pruneColumns.isEmpty()) {
-      List<String> paths = new ArrayList<>();
-      getPaths(schema, paths, null);
-      for (String col : pruneColumns) {
-        if (!paths.contains(col)) {
-          LOG.warn("Input column name {} doesn't show up in the schema of file 
{}", col, reader.getFile());
-        }
+    inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf));
+    
inputFilesToJoin.addAll(getFileReaders(options.getParquetInputFilesToJoin(), 
conf));
+    ensureSameSchema(inputFiles);
+    ensureSameSchema(inputFilesToJoin);

Review Comment:
   I thought this works like: `file_1<column a, column b>` join `file_2<column 
c>`, `file_3<column d>`, then produce `file_joined<column a, column b, column 
c, column d>`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to