wgtmac commented on code in PR #1335:
URL: https://github.com/apache/parquet-java/pull/1335#discussion_r1706401936


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java:
##########
@@ -358,6 +466,9 @@ public Builder indexCacheStrategy(IndexCache.CacheStrategy 
cacheStrategy) {
     public RewriteOptions build() {
       Preconditions.checkArgument(inputFiles != null && !inputFiles.isEmpty(), 
"Input file is required");
       Preconditions.checkArgument(outputFile != null, "Output file is 
required");
+      Preconditions.checkArgument(
+          inputFilesToJoin == null || !inputFiles.isEmpty(),

Review Comment:
   ```suggestion
             inputFilesToJoin == null || !inputFilesToJoin.isEmpty(),
   ```



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -106,48 +109,63 @@ public class ParquetRewriter implements Closeable {
   private int numBlocksRewritten = 0;
   // Reader and relevant states of the in-processing input file
   private final Queue<TransParquetFileReader> inputFiles = new LinkedList<>();
-  // Schema of input files (should be the same) and to write to the output file
-  private MessageType schema = null;
-  private final Map<ColumnPath, ColumnDescriptor> descriptorsMap;
-  // The reader for the current input file
-  private TransParquetFileReader reader = null;
-  // The metadata of current reader being processed
-  private ParquetMetadata meta = null;
-  // created_by information of current reader being processed
-  private String originalCreatedBy = "";
-  // Unique created_by information from all input files
-  private final Set<String> allOriginalCreatedBys = new HashSet<>();
+  private final Queue<TransParquetFileReader> inputFilesToJoin = new 
LinkedList<>();
+  private MessageType outSchema;
   // The index cache strategy
   private final IndexCache.CacheStrategy indexCacheStrategy;
+  private final boolean overwriteInputWithJoinColumns;
 
   public ParquetRewriter(RewriteOptions options) throws IOException {
+    this.newCodecName = options.getNewCodecName();
+    this.indexCacheStrategy = options.getIndexCacheStrategy();
+    this.overwriteInputWithJoinColumns = 
options.getOverwriteInputWithJoinColumns();
     ParquetConfiguration conf = options.getParquetConfiguration();
     OutputFile out = options.getParquetOutputFile();
-    openInputFiles(options.getParquetInputFiles(), conf);
-    LOG.info("Start rewriting {} input file(s) {} to {}", inputFiles.size(), 
options.getParquetInputFiles(), out);
-
-    // Init reader of the first input file
-    initNextReader();
-
-    newCodecName = options.getNewCodecName();
-    List<String> pruneColumns = options.getPruneColumns();
-    // Prune columns if specified
-    if (pruneColumns != null && !pruneColumns.isEmpty()) {
-      List<String> paths = new ArrayList<>();
-      getPaths(schema, paths, null);
-      for (String col : pruneColumns) {
-        if (!paths.contains(col)) {
-          LOG.warn("Input column name {} doesn't show up in the schema of file 
{}", col, reader.getFile());
-        }
+    inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf));
+    
inputFilesToJoin.addAll(getFileReaders(options.getParquetInputFilesToJoin(), 
conf));
+    ensureSameSchema(inputFiles);
+    ensureSameSchema(inputFilesToJoin);
+    LOG.info(
+        "Start rewriting {} input file(s) {} to {}",
+        inputFiles.size() + inputFilesToJoin.size(),
+        Stream.concat(options.getParquetInputFiles().stream(), 
options.getParquetInputFilesToJoin().stream())
+            .collect(Collectors.toList()),
+        out);
+
+    this.outSchema = getSchema();
+    this.outSchema = pruneColumnsInSchema(outSchema, 
options.getPruneColumns());
+
+    List<TransParquetFileReader> allFiles;

Review Comment:
   What about adding a separate `mergeExtraMetaData()` method?



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -267,100 +282,157 @@ public void close() throws IOException {
   }
 
   public void processBlocks() throws IOException {
-    while (reader != null) {
-      IndexCache indexCache = IndexCache.create(reader, 
descriptorsMap.keySet(), indexCacheStrategy, true);
-      processBlocksFromReader(indexCache);
-      indexCache.clean();
-      initNextReader();
-    }
-  }
-
-  private void processBlocksFromReader(IndexCache indexCache) throws 
IOException {
-    for (int blockId = 0; blockId < meta.getBlocks().size(); blockId++) {
-      BlockMetaData blockMetaData = meta.getBlocks().get(blockId);
-      writer.startBlock(blockMetaData.getRowCount());
-      indexCache.setBlockMetadata(blockMetaData);
-      List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();
-      for (int i = 0, columnId = 0; i < columnsInOrder.size(); i++) {
-        ColumnChunkMetaData chunk = columnsInOrder.get(i);
-        ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
-
-        // This column has been pruned.
-        if (descriptor == null) {
-          continue;
-        }
-
-        // If a column is encrypted, we simply throw exception.
-        // Later we can add a feature to trans-encrypt it with different keys
-        if (chunk.isEncrypted()) {
-          throw new IOException("Column " + chunk.getPath().toDotString() + " 
is already encrypted");
+    TransParquetFileReader readerJoin = inputFilesToJoin.peek();

Review Comment:
   Same for below variables.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -267,100 +282,157 @@ public void close() throws IOException {
   }
 
   public void processBlocks() throws IOException {
-    while (reader != null) {
-      IndexCache indexCache = IndexCache.create(reader, 
descriptorsMap.keySet(), indexCacheStrategy, true);
-      processBlocksFromReader(indexCache);
-      indexCache.clean();
-      initNextReader();
-    }
-  }
-
-  private void processBlocksFromReader(IndexCache indexCache) throws 
IOException {
-    for (int blockId = 0; blockId < meta.getBlocks().size(); blockId++) {
-      BlockMetaData blockMetaData = meta.getBlocks().get(blockId);
-      writer.startBlock(blockMetaData.getRowCount());
-      indexCache.setBlockMetadata(blockMetaData);
-      List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();
-      for (int i = 0, columnId = 0; i < columnsInOrder.size(); i++) {
-        ColumnChunkMetaData chunk = columnsInOrder.get(i);
-        ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
-
-        // This column has been pruned.
-        if (descriptor == null) {
-          continue;
-        }
-
-        // If a column is encrypted, we simply throw exception.
-        // Later we can add a feature to trans-encrypt it with different keys
-        if (chunk.isEncrypted()) {
-          throw new IOException("Column " + chunk.getPath().toDotString() + " 
is already encrypted");
+    TransParquetFileReader readerJoin = inputFilesToJoin.peek();
+    IndexCache indexCacheJoin = null;
+    int blockIdxJoin = -1;
+    List<ColumnDescriptor> outColumns = outSchema.getColumns();
+
+    while (!inputFiles.isEmpty()) {
+      TransParquetFileReader reader = inputFiles.poll();
+      LOG.info("Rewriting input file: {}, remaining files: {}", 
reader.getFile(), inputFiles.size());
+      ParquetMetadata meta = reader.getFooter();
+      Set<ColumnPath> columnPaths = 
meta.getFileMetaData().getSchema().getColumns().stream()
+          .map(x -> ColumnPath.get(x.getPath()))
+          .collect(Collectors.toSet());
+      IndexCache indexCache = IndexCache.create(reader, columnPaths, 
indexCacheStrategy, true);
+
+      for (int blockIdx = 0; blockIdx < meta.getBlocks().size(); blockIdx++) {
+        BlockMetaData blockMetaData = meta.getBlocks().get(blockIdx);
+        writer.startBlock(blockMetaData.getRowCount());
+        indexCache.setBlockMetadata(blockMetaData);
+        Map<ColumnPath, ColumnChunkMetaData> pathToChunk =
+            blockMetaData.getColumns().stream().collect(Collectors.toMap(x -> 
x.getPath(), x -> x));
+
+        if (readerJoin != null
+            && (blockIdxJoin == -1
+                || ++blockIdxJoin
+                    == readerJoin.getFooter().getBlocks().size())) {
+          blockIdxJoin = 0;
+          readerJoin = inputFilesToJoin.poll();
+          Set<ColumnPath> columnPathsJoin = 
readerJoin.getFileMetaData().getSchema().getColumns().stream()
+              .map(x -> ColumnPath.get(x.getPath()))
+              .collect(Collectors.toSet());
+          if (indexCacheJoin != null) {
+            indexCacheJoin.clean();
+          }
+          indexCacheJoin = IndexCache.create(readerJoin, columnPathsJoin, 
indexCacheStrategy, true);
+          indexCacheJoin.setBlockMetadata(
+              readerJoin.getFooter().getBlocks().get(blockIdxJoin));
+        } else {
+          blockIdxJoin++;
         }
 
-        reader.setStreamPosition(chunk.getStartingPos());
-        CompressionCodecName newCodecName = this.newCodecName == null ? 
chunk.getCodec() : this.newCodecName;
-        boolean encryptColumn =
-            encryptMode && encryptColumns != null && 
encryptColumns.contains(chunk.getPath());
-
-        if (maskColumns != null && maskColumns.containsKey(chunk.getPath())) {
-          // Mask column and compress it again.
-          MaskMode maskMode = maskColumns.get(chunk.getPath());
-          if (maskMode.equals(MaskMode.NULLIFY)) {
-            Type.Repetition repetition =
-                descriptor.getPrimitiveType().getRepetition();
-            if (repetition.equals(Type.Repetition.REQUIRED)) {
-              throw new IOException("Required column ["
-                  + descriptor.getPrimitiveType().getName() + "] cannot be 
nullified");
+        for (int outColumnIdx = 0; outColumnIdx < outColumns.size(); 
outColumnIdx++) {
+          ColumnPath colPath =
+              ColumnPath.get(outColumns.get(outColumnIdx).getPath());
+          if (readerJoin != null) {
+            Optional<ColumnChunkMetaData> chunkJoin =

Review Comment:
   ```suggestion
               Optional<ColumnChunkMetaData> chunkToJoin =
   ```



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java:
##########
@@ -39,38 +39,55 @@
 
 /**
  * A set of options to create a ParquetRewriter.
+ *
+ * TODO find a place where to put a proper description of functionality as it 
is not trivial:

Review Comment:
   It seems that we are missing a README.md on the rewriter usage.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -106,48 +109,63 @@ public class ParquetRewriter implements Closeable {
   private int numBlocksRewritten = 0;
   // Reader and relevant states of the in-processing input file
   private final Queue<TransParquetFileReader> inputFiles = new LinkedList<>();
-  // Schema of input files (should be the same) and to write to the output file
-  private MessageType schema = null;
-  private final Map<ColumnPath, ColumnDescriptor> descriptorsMap;
-  // The reader for the current input file
-  private TransParquetFileReader reader = null;
-  // The metadata of current reader being processed
-  private ParquetMetadata meta = null;
-  // created_by information of current reader being processed
-  private String originalCreatedBy = "";
-  // Unique created_by information from all input files
-  private final Set<String> allOriginalCreatedBys = new HashSet<>();
+  private final Queue<TransParquetFileReader> inputFilesToJoin = new 
LinkedList<>();
+  private MessageType outSchema;
   // The index cache strategy
   private final IndexCache.CacheStrategy indexCacheStrategy;
+  private final boolean overwriteInputWithJoinColumns;
 
   public ParquetRewriter(RewriteOptions options) throws IOException {
+    this.newCodecName = options.getNewCodecName();
+    this.indexCacheStrategy = options.getIndexCacheStrategy();
+    this.overwriteInputWithJoinColumns = 
options.getOverwriteInputWithJoinColumns();
     ParquetConfiguration conf = options.getParquetConfiguration();
     OutputFile out = options.getParquetOutputFile();
-    openInputFiles(options.getParquetInputFiles(), conf);
-    LOG.info("Start rewriting {} input file(s) {} to {}", inputFiles.size(), 
options.getParquetInputFiles(), out);
-
-    // Init reader of the first input file
-    initNextReader();
-
-    newCodecName = options.getNewCodecName();
-    List<String> pruneColumns = options.getPruneColumns();
-    // Prune columns if specified
-    if (pruneColumns != null && !pruneColumns.isEmpty()) {
-      List<String> paths = new ArrayList<>();
-      getPaths(schema, paths, null);
-      for (String col : pruneColumns) {
-        if (!paths.contains(col)) {
-          LOG.warn("Input column name {} doesn't show up in the schema of file 
{}", col, reader.getFile());
-        }
+    inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf));
+    
inputFilesToJoin.addAll(getFileReaders(options.getParquetInputFilesToJoin(), 
conf));
+    ensureSameSchema(inputFiles);
+    ensureSameSchema(inputFilesToJoin);
+    LOG.info(
+        "Start rewriting {} input file(s) {} to {}",
+        inputFiles.size() + inputFilesToJoin.size(),
+        Stream.concat(options.getParquetInputFiles().stream(), 
options.getParquetInputFilesToJoin().stream())
+            .collect(Collectors.toList()),
+        out);
+
+    this.outSchema = getSchema();
+    this.outSchema = pruneColumnsInSchema(outSchema, 
options.getPruneColumns());
+
+    List<TransParquetFileReader> allFiles;
+    if (options.getIgnoreJoinFilesMetadata()) {
+      allFiles = new ArrayList<>(inputFiles);
+    } else {
+      allFiles = Stream.concat(inputFiles.stream(), inputFilesToJoin.stream())
+          .collect(Collectors.toList());
+    }
+    extraMetaData.put(
+        ORIGINAL_CREATED_BY_KEY,
+        allFiles.stream()
+            .map(x -> x.getFooter().getFileMetaData().getCreatedBy())
+            .collect(Collectors.toSet())
+            .stream()
+            .reduce((a, b) -> a + "\n" + b)
+            .orElse(""));
+    allFiles.forEach(x -> 
extraMetaData.putAll(x.getFileMetaData().getKeyValueMetaData()));
+
+    if (!inputFilesToJoin.isEmpty()) {

Review Comment:
   What about adding a `checkRowCountToJoin()` method?



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -267,100 +282,157 @@ public void close() throws IOException {
   }
 
   public void processBlocks() throws IOException {
-    while (reader != null) {
-      IndexCache indexCache = IndexCache.create(reader, 
descriptorsMap.keySet(), indexCacheStrategy, true);
-      processBlocksFromReader(indexCache);
-      indexCache.clean();
-      initNextReader();
-    }
-  }
-
-  private void processBlocksFromReader(IndexCache indexCache) throws 
IOException {
-    for (int blockId = 0; blockId < meta.getBlocks().size(); blockId++) {
-      BlockMetaData blockMetaData = meta.getBlocks().get(blockId);
-      writer.startBlock(blockMetaData.getRowCount());
-      indexCache.setBlockMetadata(blockMetaData);
-      List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();
-      for (int i = 0, columnId = 0; i < columnsInOrder.size(); i++) {
-        ColumnChunkMetaData chunk = columnsInOrder.get(i);
-        ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
-
-        // This column has been pruned.
-        if (descriptor == null) {
-          continue;
-        }
-
-        // If a column is encrypted, we simply throw exception.
-        // Later we can add a feature to trans-encrypt it with different keys
-        if (chunk.isEncrypted()) {
-          throw new IOException("Column " + chunk.getPath().toDotString() + " 
is already encrypted");
+    TransParquetFileReader readerJoin = inputFilesToJoin.peek();

Review Comment:
   ```suggestion
       TransParquetFileReader readerToJoin = inputFilesToJoin.peek();
   ```



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -176,22 +192,40 @@ public ParquetRewriter(RewriteOptions options) throws 
IOException {
     writer.start();
   }
 
+  private MessageType getSchema() {
+    MessageType schemaMain = 
inputFiles.peek().getFooter().getFileMetaData().getSchema();
+    if (inputFilesToJoin.isEmpty()) {
+      return schemaMain;
+    } else {
+      Map<String, Type> fieldNames = new LinkedHashMap<>();
+      schemaMain.getFields().forEach(x -> fieldNames.put(x.getName(), x));
+      inputFilesToJoin
+          .peek()
+          .getFooter()
+          .getFileMetaData()
+          .getSchema()
+          .getFields()
+          .forEach(x -> {
+            if (!fieldNames.containsKey(x.getName()) || 
overwriteInputWithJoinColumns) {
+              fieldNames.put(x.getName(), x);

Review Comment:
   Please add a log to indicate `overwriteInputWithJoinColumns` takes effect 
for any column



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java:
##########
@@ -325,6 +388,18 @@ public Builder addInputFile(Path path) {
       return this;
     }
 
+    /** TODO fix documentation after addition of InputFilesToJoin

Review Comment:
   Should we remove the TODO now?



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -267,100 +282,157 @@ public void close() throws IOException {
   }
 
   public void processBlocks() throws IOException {
-    while (reader != null) {
-      IndexCache indexCache = IndexCache.create(reader, 
descriptorsMap.keySet(), indexCacheStrategy, true);
-      processBlocksFromReader(indexCache);
-      indexCache.clean();
-      initNextReader();
-    }
-  }
-
-  private void processBlocksFromReader(IndexCache indexCache) throws 
IOException {
-    for (int blockId = 0; blockId < meta.getBlocks().size(); blockId++) {
-      BlockMetaData blockMetaData = meta.getBlocks().get(blockId);
-      writer.startBlock(blockMetaData.getRowCount());
-      indexCache.setBlockMetadata(blockMetaData);
-      List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();
-      for (int i = 0, columnId = 0; i < columnsInOrder.size(); i++) {
-        ColumnChunkMetaData chunk = columnsInOrder.get(i);
-        ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
-
-        // This column has been pruned.
-        if (descriptor == null) {
-          continue;
-        }
-
-        // If a column is encrypted, we simply throw exception.
-        // Later we can add a feature to trans-encrypt it with different keys
-        if (chunk.isEncrypted()) {
-          throw new IOException("Column " + chunk.getPath().toDotString() + " 
is already encrypted");
+    TransParquetFileReader readerJoin = inputFilesToJoin.peek();
+    IndexCache indexCacheJoin = null;
+    int blockIdxJoin = -1;
+    List<ColumnDescriptor> outColumns = outSchema.getColumns();
+
+    while (!inputFiles.isEmpty()) {
+      TransParquetFileReader reader = inputFiles.poll();
+      LOG.info("Rewriting input file: {}, remaining files: {}", 
reader.getFile(), inputFiles.size());
+      ParquetMetadata meta = reader.getFooter();
+      Set<ColumnPath> columnPaths = 
meta.getFileMetaData().getSchema().getColumns().stream()
+          .map(x -> ColumnPath.get(x.getPath()))
+          .collect(Collectors.toSet());
+      IndexCache indexCache = IndexCache.create(reader, columnPaths, 
indexCacheStrategy, true);
+
+      for (int blockIdx = 0; blockIdx < meta.getBlocks().size(); blockIdx++) {
+        BlockMetaData blockMetaData = meta.getBlocks().get(blockIdx);
+        writer.startBlock(blockMetaData.getRowCount());
+        indexCache.setBlockMetadata(blockMetaData);
+        Map<ColumnPath, ColumnChunkMetaData> pathToChunk =
+            blockMetaData.getColumns().stream().collect(Collectors.toMap(x -> 
x.getPath(), x -> x));
+
+        if (readerJoin != null
+            && (blockIdxJoin == -1
+                || ++blockIdxJoin
+                    == readerJoin.getFooter().getBlocks().size())) {
+          blockIdxJoin = 0;
+          readerJoin = inputFilesToJoin.poll();

Review Comment:
   It seems that close() is not called for the reader.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to