This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-java.git


The following commit(s) were added to refs/heads/master by this push:
     new 686f07147 GH-3035: ParquetRewriter: Add a column renaming feature 
(#3036)
686f07147 is described below

commit 686f07147499f06eeb78c9643aba499e20002676
Author: Maksim Konstantinov <[email protected]>
AuthorDate: Wed Nov 13 07:09:08 2024 -0800

    GH-3035: ParquetRewriter: Add a column renaming feature (#3036)
---
 .../parquet/hadoop/rewrite/ParquetRewriter.java    | 137 +++++++++--
 .../parquet/hadoop/rewrite/RewriteOptions.java     |  81 +++++--
 .../hadoop/rewrite/ParquetRewriterTest.java        | 251 +++++++++++++++------
 3 files changed, 363 insertions(+), 106 deletions(-)

diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
index 2ff9c0ea3..9535b4335 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
@@ -109,7 +109,7 @@ import org.slf4j.LoggerFactory;
  * Please note the schema of all <code>inputFiles</code> must be the same, 
otherwise the rewrite will fail.
  * <p>
  * <h2>Applying column transformations</h2>
- * Some supported column transformations: pruning, masking, encrypting, 
changing a codec.
+ * Some supported column transformations: pruning, masking, renaming, 
encrypting, changing a codec.
  * See {@link RewriteOptions} and {@link RewriteOptions.Builder} for the full 
list with description.
  * <p>
  * <h2><i>Joining</i> with extra files with a different schema</h2>
@@ -149,18 +149,23 @@ public class ParquetRewriter implements Closeable {
   private final IndexCache.CacheStrategy indexCacheStrategy;
   private final boolean overwriteInputWithJoinColumns;
   private final InternalFileEncryptor nullColumnEncryptor;
+  private final Map<String, String> renamedColumns;
 
   public ParquetRewriter(RewriteOptions options) throws IOException {
     this.newCodecName = options.getNewCodecName();
     this.indexCacheStrategy = options.getIndexCacheStrategy();
     this.overwriteInputWithJoinColumns = 
options.getOverwriteInputWithJoinColumns();
+    this.renamedColumns = options.getRenameColumns();
     ParquetConfiguration conf = options.getParquetConfiguration();
-    OutputFile out = options.getParquetOutputFile();
-    inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf));
-    
inputFilesToJoin.addAll(getFileReaders(options.getParquetInputFilesToJoin(), 
conf));
+    this.inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), 
conf));
+    
this.inputFilesToJoin.addAll(getFileReaders(options.getParquetInputFilesToJoin(),
 conf));
+    this.outSchema = pruneColumnsInSchema(getSchema(), 
options.getPruneColumns());
+    this.extraMetaData = getExtraMetadata(options);
     ensureSameSchema(inputFiles);
     ensureSameSchema(inputFilesToJoin);
     ensureRowCount();
