This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-java.git


The following commit(s) were added to refs/heads/master by this push:
     new 08a4e7e62 PARQUET-2430: Add parquet joiner v2 (#1335)
08a4e7e62 is described below

commit 08a4e7e6279f8c3b8558bd294fee4489a96d0db1
Author: Max Konstantinov <[email protected]>
AuthorDate: Thu Sep 19 08:30:08 2024 -0700

    PARQUET-2430: Add parquet joiner v2 (#1335)
---
 .../parquet/hadoop/rewrite/ParquetRewriter.java    | 486 +++++++++++++-------
 .../parquet/hadoop/rewrite/RewriteOptions.java     | 208 ++++++++-
 .../hadoop/rewrite/ParquetRewriterTest.java        | 489 +++++++++++++++------
 3 files changed, 865 insertions(+), 318 deletions(-)

diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
index 092568ab1..9e106fc3c 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
@@ -29,12 +29,15 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.Preconditions;
@@ -87,6 +90,40 @@ import org.apache.parquet.schema.Type;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Rewrites multiple input files into a single output file.
+ * <p>
+ * Supported functionality:
+ * <ul>
+ * <li>Merging multiple files into a single one</li>
+ * <li>Applying column transformations</li>
+ * <li><i>Joining</i> with extra files with a different schema</li>
+ * </ul>
+ * <p>
+ * Note that the total number of row groups from all input files is preserved 
in the output file.
+ * This may not be optimal if row groups are very small and will not solve 
small file problems. Instead, it will
+ * make it worse to have a large file footer in the output file.
+ * <p>
+ * <h2>Merging multiple files into a single output files</h2>
+ * Use {@link RewriteOptions.Builder}'s constructor or methods to provide 
<code>inputFiles</code>.
+ * Please note the schema of all <code>inputFiles</code> must be the same, 
otherwise the rewrite will fail.
+ * <p>
+ * <h2>Applying column transformations</h2>
+ * Some supported column transformations: pruning, masking, encrypting, 
changing a codec.
+ * See {@link RewriteOptions} and {@link RewriteOptions.Builder} for the full 
list with description.
+ * <p>
+ * <h2><i>Joining</i> with extra files with a different schema</h2>
+ * Use {@link RewriteOptions.Builder}'s constructor or methods to provide 
<code>inputFilesToJoin</code>.
+ * Please note the schema of all <code>inputFilesToJoin</code> must be the 
same, otherwise the rewrite will fail.
+ * Requirements for a <i>joining</i> the main <code>inputFiles</code>(left) 
and <code>inputFilesToJoin</code>(right):
+ * <ul>
+ * <li>the number of files might be different on the left and right,</li>
+ * <li>the schema of files inside of each group(left/right) must be the same, 
but those two schemas not necessarily should be equal,</li>
+ * <li>the total number of row groups must be the same on the left and 
right,</li>
+ * <li>the total number of rows must be the same on the left and right,</li>
+ * <li>the global ordering of rows must be the same on the left and right.</li>
+ * </ul>
+ */
 public class ParquetRewriter implements Closeable {
 
   // Key to store original writer version in the file key-value metadata
@@ -99,54 +136,39 @@ public class ParquetRewriter implements Closeable {
   private Map<ColumnPath, MaskMode> maskColumns = null;
   private Set<ColumnPath> encryptColumns = null;
   private boolean encryptMode = false;
-  private final Map<String, String> extraMetaData = new HashMap<>();
+  private final Map<String, String> extraMetaData;
   // Writer to rewrite the input files
   private final ParquetFileWriter writer;
   // Number of blocks written which is used to keep track of the actual row 
group ordinal
   private int numBlocksRewritten = 0;
   // Reader and relevant states of the in-processing input file
   private final Queue<TransParquetFileReader> inputFiles = new LinkedList<>();
-  // Schema of input files (should be the same) and to write to the output file
-  private MessageType schema = null;
-  private final Map<ColumnPath, ColumnDescriptor> descriptorsMap;
-  // The reader for the current input file
-  private TransParquetFileReader reader = null;
-  // The metadata of current reader being processed
-  private ParquetMetadata meta = null;
-  // created_by information of current reader being processed
-  private String originalCreatedBy = "";
-  // Unique created_by information from all input files
-  private final Set<String> allOriginalCreatedBys = new HashSet<>();
+  private final Queue<TransParquetFileReader> inputFilesToJoin = new 
LinkedList<>();
+  private MessageType outSchema;
   // The index cache strategy
   private final IndexCache.CacheStrategy indexCacheStrategy;
+  private final boolean overwriteInputWithJoinColumns;
 
   public ParquetRewriter(RewriteOptions options) throws IOException {
+    this.newCodecName = options.getNewCodecName();
+    this.indexCacheStrategy = options.getIndexCacheStrategy();
+    this.overwriteInputWithJoinColumns = 
options.getOverwriteInputWithJoinColumns();
     ParquetConfiguration conf = options.getParquetConfiguration();
     OutputFile out = options.getParquetOutputFile();
-    openInputFiles(options.getParquetInputFiles(), conf);
-    LOG.info("Start rewriting {} input file(s) {} to {}", inputFiles.size(), 
options.getParquetInputFiles(), out);
-
-    // Init reader of the first input file
-    initNextReader();
-
-    newCodecName = options.getNewCodecName();
-    List<String> pruneColumns = options.getPruneColumns();
-    // Prune columns if specified
-    if (pruneColumns != null && !pruneColumns.isEmpty()) {
-      List<String> paths = new ArrayList<>();
-      getPaths(schema, paths, null);
-      for (String col : pruneColumns) {
-        if (!paths.contains(col)) {
-          LOG.warn("Input column name {} doesn't show up in the schema of file 
{}", col, reader.getFile());
-        }
-      }
-
-      Set<ColumnPath> prunePaths = convertToColumnPaths(pruneColumns);
-      schema = pruneColumnsInSchema(schema, prunePaths);
-    }
-
-    this.descriptorsMap =
-        schema.getColumns().stream().collect(Collectors.toMap(x -> 
ColumnPath.get(x.getPath()), x -> x));
+    inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf));
+    
inputFilesToJoin.addAll(getFileReaders(options.getParquetInputFilesToJoin(), 
conf));
+    ensureSameSchema(inputFiles);
+    ensureSameSchema(inputFilesToJoin);
+    ensureRowCount();
+    LOG.info(
+        "Start rewriting {} input file(s) {} to {}",
+        inputFiles.size() + inputFilesToJoin.size(),
+        Stream.concat(options.getParquetInputFiles().stream(), 
options.getParquetInputFilesToJoin().stream())
+            .collect(Collectors.toList()),
+        out);
+
+    this.outSchema = pruneColumnsInSchema(getSchema(), 
options.getPruneColumns());
+    this.extraMetaData = getExtraMetadata(options);
 
     if (options.getMaskColumns() != null) {
       this.maskColumns = new HashMap<>();
@@ -160,12 +182,10 @@ public class ParquetRewriter implements Closeable {
       this.encryptMode = true;
     }
 
-    this.indexCacheStrategy = options.getIndexCacheStrategy();
-
     ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE;
     writer = new ParquetFileWriter(
         out,
-        schema,
+        outSchema,
         writerMode,
         DEFAULT_BLOCK_SIZE,
         MAX_PADDING_SIZE_DEFAULT,
@@ -176,89 +196,133 @@ public class ParquetRewriter implements Closeable {
     writer.start();
   }
 
+  // TODO: Should we mark it as deprecated to encourage the main constructor 
usage? it is also used only from
+  // deprecated classes atm
   // Ctor for legacy CompressionConverter and ColumnMasker
   public ParquetRewriter(
       TransParquetFileReader reader,
       ParquetFileWriter writer,
       ParquetMetadata meta,
-      MessageType schema,
+      MessageType outSchema,
       String originalCreatedBy,
       CompressionCodecName codecName,
       List<String> maskColumns,
       MaskMode maskMode) {
-    this.reader = reader;
     this.writer = writer;
-    this.meta = meta;
-    this.schema = schema;
-    this.descriptorsMap =
-        schema.getColumns().stream().collect(Collectors.toMap(x -> 
ColumnPath.get(x.getPath()), x -> x));
+    this.outSchema = outSchema;
     this.newCodecName = codecName;
-    originalCreatedBy = originalCreatedBy == null ? 
meta.getFileMetaData().getCreatedBy() : originalCreatedBy;
-    extraMetaData.putAll(meta.getFileMetaData().getKeyValueMetaData());
-    extraMetaData.put(ORIGINAL_CREATED_BY_KEY, originalCreatedBy);
+    extraMetaData = new 
HashMap<>(meta.getFileMetaData().getKeyValueMetaData());
+    extraMetaData.put(
+        ORIGINAL_CREATED_BY_KEY,
+        originalCreatedBy != null
+            ? originalCreatedBy
+            : meta.getFileMetaData().getCreatedBy());
     if (maskColumns != null && maskMode != null) {
       this.maskColumns = new HashMap<>();
       for (String col : maskColumns) {
         this.maskColumns.put(ColumnPath.fromDotString(col), maskMode);
       }
     }
+    this.inputFiles.add(reader);
     this.indexCacheStrategy = IndexCache.CacheStrategy.NONE;
+    this.overwriteInputWithJoinColumns = false;
   }
 
-  // Open all input files to validate their schemas are compatible to merge
-  private void openInputFiles(List<InputFile> inputFiles, ParquetConfiguration 
conf) {
-    Preconditions.checkArgument(inputFiles != null && !inputFiles.isEmpty(), 
"No input files");
+  private MessageType getSchema() {
+    MessageType schemaMain = 
inputFiles.peek().getFooter().getFileMetaData().getSchema();
+    if (inputFilesToJoin.isEmpty()) {
+      return schemaMain;
+    } else {
+      Map<String, Type> fieldNames = new LinkedHashMap<>();
+      schemaMain.getFields().forEach(x -> fieldNames.put(x.getName(), x));
+      inputFilesToJoin
+          .peek()
+          .getFooter()
+          .getFileMetaData()
+          .getSchema()
+          .getFields()
+          .forEach(x -> {
+            if (!fieldNames.containsKey(x.getName())) {
+              fieldNames.put(x.getName(), x);
+            } else if (overwriteInputWithJoinColumns) {
+              LOG.info("Column {} in inputFiles is overwritten by 
inputFilesToJoin side", x.getName());
+              fieldNames.put(x.getName(), x);
+            }
+          });
+      return new MessageType(schemaMain.getName(), new 
ArrayList<>(fieldNames.values()));
+    }
+  }
 
+  private Map<String, String> getExtraMetadata(RewriteOptions options) {
+    List<TransParquetFileReader> allFiles;
+    if (options.getIgnoreJoinFilesMetadata()) {
+      allFiles = new ArrayList<>(inputFiles);
+    } else {
+      allFiles = Stream.concat(inputFiles.stream(), inputFilesToJoin.stream())
+          .collect(Collectors.toList());
+    }
+    Map<String, String> result = new HashMap<>();
+    result.put(
+        ORIGINAL_CREATED_BY_KEY,
+        allFiles.stream()
+            .map(x -> x.getFooter().getFileMetaData().getCreatedBy())
+            .collect(Collectors.toSet())
+            .stream()
+            .reduce((a, b) -> a + "\n" + b)
+            .orElse(""));
+    allFiles.forEach(x -> 
result.putAll(x.getFileMetaData().getKeyValueMetaData()));
+    return result;
+  }
+
+  private void ensureRowCount() {
+    if (!inputFilesToJoin.isEmpty()) {
+      List<Long> blocksRowCountsL = inputFiles.stream()
+          .flatMap(x -> 
x.getFooter().getBlocks().stream().map(BlockMetaData::getRowCount))
+          .collect(Collectors.toList());
+      List<Long> blocksRowCountsR = inputFilesToJoin.stream()
+          .flatMap(x -> 
x.getFooter().getBlocks().stream().map(BlockMetaData::getRowCount))
+          .collect(Collectors.toList());
+      if (!blocksRowCountsL.equals(blocksRowCountsR)) {
+        throw new IllegalArgumentException(
+            "The number of rows in each block must match! Left blocks row 
counts: " + blocksRowCountsL
+                + ", right blocks row counts" + blocksRowCountsR + ".");
+      }
+    }
+  }
+
+  private Queue<TransParquetFileReader> getFileReaders(List<InputFile> 
inputFiles, ParquetConfiguration conf) {
+    LinkedList<TransParquetFileReader> inputFileReaders = new LinkedList<>();
     for (InputFile inputFile : inputFiles) {
       try {
         TransParquetFileReader reader = new TransParquetFileReader(
             inputFile, ParquetReadOptions.builder(conf).build());
-        MessageType inputFileSchema =
-            reader.getFooter().getFileMetaData().getSchema();
-        if (this.schema == null) {
-          this.schema = inputFileSchema;
-        } else {
-          // Now we enforce equality of schemas from input files for 
simplicity.
-          if (!this.schema.equals(inputFileSchema)) {
-            LOG.error(
-                "Input files have different schemas, expect: {}, input: {}, 
current file: {}",
-                this.schema,
-                inputFileSchema,
-                inputFile);
-            throw new InvalidSchemaException(
-                "Input files have different schemas, current file: " + 
inputFile);
-          }
-        }
-        this.allOriginalCreatedBys.add(
-            reader.getFooter().getFileMetaData().getCreatedBy());
-        this.inputFiles.add(reader);
+        inputFileReaders.add(reader);
       } catch (IOException e) {
         throw new IllegalArgumentException("Failed to open input file: " + 
inputFile, e);
       }
     }
-
-    extraMetaData.put(ORIGINAL_CREATED_BY_KEY, String.join("\n", 
allOriginalCreatedBys));
+    return inputFileReaders;
   }
 
-  // Routines to get reader of next input file and set up relevant states
-  private void initNextReader() throws IOException {
-    if (reader != null) {
-      LOG.info("Finish rewriting input file: {}", reader.getFile());
-    }
-
-    if (inputFiles.isEmpty()) {
-      reader = null;
-      meta = null;
-      originalCreatedBy = null;
-      return;
+  private void ensureSameSchema(Queue<TransParquetFileReader> 
inputFileReaders) {
+    MessageType schema = null;
+    for (TransParquetFileReader reader : inputFileReaders) {
+      MessageType newSchema = reader.getFooter().getFileMetaData().getSchema();
+      if (schema == null) {
+        schema = newSchema;
+      } else {
+        // Now we enforce equality of schemas from input files for simplicity.
+        if (!schema.equals(newSchema)) {
+          String file = reader.getFile();
+          LOG.error(
+              "Input files have different schemas, expect: {}, input: {}, 
current file: {}",
+              schema,
+              newSchema,
+              file);
+          throw new InvalidSchemaException("Input files have different 
schemas, current file: " + file);
+        }
+      }
     }
-    if (reader != null) reader.close();
-    reader = inputFiles.poll();
-    meta = reader.getFooter();
-    originalCreatedBy = meta.getFileMetaData().getCreatedBy();
-    extraMetaData.putAll(meta.getFileMetaData().getKeyValueMetaData());
-
-    LOG.info("Rewriting input file: {}, remaining files: {}", 
reader.getFile(), inputFiles.size());
   }
 
   @Override
@@ -267,92 +331,156 @@ public class ParquetRewriter implements Closeable {
   }
 
   public void processBlocks() throws IOException {
-    while (reader != null) {
-      IndexCache indexCache = IndexCache.create(reader, 
descriptorsMap.keySet(), indexCacheStrategy, true);
-      processBlocksFromReader(indexCache);
-      indexCache.clean();
-      initNextReader();
-    }
-  }
-
-  private void processBlocksFromReader(IndexCache indexCache) throws 
IOException {
-    for (int blockId = 0; blockId < meta.getBlocks().size(); blockId++) {
-      BlockMetaData blockMetaData = meta.getBlocks().get(blockId);
-      writer.startBlock(blockMetaData.getRowCount());
-      indexCache.setBlockMetadata(blockMetaData);
-      List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();
-      for (int i = 0, columnId = 0; i < columnsInOrder.size(); i++) {
-        ColumnChunkMetaData chunk = columnsInOrder.get(i);
-        ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
-
-        // This column has been pruned.
-        if (descriptor == null) {
-          continue;
-        }
-
-        // If a column is encrypted, we simply throw exception.
-        // Later we can add a feature to trans-encrypt it with different keys
-        if (chunk.isEncrypted()) {
-          throw new IOException("Column " + chunk.getPath().toDotString() + " 
is already encrypted");
+    TransParquetFileReader readerToJoin = null;
+    IndexCache indexCacheToJoin = null;
+    int blockIdxToJoin = 0;
+    List<ColumnDescriptor> outColumns = outSchema.getColumns();
+
+    while (!inputFiles.isEmpty()) {
+      TransParquetFileReader reader = inputFiles.poll();
+      LOG.info("Rewriting input file: {}, remaining files: {}", 
reader.getFile(), inputFiles.size());
+      ParquetMetadata meta = reader.getFooter();
+      Set<ColumnPath> columnPaths = 
meta.getFileMetaData().getSchema().getColumns().stream()
+          .map(x -> ColumnPath.get(x.getPath()))
+          .collect(Collectors.toSet());
+      IndexCache indexCache = IndexCache.create(reader, columnPaths, 
indexCacheStrategy, true);
+
+      for (int blockIdx = 0; blockIdx < meta.getBlocks().size(); blockIdx++) {
+        BlockMetaData blockMetaData = meta.getBlocks().get(blockIdx);
+        writer.startBlock(blockMetaData.getRowCount());
+        indexCache.setBlockMetadata(blockMetaData);
+        Map<ColumnPath, ColumnChunkMetaData> pathToChunk =
+            blockMetaData.getColumns().stream().collect(Collectors.toMap(x -> 
x.getPath(), x -> x));
+
+        if (!inputFilesToJoin.isEmpty()) {
+          if (readerToJoin == null
+              || ++blockIdxToJoin
+                  == readerToJoin.getFooter().getBlocks().size()) {
+            if (readerToJoin != null) readerToJoin.close();
+            blockIdxToJoin = 0;
+            readerToJoin = inputFilesToJoin.poll();
+            Set<ColumnPath> columnPathsToJoin =
+                
readerToJoin.getFileMetaData().getSchema().getColumns().stream()
+                    .map(x -> ColumnPath.get(x.getPath()))
+                    .collect(Collectors.toSet());
+            if (indexCacheToJoin != null) {
+              indexCacheToJoin.clean();
+            }
+            indexCacheToJoin = IndexCache.create(readerToJoin, 
columnPathsToJoin, indexCacheStrategy, true);
+            indexCacheToJoin.setBlockMetadata(
+                readerToJoin.getFooter().getBlocks().get(blockIdxToJoin));
+          } else {
+            blockIdxToJoin++;
+            indexCacheToJoin.setBlockMetadata(
+                readerToJoin.getFooter().getBlocks().get(blockIdxToJoin));
+          }
         }
 
-        reader.setStreamPosition(chunk.getStartingPos());
-        CompressionCodecName newCodecName = this.newCodecName == null ? 
chunk.getCodec() : this.newCodecName;
-        boolean encryptColumn =
-            encryptMode && encryptColumns != null && 
encryptColumns.contains(chunk.getPath());
-
-        if (maskColumns != null && maskColumns.containsKey(chunk.getPath())) {
-          // Mask column and compress it again.
-          MaskMode maskMode = maskColumns.get(chunk.getPath());
-          if (maskMode.equals(MaskMode.NULLIFY)) {
-            Type.Repetition repetition =
-                descriptor.getPrimitiveType().getRepetition();
-            if (repetition.equals(Type.Repetition.REQUIRED)) {
-              throw new IOException("Required column ["
-                  + descriptor.getPrimitiveType().getName() + "] cannot be 
nullified");
+        for (int outColumnIdx = 0; outColumnIdx < outColumns.size(); 
outColumnIdx++) {
+          ColumnPath colPath =
+              ColumnPath.get(outColumns.get(outColumnIdx).getPath());
+          if (readerToJoin != null) {
+            Optional<ColumnChunkMetaData> chunkToJoin =
+                
readerToJoin.getFooter().getBlocks().get(blockIdxToJoin).getColumns().stream()
+                    .filter(x -> x.getPath().equals(colPath))
+                    .findFirst();
+            if (chunkToJoin.isPresent()
+                && (overwriteInputWithJoinColumns || 
!columnPaths.contains(colPath))) {
+              processBlock(
+                  readerToJoin, blockIdxToJoin, outColumnIdx, 
indexCacheToJoin, chunkToJoin.get());
+            } else {
+              processBlock(reader, blockIdx, outColumnIdx, indexCache, 
pathToChunk.get(colPath));
             }
-            nullifyColumn(blockId, descriptor, chunk, writer, schema, 
newCodecName, encryptColumn);
           } else {
-            throw new UnsupportedOperationException("Only nullify is supported 
for now");
-          }
-        } else if (encryptMode || this.newCodecName != null) {
-          // Prepare encryption context
-          ColumnChunkEncryptorRunTime columnChunkEncryptorRunTime = null;
-          if (encryptMode) {
-            columnChunkEncryptorRunTime = new ColumnChunkEncryptorRunTime(
-                writer.getEncryptor(), chunk, numBlocksRewritten, columnId);
+            processBlock(reader, blockIdx, outColumnIdx, indexCache, 
pathToChunk.get(colPath));
           }
-
-          // Translate compression and/or encryption
-          writer.startColumn(descriptor, chunk.getValueCount(), newCodecName);
-          processChunk(
-              blockMetaData.getRowCount(),
-              chunk,
-              newCodecName,
-              columnChunkEncryptorRunTime,
-              encryptColumn,
-              indexCache.getBloomFilter(chunk),
-              indexCache.getColumnIndex(chunk),
-              indexCache.getOffsetIndex(chunk));
-          writer.endColumn();
-        } else {
-          // Nothing changed, simply copy the binary data.
-          BloomFilter bloomFilter = indexCache.getBloomFilter(chunk);
-          ColumnIndex columnIndex = indexCache.getColumnIndex(chunk);
-          OffsetIndex offsetIndex = indexCache.getOffsetIndex(chunk);
-          writer.appendColumnChunk(
-              descriptor, reader.getStream(), chunk, bloomFilter, columnIndex, 
offsetIndex);
         }
 
-        columnId++;
+        writer.endBlock();
+        indexCache.clean();
+        numBlocksRewritten++;
+      }
+
+      indexCache.clean();
+      LOG.info("Finish rewriting input file: {}", reader.getFile());
+      reader.close();
+    }
+    if (readerToJoin != null) readerToJoin.close();
+  }
+
+  private void processBlock(
+      TransParquetFileReader reader,
+      int blockIdx,
+      int outColumnIdx,
+      IndexCache indexCache,
+      ColumnChunkMetaData chunk)
+      throws IOException {
+    if (chunk.isEncrypted()) {
+      throw new IOException("Column " + chunk.getPath().toDotString() + " is 
already encrypted");
+    }
+    ColumnDescriptor descriptor = outSchema.getColumns().get(outColumnIdx);
+    BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(blockIdx);
+    String originalCreatedBy = reader.getFileMetaData().getCreatedBy();
+
+    reader.setStreamPosition(chunk.getStartingPos());
+    CompressionCodecName newCodecName = this.newCodecName == null ? 
chunk.getCodec() : this.newCodecName;
+    boolean encryptColumn = encryptMode && encryptColumns != null && 
encryptColumns.contains(chunk.getPath());
+
+    if (maskColumns != null && maskColumns.containsKey(chunk.getPath())) {
+      // Mask column and compress it again.
+      MaskMode maskMode = maskColumns.get(chunk.getPath());
+      if (maskMode.equals(MaskMode.NULLIFY)) {
+        Type.Repetition repetition = 
descriptor.getPrimitiveType().getRepetition();
+        if (repetition.equals(Type.Repetition.REQUIRED)) {
+          throw new IOException(
+              "Required column [" + descriptor.getPrimitiveType().getName() + 
"] cannot be nullified");
+        }
+        nullifyColumn(
+            reader,
+            blockIdx,
+            descriptor,
+            chunk,
+            writer,
+            outSchema,
+            newCodecName,
+            encryptColumn,
+            originalCreatedBy);
+      } else {
+        throw new UnsupportedOperationException("Only nullify is supported for 
now");
+      }
+    } else if (encryptMode || this.newCodecName != null) {
+      // Prepare encryption context
+      ColumnChunkEncryptorRunTime columnChunkEncryptorRunTime = null;
+      if (encryptMode) {
+        columnChunkEncryptorRunTime =
+            new ColumnChunkEncryptorRunTime(writer.getEncryptor(), chunk, 
numBlocksRewritten, outColumnIdx);
       }
 
-      writer.endBlock();
-      numBlocksRewritten++;
+      // Translate compression and/or encryption
+      writer.startColumn(descriptor, chunk.getValueCount(), newCodecName);
+      processChunk(
+          reader,
+          blockMetaData.getRowCount(),
+          chunk,
+          newCodecName,
+          columnChunkEncryptorRunTime,
+          encryptColumn,
+          indexCache.getBloomFilter(chunk),
+          indexCache.getColumnIndex(chunk),
+          indexCache.getOffsetIndex(chunk),
+          originalCreatedBy);
+      writer.endColumn();
+    } else {
+      // Nothing changed, simply copy the binary data.
+      BloomFilter bloomFilter = indexCache.getBloomFilter(chunk);
+      ColumnIndex columnIndex = indexCache.getColumnIndex(chunk);
+      OffsetIndex offsetIndex = indexCache.getOffsetIndex(chunk);
+      writer.appendColumnChunk(descriptor, reader.getStream(), chunk, 
bloomFilter, columnIndex, offsetIndex);
     }
   }
 
   private void processChunk(
+      TransParquetFileReader reader,
       long blockRowCount,
       ColumnChunkMetaData chunk,
       CompressionCodecName newCodecName,
@@ -360,7 +488,8 @@ public class ParquetRewriter implements Closeable {
       boolean encryptColumn,
       BloomFilter bloomFilter,
       ColumnIndex columnIndex,
-      OffsetIndex offsetIndex)
+      OffsetIndex offsetIndex,
+      String originalCreatedBy)
       throws IOException {
     CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0);
     CompressionCodecFactory.BytesInputDecompressor decompressor = null;
@@ -663,11 +792,24 @@ public class ParquetRewriter implements Closeable {
     }
   }
 
-  private MessageType pruneColumnsInSchema(MessageType schema, Set<ColumnPath> 
prunePaths) {
-    List<Type> fields = schema.getFields();
-    List<String> currentPath = new ArrayList<>();
-    List<Type> prunedFields = pruneColumnsInFields(fields, currentPath, 
prunePaths);
-    return new MessageType(schema.getName(), prunedFields);
+  private MessageType pruneColumnsInSchema(MessageType schema, List<String> 
pruneColumns) {
+    if (pruneColumns == null || pruneColumns.isEmpty()) {
+      return schema;
+    } else {
+      List<String> paths = new ArrayList<>();
+      getPaths(schema, paths, null);
+      for (String col : pruneColumns) {
+        if (!paths.contains(col)) {
+          LOG.warn("Input column name {} doesn't show up in the schema", col);
+        }
+      }
+      Set<ColumnPath> prunePaths = convertToColumnPaths(pruneColumns);
+
+      List<Type> fields = schema.getFields();
+      List<String> currentPath = new ArrayList<>();
+      List<Type> prunedFields = pruneColumnsInFields(fields, currentPath, 
prunePaths);
+      return new MessageType(schema.getName(), prunedFields);
+    }
   }
 
   private List<Type> pruneColumnsInFields(List<Type> fields, List<String> 
currentPath, Set<ColumnPath> prunePaths) {
@@ -711,13 +853,15 @@ public class ParquetRewriter implements Closeable {
   }
 
   private void nullifyColumn(
+      TransParquetFileReader reader,
       int blockIndex,
       ColumnDescriptor descriptor,
       ColumnChunkMetaData chunk,
       ParquetFileWriter writer,
       MessageType schema,
       CompressionCodecName newCodecName,
-      boolean encryptColumn)
+      boolean encryptColumn,
+      String originalCreatedBy)
       throws IOException {
     if (encryptColumn) {
       Preconditions.checkArgument(writer.getEncryptor() != null, "Missing 
encryptor");
@@ -813,7 +957,7 @@ public class ParquetRewriter implements Closeable {
       } else {
         Type tempField = extractField(field.asGroupType(), targetField);
         if (tempField != null) {
-          return tempField;
+          return new GroupType(candidate.getRepetition(), candidate.getName(), 
tempField);
         }
       }
     }
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
index 127af68dd..a69403f46 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
@@ -38,12 +38,13 @@ import org.apache.parquet.io.InputFile;
 import org.apache.parquet.io.OutputFile;
 
 /**
- * A set of options to create a ParquetRewriter.
+ * A set of options to create a {@link ParquetRewriter}. See {@link 
RewriteOptions.Builder} for options description.
  */
 public class RewriteOptions {
 
   private final ParquetConfiguration conf;
   private final List<InputFile> inputFiles;
+  private final List<InputFile> inputFilesToJoin;
   private final OutputFile outputFile;
   private final List<String> pruneColumns;
   private final CompressionCodecName newCodecName;
@@ -51,19 +52,25 @@ public class RewriteOptions {
   private final List<String> encryptColumns;
   private final FileEncryptionProperties fileEncryptionProperties;
   private final IndexCache.CacheStrategy indexCacheStrategy;
+  private final boolean overwriteInputWithJoinColumns;
+  private final boolean ignoreJoinFilesMetadata;
 
   private RewriteOptions(
       ParquetConfiguration conf,
       List<InputFile> inputFiles,
+      List<InputFile> inputFilesToJoin,
       OutputFile outputFile,
       List<String> pruneColumns,
       CompressionCodecName newCodecName,
       Map<String, MaskMode> maskColumns,
       List<String> encryptColumns,
       FileEncryptionProperties fileEncryptionProperties,
-      IndexCache.CacheStrategy indexCacheStrategy) {
+      IndexCache.CacheStrategy indexCacheStrategy,
+      boolean overwriteInputWithJoinColumns,
+      boolean ignoreJoinFilesMetadata) {
     this.conf = conf;
     this.inputFiles = inputFiles;
+    this.inputFilesToJoin = inputFilesToJoin;
     this.outputFile = outputFile;
     this.pruneColumns = pruneColumns;
     this.newCodecName = newCodecName;
@@ -71,6 +78,8 @@ public class RewriteOptions {
     this.encryptColumns = encryptColumns;
     this.fileEncryptionProperties = fileEncryptionProperties;
     this.indexCacheStrategy = indexCacheStrategy;
+    this.overwriteInputWithJoinColumns = overwriteInputWithJoinColumns;
+    this.ignoreJoinFilesMetadata = ignoreJoinFilesMetadata;
   }
 
   /**
@@ -110,6 +119,26 @@ public class RewriteOptions {
         .collect(Collectors.toList());
   }
 
+  /**
+   * Gets the input {@link Path}s for the rewrite if they exist for all input 
files to join,
+   * otherwise throws a {@link RuntimeException}.
+   *
+   * @return a {@link List} of the associated input {@link Path}s to join
+   */
+  public List<Path> getInputFilesToJoin() {
+    return inputFilesToJoin.stream()
+        .map(f -> {
+          if (f instanceof HadoopOutputFile) {
+            HadoopOutputFile hadoopOutputFile = (HadoopOutputFile) f;
+            return new Path(hadoopOutputFile.getPath());
+          } else {
+            throw new RuntimeException(
+                "The input files to join do not all have an associated Hadoop 
Path.");
+          }
+        })
+        .collect(Collectors.toList());
+  }
+
   /**
    * Gets the {@link InputFile}s for the rewrite.
    *
@@ -119,6 +148,15 @@ public class RewriteOptions {
     return inputFiles;
   }
 
+  /**
+   * Gets the right {@link InputFile}s to join during the rewrite.
+   *
+   * @return a {@link List} of the associated {@link InputFile}s to join
+   */
+  public List<InputFile> getParquetInputFilesToJoin() {
+    return inputFilesToJoin;
+  }
+
   /**
    * Get the {@link Path} for the rewrite if it exists, otherwise throws a 
{@link RuntimeException}.
    *
@@ -166,10 +204,19 @@ public class RewriteOptions {
     return indexCacheStrategy;
   }
 
-  // Builder to create a RewriterOptions.
+  public boolean getOverwriteInputWithJoinColumns() {
+    return overwriteInputWithJoinColumns;
+  }
+
+  public boolean getIgnoreJoinFilesMetadata() {
+    return ignoreJoinFilesMetadata;
+  }
+
+  /** Builder for {@link RewriteOptions} which is used for constructing {@link 
ParquetRewriter}.*/
   public static class Builder {
     private final ParquetConfiguration conf;
     private final List<InputFile> inputFiles;
+    private final List<InputFile> inputFilesToJoin;
     private final OutputFile outputFile;
     private List<String> pruneColumns;
     private CompressionCodecName newCodecName;
@@ -177,6 +224,24 @@ public class RewriteOptions {
     private List<String> encryptColumns;
     private FileEncryptionProperties fileEncryptionProperties;
     private IndexCache.CacheStrategy indexCacheStrategy = 
IndexCache.CacheStrategy.NONE;
+    private boolean overwriteInputWithJoinColumns = false;
+    private boolean ignoreJoinFilesMetadata = false;
+
+    /**
+     * Create a builder to create a RewriterOptions.
+     *
+     * @param conf              configuration for reading from input files and 
writing to output file
+     * @param inputFile         input file path to read from
+     * @param inputFileToJoin   input join file path to read from
+     * @param outputFile        output file path to rewrite to
+     */
+    public Builder(Configuration conf, Path inputFile, Path inputFileToJoin, 
Path outputFile) {
+      this(
+          new HadoopParquetConfiguration(conf),
+          HadoopInputFile.fromPathUnchecked(inputFile, conf),
+          HadoopInputFile.fromPathUnchecked(inputFileToJoin, conf),
+          HadoopOutputFile.fromPathUnchecked(outputFile, conf));
+    }
 
     /**
      * Create a builder to create a RewriterOptions.
@@ -200,7 +265,20 @@ public class RewriteOptions {
      * @param outputFile output file to rewrite to
      */
     public Builder(ParquetConfiguration conf, InputFile inputFile, OutputFile 
outputFile) {
-      this(conf, Collections.singletonList(inputFile), outputFile);
+      this(conf, Collections.singletonList(inputFile), null, outputFile);
+    }
+
+    /**
+     * Create a builder to create a RewriterOptions.
+     *
+     * @param conf              configuration for reading from input files and 
writing to output file
+     * @param inputFile         input file to read from
+     * @param inputFileToJoin   input join file to read from
+     * @param outputFile        output file to rewrite to
+     */
+    public Builder(
+        ParquetConfiguration conf, InputFile inputFile, InputFile 
inputFileToJoin, OutputFile outputFile) {
+      this(conf, Collections.singletonList(inputFile), 
Collections.singletonList(inputFileToJoin), outputFile);
     }
 
     /**
@@ -213,6 +291,8 @@ public class RewriteOptions {
      * if row groups are very small and will not solve small file problems. 
Instead, it will
      * make it worse to have a large file footer in the output file.
      * TODO: support rewrite by record to break the original row groups into 
reasonable ones.
+     * <p>
+     * See {@link ParquetRewriter} for more details.
      *
      * @param conf       configuration for reading from input files and 
writing to output file
      * @param inputFiles list of input file paths to read from
@@ -224,6 +304,7 @@ public class RewriteOptions {
       for (Path inputFile : inputFiles) {
         this.inputFiles.add(HadoopInputFile.fromPathUnchecked(inputFile, 
conf));
       }
+      this.inputFilesToJoin = new ArrayList<>();
       this.outputFile = HadoopOutputFile.fromPathUnchecked(outputFile, conf);
     }
 
@@ -237,6 +318,8 @@ public class RewriteOptions {
      * if row groups are very small and will not solve small file problems. 
Instead, it will
      * make it worse to have a large file footer in the output file.
      * TODO: support rewrite by record to break the original row groups into 
reasonable ones.
+     * <p>
+     * See {@link ParquetRewriter} for more details.
      *
      * @param conf       configuration for reading from input files and 
writing to output file
      * @param inputFiles list of input file paths to read from
@@ -245,6 +328,68 @@ public class RewriteOptions {
     public Builder(ParquetConfiguration conf, List<InputFile> inputFiles, 
OutputFile outputFile) {
       this.conf = conf;
       this.inputFiles = inputFiles;
+      this.inputFilesToJoin = new ArrayList<>();
+      this.outputFile = outputFile;
+    }
+
+    /**
+     * Create a builder to create a RewriterOptions.
+     * <p>
+     * Please note the schema of all files in each file group 
<code>inputFiles</code> and <code>inputFilesToJoin</code>
+     * must be the same while those two schemas can be different in comparison 
with each other.
+     * Otherwise, the rewrite will fail.
+     * <p>
+     * The rewrite will keep original row groups from all input files. This 
may not be optimal
+     * if row groups are very small and will not solve small file problems. 
Instead, it will
+     * make it worse to have a large file footer in the output file.
+     * TODO: support rewrite by record to break the original row groups into 
reasonable ones.
+     * <p>
+     * See {@link ParquetRewriter} for more details.
+     *
+     * @param conf               configuration for reading from input files 
and writing to output file
+     * @param inputFiles        list of input file paths to read from
+     * @param inputFilesToJoin  list of input join file paths to read from
+     * @param outputFile        output file path to rewrite to
+     */
+    public Builder(Configuration conf, List<Path> inputFiles, List<Path> 
inputFilesToJoin, Path outputFile) {
+      this.conf = new HadoopParquetConfiguration(conf);
+      this.inputFiles = new ArrayList<>(inputFiles.size());
+      for (Path inputFile : inputFiles) {
+        this.inputFiles.add(HadoopInputFile.fromPathUnchecked(inputFile, 
conf));
+      }
+      this.inputFilesToJoin = new ArrayList<>(inputFilesToJoin.size());
+      for (Path inputFile : inputFilesToJoin) {
+        this.inputFilesToJoin.add(HadoopInputFile.fromPathUnchecked(inputFile, 
conf));
+      }
+      this.outputFile = HadoopOutputFile.fromPathUnchecked(outputFile, conf);
+    }
+
+    /**
+     * Create a builder to create a RewriterOptions.
+     * <p>
+     * Please note the schema of all files in each file group 
<code>inputFiles</code> and <code>inputFilesToJoin</code>
+     * must be the same while those two schemas can be different in comparison 
with each other.
+     * Otherwise, the rewrite will fail.
+     * <p>
+     * The rewrite will keep original row groups from all input files. This 
may not be optimal
+     * if row groups are very small and will not solve small file problems. 
Instead, it will
+     * make it worse to have a large file footer in the output file.
+     * <p>
+     * See {@link ParquetRewriter} for more details.
+     *
+     * @param conf              configuration for reading from input files and 
writing to output file
+     * @param inputFiles        list of input file paths to read from
+     * @param inputFilesToJoin  list of input join file paths to read from
+     * @param outputFile        output file path to rewrite to
+     */
+    public Builder(
+        ParquetConfiguration conf,
+        List<InputFile> inputFiles,
+        List<InputFile> inputFilesToJoin,
+        OutputFile outputFile) {
+      this.conf = conf;
+      this.inputFiles = inputFiles;
+      this.inputFilesToJoin = inputFilesToJoin;
       this.outputFile = outputFile;
     }
 
@@ -325,6 +470,18 @@ public class RewriteOptions {
       return this;
     }
 
+    /**
+     * Add an input join file to read from.
+     *
+     * @param path input file path to read from
+     * @return self
+     */
+    public Builder addInputFileToJoinColumns(Path path) {
+      this.inputFilesToJoin.add(
+          HadoopInputFile.fromPathUnchecked(path, 
ConfigurationUtil.createHadoopConfiguration(conf)));
+      return this;
+    }
+
     /**
      * Add an input file to read from.
      *
@@ -336,6 +493,17 @@ public class RewriteOptions {
       return this;
     }
 
+    /**
+     * Add an input file to join.
+     *
+     * @param fileToJoin input file to join
+     * @return self
+     */
+    public Builder addInputFilesToJoin(InputFile fileToJoin) {
+      this.inputFilesToJoin.add(fileToJoin);
+      return this;
+    }
+
     /**
      * Set the index(ColumnIndex, Offset and BloomFilter) cache strategy.
      * <p>
@@ -350,6 +518,33 @@ public class RewriteOptions {
       return this;
     }
 
+    /**
+     * Set a flag whether columns from join files need to overwrite columns 
from the main input files.
+     * <p>
+     * By default, join files columns do not overwrite the main input file 
columns.
+     *
+     * @param overwriteInputWithJoinColumns a flag if columns from join files 
should overwrite columns
+     *                                      from the main input files
+     * @return self
+     */
+    public Builder overwriteInputWithJoinColumns(boolean 
overwriteInputWithJoinColumns) {
+      this.overwriteInputWithJoinColumns = overwriteInputWithJoinColumns;
+      return this;
+    }
+
+    /**
+     * Set a flag whether metadata from join files should be ignored.
+     * <p>
+     * By default, metadata is not ignored.
+     *
+     * @param ignoreJoinFilesMetadata a flag if metadata from join files 
should be ignored
+     * @return self
+     */
+    public Builder ignoreJoinFilesMetadata(boolean ignoreJoinFilesMetadata) {
+      this.ignoreJoinFilesMetadata = ignoreJoinFilesMetadata;
+      return this;
+    }
+
     /**
      * Build the RewriterOptions.
      *
@@ -390,13 +585,16 @@ public class RewriteOptions {
       return new RewriteOptions(
           conf,
           inputFiles,
+          (inputFilesToJoin != null ? inputFilesToJoin : new ArrayList<>()),
           outputFile,
           pruneColumns,
           newCodecName,
           maskColumns,
           encryptColumns,
           fileEncryptionProperties,
-          indexCacheStrategy);
+          indexCacheStrategy,
+          overwriteInputWithJoinColumns,
+          ignoreJoinFilesMetadata);
     }
   }
 }
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
index 0573d4e33..ce612baec 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
@@ -33,6 +33,9 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import java.io.IOException;
@@ -44,7 +47,10 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.HadoopReadOptions;
@@ -108,6 +114,7 @@ public class ParquetRewriterTest {
   private final boolean usingHadoop;
 
   private List<EncryptionTestFile> inputFiles = Lists.newArrayList();
+  private List<EncryptionTestFile> inputFilesToJoin = Lists.newArrayList();
   private String outputFile = null;
   private ParquetRewriter rewriter = null;
 
@@ -161,7 +168,7 @@ public class ParquetRewriterTest {
     rewriter.close();
 
     // Verify the schema is not changed for the columns not pruned
-    validateSchema();
+    validateSchemaWithGenderColumnPruned(false);
 
     // Verify codec has been translated
     verifyCodec(
@@ -174,17 +181,10 @@ public class ParquetRewriterTest {
         null);
 
     // Verify the data are not changed for the columns not pruned
-    validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(), 
null);
+    validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(), 
null, false);
 
     // Verify the page index
-    validatePageIndex(new HashMap<Integer, Integer>() {
-      {
-        put(0, 0);
-        put(1, 1);
-        put(2, 3);
-        put(3, 4);
-      }
-    });
+    validatePageIndex(new HashSet<>(), false);
 
     // Verify original.created.by is preserved
     validateCreatedBy();
@@ -194,6 +194,7 @@ public class ParquetRewriterTest {
   @Before
   public void setUp() {
     outputFile = TestFileBuilder.createTempFile("test");
+    inputFilesToJoin = new ArrayList<>();
   }
 
   @Test
@@ -238,7 +239,7 @@ public class ParquetRewriterTest {
     rewriter.close();
 
     // Verify the schema are not changed for the columns not pruned
-    validateSchema();
+    validateSchemaWithGenderColumnPruned(false);
 
     // Verify codec has been translated
     verifyCodec(
@@ -251,16 +252,10 @@ public class ParquetRewriterTest {
         null);
 
     // Verify the data are not changed for the columns not pruned
-    validateColumnData(new HashSet<>(pruneColumns), maskColumns.keySet(), 
null);
+    validateColumnData(new HashSet<>(pruneColumns), maskColumns.keySet(), 
null, false);
 
     // Verify the page index
-    validatePageIndex(new HashMap<Integer, Integer>() {
-      {
-        put(0, 0);
-        put(1, 1);
-        put(2, 3);
-      }
-    });
+    validatePageIndex(ImmutableSet.of("Links.Forward"), false);
 
     // Verify original.created.by is preserved
     validateCreatedBy();
@@ -318,7 +313,7 @@ public class ParquetRewriterTest {
     rewriter.close();
 
     // Verify the schema is not changed for the columns not pruned
-    validateSchema();
+    validateSchemaWithGenderColumnPruned(false);
 
     // Verify codec has been translated
     FileDecryptionProperties fileDecryptionProperties = 
EncDecProperties.getFileDecryptionProperties();
@@ -332,7 +327,7 @@ public class ParquetRewriterTest {
         fileDecryptionProperties);
 
     // Verify the data are not changed for the columns not pruned
-    validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(), 
fileDecryptionProperties);
+    validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(), 
fileDecryptionProperties, false);
 
     // Verify column encryption
     ParquetMetadata metaData = getFileMetaData(outputFile, 
fileDecryptionProperties);
@@ -493,17 +488,10 @@ public class ParquetRewriterTest {
 
     // Verify the data are not changed for non-encrypted and non-masked 
columns.
     // Also make sure the masked column is nullified.
-    validateColumnData(Collections.emptySet(), maskColumns.keySet(), 
fileDecryptionProperties);
+    validateColumnData(Collections.emptySet(), maskColumns.keySet(), 
fileDecryptionProperties, false);
 
     // Verify the page index
-    validatePageIndex(new HashMap<Integer, Integer>() {
-      {
-        put(1, 1);
-        put(2, 2);
-        put(3, 3);
-        put(4, 4);
-      }
-    });
+    validatePageIndex(ImmutableSet.of("DocId"), false);
 
     // Verify the column is encrypted
     ParquetMetadata metaData = getFileMetaData(outputFile, 
fileDecryptionProperties);
@@ -583,18 +571,10 @@ public class ParquetRewriterTest {
         null);
 
     // Verify the merged data are not changed
-    validateColumnData(Collections.emptySet(), Collections.emptySet(), null);
+    validateColumnData(Collections.emptySet(), Collections.emptySet(), null, 
false);
 
     // Verify the page index
-    validatePageIndex(new HashMap<Integer, Integer>() {
-      {
-        put(0, 0);
-        put(1, 1);
-        put(2, 2);
-        put(3, 3);
-        put(4, 4);
-      }
-    });
+    validatePageIndex(new HashSet<>(), false);
 
     // Verify original.created.by is preserved
     validateCreatedBy();
@@ -603,6 +583,15 @@ public class ParquetRewriterTest {
 
   @Test(expected = InvalidSchemaException.class)
   public void testMergeTwoFilesWithDifferentSchema() throws Exception {
+    testMergeTwoFilesWithDifferentSchemaSetup(true);
+  }
+
+  @Test(expected = InvalidSchemaException.class)
+  public void testMergeTwoFilesToJoinWithDifferentSchema() throws Exception {
+    testMergeTwoFilesWithDifferentSchemaSetup(false);
+  }
+
+  public void testMergeTwoFilesWithDifferentSchemaSetup(boolean 
wrongSchemaInInputFile) throws Exception {
     MessageType schema1 = new MessageType(
         "schema",
         new PrimitiveType(OPTIONAL, INT64, "DocId"),
@@ -625,18 +614,32 @@ public class ParquetRewriterTest {
         .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
         .withWriterVersion(writerVersion)
         .build());
-    inputFiles.add(new TestFileBuilder(conf, schema2)
+    inputFilesToJoin.add(new TestFileBuilder(conf, schema1)
         .withNumRecord(numRecord)
         .withCodec("UNCOMPRESSED")
         .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
         .withWriterVersion(writerVersion)
         .build());
-
-    List<Path> inputPaths = new ArrayList<>();
-    for (EncryptionTestFile inputFile : inputFiles) {
-      inputPaths.add(new Path(inputFile.getFileName()));
+    if (wrongSchemaInInputFile) {
+      inputFiles.add(new TestFileBuilder(conf, schema2)
+          .withNumRecord(numRecord)
+          .withCodec("UNCOMPRESSED")
+          .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+          .withWriterVersion(writerVersion)
+          .build());
+    } else {
+      inputFilesToJoin.add(new TestFileBuilder(conf, schema2)
+          .withNumRecord(numRecord)
+          .withCodec("UNCOMPRESSED")
+          .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+          .withWriterVersion(writerVersion)
+          .build());
     }
-    RewriteOptions.Builder builder = createBuilder(inputPaths);
+
+    RewriteOptions.Builder builder = createBuilder(
+        inputFiles.stream().map(x -> new 
Path(x.getFileName())).collect(Collectors.toList()),
+        inputFilesToJoin.stream().map(x -> new 
Path(x.getFileName())).collect(Collectors.toList()),
+        false);
     RewriteOptions options = 
builder.indexCacheStrategy(indexCacheStrategy).build();
 
     // This should throw an exception because the schemas are different
@@ -733,6 +736,127 @@ public class ParquetRewriterTest {
         .build());
   }
 
+  @Test
+  public void testFilesToJoinHaveDifferentRowCount() throws Exception {
+    MessageType schema1 = new MessageType("schema", new 
PrimitiveType(OPTIONAL, INT64, "DocId"));
+    MessageType schema2 = new MessageType("schema", new 
PrimitiveType(REQUIRED, BINARY, "Name"));
+    inputFiles = ImmutableList.of(
+        new TestFileBuilder(conf, schema1).withNumRecord(numRecord).build());
+    inputFilesToJoin = ImmutableList.of(
+        new TestFileBuilder(conf, schema2).withNumRecord(numRecord / 
2).build());
+    RewriteOptions.Builder builder = createBuilder(
+        inputFiles.stream().map(x -> new 
Path(x.getFileName())).collect(Collectors.toList()),
+        inputFilesToJoin.stream().map(x -> new 
Path(x.getFileName())).collect(Collectors.toList()),
+        true);
+    RewriteOptions options = builder.build();
+    try {
+      rewriter =
+          new ParquetRewriter(options); // This should throw an exception 
because the row count is different
+    } catch (RuntimeException e) {
+      assertTrue(e.getMessage().contains("The number of rows in each block 
must match"));
+    }
+  }
+
+  @Test
+  public void testOneInputFileManyInputFilesToJoinWithJoinColumnsOverwrite() 
throws Exception {
+    testOneInputFileManyInputFilesToJoinSetup(true);
+  }
+
+  @Test
+  public void 
testOneInputFileManyInputFilesToJoinWithoutJoinColumnsOverwrite() throws 
Exception {
+    testOneInputFileManyInputFilesToJoinSetup(false);
+  }
+
+  public void testOneInputFileManyInputFilesToJoinSetup(boolean 
joinColumnsOverwrite) throws Exception {
+    testOneInputFileManyInputFilesToJoinSetup();
+
+    String encryptColumn = "DocId";
+    String pruneColumn = "Gender";
+
+    FileEncryptionProperties fileEncryptionProperties = 
EncDecProperties.getFileEncryptionProperties(
+        new String[] {encryptColumn}, ParquetCipher.AES_GCM_CTR_V1, false);
+    FileDecryptionProperties fileDecryptionProperties = 
EncDecProperties.getFileDecryptionProperties();
+
+    List<Path> inputPathsL =
+        inputFiles.stream().map(x -> new 
Path(x.getFileName())).collect(Collectors.toList());
+    List<Path> inputPathsR =
+        inputFilesToJoin.stream().map(y -> new 
Path(y.getFileName())).collect(Collectors.toList());
+    List<String> pruneColumns = ImmutableList.of(pruneColumn);
+    Map<String, MaskMode> maskColumns = ImmutableMap.of(encryptColumn, 
MaskMode.NULLIFY);
+    RewriteOptions options = createBuilder(inputPathsL, inputPathsR, true)
+        .prune(pruneColumns)
+        .mask(maskColumns)
+        .transform(CompressionCodecName.ZSTD)
+        .indexCacheStrategy(indexCacheStrategy)
+        .overwriteInputWithJoinColumns(joinColumnsOverwrite)
+        .encrypt(ImmutableList.of(encryptColumn))
+        .encryptionProperties(fileEncryptionProperties)
+        .build();
+
+    rewriter = new ParquetRewriter(options);
+    rewriter.processBlocks();
+    rewriter.close();
+
+    Map<ColumnPath, List<BloomFilter>> inputBloomFilters = 
allInputBloomFilters();
+    Map<ColumnPath, List<BloomFilter>> outputBloomFilters = 
allOutputBloomFilters(fileDecryptionProperties);
+    Set<ColumnPath> schemaRColumns = createSchemaToJoin().getColumns().stream()
+        .map(x -> ColumnPath.get(x.getPath()))
+        .collect(Collectors.toSet());
+    Set<ColumnPath> rBloomFilters = outputBloomFilters.keySet().stream()
+        .filter(schemaRColumns::contains)
+        .collect(Collectors.toSet());
+
+    // Verify column encryption
+    ParquetMetadata metaData = getFileMetaData(outputFile, 
fileDecryptionProperties);
+    assertFalse(metaData.getBlocks().isEmpty());
+    List<ColumnChunkMetaData> columns = 
metaData.getBlocks().get(0).getColumns();
+    Set<String> set = ImmutableSet.of(encryptColumn);
+    for (ColumnChunkMetaData column : columns) {
+      if (set.contains(column.getPath().toDotString())) {
+        assertTrue(column.isEncrypted());
+      } else {
+        assertFalse(column.isEncrypted());
+      }
+    }
+
+    validateColumnData(
+        new HashSet<>(pruneColumns),
+        maskColumns.keySet(),
+        fileDecryptionProperties,
+        joinColumnsOverwrite); // Verify data
+    validateSchemaWithGenderColumnPruned(true); // Verify schema
+    validateCreatedBy(); // Verify original.created.by
+    assertEquals(inputBloomFilters.keySet(), rBloomFilters); // Verify bloom 
filters
+    verifyCodec(outputFile, ImmutableSet.of(CompressionCodecName.ZSTD), 
fileDecryptionProperties); // Verify codec
+    validatePageIndex(ImmutableSet.of(encryptColumn), joinColumnsOverwrite);
+  }
+
+  private void testOneInputFileManyInputFilesToJoinSetup() throws IOException {
+    inputFiles = Lists.newArrayList(new TestFileBuilder(conf, createSchema())
+        .withNumRecord(numRecord)
+        .withRowGroupSize(1 * 1024 * 1024)
+        .withCodec("GZIP")
+        .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+        .withWriterVersion(writerVersion)
+        .build());
+
+    List<Long> rowGroupRowCounts = ParquetFileReader.readFooter(
+            conf, new Path(inputFiles.get(0).getFileName()), 
ParquetMetadataConverter.NO_FILTER)
+        .getBlocks()
+        .stream()
+        .map(BlockMetaData::getRowCount)
+        .collect(Collectors.toList());
+
+    for (long count : rowGroupRowCounts) {
+      inputFilesToJoin.add(new TestFileBuilder(conf, createSchemaToJoin())
+          .withNumRecord((int) count)
+          .withCodec("UNCOMPRESSED")
+          .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+          .withWriterVersion(writerVersion)
+          .build());
+    }
+  }
+
   private MessageType createSchema() {
     return new MessageType(
         "schema",
@@ -748,52 +872,85 @@ public class ParquetRewriterTest {
             new PrimitiveType(REPEATED, BINARY, "Forward")));
   }
 
+  private MessageType createSchemaToJoin() {
+    return new MessageType(
+        "schema",
+        new PrimitiveType(REPEATED, FLOAT, "FloatFraction"),
+        new PrimitiveType(OPTIONAL, INT64, "Age"),
+        new GroupType(
+            OPTIONAL,
+            "Links",
+            new PrimitiveType(REPEATED, BINARY, "Backward"),
+            new PrimitiveType(REPEATED, BINARY, "Forward")));
+  }
+
   private void validateColumnData(
-      Set<String> prunePaths, Set<String> nullifiedPaths, 
FileDecryptionProperties fileDecryptionProperties)
+      Set<String> prunePaths,
+      Set<String> nullifiedPaths,
+      FileDecryptionProperties fileDecryptionProperties,
+      Boolean joinColumnsOverwrite)
       throws IOException {
     ParquetReader<Group> reader = ParquetReader.builder(new 
GroupReadSupport(), new Path(outputFile))
         .withConf(conf)
         .withDecryption(fileDecryptionProperties)
         .build();
 
-    // Get the total number of rows from input files
-    int totalRows = 0;
-    for (EncryptionTestFile inputFile : inputFiles) {
-      totalRows += inputFile.getFileContent().length;
-    }
+    List<SimpleGroup> filesMain = inputFiles.stream()
+        .flatMap(x -> Arrays.stream(x.getFileContent()))
+        .collect(Collectors.toList());
+    List<SimpleGroup> filesJoined = inputFilesToJoin.stream()
+        .flatMap(x -> Arrays.stream(x.getFileContent()))
+        .collect(Collectors.toList());
+    BiFunction<String, Integer, Group> groups = (name, rowIdx) -> {
+      if (!filesMain.get(0).getType().containsField(name)
+          || joinColumnsOverwrite
+              && !filesJoined.isEmpty()
+              && filesJoined.get(0).getType().containsField(name)) {
+        return filesJoined.get(rowIdx);
+      } else {
+        return filesMain.get(rowIdx);
+      }
+    };
 
+    int totalRows =
+        inputFiles.stream().mapToInt(x -> x.getFileContent().length).sum();
     for (int i = 0; i < totalRows; i++) {
       Group group = reader.read();
       assertNotNull(group);
 
-      SimpleGroup expectGroup = inputFiles.get(i / 
numRecord).getFileContent()[i % numRecord];
-
       if (!prunePaths.contains("DocId")) {
         if (nullifiedPaths.contains("DocId")) {
           assertThrows(RuntimeException.class, () -> group.getLong("DocId", 
0));
         } else {
-          assertEquals(group.getLong("DocId", 0), expectGroup.getLong("DocId", 
0));
+          assertEquals(
+              group.getLong("DocId", 0), groups.apply("DocId", 
i).getLong("DocId", 0));
         }
       }
 
       if (!prunePaths.contains("Name") && !nullifiedPaths.contains("Name")) {
         assertArrayEquals(
             group.getBinary("Name", 0).getBytes(),
-            expectGroup.getBinary("Name", 0).getBytes());
+            groups.apply("Name", i).getBinary("Name", 0).getBytes());
       }
 
       if (!prunePaths.contains("Gender") && 
!nullifiedPaths.contains("Gender")) {
         assertArrayEquals(
             group.getBinary("Gender", 0).getBytes(),
-            expectGroup.getBinary("Gender", 0).getBytes());
+            groups.apply("Gender", i).getBinary("Gender", 0).getBytes());
       }
 
       if (!prunePaths.contains("FloatFraction") && 
!nullifiedPaths.contains("FloatFraction")) {
-        assertEquals(group.getFloat("FloatFraction", 0), 
expectGroup.getFloat("FloatFraction", 0), 0);
+        assertEquals(
+            group.getFloat("FloatFraction", 0),
+            groups.apply("FloatFraction", i).getFloat("FloatFraction", 0),
+            0);
       }
 
       if (!prunePaths.contains("DoubleFraction") && 
!nullifiedPaths.contains("DoubleFraction")) {
-        assertEquals(group.getDouble("DoubleFraction", 0), 
expectGroup.getDouble("DoubleFraction", 0), 0);
+        assertEquals(
+            group.getDouble("DoubleFraction", 0),
+            groups.apply("DoubleFraction", i).getDouble("DoubleFraction", 0),
+            0);
       }
 
       Group subGroup = group.getGroup("Links", 0);
@@ -801,7 +958,7 @@ public class ParquetRewriterTest {
       if (!prunePaths.contains("Links.Backward") && 
!nullifiedPaths.contains("Links.Backward")) {
         assertArrayEquals(
             subGroup.getBinary("Backward", 0).getBytes(),
-            expectGroup
+            groups.apply("Links", i)
                 .getGroup("Links", 0)
                 .getBinary("Backward", 0)
                 .getBytes());
@@ -813,7 +970,7 @@ public class ParquetRewriterTest {
         } else {
           assertArrayEquals(
               subGroup.getBinary("Forward", 0).getBytes(),
-              expectGroup
+              groups.apply("Links", i)
                   .getGroup("Links", 0)
                   .getBinary("Forward", 0)
                   .getBytes());
@@ -852,77 +1009,107 @@ public class ParquetRewriterTest {
     assertEquals(expectedCodecs, codecs);
   }
 
+  @FunctionalInterface
+  interface CheckedFunction<T, R> {
+    R apply(T t) throws IOException;
+  }
+
   /**
    * Verify the page index is correct.
    *
-   * @param outFileColumnMapping the column mapping from the output file to 
the input file.
+   * @param exclude the columns to exclude from comparison, for example 
because they were nullified.
+   * @param joinColumnsOverwrite whether a join columns overwrote existing 
overlapping columns.
    */
-  private void validatePageIndex(Map<Integer, Integer> outFileColumnMapping) 
throws Exception {
-    ParquetMetadata outMetaData = getFileMetaData(outputFile, null);
-
-    int inputFileIndex = 0;
-    TransParquetFileReader inReader = new TransParquetFileReader(
-        HadoopInputFile.fromPath(new 
Path(inputFiles.get(inputFileIndex).getFileName()), conf),
-        HadoopReadOptions.builder(conf).build());
-    ParquetMetadata inMetaData = inReader.getFooter();
-
-    try (TransParquetFileReader outReader = new TransParquetFileReader(
-        HadoopInputFile.fromPath(new Path(outputFile), conf),
-        HadoopReadOptions.builder(conf).build())) {
-
-      for (int outBlockId = 0, inBlockId = 0;
-          outBlockId < outMetaData.getBlocks().size();
-          ++outBlockId, ++inBlockId) {
-        // Refresh reader of input file
-        if (inBlockId == inMetaData.getBlocks().size()) {
-          inReader = new TransParquetFileReader(
-              HadoopInputFile.fromPath(
-                  new Path(inputFiles.get(++inputFileIndex).getFileName()), 
conf),
-              HadoopReadOptions.builder(conf).build());
-          inMetaData = inReader.getFooter();
-          inBlockId = 0;
-        }
-
-        BlockMetaData inBlockMetaData = inMetaData.getBlocks().get(inBlockId);
-        BlockMetaData outBlockMetaData = 
outMetaData.getBlocks().get(outBlockId);
+  private void validatePageIndex(Set<String> exclude, boolean 
joinColumnsOverwrite) throws Exception {
+    class BlockMeta {
+      final TransParquetFileReader reader;
+      final BlockMetaData blockMeta;
+      final Map<ColumnPath, ColumnChunkMetaData> colPathToMeta;
+
+      BlockMeta(
+          TransParquetFileReader reader,
+          BlockMetaData blockMeta,
+          Map<ColumnPath, ColumnChunkMetaData> colPathToMeta) {
+        this.reader = reader;
+        this.blockMeta = blockMeta;
+        this.colPathToMeta = colPathToMeta;
+      }
+    }
+    CheckedFunction<List<String>, List<BlockMeta>> blockMetaExtractor = files 
-> {
+      List<BlockMeta> result = new ArrayList<>();
+      for (String inputFile : files) {
+        TransParquetFileReader reader = new TransParquetFileReader(
+            HadoopInputFile.fromPath(new Path(inputFile), conf),
+            HadoopReadOptions.builder(conf).build());
+        reader.getFooter()
+            .getBlocks()
+            .forEach(blockMetaData -> result.add(new BlockMeta(
+                reader,
+                blockMetaData,
+                blockMetaData.getColumns().stream()
+                    .collect(
+                        Collectors.toMap(ColumnChunkMetaData::getPath, 
Function.identity())))));
+      }
+      return result;
+    };
 
-        assertEquals(inBlockMetaData.getRowCount(), 
outBlockMetaData.getRowCount());
+    List<BlockMeta> inBlocksMain = blockMetaExtractor.apply(
+        
inputFiles.stream().map(EncryptionTestFile::getFileName).collect(Collectors.toList()));
+    List<BlockMeta> inBlocksJoined = blockMetaExtractor.apply(
+        
inputFilesToJoin.stream().map(EncryptionTestFile::getFileName).collect(Collectors.toList()));
+    List<BlockMeta> outBlocks = 
blockMetaExtractor.apply(ImmutableList.of(outputFile));
+    for (int blockIdx = 0; blockIdx < outBlocks.size(); blockIdx++) {
+      BlockMetaData outBlockMeta = outBlocks.get(blockIdx).blockMeta;
+      TransParquetFileReader outReader = outBlocks.get(blockIdx).reader;
+      for (ColumnChunkMetaData outChunk : outBlockMeta.getColumns()) {
+        if (exclude.contains(outChunk.getPath().toDotString())) continue;
+        TransParquetFileReader inReader;
+        BlockMetaData inBlockMeta;
+        ColumnChunkMetaData inChunk;
+        if 
(!inBlocksMain.get(blockIdx).colPathToMeta.containsKey(outChunk.getPath())
+            || joinColumnsOverwrite
+                && !inBlocksJoined.isEmpty()
+                && 
inBlocksJoined.get(blockIdx).colPathToMeta.containsKey(outChunk.getPath())) {
+          inReader = inBlocksJoined.get(blockIdx).reader;
+          inBlockMeta = inBlocksJoined.get(blockIdx).blockMeta;
+          inChunk = 
inBlocksJoined.get(blockIdx).colPathToMeta.get(outChunk.getPath());
+        } else {
+          inReader = inBlocksMain.get(blockIdx).reader;
+          inBlockMeta = inBlocksMain.get(blockIdx).blockMeta;
+          inChunk = 
inBlocksMain.get(blockIdx).colPathToMeta.get(outChunk.getPath());
+        }
 
-        for (int j = 0; j < outBlockMetaData.getColumns().size(); j++) {
-          if (!outFileColumnMapping.containsKey(j)) {
-            continue;
-          }
-          int columnIdFromInputFile = outFileColumnMapping.get(j);
-          ColumnChunkMetaData inChunk = 
inBlockMetaData.getColumns().get(columnIdFromInputFile);
-          ColumnIndex inColumnIndex = inReader.readColumnIndex(inChunk);
-          OffsetIndex inOffsetIndex = inReader.readOffsetIndex(inChunk);
-          ColumnChunkMetaData outChunk = outBlockMetaData.getColumns().get(j);
-          ColumnIndex outColumnIndex = outReader.readColumnIndex(outChunk);
-          OffsetIndex outOffsetIndex = outReader.readOffsetIndex(outChunk);
-          if (inColumnIndex != null) {
-            assertEquals(inColumnIndex.getBoundaryOrder(), 
outColumnIndex.getBoundaryOrder());
-            assertEquals(inColumnIndex.getMaxValues(), 
outColumnIndex.getMaxValues());
-            assertEquals(inColumnIndex.getMinValues(), 
outColumnIndex.getMinValues());
-            assertEquals(inColumnIndex.getNullCounts(), 
outColumnIndex.getNullCounts());
-          }
-          if (inOffsetIndex != null) {
-            List<Long> inOffsets = getOffsets(inReader, inChunk);
-            List<Long> outOffsets = getOffsets(outReader, outChunk);
-            assertEquals(inOffsets.size(), outOffsets.size());
-            assertEquals(inOffsets.size(), inOffsetIndex.getPageCount());
-            assertEquals(inOffsetIndex.getPageCount(), 
outOffsetIndex.getPageCount());
-            for (int k = 0; k < inOffsetIndex.getPageCount(); k++) {
-              assertEquals(inOffsetIndex.getFirstRowIndex(k), 
outOffsetIndex.getFirstRowIndex(k));
-              assertEquals(
-                  inOffsetIndex.getLastRowIndex(k, 
inBlockMetaData.getRowCount()),
-                  outOffsetIndex.getLastRowIndex(k, 
outBlockMetaData.getRowCount()));
-              assertEquals(inOffsetIndex.getOffset(k), (long) 
inOffsets.get(k));
-              assertEquals(outOffsetIndex.getOffset(k), (long) 
outOffsets.get(k));
-            }
+        ColumnIndex inColumnIndex = inReader.readColumnIndex(inChunk);
+        OffsetIndex inOffsetIndex = inReader.readOffsetIndex(inChunk);
+        ColumnIndex outColumnIndex = outReader.readColumnIndex(outChunk);
+        OffsetIndex outOffsetIndex = outReader.readOffsetIndex(outChunk);
+        if (inColumnIndex != null) {
+          assertEquals(inColumnIndex.getBoundaryOrder(), 
outColumnIndex.getBoundaryOrder());
+          assertEquals(inColumnIndex.getMaxValues(), 
outColumnIndex.getMaxValues());
+          assertEquals(inColumnIndex.getMinValues(), 
outColumnIndex.getMinValues());
+          assertEquals(inColumnIndex.getNullCounts(), 
outColumnIndex.getNullCounts());
+        }
+        if (inOffsetIndex != null) {
+          List<Long> inOffsets = getOffsets(inReader, inChunk);
+          List<Long> outOffsets = getOffsets(outReader, outChunk);
+          assertEquals(inOffsets.size(), outOffsets.size());
+          assertEquals(inOffsets.size(), inOffsetIndex.getPageCount());
+          assertEquals(inOffsetIndex.getPageCount(), 
outOffsetIndex.getPageCount());
+          for (int k = 0; k < inOffsetIndex.getPageCount(); k++) {
+            assertEquals(inOffsetIndex.getFirstRowIndex(k), 
outOffsetIndex.getFirstRowIndex(k));
+            assertEquals(
+                inOffsetIndex.getLastRowIndex(k, inBlockMeta.getRowCount()),
+                outOffsetIndex.getLastRowIndex(k, outBlockMeta.getRowCount()));
+            assertEquals(inOffsetIndex.getOffset(k), (long) inOffsets.get(k));
+            assertEquals(outOffsetIndex.getOffset(k), (long) 
outOffsets.get(k));
           }
         }
       }
     }
+
+    for (BlockMeta t3 : inBlocksMain) t3.reader.close();
+    for (BlockMeta t3 : inBlocksJoined) t3.reader.close();
+    for (BlockMeta t3 : outBlocks) t3.reader.close();
   }
 
   private List<Long> getOffsets(TransParquetFileReader reader, 
ColumnChunkMetaData chunk) throws IOException {
@@ -963,7 +1150,9 @@ public class ParquetRewriterTest {
 
   private void validateCreatedBy() throws Exception {
     Set<String> createdBySet = new HashSet<>();
-    for (EncryptionTestFile inputFile : inputFiles) {
+    List<EncryptionTestFile> inFiles =
+        Stream.concat(inputFiles.stream(), 
inputFilesToJoin.stream()).collect(Collectors.toList());
+    for (EncryptionTestFile inputFile : inFiles) {
       ParquetMetadata pmd = getFileMetaData(inputFile.getFileName(), null);
       createdBySet.add(pmd.getFileMetaData().getCreatedBy());
       
assertNull(pmd.getFileMetaData().getKeyValueMetaData().get(ParquetRewriter.ORIGINAL_CREATED_BY_KEY));
@@ -1005,7 +1194,9 @@ public class ParquetRewriterTest {
 
   private Map<ColumnPath, List<BloomFilter>> allInputBloomFilters() throws 
Exception {
     Map<ColumnPath, List<BloomFilter>> inputBloomFilters = new HashMap<>();
-    for (EncryptionTestFile inputFile : inputFiles) {
+    List<EncryptionTestFile> files =
+        Stream.concat(inputFiles.stream(), 
inputFilesToJoin.stream()).collect(Collectors.toList());
+    for (EncryptionTestFile inputFile : files) {
       Map<ColumnPath, List<BloomFilter>> bloomFilters = 
allBloomFilters(inputFile.getFileName(), null);
       for (Map.Entry<ColumnPath, List<BloomFilter>> entry : 
bloomFilters.entrySet()) {
         List<BloomFilter> bloomFilterList = 
inputBloomFilters.getOrDefault(entry.getKey(), new ArrayList<>());
@@ -1048,35 +1239,49 @@ public class ParquetRewriterTest {
   }
 
   private RewriteOptions.Builder createBuilder(List<Path> inputPaths) throws 
IOException {
+    return createBuilder(inputPaths, new ArrayList<>(), false);
+  }
+
+  private RewriteOptions.Builder createBuilder(
+      List<Path> inputPathsL, List<Path> inputPathsR, boolean 
overwriteInputWithJoinColumns) throws IOException {
     RewriteOptions.Builder builder;
     if (usingHadoop) {
       Path outputPath = new Path(outputFile);
-      builder = new RewriteOptions.Builder(conf, inputPaths, outputPath);
+      builder = new RewriteOptions.Builder(conf, inputPathsL, inputPathsR, 
outputPath);
     } else {
       OutputFile outputPath = HadoopOutputFile.fromPath(new Path(outputFile), 
conf);
-      List<InputFile> inputs = inputPaths.stream()
+      List<InputFile> inputsL = inputPathsL.stream()
+          .map(p -> HadoopInputFile.fromPathUnchecked(p, conf))
+          .collect(Collectors.toList());
+      List<InputFile> inputsR = inputPathsR.stream()
           .map(p -> HadoopInputFile.fromPathUnchecked(p, conf))
           .collect(Collectors.toList());
-      builder = new RewriteOptions.Builder(parquetConf, inputs, outputPath);
+      builder = new RewriteOptions.Builder(parquetConf, inputsL, inputsR, 
outputPath);
     }
+    builder.overwriteInputWithJoinColumns(overwriteInputWithJoinColumns);
     return builder;
   }
 
-  private void validateSchema() throws IOException {
-    ParquetMetadata pmd =
-        ParquetFileReader.readFooter(conf, new Path(outputFile), 
ParquetMetadataConverter.NO_FILTER);
-    MessageType schema = pmd.getFileMetaData().getSchema();
-    List<Type> fields = schema.getFields();
-    assertEquals(fields.size(), 5);
-    assertEquals(fields.get(0).getName(), "DocId");
-    assertEquals(fields.get(1).getName(), "Name");
-    assertEquals(fields.get(2).getName(), "FloatFraction");
-    assertEquals(fields.get(3).getName(), "DoubleFraction");
-    assertEquals(fields.get(4).getName(), "Links");
-    List<Type> subFields = fields.get(4).asGroupType().getFields();
-    assertEquals(subFields.size(), 2);
-    assertEquals(subFields.get(0).getName(), "Backward");
-    assertEquals(subFields.get(1).getName(), "Forward");
+  private void validateSchemaWithGenderColumnPruned(boolean addJoinedColumn) 
throws IOException {
+    MessageType expectSchema = new MessageType(
+        "schema",
+        new PrimitiveType(OPTIONAL, INT64, "DocId"),
+        new PrimitiveType(REQUIRED, BINARY, "Name"),
+        new PrimitiveType(REPEATED, FLOAT, "FloatFraction"),
+        new PrimitiveType(OPTIONAL, DOUBLE, "DoubleFraction"),
+        new GroupType(
+            OPTIONAL,
+            "Links",
+            new PrimitiveType(REPEATED, BINARY, "Backward"),
+            new PrimitiveType(REPEATED, BINARY, "Forward")));
+    if (addJoinedColumn) {
+      expectSchema = expectSchema.union(new MessageType("schema", new 
PrimitiveType(OPTIONAL, INT64, "Age")));
+    }
+    MessageType actualSchema = ParquetFileReader.readFooter(
+            conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER)
+        .getFileMetaData()
+        .getSchema();
+    assertEquals(expectSchema, actualSchema);
   }
 
   private void ensureContainsGzipFile() {

Reply via email to