This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 720aa3d3c PARQUET-2228: ParquetRewriter supports more than one input 
file (#1026)
720aa3d3c is described below

commit 720aa3d3c1ce1ce5c5e2b7715cd8c5f11bdc8ede
Author: Gang Wu <ust...@gmail.com>
AuthorDate: Tue Feb 21 18:43:09 2023 +0800

    PARQUET-2228: ParquetRewriter supports more than one input file (#1026)
---
 .../parquet/hadoop/rewrite/ParquetRewriter.java    | 112 ++++++--
 .../parquet/hadoop/rewrite/RewriteOptions.java     |  88 +++++-
 .../hadoop/rewrite/ParquetRewriterTest.java        | 319 +++++++++++++++++----
 3 files changed, 433 insertions(+), 86 deletions(-)

diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
index e5befa99d..834de8340 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
@@ -45,7 +45,6 @@ import org.apache.parquet.format.PageHeader;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.CodecFactory;
 import org.apache.parquet.hadoop.ColumnChunkPageWriteStore;
-import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.ParquetFileWriter;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
@@ -63,6 +62,7 @@ import org.apache.parquet.io.api.Converter;
 import org.apache.parquet.io.api.GroupConverter;
 import org.apache.parquet.io.api.PrimitiveConverter;
 import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.InvalidSchemaException;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
@@ -74,15 +74,16 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 import static 
org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
 import static 
org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH;
 import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
-import static 
org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
 import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
 import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
 
@@ -93,41 +94,49 @@ public class ParquetRewriter implements Closeable {
   private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRewriter.class);
   private final int pageBufferSize = ParquetProperties.DEFAULT_PAGE_SIZE * 2;
   private final byte[] pageBuffer = new byte[pageBufferSize];
-  private TransParquetFileReader reader;
-  private ParquetFileWriter writer;
-  private ParquetMetadata meta;
-  private MessageType schema;
-  private String originalCreatedBy;
+  // Configurations for the new file
   private CompressionCodecName newCodecName = null;
   private List<String> pruneColumns = null;
   private Map<ColumnPath, MaskMode> maskColumns = null;
   private Set<ColumnPath> encryptColumns = null;
   private boolean encryptMode = false;
   private Map<String, String> extraMetaData = new HashMap<>();
+  // Writer to rewrite the input files
+  private ParquetFileWriter writer;
+  // Number of blocks written which is used to keep track of the actual row 
group ordinal
+  private int numBlocksRewritten = 0;
+  // Reader and relevant states of the in-processing input file
+  private Queue<TransParquetFileReader> inputFiles = new LinkedList<>();
+  // Schema of input files (should be the same) and to write to the output file
+  private MessageType schema = null;
+  // The reader for the current input file
+  private TransParquetFileReader reader = null;
+  // The metadata of current reader being processed
+  private ParquetMetadata meta = null;
+  // created_by information of current reader being processed
+  private String originalCreatedBy = "";
+  // Unique created_by information from all input files
+  private Set<String> allOriginalCreatedBys = new HashSet<>();
 
   public ParquetRewriter(RewriteOptions options) throws IOException {
-    Preconditions.checkArgument(options.getInputFiles().size() == 1, "Only 
support one input file");
-    Path inPath = options.getInputFiles().get(0);
-    Path outPath = options.getOutputFile();
     Configuration conf = options.getConf();
+    Path outPath = options.getOutputFile();
+    openInputFiles(options.getInputFiles(), conf);
+    LOG.info("Start rewriting {} input files {} to {}", inputFiles.size(), 
outPath);
+
+    // Init reader of the first input file
+    initNextReader();
 
     newCodecName = options.getNewCodecName();
     pruneColumns = options.getPruneColumns();
 
-    // Get file metadata and full schema from the input file
-    meta = ParquetFileReader.readFooter(conf, inPath, NO_FILTER);
-    schema = meta.getFileMetaData().getSchema();
-    originalCreatedBy = meta.getFileMetaData().getCreatedBy();
-    extraMetaData.putAll(meta.getFileMetaData().getKeyValueMetaData());
-    extraMetaData.put(ORIGINAL_CREATED_BY_KEY, originalCreatedBy);
-
     // Prune columns if specified
     if (pruneColumns != null && !pruneColumns.isEmpty()) {
       List<String> paths = new ArrayList<>();
       getPaths(schema, paths, null);
       for (String col : pruneColumns) {
         if (!paths.contains(col)) {
-          LOG.warn("Input column name {} doesn't show up in the schema of file 
{}", col, inPath.getName());
+          LOG.warn("Input column name {} doesn't show up in the schema of file 
{}", col, reader.getFile());
         }
       }
 
@@ -147,9 +156,6 @@ public class ParquetRewriter implements Closeable {
       this.encryptMode = true;
     }
 
-    reader = new TransParquetFileReader(
-            HadoopInputFile.fromPath(inPath, conf), 
HadoopReadOptions.builder(conf).build());
-
     ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE;
     writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf), 
schema, writerMode,
             DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT, 
DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
@@ -183,12 +189,69 @@ public class ParquetRewriter implements Closeable {
     }
   }
 
+  // Open all input files to validate their schemas are compatible to merge
+  private void openInputFiles(List<Path> inputFiles, Configuration conf) {
+    Preconditions.checkArgument(inputFiles != null && !inputFiles.isEmpty(), 
"No input files");
+
+    for (Path inputFile : inputFiles) {
+      try {
+        TransParquetFileReader reader = new TransParquetFileReader(
+                HadoopInputFile.fromPath(inputFile, conf), 
HadoopReadOptions.builder(conf).build());
+        MessageType inputFileSchema = 
reader.getFooter().getFileMetaData().getSchema();
+        if (this.schema == null) {
+          this.schema = inputFileSchema;
+        } else {
+          // Now we enforce equality of schemas from input files for 
simplicity.
+          if (!this.schema.equals(inputFileSchema)) {
+            LOG.error("Input files have different schemas, expect: {}, input: 
{}, current file: {}",
+                    this.schema, inputFileSchema, inputFile);
+            throw new InvalidSchemaException("Input files have different 
schemas, current file: " + inputFile);
+          }
+        }
+        
this.allOriginalCreatedBys.add(reader.getFooter().getFileMetaData().getCreatedBy());
+        this.inputFiles.add(reader);
+      } catch (IOException e) {
+        throw new IllegalArgumentException("Failed to open input file: " + 
inputFile, e);
+      }
+    }
+
+    extraMetaData.put(ORIGINAL_CREATED_BY_KEY, String.join("\n", 
allOriginalCreatedBys));
+  }
+
+  // Routines to get reader of next input file and set up relevant states
+  private void initNextReader() {
+    if (reader != null) {
+      LOG.info("Finish rewriting input file: {}", reader.getFile());
+    }
+
+    if (inputFiles.isEmpty()) {
+      reader = null;
+      meta = null;
+      originalCreatedBy = null;
+      return;
+    }
+
+    reader = inputFiles.poll();
+    meta = reader.getFooter();
+    originalCreatedBy = meta.getFileMetaData().getCreatedBy();
+    extraMetaData.putAll(meta.getFileMetaData().getKeyValueMetaData());
+
+    LOG.info("Rewriting input file: {}, remaining files: {}", 
reader.getFile(), inputFiles.size());
+  }
+
   @Override
   public void close() throws IOException {
     writer.end(extraMetaData);
   }
 
   public void processBlocks() throws IOException {
+    while (reader != null) {
+      processBlocksFromReader();
+      initNextReader();
+    }
+  }
+
+  private void processBlocksFromReader() throws IOException {
     PageReadStore store = reader.readNextRowGroup();
     ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(store, new 
DummyGroupConverter(), schema, originalCreatedBy);
     Map<ColumnPath, ColumnDescriptor> descriptorsMap = 
schema.getColumns().stream().collect(
@@ -236,7 +299,6 @@ public class ParquetRewriter implements Closeable {
                     writer,
                     schema,
                     newCodecName,
-                    blockId,
                     encryptColumn);
           } else {
             throw new UnsupportedOperationException("Only nullify is supported 
for now");
@@ -246,7 +308,7 @@ public class ParquetRewriter implements Closeable {
           ColumnChunkEncryptorRunTime columnChunkEncryptorRunTime = null;
           if (encryptMode) {
             columnChunkEncryptorRunTime =
-                    new ColumnChunkEncryptorRunTime(writer.getEncryptor(), 
chunk, blockId, columnId);
+                    new ColumnChunkEncryptorRunTime(writer.getEncryptor(), 
chunk, numBlocksRewritten, columnId);
           }
 
           // Translate compression and/or encryption
@@ -267,6 +329,7 @@ public class ParquetRewriter implements Closeable {
       writer.endBlock();
       store = reader.readNextRowGroup();
       blockId++;
+      numBlocksRewritten++;
     }
   }
 
@@ -571,7 +634,6 @@ public class ParquetRewriter implements Closeable {
                              ParquetFileWriter writer,
                              MessageType schema,
                              CompressionCodecName newCodecName,
-                             int rowGroupOrdinal,
                              boolean encryptColumn) throws IOException {
     if (encryptColumn) {
       Preconditions.checkArgument(writer.getEncryptor() != null, "Missing 
encryptor");
@@ -593,7 +655,7 @@ public class ParquetRewriter implements Closeable {
     MessageType newSchema = newSchema(schema, descriptor);
     ColumnChunkPageWriteStore cPageStore = new ColumnChunkPageWriteStore(
             compressor, newSchema, props.getAllocator(), 
props.getColumnIndexTruncateLength(),
-            props.getPageWriteChecksumEnabled(), writer.getEncryptor(), 
rowGroupOrdinal);
+            props.getPageWriteChecksumEnabled(), writer.getEncryptor(), 
numBlocksRewritten);
     ColumnWriteStore cStore = props.newColumnWriteStore(newSchema, cPageStore);
     ColumnWriter cWriter = cStore.getColumnWriter(descriptor);
 
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
index e15ba43c7..cc1280921 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
@@ -28,7 +28,9 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
-// A set of options to create a ParquetRewriter.
+/**
+ * A set of options to create a ParquetRewriter.
+ */
 public class RewriteOptions {
 
   final Configuration conf;
@@ -101,37 +103,121 @@ public class RewriteOptions {
     private List<String> encryptColumns;
     private FileEncryptionProperties fileEncryptionProperties;
 
+    /**
+     * Create a builder to create a RewriterOptions.
+     *
+     * @param conf       configuration for reading from input files and 
writing to output file
+     * @param inputFile  input file path to read from
+     * @param outputFile output file path to rewrite to
+     */
     public Builder(Configuration conf, Path inputFile, Path outputFile) {
       this.conf = conf;
       this.inputFiles = Arrays.asList(inputFile);
       this.outputFile = outputFile;
     }
 
+    /**
+     * Create a builder to create a RewriterOptions.
+     * <p>
+     * Please note that if merging more than one file, the schema of all files 
must be the same.
+     * Otherwise, the rewrite will fail.
+     * <p>
+     * The rewrite will keep original row groups from all input files. This 
may not be optimal
+     * if row groups are very small and will not solve small file problems. 
Instead, it will
+     * make it worse to have a large file footer in the output file.
+     * TODO: support rewrite by record to break the original row groups into 
reasonable ones.
+     *
+     * @param conf       configuration for reading from input files and 
writing to output file
+     * @param inputFiles list of input file paths to read from
+     * @param outputFile output file path to rewrite to
+     */
+    public Builder(Configuration conf, List<Path> inputFiles, Path outputFile) 
{
+      this.conf = conf;
+      this.inputFiles = inputFiles;
+      this.outputFile = outputFile;
+    }
+
+    /**
+     * Set the columns to prune.
+     * <p>
+     * By default, all columns are kept.
+     *
+     * @param columns list of columns to prune
+     * @return self
+     */
     public Builder prune(List<String> columns) {
       this.pruneColumns = columns;
       return this;
     }
 
+    /**
+     * Set the compression codec to use for the output file.
+     * <p>
+     * By default, the codec is the same as the input file.
+     *
+     * @param newCodecName compression codec to use
+     * @return self
+     */
     public Builder transform(CompressionCodecName newCodecName) {
       this.newCodecName = newCodecName;
       return this;
     }
 
+    /**
+     * Set the columns to mask.
+     * <p>
+     * By default, no columns are masked.
+     *
+     * @param maskColumns map of columns to mask to the masking mode
+     * @return self
+     */
     public Builder mask(Map<String, MaskMode> maskColumns) {
       this.maskColumns = maskColumns;
       return this;
     }
 
+    /**
+     * Set the columns to encrypt.
+     * <p>
+     * By default, no columns are encrypted.
+     *
+     * @param encryptColumns list of columns to encrypt
+     * @return self
+     */
     public Builder encrypt(List<String> encryptColumns) {
       this.encryptColumns = encryptColumns;
       return this;
     }
 
+    /**
+     * Set the encryption properties to use for the output file.
+     * <p>
+     * This is required if encrypting columns are not empty.
+     *
+     * @param fileEncryptionProperties encryption properties to use
+     * @return self
+     */
     public Builder encryptionProperties(FileEncryptionProperties 
fileEncryptionProperties) {
       this.fileEncryptionProperties = fileEncryptionProperties;
       return this;
     }
 
+    /**
+     * Add an input file to read from.
+     *
+     * @param path input file path to read from
+     * @return self
+     */
+    public Builder addInputFile(Path path) {
+      this.inputFiles.add(path);
+      return this;
+    }
+
+    /**
+     * Build the RewriterOptions.
+     *
+     * @return a RewriterOptions
+     */
     public RewriteOptions build() {
       Preconditions.checkArgument(inputFiles != null && !inputFiles.isEmpty(), 
"Input file is required");
       Preconditions.checkArgument(outputFile != null, "Output file is 
required");
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
index 25b4f3564..043261f77 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.parquet.hadoop.rewrite;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.HadoopReadOptions;
@@ -50,6 +51,7 @@ import 
org.apache.parquet.internal.column.columnindex.OffsetIndex;
 import org.apache.parquet.io.InputFile;
 import org.apache.parquet.io.SeekableInputStream;
 import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.InvalidSchemaException;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
@@ -82,19 +84,15 @@ public class ParquetRewriterTest {
 
   private final int numRecord = 100000;
   private Configuration conf = new Configuration();
-  private EncryptionTestFile inputFile = null;
+  private List<EncryptionTestFile> inputFiles = null;
   private String outputFile = null;
   private ParquetRewriter rewriter = null;
 
-  @Test
-  public void testPruneSingleColumnAndTranslateCodec() throws Exception {
-    testSetup("GZIP");
-
-    Path inputPath = new Path(inputFile.getFileName());
+  private void testPruneSingleColumnTranslateCodec(List<Path> inputPaths) 
throws Exception {
     Path outputPath = new Path(outputFile);
     List<String> pruneColumns = Arrays.asList("Gender");
     CompressionCodecName newCodec = CompressionCodecName.ZSTD;
-    RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, 
inputPath, outputPath);
+    RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, 
inputPaths, outputPath);
     RewriteOptions options = 
builder.prune(pruneColumns).transform(newCodec).build();
 
     rewriter = new ParquetRewriter(options);
@@ -115,10 +113,12 @@ public class ParquetRewriterTest {
     assertEquals(subFields.get(1).getName(), "Forward");
 
     // Verify codec has been translated
-    verifyCodec(outputFile, CompressionCodecName.ZSTD, null);
+    verifyCodec(outputFile, new HashSet<CompressionCodecName>() {{
+      add(CompressionCodecName.ZSTD);
+    }}, null);
 
     // Verify the data are not changed for the columns not pruned
-    validateColumnData(outputFile, inputFile.getFileContent(), new 
HashSet<>(pruneColumns), Collections.emptySet(), null);
+    validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(), 
null);
 
     // Verify the page index
     validatePageIndex(new HashMap<Integer, Integer>() {{
@@ -133,16 +133,31 @@ public class ParquetRewriterTest {
   }
 
   @Test
-  public void testPruneNullifyAndTranslateCodec() throws Exception {
-    testSetup("UNCOMPRESSED");
+  public void testPruneSingleColumnTranslateCodecSingleFile() throws Exception 
{
+    testSingleInputFileSetup("GZIP");
+    List<Path> inputPaths = new ArrayList<Path>() {{
+      add(new Path(inputFiles.get(0).getFileName()));
+    }};
+    testPruneSingleColumnTranslateCodec(inputPaths);
+  }
+
+  @Test
+  public void testPruneSingleColumnTranslateCodecTwoFiles() throws Exception {
+    testMultipleInputFilesSetup();
+    List<Path> inputPaths = new ArrayList<Path>() {{
+      add(new Path(inputFiles.get(0).getFileName()));
+      add(new Path(inputFiles.get(1).getFileName()));
+    }};
+    testPruneSingleColumnTranslateCodec(inputPaths);
+  }
 
-    Path inputPath = new Path(inputFile.getFileName());
+  private void testPruneNullifyTranslateCodec(List<Path> inputPaths) throws 
Exception {
     Path outputPath = new Path(outputFile);
     List<String> pruneColumns = Arrays.asList("Gender");
     Map<String, MaskMode> maskColumns = new HashMap<>();
     maskColumns.put("Links.Forward", MaskMode.NULLIFY);
-    CompressionCodecName newCodec = CompressionCodecName.GZIP;
-    RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, 
inputPath, outputPath);
+    CompressionCodecName newCodec = CompressionCodecName.ZSTD;
+    RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, 
inputPaths, outputPath);
     RewriteOptions options = 
builder.prune(pruneColumns).mask(maskColumns).transform(newCodec).build();
 
     rewriter = new ParquetRewriter(options);
@@ -163,10 +178,12 @@ public class ParquetRewriterTest {
     assertEquals(subFields.get(1).getName(), "Forward");
 
     // Verify codec has been translated
-    verifyCodec(outputFile, newCodec, null);
+    verifyCodec(outputFile, new HashSet<CompressionCodecName>() {{
+      add(newCodec);
+    }}, null);
 
     // Verify the data are not changed for the columns not pruned
-    validateColumnData(outputFile, inputFile.getFileContent(), new 
HashSet<>(pruneColumns), maskColumns.keySet(), null);
+    validateColumnData(new HashSet<>(pruneColumns), maskColumns.keySet(), 
null);
 
     // Verify the page index
     validatePageIndex(new HashMap<Integer, Integer>() {{
@@ -180,12 +197,27 @@ public class ParquetRewriterTest {
   }
 
   @Test
-  public void testPruneEncryptAndTranslateCodec() throws Exception {
-    testSetup("GZIP");
+  public void testPruneNullifyTranslateCodecSingleFile() throws Exception {
+    testSingleInputFileSetup("GZIP");
+    List<Path> inputPaths = new ArrayList<Path>() {{
+      add(new Path(inputFiles.get(0).getFileName()));
+    }};
+    testPruneNullifyTranslateCodec(inputPaths);
+  }
+
+  @Test
+  public void testPruneNullifyTranslateCodecTwoFiles() throws Exception {
+    testMultipleInputFilesSetup();
+    List<Path> inputPaths = new ArrayList<Path>() {{
+      add(new Path(inputFiles.get(0).getFileName()));
+      add(new Path(inputFiles.get(1).getFileName()));
+    }};
+    testPruneNullifyTranslateCodec(inputPaths);
+  }
 
-    Path inputPath = new Path(inputFile.getFileName());
+  private void testPruneEncryptTranslateCodec(List<Path> inputPaths) throws 
Exception {
     Path outputPath = new Path(outputFile);
-    RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, 
inputPath, outputPath);
+    RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, 
inputPaths, outputPath);
 
     // Prune
     List<String> pruneColumns = Arrays.asList("Gender");
@@ -221,11 +253,12 @@ public class ParquetRewriterTest {
 
     // Verify codec has been translated
     FileDecryptionProperties fileDecryptionProperties = 
EncDecProperties.getFileDecryptionProperties();
-    verifyCodec(outputFile, newCodec, fileDecryptionProperties);
+    verifyCodec(outputFile, new HashSet<CompressionCodecName>() {{
+      add(newCodec);
+    }}, fileDecryptionProperties);
 
     // Verify the data are not changed for the columns not pruned
-    validateColumnData(outputFile,
-            inputFile.getFileContent(), new HashSet<>(pruneColumns), 
Collections.emptySet(), fileDecryptionProperties);
+    validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(), 
fileDecryptionProperties);
 
     // Verify column encryption
     ParquetMetadata metaData = getFileMetaData(outputFile, 
fileDecryptionProperties);
@@ -245,9 +278,25 @@ public class ParquetRewriterTest {
   }
 
   @Test
-  public void testNullifyAndEncryptColumn() throws Exception {
-    testSetup("GZIP");
+  public void testPruneEncryptTranslateCodecSingleFile() throws Exception {
+    testSingleInputFileSetup("GZIP");
+    List<Path> inputPaths = new ArrayList<Path>() {{
+      add(new Path(inputFiles.get(0).getFileName()));
+    }};
+    testPruneEncryptTranslateCodec(inputPaths);
+  }
+
+  @Test
+  public void testPruneEncryptTranslateCodecTwoFiles() throws Exception {
+    testMultipleInputFilesSetup();
+    List<Path> inputPaths = new ArrayList<Path>() {{
+      add(new Path(inputFiles.get(0).getFileName()));
+      add(new Path(inputFiles.get(1).getFileName()));
+    }};
+    testPruneEncryptTranslateCodec(inputPaths);
+  }
 
+  private void testNullifyAndEncryptColumn(List<Path> inputPaths) throws 
Exception {
     Map<String, MaskMode> maskColumns = new HashMap<>();
     maskColumns.put("DocId", MaskMode.NULLIFY);
 
@@ -255,9 +304,9 @@ public class ParquetRewriterTest {
     FileEncryptionProperties fileEncryptionProperties = 
EncDecProperties.getFileEncryptionProperties(
             encryptColumns, ParquetCipher.AES_GCM_CTR_V1, false);
 
-    Path inputPath = new Path(inputFile.getFileName());
     Path outputPath = new Path(outputFile);
-    RewriteOptions options = new RewriteOptions.Builder(conf, inputPath, 
outputPath).mask(maskColumns)
+    RewriteOptions options = new RewriteOptions.Builder(conf, inputPaths, 
outputPath).mask(maskColumns)
+            .transform(CompressionCodecName.ZSTD)
             
.encrypt(Arrays.asList(encryptColumns)).encryptionProperties(fileEncryptionProperties).build();
 
     rewriter = new ParquetRewriter(options);
@@ -267,12 +316,13 @@ public class ParquetRewriterTest {
     FileDecryptionProperties fileDecryptionProperties = 
EncDecProperties.getFileDecryptionProperties();
 
     // Verify codec has not been changed
-    verifyCodec(outputFile, CompressionCodecName.GZIP, 
fileDecryptionProperties);
+    verifyCodec(outputFile, new HashSet<CompressionCodecName>() {{
+      add(CompressionCodecName.ZSTD);
+    }}, fileDecryptionProperties);
 
     // Verify the data are not changed for non-encrypted and non-masked 
columns.
     // Also make sure the masked column is nullified.
-    validateColumnData(outputFile,
-            inputFile.getFileContent(), Collections.emptySet(), 
maskColumns.keySet(), fileDecryptionProperties);
+    validateColumnData(Collections.emptySet(), maskColumns.keySet(), 
fileDecryptionProperties);
 
     // Verify the page index
     validatePageIndex(new HashMap<Integer, Integer>() {{
@@ -298,13 +348,132 @@ public class ParquetRewriterTest {
     }
   }
 
-  private void testSetup(String compression) throws IOException {
+  @Test
+  public void testNullifyEncryptSingleFile() throws Exception {
+    testSingleInputFileSetup("GZIP");
+    List<Path> inputPaths = new ArrayList<Path>() {{
+      add(new Path(inputFiles.get(0).getFileName()));
+    }};
+    testNullifyAndEncryptColumn(inputPaths);
+  }
+
+  @Test
+  public void testNullifyEncryptTwoFiles() throws Exception {
+    testMultipleInputFilesSetup();
+    List<Path> inputPaths = new ArrayList<Path>() {{
+      add(new Path(inputFiles.get(0).getFileName()));
+      add(new Path(inputFiles.get(1).getFileName()));
+    }};
+    testNullifyAndEncryptColumn(inputPaths);
+  }
+
+  @Test
+  public void testMergeTwoFilesOnly() throws Exception {
+    testMultipleInputFilesSetup();
+
+    // Only merge two files but do not change anything.
+    List<Path> inputPaths = new ArrayList<>();
+    for (EncryptionTestFile inputFile : inputFiles) {
+      inputPaths.add(new Path(inputFile.getFileName()));
+    }
+    Path outputPath = new Path(outputFile);
+    RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, 
inputPaths, outputPath);
+    RewriteOptions options = builder.build();
+
+    rewriter = new ParquetRewriter(options);
+    rewriter.processBlocks();
+    rewriter.close();
+
+    // Verify the schema are not changed
+    ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new 
Path(outputFile), ParquetMetadataConverter.NO_FILTER);
+    MessageType schema = pmd.getFileMetaData().getSchema();
+    MessageType expectSchema = createSchema();
+    assertEquals(expectSchema, schema);
+
+    // Verify codec has not been translated
+    verifyCodec(outputFile, new HashSet<CompressionCodecName>() {{
+      add(CompressionCodecName.GZIP);
+      add(CompressionCodecName.UNCOMPRESSED);
+    }}, null);
+
+    // Verify the merged data are not changed
+    validateColumnData(Collections.emptySet(), Collections.emptySet(), null);
+
+    // Verify the page index
+    validatePageIndex(new HashMap<Integer, Integer>() {{
+      put(0, 0);
+      put(1, 1);
+      put(2, 2);
+      put(3, 3);
+      put(4, 4);
+    }});
+
+    // Verify original.created.by is preserved
+    validateCreatedBy();
+  }
+
+  @Test(expected = InvalidSchemaException.class)
+  public void testMergeTwoFilesWithDifferentSchema() throws Exception {
+    MessageType schema1 = new MessageType("schema",
+            new PrimitiveType(OPTIONAL, INT64, "DocId"),
+            new PrimitiveType(REQUIRED, BINARY, "Name"),
+            new PrimitiveType(OPTIONAL, BINARY, "Gender"),
+            new GroupType(OPTIONAL, "Links",
+                    new PrimitiveType(REPEATED, BINARY, "Backward"),
+                    new PrimitiveType(REPEATED, BINARY, "Forward")));
+    MessageType schema2 = new MessageType("schema",
+            new PrimitiveType(OPTIONAL, INT64, "DocId"),
+            new PrimitiveType(REQUIRED, BINARY, "Name"),
+            new PrimitiveType(OPTIONAL, BINARY, "Gender"));
+    inputFiles = Lists.newArrayList();
+    inputFiles.add(new TestFileBuilder(conf, schema1)
+            .withNumRecord(numRecord)
+            .withCodec("UNCOMPRESSED")
+            .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+            .build());
+    inputFiles.add(new TestFileBuilder(conf, schema2)
+            .withNumRecord(numRecord)
+            .withCodec("UNCOMPRESSED")
+            .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+            .build());
+    outputFile = TestFileBuilder.createTempFile("test");
+
+    List<Path> inputPaths = new ArrayList<>();
+    for (EncryptionTestFile inputFile : inputFiles) {
+      inputPaths.add(new Path(inputFile.getFileName()));
+    }
+    Path outputPath = new Path(outputFile);
+    RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, 
inputPaths, outputPath);
+    RewriteOptions options = builder.build();
+
+    // This should throw an exception because the schemas are different
+    rewriter = new ParquetRewriter(options);
+  }
+
+  private void testSingleInputFileSetup(String compression) throws IOException 
{
     MessageType schema = createSchema();
-    inputFile = new TestFileBuilder(conf, schema)
+    inputFiles = Lists.newArrayList();
+    inputFiles.add(new TestFileBuilder(conf, schema)
             .withNumRecord(numRecord)
             .withCodec(compression)
             .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
-            .build();
+            .build());
+    outputFile = TestFileBuilder.createTempFile("test");
+  }
+
+  private void testMultipleInputFilesSetup() throws IOException {
+    MessageType schema = createSchema();
+    inputFiles = Lists.newArrayList();
+    inputFiles.add(new TestFileBuilder(conf, schema)
+            .withNumRecord(numRecord)
+            .withCodec("GZIP")
+            .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+            .build());
+    inputFiles.add(new TestFileBuilder(conf, schema)
+            .withNumRecord(numRecord)
+            .withCodec("UNCOMPRESSED")
+            .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+            .build());
     outputFile = TestFileBuilder.createTempFile("test");
   }
 
@@ -318,39 +487,47 @@ public class ParquetRewriterTest {
                     new PrimitiveType(REPEATED, BINARY, "Forward")));
   }
 
-  private void validateColumnData(String file,
-                                  SimpleGroup[] fileContent,
-                                  Set<String> prunePaths,
+  private void validateColumnData(Set<String> prunePaths,
                                   Set<String> nullifiedPaths,
                                   FileDecryptionProperties 
fileDecryptionProperties) throws IOException {
-    ParquetReader<Group> reader = ParquetReader.builder(new 
GroupReadSupport(), new Path(file))
+    ParquetReader<Group> reader = ParquetReader.builder(new 
GroupReadSupport(), new Path(outputFile))
             .withConf(conf).withDecryption(fileDecryptionProperties).build();
-    for (int i = 0; i < numRecord; i++) {
+
+    // Get total number of rows from input files
+    int totalRows = 0;
+    for (EncryptionTestFile inputFile : inputFiles) {
+      totalRows += inputFile.getFileContent().length;
+    }
+
+    for (int i = 0; i < totalRows; i++) {
       Group group = reader.read();
+      assertNotNull(group);
+
+      SimpleGroup expectGroup = inputFiles.get(i / 
numRecord).getFileContent()[i % numRecord];
 
       if (!prunePaths.contains("DocId")) {
         if (nullifiedPaths.contains("DocId")) {
           assertThrows(RuntimeException.class, () -> group.getLong("DocId", 
0));
         } else {
-          assertEquals(group.getLong("DocId", 0), 
fileContent[i].getLong("DocId", 0));
+          assertEquals(group.getLong("DocId", 0), expectGroup.getLong("DocId", 
0));
         }
       }
 
       if (!prunePaths.contains("Name") && !nullifiedPaths.contains("Name")) {
         assertArrayEquals(group.getBinary("Name", 0).getBytes(),
-                fileContent[i].getBinary("Name", 0).getBytes());
+                expectGroup.getBinary("Name", 0).getBytes());
       }
 
       if (!prunePaths.contains("Gender") && 
!nullifiedPaths.contains("Gender")) {
         assertArrayEquals(group.getBinary("Gender", 0).getBytes(),
-                fileContent[i].getBinary("Gender", 0).getBytes());
+                expectGroup.getBinary("Gender", 0).getBytes());
       }
 
       Group subGroup = group.getGroup("Links", 0);
 
       if (!prunePaths.contains("Links.Backward") && 
!nullifiedPaths.contains("Links.Backward")) {
         assertArrayEquals(subGroup.getBinary("Backward", 0).getBytes(),
-                fileContent[i].getGroup("Links", 0).getBinary("Backward", 
0).getBytes());
+                expectGroup.getGroup("Links", 0).getBinary("Backward", 
0).getBytes());
       }
 
       if (!prunePaths.contains("Links.Forward")) {
@@ -358,7 +535,7 @@ public class ParquetRewriterTest {
           assertThrows(RuntimeException.class, () -> 
subGroup.getBinary("Forward", 0));
         } else {
           assertArrayEquals(subGroup.getBinary("Forward", 0).getBytes(),
-                  fileContent[i].getGroup("Links", 0).getBinary("Forward", 
0).getBytes());
+                  expectGroup.getGroup("Links", 0).getBinary("Forward", 
0).getBytes());
         }
       }
     }
@@ -380,16 +557,18 @@ public class ParquetRewriterTest {
   }
 
   private void verifyCodec(String file,
-                           CompressionCodecName codec,
+                           Set<CompressionCodecName> expectedCodecs,
                            FileDecryptionProperties fileDecryptionProperties) 
throws IOException {
+    Set<CompressionCodecName> codecs = new HashSet<>();
     ParquetMetadata pmd = getFileMetaData(file, fileDecryptionProperties);
     for (int i = 0; i < pmd.getBlocks().size(); i++) {
       BlockMetaData block = pmd.getBlocks().get(i);
       for (int j = 0; j < block.getColumns().size(); ++j) {
         ColumnChunkMetaData columnChunkMetaData = block.getColumns().get(j);
-        assertEquals(columnChunkMetaData.getCodec(), codec);
+        codecs.add(columnChunkMetaData.getCodec());
       }
     }
+    assertEquals(expectedCodecs, codecs);
   }
 
   /**
@@ -398,18 +577,28 @@ public class ParquetRewriterTest {
    * @param outFileColumnMapping the column mapping from the output file to 
the input file.
    */
   private void validatePageIndex(Map<Integer, Integer> outFileColumnMapping) 
throws Exception {
-    ParquetMetadata inMetaData = getFileMetaData(inputFile.getFileName(), 
null);
     ParquetMetadata outMetaData = getFileMetaData(outputFile, null);
-    assertEquals(inMetaData.getBlocks().size(), 
outMetaData.getBlocks().size());
 
-    try (TransParquetFileReader inReader = new TransParquetFileReader(
-            HadoopInputFile.fromPath(new Path(inputFile.getFileName()), conf), 
HadoopReadOptions.builder(conf).build());
-         TransParquetFileReader outReader = new TransParquetFileReader(
-                 HadoopInputFile.fromPath(new Path(outputFile), conf), 
HadoopReadOptions.builder(conf).build())) {
+    int inputFileIndex = 0;
+    TransParquetFileReader inReader = new 
TransParquetFileReader(HadoopInputFile.fromPath(
+            new Path(inputFiles.get(inputFileIndex).getFileName()), conf), 
HadoopReadOptions.builder(conf).build());
+    ParquetMetadata inMetaData = inReader.getFooter();
+
+    try (TransParquetFileReader outReader = new TransParquetFileReader(
+            HadoopInputFile.fromPath(new Path(outputFile), conf), 
HadoopReadOptions.builder(conf).build())) {
+
+      for (int outBlockId = 0, inBlockId = 0; outBlockId < 
outMetaData.getBlocks().size(); ++outBlockId, ++inBlockId) {
+        // Refresh reader of input file
+        if (inBlockId == inMetaData.getBlocks().size()) {
+          inReader = new TransParquetFileReader(
+                  HadoopInputFile.fromPath(new 
Path(inputFiles.get(++inputFileIndex).getFileName()), conf),
+                  HadoopReadOptions.builder(conf).build());
+          inMetaData = inReader.getFooter();
+          inBlockId = 0;
+        }
 
-      for (int i = 0; i < inMetaData.getBlocks().size(); i++) {
-        BlockMetaData inBlockMetaData = inMetaData.getBlocks().get(i);
-        BlockMetaData outBlockMetaData = outMetaData.getBlocks().get(i);
+        BlockMetaData inBlockMetaData = inMetaData.getBlocks().get(inBlockId);
+        BlockMetaData outBlockMetaData = 
outMetaData.getBlocks().get(outBlockId);
 
         for (int j = 0; j < outBlockMetaData.getColumns().size(); j++) {
           if (!outFileColumnMapping.containsKey(j)) {
@@ -484,15 +673,25 @@ public class ParquetRewriterTest {
   }
 
   private void validateCreatedBy() throws Exception {
-    FileMetaData inFMD = getFileMetaData(inputFile.getFileName(), 
null).getFileMetaData();
-    FileMetaData outFMD = getFileMetaData(outputFile, null).getFileMetaData();
+    Set<String> createdBySet = new HashSet<>();
+    for (EncryptionTestFile inputFile : inputFiles) {
+      ParquetMetadata pmd = getFileMetaData(inputFile.getFileName(), null);
+      createdBySet.add(pmd.getFileMetaData().getCreatedBy());
+      
assertNull(pmd.getFileMetaData().getKeyValueMetaData().get(ParquetRewriter.ORIGINAL_CREATED_BY_KEY));
+    }
 
-    assertEquals(inFMD.getCreatedBy(), outFMD.getCreatedBy());
-    
assertNull(inFMD.getKeyValueMetaData().get(ParquetRewriter.ORIGINAL_CREATED_BY_KEY));
+    // Verify created_by from input files have been deduplicated
+    Object[] inputCreatedBys = createdBySet.toArray();
+    assertEquals(1, inputCreatedBys.length);
+
+    // Verify created_by has been set
+    FileMetaData outFMD = getFileMetaData(outputFile, null).getFileMetaData();
+    String inputCreatedBy = (String) inputCreatedBys[0];
+    assertEquals(inputCreatedBy, outFMD.getCreatedBy());
 
+    // Verify original.created.by has been set
     String originalCreatedBy = 
outFMD.getKeyValueMetaData().get(ParquetRewriter.ORIGINAL_CREATED_BY_KEY);
-    assertNotNull(originalCreatedBy);
-    assertEquals(inFMD.getCreatedBy(), originalCreatedBy);
+    assertEquals(inputCreatedBy, originalCreatedBy);
   }
 
 }


Reply via email to