+    ensureRenamingCorrectness(outSchema, renamedColumns);
+    OutputFile out = options.getParquetOutputFile();
     LOG.info(
         "Start rewriting {} input file(s) {} to {}",
         inputFiles.size() + inputFilesToJoin.size(),
@@ -168,9 +173,6 @@ public class ParquetRewriter implements Closeable {
             .collect(Collectors.toList()),
         out);
 
-    this.outSchema = pruneColumnsInSchema(getSchema(), 
options.getPruneColumns());
-    this.extraMetaData = getExtraMetadata(options);
-
     if (options.getMaskColumns() != null) {
       this.maskColumns = new HashMap<>();
       for (Map.Entry<String, MaskMode> col : 
options.getMaskColumns().entrySet()) {
@@ -184,9 +186,9 @@ public class ParquetRewriter implements Closeable {
     }
 
     ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE;
-    writer = new ParquetFileWriter(
+    this.writer = new ParquetFileWriter(
         out,
-        outSchema,
+        renamedColumns.isEmpty() ? outSchema : 
getSchemaWithRenamedColumns(this.outSchema),
         writerMode,
         DEFAULT_BLOCK_SIZE,
         MAX_PADDING_SIZE_DEFAULT,
@@ -200,7 +202,8 @@ public class ParquetRewriter implements Closeable {
       this.nullColumnEncryptor = null;
     } else {
       this.nullColumnEncryptor = new 
InternalFileEncryptor(options.getFileEncryptionProperties());
-      List<ColumnDescriptor> columns = outSchema.getColumns();
+      List<ColumnDescriptor> columns =
+          getSchemaWithRenamedColumns(this.outSchema).getColumns();
       for (int i = 0; i < columns.size(); i++) {
         writer.getEncryptor()
             .getColumnSetup(ColumnPath.get(columns.get(i).getPath()), true, i);
@@ -223,8 +226,8 @@ public class ParquetRewriter implements Closeable {
     this.writer = writer;
     this.outSchema = outSchema;
     this.newCodecName = codecName;
-    extraMetaData = new 
HashMap<>(meta.getFileMetaData().getKeyValueMetaData());
-    extraMetaData.put(
+    this.extraMetaData = new 
HashMap<>(meta.getFileMetaData().getKeyValueMetaData());
+    this.extraMetaData.put(
         ORIGINAL_CREATED_BY_KEY,
         originalCreatedBy != null
             ? originalCreatedBy
@@ -239,6 +242,7 @@ public class ParquetRewriter implements Closeable {
     this.indexCacheStrategy = IndexCache.CacheStrategy.NONE;
     this.overwriteInputWithJoinColumns = false;
     this.nullColumnEncryptor = null;
+    this.renamedColumns = new HashMap<>();
   }
 
   private MessageType getSchema() {
@@ -266,6 +270,27 @@ public class ParquetRewriter implements Closeable {
     }
   }
 
+  private MessageType getSchemaWithRenamedColumns(MessageType schema) {
+    List<Type> fields = schema.getFields().stream()
+        .map(type -> {
+          if (!renamedColumns.containsKey(type.getName())) {
+            return type;
+          } else if (type.isPrimitive()) {
+            return new PrimitiveType(
+                type.getRepetition(),
+                type.asPrimitiveType().getPrimitiveTypeName(),
+                renamedColumns.get(type.getName()));
+          } else {
+            return new GroupType(
+                type.getRepetition(),
+                renamedColumns.get(type.getName()),
+                type.asGroupType().getFields());
+          }
+        })
+        .collect(Collectors.toList());
+    return new MessageType(schema.getName(), fields);
+  }
+
   private Map<String, String> getExtraMetadata(RewriteOptions options) {
     List<TransParquetFileReader> allFiles;
     if (options.getIgnoreJoinFilesMetadata()) {
@@ -338,6 +363,21 @@ public class ParquetRewriter implements Closeable {
     }
   }
 
+  private void ensureRenamingCorrectness(MessageType schema, Map<String, 
String> renameMap) {
+    Set<String> columns = 
schema.getFields().stream().map(Type::getName).collect(Collectors.toSet());
+    renameMap.forEach((src, dst) -> {
+      if (!columns.contains(src)) {
+        String msg = String.format("Column to rename '%s' is not found in 
input files schema", src);
+        LOG.error(msg);
+        throw new IllegalArgumentException(msg);
+      } else if (columns.contains(dst)) {
+        String msg = String.format("Renamed column target name '%s' is already 
present in a schema", dst);
+        LOG.error(msg);
+        throw new IllegalArgumentException(msg);
+      }
+    });
+  }
+
   @Override
   public void close() throws IOException {
     writer.end(extraMetaData);
@@ -421,6 +461,27 @@ public class ParquetRewriter implements Closeable {
     if (readerToJoin != null) readerToJoin.close();
   }
 
+  private ColumnPath normalizeFieldsInPath(ColumnPath path) {
+    if (renamedColumns.isEmpty()) {
+      return path;
+    } else {
+      String[] pathArray = path.toArray();
+      pathArray[0] = renamedColumns.getOrDefault(pathArray[0], pathArray[0]);
+      return ColumnPath.get(pathArray);
+    }
+  }
+
+  private PrimitiveType normalizeNameInType(PrimitiveType type) {
+    if (renamedColumns.isEmpty()) {
+      return type;
+    } else {
+      return new PrimitiveType(
+          type.getRepetition(),
+          type.asPrimitiveType().getPrimitiveTypeName(),
+          renamedColumns.getOrDefault(type.getName(), type.getName()));
+    }
+  }
+
   private void processBlock(
       TransParquetFileReader reader,
       int blockIdx,
@@ -431,7 +492,28 @@ public class ParquetRewriter implements Closeable {
     if (chunk.isEncrypted()) {
       throw new IOException("Column " + chunk.getPath().toDotString() + " is 
already encrypted");
     }
-    ColumnDescriptor descriptor = outSchema.getColumns().get(outColumnIdx);
+
+    ColumnChunkMetaData chunkNormalized = chunk;
+    if (!renamedColumns.isEmpty()) {
+      // Keep an eye if this get stale because of ColumnChunkMetaData change
+      chunkNormalized = ColumnChunkMetaData.get(
+          normalizeFieldsInPath(chunk.getPath()),
+          normalizeNameInType(chunk.getPrimitiveType()),
+          chunk.getCodec(),
+          chunk.getEncodingStats(),
+          chunk.getEncodings(),
+          chunk.getStatistics(),
+          chunk.getFirstDataPageOffset(),
+          chunk.getDictionaryPageOffset(),
+          chunk.getValueCount(),
+          chunk.getTotalSize(),
+          chunk.getTotalUncompressedSize(),
+          chunk.getSizeStatistics());
+    }
+
+    ColumnDescriptor descriptorOriginal = 
outSchema.getColumns().get(outColumnIdx);
+    ColumnDescriptor descriptorRenamed =
+        getSchemaWithRenamedColumns(outSchema).getColumns().get(outColumnIdx);
     BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(blockIdx);
     String originalCreatedBy = reader.getFileMetaData().getCreatedBy();
 
@@ -443,13 +525,21 @@ public class ParquetRewriter implements Closeable {
       // Mask column and compress it again.
       MaskMode maskMode = maskColumns.get(chunk.getPath());
       if (maskMode.equals(MaskMode.NULLIFY)) {
-        Type.Repetition repetition = 
descriptor.getPrimitiveType().getRepetition();
+        Type.Repetition repetition =
+            descriptorOriginal.getPrimitiveType().getRepetition();
         if (repetition.equals(Type.Repetition.REQUIRED)) {
-          throw new IOException(
-              "Required column [" + descriptor.getPrimitiveType().getName() + 
"] cannot be nullified");
+          throw new IOException("Required column ["
+              + descriptorOriginal.getPrimitiveType().getName() + "] cannot be 
nullified");
         }
         nullifyColumn(
-            reader, blockIdx, descriptor, chunk, writer, newCodecName, 
encryptColumn, originalCreatedBy);
+            reader,
+            blockIdx,
+            descriptorOriginal,
+            chunk,
+            writer,
+            newCodecName,
+            encryptColumn,
+            originalCreatedBy);
       } else {
         throw new UnsupportedOperationException("Only nullify is supported for 
now");
       }
@@ -462,7 +552,7 @@ public class ParquetRewriter implements Closeable {
       }
 
       // Translate compression and/or encryption
-      writer.startColumn(descriptor, chunk.getValueCount(), newCodecName);
+      writer.startColumn(descriptorRenamed, chunk.getValueCount(), 
newCodecName);
       processChunk(
           reader,
           blockMetaData.getRowCount(),
@@ -480,7 +570,8 @@ public class ParquetRewriter implements Closeable {
       BloomFilter bloomFilter = indexCache.getBloomFilter(chunk);
       ColumnIndex columnIndex = indexCache.getColumnIndex(chunk);
       OffsetIndex offsetIndex = indexCache.getOffsetIndex(chunk);
-      writer.appendColumnChunk(descriptor, reader.getStream(), chunk, 
bloomFilter, columnIndex, offsetIndex);
+      writer.appendColumnChunk(
+          descriptorRenamed, reader.getStream(), chunkNormalized, bloomFilter, 
columnIndex, offsetIndex);
     }
   }
 
@@ -522,7 +613,7 @@ public class ParquetRewriter implements Closeable {
     }
 
     if (bloomFilter != null) {
-      writer.addBloomFilter(chunk.getPath().toDotString(), bloomFilter);
+      
writer.addBloomFilter(normalizeFieldsInPath(chunk.getPath()).toDotString(), 
bloomFilter);
     }
 
     reader.setStreamPosition(chunk.getStartingPos());
@@ -580,7 +671,7 @@ public class ParquetRewriter implements Closeable {
               dataPageAAD);
           statistics = convertStatistics(
               originalCreatedBy,
-              chunk.getPrimitiveType(),
+              normalizeNameInType(chunk.getPrimitiveType()),
               headerV1.getStatistics(),
               columnIndex,
               pageOrdinal,
@@ -648,7 +739,7 @@ public class ParquetRewriter implements Closeable {
               dataPageAAD);
           statistics = convertStatistics(
               originalCreatedBy,
-              chunk.getPrimitiveType(),
+              normalizeNameInType(chunk.getPrimitiveType()),
               headerV2.getStatistics(),
               columnIndex,
               pageOrdinal,
@@ -887,7 +978,7 @@ public class ParquetRewriter implements Closeable {
     CompressionCodecFactory.BytesInputCompressor compressor = 
codecFactory.getCompressor(newCodecName);
 
     // Create new schema that only has the current column
-    MessageType newSchema = newSchema(outSchema, descriptor);
+    MessageType newSchema = getSchemaWithRenamedColumns(newSchema(outSchema, 
descriptor));
     ColumnChunkPageWriteStore cPageStore = new ColumnChunkPageWriteStore(
         compressor,
         newSchema,
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
index a69403f46..f85b65ea3 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
@@ -20,8 +20,11 @@ package org.apache.parquet.hadoop.rewrite;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -49,6 +52,7 @@ public class RewriteOptions {
   private final List<String> pruneColumns;
   private final CompressionCodecName newCodecName;
   private final Map<String, MaskMode> maskColumns;
+  private final Map<String, String> renameColumns;
   private final List<String> encryptColumns;
   private final FileEncryptionProperties fileEncryptionProperties;
   private final IndexCache.CacheStrategy indexCacheStrategy;
@@ -63,6 +67,7 @@ public class RewriteOptions {
       List<String> pruneColumns,
       CompressionCodecName newCodecName,
       Map<String, MaskMode> maskColumns,
+      Map<String, String> renameColumns,
       List<String> encryptColumns,
       FileEncryptionProperties fileEncryptionProperties,
       IndexCache.CacheStrategy indexCacheStrategy,
@@ -75,6 +80,7 @@ public class RewriteOptions {
     this.pruneColumns = pruneColumns;
     this.newCodecName = newCodecName;
     this.maskColumns = maskColumns;
+    this.renameColumns = renameColumns;
     this.encryptColumns = encryptColumns;
     this.fileEncryptionProperties = fileEncryptionProperties;
     this.indexCacheStrategy = indexCacheStrategy;
@@ -192,6 +198,10 @@ public class RewriteOptions {
     return maskColumns;
   }
 
+  public Map<String, String> getRenameColumns() {
+    return renameColumns;
+  }
+
   public List<String> getEncryptColumns() {
     return encryptColumns;
   }
@@ -221,6 +231,7 @@ public class RewriteOptions {
     private List<String> pruneColumns;
     private CompressionCodecName newCodecName;
     private Map<String, MaskMode> maskColumns;
+    private Map<String, String> renameColumns;
     private List<String> encryptColumns;
     private FileEncryptionProperties fileEncryptionProperties;
     private IndexCache.CacheStrategy indexCacheStrategy = 
IndexCache.CacheStrategy.NONE;
@@ -432,6 +443,19 @@ public class RewriteOptions {
       return this;
     }
 
+    /**
+     * Set the columns to be renamed.
+     * <p>
+     * Note that nested columns can't be renamed, in case of GroupType column 
only top level column can be renamed.
+     *
+     * @param renameColumns map where keys are original names and values are 
new names
+     * @return self
+     */
+    public Builder renameColumns(Map<String, String> renameColumns) {
+      this.renameColumns = renameColumns;
+      return this;
+    }
+
     /**
      * Set the columns to encrypt.
      * <p>
@@ -551,6 +575,28 @@ public class RewriteOptions {
      * @return a RewriterOptions
      */
     public RewriteOptions build() {
+      checkPreconditions();
+      return new RewriteOptions(
+          conf,
+          inputFiles,
+          (inputFilesToJoin != null ? inputFilesToJoin : new ArrayList<>()),
+          outputFile,
+          pruneColumns,
+          newCodecName,
+          maskColumns,
+          renameColumns == null
+              ? new HashMap<>()
+              : renameColumns.entrySet().stream()
+                  .collect(Collectors.toMap(x -> x.getKey().trim(), x -> 
x.getValue()
+                      .trim())),
+          encryptColumns,
+          fileEncryptionProperties,
+          indexCacheStrategy,
+          overwriteInputWithJoinColumns,
+          ignoreJoinFilesMetadata);
+    }
+
+    private void checkPreconditions() {
       Preconditions.checkArgument(inputFiles != null && !inputFiles.isEmpty(), 
"Input file is required");
       Preconditions.checkArgument(outputFile != null, "Output file is 
required");
 
@@ -561,7 +607,6 @@ public class RewriteOptions {
                 !maskColumns.containsKey(pruneColumn), "Cannot prune and mask 
same column");
           }
         }
-
         if (encryptColumns != null) {
           for (String pruneColumn : pruneColumns) {
             Preconditions.checkArgument(
@@ -570,6 +615,26 @@ public class RewriteOptions {
         }
       }
 
+      if (renameColumns != null) {
+        Set<String> nullifiedColumns = maskColumns == null
+            ? new HashSet<>()
+            : maskColumns.entrySet().stream()
+                .filter(x -> x.getValue() == MaskMode.NULLIFY)
+                .map(Map.Entry::getKey)
+                .collect(Collectors.toSet());
+        renameColumns.forEach((colSrc, colDst) -> {
+          Preconditions.checkArgument(
+              colSrc != null && !colSrc.trim().isEmpty(), "Renamed column 
source name can't be empty");
+          Preconditions.checkArgument(
+              colDst != null && !colDst.trim().isEmpty(), "Renamed column 
target name can't be empty");
+          Preconditions.checkArgument(
+              !nullifiedColumns.contains(colSrc), "Cannot nullify and rename 
the same column");
+          Preconditions.checkArgument(
+              !colSrc.contains(".") && !colDst.contains("."),
+              "Renamed column can't be nested, in case of GroupType column 
only a top level column can be renamed");
+        });
+      }
+
       if (encryptColumns != null && !encryptColumns.isEmpty()) {
         Preconditions.checkArgument(
             fileEncryptionProperties != null,
@@ -581,20 +646,6 @@ public class RewriteOptions {
             encryptColumns != null && !encryptColumns.isEmpty(),
             "Encrypt columns is required when FileEncryptionProperties is 
set");
       }
-
-      return new RewriteOptions(
-          conf,
-          inputFiles,
-          (inputFilesToJoin != null ? inputFilesToJoin : new ArrayList<>()),
-          outputFile,
-          pruneColumns,
-          newCodecName,
-          maskColumns,
-          encryptColumns,
-          fileEncryptionProperties,
-          indexCacheStrategy,
-          overwriteInputWithJoinColumns,
-          ignoreJoinFilesMetadata);
     }
   }
 }
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
index 34c90a464..c1da97c40 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.parquet.hadoop.rewrite;
 
+import static java.util.Collections.emptyMap;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
@@ -181,10 +182,10 @@ public class ParquetRewriterTest {
         null);
 
     // Verify the data are not changed for the columns not pruned
-    validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(), 
null, false);
+    validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(), 
null, false, emptyMap());
 
     // Verify the page index
-    validatePageIndex(new HashSet<>(), false);
+    validatePageIndex(new HashSet<>(), false, emptyMap());
 
     // Verify original.created.by is preserved
     validateCreatedBy();
@@ -199,7 +200,7 @@ public class ParquetRewriterTest {
 
   @Test
   public void testPruneSingleColumnTranslateCodecSingleFile() throws Exception 
{
-    ensureContainsGzipFile();
+    addGzipInputFile();
     List<Path> inputPaths = new ArrayList<Path>() {
       {
         add(new Path(inputFiles.get(0).getFileName()));
@@ -210,8 +211,8 @@ public class ParquetRewriterTest {
 
   @Test
   public void testPruneSingleColumnTranslateCodecTwoFiles() throws Exception {
-    ensureContainsGzipFile();
-    ensureContainsUncompressedFile();
+    addGzipInputFile();
+    addUncompressedInputFile();
     List<Path> inputPaths = new ArrayList<Path>() {
       {
         add(new Path(inputFiles.get(0).getFileName()));
@@ -252,10 +253,10 @@ public class ParquetRewriterTest {
         null);
 
     // Verify the data are not changed for the columns not pruned
-    validateColumnData(new HashSet<>(pruneColumns), maskColumns.keySet(), 
null, false);
+    validateColumnData(new HashSet<>(pruneColumns), maskColumns.keySet(), 
null, false, emptyMap());
 
     // Verify the page index
-    validatePageIndex(ImmutableSet.of("Links.Forward"), false);
+    validatePageIndex(ImmutableSet.of("Links.Forward"), false, emptyMap());
 
     // Verify original.created.by is preserved
     validateCreatedBy();
@@ -264,7 +265,7 @@ public class ParquetRewriterTest {
 
   @Test
   public void testPruneNullifyTranslateCodecSingleFile() throws Exception {
-    ensureContainsGzipFile();
+    addGzipInputFile();
 
     List<Path> inputPaths = new ArrayList<Path>() {
       {
@@ -276,8 +277,8 @@ public class ParquetRewriterTest {
 
   @Test
   public void testPruneNullifyTranslateCodecTwoFiles() throws Exception {
-    ensureContainsGzipFile();
-    ensureContainsUncompressedFile();
+    addGzipInputFile();
+    addUncompressedInputFile();
 
     List<Path> inputPaths = new ArrayList<Path>() {
       {
@@ -327,7 +328,8 @@ public class ParquetRewriterTest {
         fileDecryptionProperties);
 
     // Verify the data are not changed for the columns not pruned
-    validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(), 
fileDecryptionProperties, false);
+    validateColumnData(
+        new HashSet<>(pruneColumns), Collections.emptySet(), 
fileDecryptionProperties, false, emptyMap());
 
     // Verify column encryption
     ParquetMetadata metaData = getFileMetaData(outputFile, 
fileDecryptionProperties);
@@ -349,7 +351,7 @@ public class ParquetRewriterTest {
 
   @Test
   public void testPruneEncryptTranslateCodecSingleFile() throws Exception {
-    ensureContainsGzipFile();
+    addGzipInputFile();
 
     List<Path> inputPaths = new ArrayList<Path>() {
       {
@@ -361,8 +363,8 @@ public class ParquetRewriterTest {
 
   @Test
   public void testPruneEncryptTranslateCodecTwoFiles() throws Exception {
-    ensureContainsGzipFile();
-    ensureContainsUncompressedFile();
+    addGzipInputFile();
+    addUncompressedInputFile();
 
     List<Path> inputPaths = new ArrayList<Path>() {
       {
@@ -488,10 +490,10 @@ public class ParquetRewriterTest {
 
     // Verify the data are not changed for non-encrypted and non-masked 
columns.
     // Also make sure the masked column is nullified.
-    validateColumnData(Collections.emptySet(), maskColumns.keySet(), 
fileDecryptionProperties, false);
+    validateColumnData(Collections.emptySet(), maskColumns.keySet(), 
fileDecryptionProperties, false, emptyMap());
 
     // Verify the page index
-    validatePageIndex(ImmutableSet.of("DocId", "Links.Forward"), false);
+    validatePageIndex(ImmutableSet.of("DocId", "Links.Forward"), false, 
emptyMap());
 
     // Verify the column is encrypted
     ParquetMetadata metaData = getFileMetaData(outputFile, 
fileDecryptionProperties);
@@ -511,7 +513,7 @@ public class ParquetRewriterTest {
 
   @Test
   public void testNullifyEncryptSingleFile() throws Exception {
-    ensureContainsGzipFile();
+    addGzipInputFile();
 
     List<Path> inputPaths = new ArrayList<Path>() {
       {
@@ -523,8 +525,8 @@ public class ParquetRewriterTest {
 
   @Test
   public void testNullifyEncryptTwoFiles() throws Exception {
-    ensureContainsGzipFile();
-    ensureContainsUncompressedFile();
+    addGzipInputFile();
+    addUncompressedInputFile();
 
     List<Path> inputPaths = new ArrayList<Path>() {
       {
@@ -537,8 +539,8 @@ public class ParquetRewriterTest {
 
   @Test
   public void testMergeTwoFilesOnly() throws Exception {
-    ensureContainsGzipFile();
-    ensureContainsUncompressedFile();
+    addGzipInputFile();
+    addUncompressedInputFile();
 
     // Only merge two files but do not change anything.
     List<Path> inputPaths = new ArrayList<>();
@@ -571,27 +573,103 @@ public class ParquetRewriterTest {
         null);
 
     // Verify the merged data are not changed
-    validateColumnData(Collections.emptySet(), Collections.emptySet(), null, 
false);
+    validateColumnData(Collections.emptySet(), Collections.emptySet(), null, 
false, emptyMap());
 
     // Verify the page index
-    validatePageIndex(new HashSet<>(), false);
+    validatePageIndex(new HashSet<>(), false, emptyMap());
 
     // Verify original.created.by is preserved
     validateCreatedBy();
     validateRowGroupRowCount();
   }
 
+  @Test
+  public void testMergeTwoFilesOnlyRenameColumn() throws Exception {
+    addGzipInputFile();
+    addUncompressedInputFile();
+
+    Map<String, String> renameColumns = ImmutableMap.of("Name", "NameRenamed");
+    List<String> pruneColumns = ImmutableList.of("Gender");
+    String[] encryptColumns = {"DocId"};
+    FileEncryptionProperties fileEncryptionProperties =
+        EncDecProperties.getFileEncryptionProperties(encryptColumns, 
ParquetCipher.AES_GCM_CTR_V1, false);
+    List<Path> inputPaths =
+        inputFiles.stream().map(x -> new 
Path(x.getFileName())).collect(Collectors.toList());
+    RewriteOptions.Builder builder = createBuilder(inputPaths);
+    RewriteOptions options = builder.indexCacheStrategy(indexCacheStrategy)
+        .renameColumns(ImmutableMap.of("Name", "NameRenamed"))
+        .prune(pruneColumns)
+        .transform(CompressionCodecName.SNAPPY)
+        .encrypt(Arrays.asList(encryptColumns))
+        .encryptionProperties(fileEncryptionProperties)
+        .build();
+
+    rewriter = new ParquetRewriter(options);
+    rewriter.processBlocks();
+    rewriter.close();
+
+    FileDecryptionProperties fileDecryptionProperties = 
EncDecProperties.getFileDecryptionProperties();
+
+    // Verify the schema is not changed
+    ParquetMetadata pmd =
+        ParquetFileReader.readFooter(conf, new Path(outputFile), 
ParquetMetadataConverter.NO_FILTER);
+    MessageType schema = pmd.getFileMetaData().getSchema();
+    MessageType expectSchema = createSchemaWithRenamed();
+    assertEquals(expectSchema, schema);
+
+    verifyCodec(outputFile, ImmutableSet.of(CompressionCodecName.SNAPPY), 
fileDecryptionProperties); // Verify codec
+    // Verify the merged data are not changed
+    validateColumnData(
+        new HashSet<>(pruneColumns), Collections.emptySet(), 
fileDecryptionProperties, false, renameColumns);
+    validatePageIndex(ImmutableSet.of("DocId"), false, renameColumns); // 
Verify the page index
+    validateCreatedBy(); // Verify original.created.by is preserved
+    validateRowGroupRowCount();
+
+    ParquetMetadata metaData = getFileMetaData(outputFile, 
fileDecryptionProperties);
+    assertFalse(metaData.getBlocks().isEmpty());
+    Set<String> encryptedColumns = new 
HashSet<>(Arrays.asList(encryptColumns));
+    for (BlockMetaData blockMetaData : metaData.getBlocks()) {
+      List<ColumnChunkMetaData> columns = blockMetaData.getColumns();
+      for (ColumnChunkMetaData column : columns) {
+        if (encryptedColumns.contains(column.getPath().toDotString())) {
+          assertTrue(column.isEncrypted());
+        } else {
+          assertFalse(column.isEncrypted());
+        }
+      }
+    }
+  }
+
   @Test(expected = InvalidSchemaException.class)
   public void testMergeTwoFilesWithDifferentSchema() throws Exception {
-    testMergeTwoFilesWithDifferentSchemaSetup(true);
+    testMergeTwoFilesWithDifferentSchemaSetup(true, null, null);
   }
 
   @Test(expected = InvalidSchemaException.class)
   public void testMergeTwoFilesToJoinWithDifferentSchema() throws Exception {
-    testMergeTwoFilesWithDifferentSchemaSetup(false);
+    testMergeTwoFilesWithDifferentSchemaSetup(false, null, null);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testMergeTwoFilesWithWrongDestinationRenamedColumn() throws 
Exception {
+    testMergeTwoFilesWithDifferentSchemaSetup(
+        null, ImmutableMap.of("WrongColumnName", "WrongColumnNameRenamed"), 
null);
   }
 
-  public void testMergeTwoFilesWithDifferentSchemaSetup(boolean 
wrongSchemaInInputFile) throws Exception {
+  @Test(expected = IllegalArgumentException.class)
+  public void testMergeTwoFilesWithWrongSourceRenamedColumn() throws Exception 
{
+    testMergeTwoFilesWithDifferentSchemaSetup(null, ImmutableMap.of("Name", 
"DocId"), null);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testMergeTwoFilesNullifyAndRenamedSameColumn() throws Exception {
+    testMergeTwoFilesWithDifferentSchemaSetup(
+        null, ImmutableMap.of("Name", "NameRenamed"), ImmutableMap.of("Name", 
MaskMode.NULLIFY));
+  }
+
+  public void testMergeTwoFilesWithDifferentSchemaSetup(
+      Boolean wrongSchemaInInputFile, Map<String, String> renameColumns, 
Map<String, MaskMode> maskColumns)
+      throws Exception {
     MessageType schema1 = new MessageType(
         "schema",
         new PrimitiveType(OPTIONAL, INT64, "DocId"),
@@ -620,27 +698,32 @@ public class ParquetRewriterTest {
         .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
         .withWriterVersion(writerVersion)
         .build());
-    if (wrongSchemaInInputFile) {
-      inputFiles.add(new TestFileBuilder(conf, schema2)
-          .withNumRecord(numRecord)
-          .withCodec("UNCOMPRESSED")
-          .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
-          .withWriterVersion(writerVersion)
-          .build());
-    } else {
-      inputFilesToJoin.add(new TestFileBuilder(conf, schema2)
-          .withNumRecord(numRecord)
-          .withCodec("UNCOMPRESSED")
-          .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
-          .withWriterVersion(writerVersion)
-          .build());
+    if (wrongSchemaInInputFile != null) {
+      if (wrongSchemaInInputFile) {
+        inputFiles.add(new TestFileBuilder(conf, schema2)
+            .withNumRecord(numRecord)
+            .withCodec("UNCOMPRESSED")
+            .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+            .withWriterVersion(writerVersion)
+            .build());
+      } else {
+        inputFilesToJoin.add(new TestFileBuilder(conf, schema2)
+            .withNumRecord(numRecord)
+            .withCodec("UNCOMPRESSED")
+            .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+            .withWriterVersion(writerVersion)
+            .build());
+      }
     }
 
     RewriteOptions.Builder builder = createBuilder(
         inputFiles.stream().map(x -> new 
Path(x.getFileName())).collect(Collectors.toList()),
         inputFilesToJoin.stream().map(x -> new 
Path(x.getFileName())).collect(Collectors.toList()),
         false);
-    RewriteOptions options = 
builder.indexCacheStrategy(indexCacheStrategy).build();
+    RewriteOptions options = builder.indexCacheStrategy(indexCacheStrategy)
+        .renameColumns(renameColumns)
+        .mask(maskColumns)
+        .build();
 
     // This should throw an exception because the schemas are different
     rewriter = new ParquetRewriter(options);
@@ -648,7 +731,7 @@ public class ParquetRewriterTest {
 
   @Test
   public void testRewriteFileWithMultipleBlocks() throws Exception {
-    ensureContainsGzipFile();
+    addGzipInputFile();
 
     List<Path> inputPaths = new ArrayList<Path>() {
       {
@@ -823,12 +906,13 @@ public class ParquetRewriterTest {
         new HashSet<>(pruneColumns),
         maskColumns.keySet(),
         fileDecryptionProperties,
-        joinColumnsOverwrite); // Verify data
+        joinColumnsOverwrite,
+        emptyMap()); // Verify data
     validateSchemaWithGenderColumnPruned(true); // Verify schema
     validateCreatedBy(); // Verify original.created.by
     assertEquals(inputBloomFilters.keySet(), rBloomFilters); // Verify bloom 
filters
     verifyCodec(outputFile, ImmutableSet.of(CompressionCodecName.ZSTD), 
fileDecryptionProperties); // Verify codec
-    validatePageIndex(ImmutableSet.of(encryptColumn), joinColumnsOverwrite);
+    validatePageIndex(ImmutableSet.of(encryptColumn), joinColumnsOverwrite, 
emptyMap());
   }
 
   private void testOneInputFileManyInputFilesToJoinSetup() throws IOException {
@@ -884,11 +968,26 @@ public class ParquetRewriterTest {
             new PrimitiveType(REPEATED, BINARY, "Forward")));
   }
 
+  private MessageType createSchemaWithRenamed() {
+    return new MessageType(
+        "schema",
+        new PrimitiveType(OPTIONAL, INT64, "DocId"),
+        new PrimitiveType(REQUIRED, BINARY, "NameRenamed"),
+        new PrimitiveType(REPEATED, FLOAT, "FloatFraction"),
+        new PrimitiveType(OPTIONAL, DOUBLE, "DoubleFraction"),
+        new GroupType(
+            OPTIONAL,
+            "Links",
+            new PrimitiveType(REPEATED, BINARY, "Backward"),
+            new PrimitiveType(REPEATED, BINARY, "Forward")));
+  }
+
   private void validateColumnData(
       Set<String> prunePaths,
       Set<String> nullifiedPaths,
       FileDecryptionProperties fileDecryptionProperties,
-      Boolean joinColumnsOverwrite)
+      Boolean joinColumnsOverwrite,
+      Map<String, String> renameColumns)
       throws IOException {
     ParquetReader<Group> reader = ParquetReader.builder(new 
GroupReadSupport(), new Path(outputFile))
         .withConf(conf)
@@ -901,7 +1000,7 @@ public class ParquetRewriterTest {
     List<SimpleGroup> filesJoined = inputFilesToJoin.stream()
         .flatMap(x -> Arrays.stream(x.getFileContent()))
         .collect(Collectors.toList());
-    BiFunction<String, Integer, Group> groups = (name, rowIdx) -> {
+    BiFunction<String, Integer, Group> groupsExpected = (name, rowIdx) -> {
       if (!filesMain.get(0).getType().containsField(name)
           || joinColumnsOverwrite
               && !filesJoined.isEmpty()
@@ -915,50 +1014,53 @@ public class ParquetRewriterTest {
     int totalRows =
         inputFiles.stream().mapToInt(x -> x.getFileContent().length).sum();
     for (int i = 0; i < totalRows; i++) {
-      Group group = reader.read();
-      assertNotNull(group);
+      Group groupActual = reader.read();
+      assertNotNull(groupActual);
 
       if (!prunePaths.contains("DocId")) {
         if (nullifiedPaths.contains("DocId")) {
-          assertThrows(RuntimeException.class, () -> group.getLong("DocId", 
0));
+          assertThrows(RuntimeException.class, () -> 
groupActual.getLong("DocId", 0));
         } else {
           assertEquals(
-              group.getLong("DocId", 0), groups.apply("DocId", 
i).getLong("DocId", 0));
+              groupActual.getLong("DocId", 0),
+              groupsExpected.apply("DocId", i).getLong("DocId", 0));
         }
       }
 
       if (!prunePaths.contains("Name") && !nullifiedPaths.contains("Name")) {
+        String colName = renameColumns.getOrDefault("Name", "Name");
         assertArrayEquals(
-            group.getBinary("Name", 0).getBytes(),
-            groups.apply("Name", i).getBinary("Name", 0).getBytes());
+            groupActual.getBinary(colName, 0).getBytes(),
+            groupsExpected.apply("Name", i).getBinary("Name", 0).getBytes());
       }
 
       if (!prunePaths.contains("Gender") && 
!nullifiedPaths.contains("Gender")) {
         assertArrayEquals(
-            group.getBinary("Gender", 0).getBytes(),
-            groups.apply("Gender", i).getBinary("Gender", 0).getBytes());
+            groupActual.getBinary("Gender", 0).getBytes(),
+            groupsExpected.apply("Gender", i).getBinary("Gender", 
0).getBytes());
       }
 
       if (!prunePaths.contains("FloatFraction") && 
!nullifiedPaths.contains("FloatFraction")) {
         assertEquals(
-            group.getFloat("FloatFraction", 0),
-            groups.apply("FloatFraction", i).getFloat("FloatFraction", 0),
+            groupActual.getFloat("FloatFraction", 0),
+            groupsExpected.apply("FloatFraction", i).getFloat("FloatFraction", 
0),
             0);
       }
 
       if (!prunePaths.contains("DoubleFraction") && 
!nullifiedPaths.contains("DoubleFraction")) {
         assertEquals(
-            group.getDouble("DoubleFraction", 0),
-            groups.apply("DoubleFraction", i).getDouble("DoubleFraction", 0),
+            groupActual.getDouble("DoubleFraction", 0),
+            groupsExpected.apply("DoubleFraction", 
i).getDouble("DoubleFraction", 0),
             0);
       }
 
-      Group subGroup = group.getGroup("Links", 0);
+      Group subGroup = groupActual.getGroup("Links", 0);
 
       if (!prunePaths.contains("Links.Backward") && 
!nullifiedPaths.contains("Links.Backward")) {
         assertArrayEquals(
             subGroup.getBinary("Backward", 0).getBytes(),
-            groups.apply("Links", i)
+            groupsExpected
+                .apply("Links", i)
                 .getGroup("Links", 0)
                 .getBinary("Backward", 0)
                 .getBytes());
@@ -970,7 +1072,8 @@ public class ParquetRewriterTest {
         } else {
           assertArrayEquals(
               subGroup.getBinary("Forward", 0).getBytes(),
-              groups.apply("Links", i)
+              groupsExpected
+                  .apply("Links", i)
                   .getGroup("Links", 0)
                   .getBinary("Forward", 0)
                   .getBytes());
@@ -1014,13 +1117,22 @@ public class ParquetRewriterTest {
     R apply(T t) throws IOException;
   }
 
+  private ColumnPath normalizeFieldsInPath(ColumnPath path, Map<String, 
String> renameColumns) {
+    String[] pathArray = path.toArray();
+    if (renameColumns != null) {
+      pathArray[0] = renameColumns.getOrDefault(pathArray[0], pathArray[0]);
+    }
+    return ColumnPath.get(pathArray);
+  }
+
   /**
    * Verify the page index is correct.
    *
    * @param exclude the columns to exclude from comparison, for example 
because they were nullified.
    * @param joinColumnsOverwrite whether a join columns overwrote existing 
overlapping columns.
    */
-  private void validatePageIndex(Set<String> exclude, boolean 
joinColumnsOverwrite) throws Exception {
+  private void validatePageIndex(Set<String> exclude, boolean 
joinColumnsOverwrite, Map<String, String> renameColumns)
+      throws Exception {
     class BlockMeta {
       final TransParquetFileReader reader;
       final BlockMetaData blockMeta;
@@ -1058,6 +1170,8 @@ public class ParquetRewriterTest {
     List<BlockMeta> inBlocksJoined = blockMetaExtractor.apply(
         
inputFilesToJoin.stream().map(EncryptionTestFile::getFileName).collect(Collectors.toList()));
     List<BlockMeta> outBlocks = 
blockMetaExtractor.apply(ImmutableList.of(outputFile));
+    Map<String, String> renameColumnsInverted =
+        
renameColumns.entrySet().stream().collect(Collectors.toMap(Map.Entry::getValue, 
Map.Entry::getKey));
     for (int blockIdx = 0; blockIdx < outBlocks.size(); blockIdx++) {
       BlockMetaData outBlockMeta = outBlocks.get(blockIdx).blockMeta;
       TransParquetFileReader outReader = outBlocks.get(blockIdx).reader;
@@ -1066,17 +1180,18 @@ public class ParquetRewriterTest {
         TransParquetFileReader inReader;
         BlockMetaData inBlockMeta;
         ColumnChunkMetaData inChunk;
-        if 
(!inBlocksMain.get(blockIdx).colPathToMeta.containsKey(outChunk.getPath())
+        ColumnPath colPath = normalizeFieldsInPath(outChunk.getPath(), 
renameColumnsInverted);
+        if (!inBlocksMain.get(blockIdx).colPathToMeta.containsKey(colPath)
             || joinColumnsOverwrite
                 && !inBlocksJoined.isEmpty()
-                && 
inBlocksJoined.get(blockIdx).colPathToMeta.containsKey(outChunk.getPath())) {
+                && 
inBlocksJoined.get(blockIdx).colPathToMeta.containsKey(colPath)) {
           inReader = inBlocksJoined.get(blockIdx).reader;
           inBlockMeta = inBlocksJoined.get(blockIdx).blockMeta;
-          inChunk = 
inBlocksJoined.get(blockIdx).colPathToMeta.get(outChunk.getPath());
+          inChunk = inBlocksJoined.get(blockIdx).colPathToMeta.get(colPath);
         } else {
           inReader = inBlocksMain.get(blockIdx).reader;
           inBlockMeta = inBlocksMain.get(blockIdx).blockMeta;
-          inChunk = 
inBlocksMain.get(blockIdx).colPathToMeta.get(outChunk.getPath());
+          inChunk = inBlocksMain.get(blockIdx).colPathToMeta.get(colPath);
         }
 
         ColumnIndex inColumnIndex = inReader.readColumnIndex(inChunk);
@@ -1284,13 +1399,13 @@ public class ParquetRewriterTest {
     assertEquals(expectSchema, actualSchema);
   }
 
-  private void ensureContainsGzipFile() {
+  private void addGzipInputFile() {
     if (!inputFiles.contains(gzipEncryptionTestFileWithoutBloomFilterColumn)) {
       inputFiles.add(this.gzipEncryptionTestFileWithoutBloomFilterColumn);
     }
   }
 
-  private void ensureContainsUncompressedFile() {
+  private void addUncompressedInputFile() {
     if 
(!inputFiles.contains(uncompressedEncryptionTestFileWithoutBloomFilterColumn)) {
       inputFiles.add(uncompressedEncryptionTestFileWithoutBloomFilterColumn);
     }


Reply via email to