This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new d6417dfad PARQUET-2227: Refactor several file rewriters to use a new 
unified ParquetRewriter implementation (#1014)
d6417dfad is described below

commit d6417dfad59c1423e358a5e859c332abd1244d2f
Author: Gang Wu <[email protected]>
AuthorDate: Mon Jan 30 04:25:17 2023 +0800

    PARQUET-2227: Refactor several file rewriters to use a new unified 
ParquetRewriter implementation (#1014)
    
    - A new ParquetRewriter is introduced to unify rewriting logic.
    - RewriteOptions is defined to provide essential settings.
    - CompressionConverter, ColumnPruner, ColumnMasker, and ColumnEncryptor
      have been refactored.
    - Check conflicts in the RewriterOptions.
    - Rename EncryptorRunTime to ColumnChunkEncryptorRunTime.
    - Avoid redundant check in the ColumnChunkEncryptorRunTime.
    - Simplify MaskMode enum.
    - add mixed test cases for rewriter
    - add test case with encryption/pruning/transcodec
    - fix error message
    - rename createdBy to originalCreatedBy, add multiple inputs to 
RewriterOptions
    - rewriter keeps old writer version into original.created.by
---
 .../apache/parquet/hadoop/rewrite/MaskMode.java    |  38 ++
 .../parquet/hadoop/rewrite/ParquetRewriter.java    | 754 +++++++++++++++++++++
 .../parquet/hadoop/rewrite/RewriteOptions.java     | 184 +++++
 .../parquet/hadoop/util/ColumnEncryptor.java       | 255 +------
 .../apache/parquet/hadoop/util/ColumnMasker.java   | 174 +----
 .../apache/parquet/hadoop/util/ColumnPruner.java   |  98 +--
 .../parquet/hadoop/util/CompressionConverter.java  | 209 +-----
 .../hadoop/rewrite/ParquetRewriterTest.java        | 428 ++++++++++++
 ...eterTest.java => CompressionConverterTest.java} |   2 +-
 9 files changed, 1442 insertions(+), 700 deletions(-)

diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/MaskMode.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/MaskMode.java
new file mode 100644
index 000000000..4e1fda0de
--- /dev/null
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/MaskMode.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.rewrite;
+
+import org.apache.parquet.Preconditions;
+
+public enum MaskMode {
+  NULLIFY("nullify"),
+  HASH("hash"),
+  REDACT("redact");
+
+  private String mode;
+
+  MaskMode(String text) {
+    Preconditions.checkArgument(text != null, "Text of mask mode is required");
+    this.mode = text;
+  }
+
+  public String getMode() {
+    return this.mode;
+  }
+}
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
new file mode 100644
index 000000000..e4870af98
--- /dev/null
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
@@ -0,0 +1,754 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.rewrite;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.ColumnWriteStore;
+import org.apache.parquet.column.ColumnWriter;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.impl.ColumnReadStoreImpl;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.ColumnChunkPageWriteStore;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import 
org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopCodecs;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
+import static 
org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH;
+import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import static 
org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
+
+public class ParquetRewriter implements Closeable {
+
+  // Key to store original writer version in the file key-value metadata
+  public static final String ORIGINAL_CREATED_BY_KEY = "original.created.by";
+  private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRewriter.class);
+  private final int pageBufferSize = ParquetProperties.DEFAULT_PAGE_SIZE * 2;
+  private final byte[] pageBuffer = new byte[pageBufferSize];
+  private TransParquetFileReader reader;
+  private ParquetFileWriter writer;
+  private ParquetMetadata meta;
+  private MessageType schema;
+  private String originalCreatedBy;
+  private CompressionCodecName newCodecName = null;
+  private List<String> pruneColumns = null;
+  private Map<ColumnPath, MaskMode> maskColumns = null;
+  private Set<ColumnPath> encryptColumns = null;
+  private boolean encryptMode = false;
+  private Map<String, String> extraMetaData = new HashMap<>();
+
+  public ParquetRewriter(RewriteOptions options) throws IOException {
+    Preconditions.checkArgument(options.getInputFiles().size() == 1, "Only 
support one input file");
+    Path inPath = options.getInputFiles().get(0);
+    Path outPath = options.getOutputFile();
+    Configuration conf = options.getConf();
+
+    newCodecName = options.getNewCodecName();
+    pruneColumns = options.getPruneColumns();
+
+    // Get file metadata and full schema from the input file
+    meta = ParquetFileReader.readFooter(conf, inPath, NO_FILTER);
+    schema = meta.getFileMetaData().getSchema();
+    originalCreatedBy = meta.getFileMetaData().getCreatedBy();
+    extraMetaData.putAll(meta.getFileMetaData().getKeyValueMetaData());
+    extraMetaData.put(ORIGINAL_CREATED_BY_KEY, originalCreatedBy);
+
+    // Prune columns if specified
+    if (pruneColumns != null && !pruneColumns.isEmpty()) {
+      List<String> paths = new ArrayList<>();
+      getPaths(schema, paths, null);
+      for (String col : pruneColumns) {
+        if (!paths.contains(col)) {
+          LOG.warn("Input column name {} doesn't show up in the schema of file 
{}", col, inPath.getName());
+        }
+      }
+
+      Set<ColumnPath> prunePaths = convertToColumnPaths(pruneColumns);
+      schema = pruneColumnsInSchema(schema, prunePaths);
+    }
+
+    if (options.getMaskColumns() != null) {
+      this.maskColumns = new HashMap<>();
+      for (Map.Entry<String, MaskMode> col : 
options.getMaskColumns().entrySet()) {
+        maskColumns.put(ColumnPath.fromDotString(col.getKey()), 
col.getValue());
+      }
+    }
+
+    if (options.getEncryptColumns() != null && 
options.getFileEncryptionProperties() != null) {
+      this.encryptColumns = convertToColumnPaths(options.getEncryptColumns());
+      this.encryptMode = true;
+    }
+
+    reader = new TransParquetFileReader(
+            HadoopInputFile.fromPath(inPath, conf), 
HadoopReadOptions.builder(conf).build());
+
+    ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE;
+    writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf), 
schema, writerMode,
+            DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT, 
DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
+            DEFAULT_STATISTICS_TRUNCATE_LENGTH, 
ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED,
+            options.getFileEncryptionProperties());
+    writer.start();
+  }
+
+  // Ctor for legacy CompressionConverter and ColumnMasker
+  public ParquetRewriter(TransParquetFileReader reader,
+                         ParquetFileWriter writer,
+                         ParquetMetadata meta,
+                         MessageType schema,
+                         String originalCreatedBy,
+                         CompressionCodecName codecName,
+                         List<String> maskColumns,
+                         MaskMode maskMode) {
+    this.reader = reader;
+    this.writer = writer;
+    this.meta = meta;
+    this.schema = schema;
+    this.newCodecName = codecName;
+    originalCreatedBy = originalCreatedBy == null ? 
meta.getFileMetaData().getCreatedBy() : originalCreatedBy;
+    extraMetaData.putAll(meta.getFileMetaData().getKeyValueMetaData());
+    extraMetaData.put(ORIGINAL_CREATED_BY_KEY, originalCreatedBy);
+    if (maskColumns != null && maskMode != null) {
+      this.maskColumns = new HashMap<>();
+      for (String col : maskColumns) {
+        this.maskColumns.put(ColumnPath.fromDotString(col), maskMode);
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    writer.end(extraMetaData);
+  }
+
+  public void processBlocks() throws IOException {
+    PageReadStore store = reader.readNextRowGroup();
+    ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(store, new 
DummyGroupConverter(), schema, originalCreatedBy);
+    Map<ColumnPath, ColumnDescriptor> descriptorsMap = 
schema.getColumns().stream().collect(
+            Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+
+    int blockId = 0;
+    while (store != null) {
+      writer.startBlock(store.getRowCount());
+
+      BlockMetaData blockMetaData = meta.getBlocks().get(blockId);
+      List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();
+
+      for (int i = 0, columnId = 0; i < columnsInOrder.size(); i++) {
+        ColumnChunkMetaData chunk = columnsInOrder.get(i);
+        ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
+
+        // This column has been pruned.
+        if (descriptor == null) {
+          continue;
+        }
+
+        // If a column is encrypted, we simply throw exception.
+        // Later we can add a feature to trans-encrypt it with different keys
+        if (chunk.isEncrypted()) {
+          throw new IOException("Column " + chunk.getPath().toDotString() + " 
is already encrypted");
+        }
+
+        reader.setStreamPosition(chunk.getStartingPos());
+        CompressionCodecName newCodecName = this.newCodecName == null ? 
chunk.getCodec() : this.newCodecName;
+        ColumnChunkEncryptorRunTime columnChunkEncryptorRunTime = null;
+        boolean encryptColumn = false;
+        if (encryptMode) {
+          columnChunkEncryptorRunTime =
+                  new ColumnChunkEncryptorRunTime(writer.getEncryptor(), 
chunk, blockId, columnId);
+          encryptColumn = encryptColumns != null && 
encryptColumns.contains(chunk.getPath());
+        }
+
+        if (maskColumns != null && maskColumns.containsKey(chunk.getPath())) {
+          // Mask column and compress it again.
+          MaskMode maskMode = maskColumns.get(chunk.getPath());
+          if (maskMode.equals(MaskMode.NULLIFY)) {
+            Type.Repetition repetition = 
descriptor.getPrimitiveType().getRepetition();
+            if (repetition.equals(Type.Repetition.REQUIRED)) {
+              throw new IOException(
+                      "Required column [" + 
descriptor.getPrimitiveType().getName() + "] cannot be nullified");
+            }
+            nullifyColumn(
+                    descriptor,
+                    chunk,
+                    crStore,
+                    writer,
+                    schema,
+                    newCodecName,
+                    columnChunkEncryptorRunTime,
+                    encryptColumn);
+          } else {
+            throw new UnsupportedOperationException("Only nullify is supported 
for now");
+          }
+        } else if (encryptMode || this.newCodecName != null) {
+          // Translate compression and/or encryption
+          writer.startColumn(descriptor, 
crStore.getColumnReader(descriptor).getTotalValueCount(), newCodecName);
+          processChunk(chunk, newCodecName, columnChunkEncryptorRunTime, 
encryptColumn);
+          writer.endColumn();
+        } else {
+          // Nothing changed, simply copy the binary data.
+          BloomFilter bloomFilter = reader.readBloomFilter(chunk);
+          ColumnIndex columnIndex = reader.readColumnIndex(chunk);
+          OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
+          writer.appendColumnChunk(descriptor, reader.getStream(), chunk, 
bloomFilter, columnIndex, offsetIndex);
+        }
+
+        columnId++;
+      }
+
+      writer.endBlock();
+      store = reader.readNextRowGroup();
+      blockId++;
+    }
+  }
+
+  private void processChunk(ColumnChunkMetaData chunk,
+                            CompressionCodecName newCodecName,
+                            ColumnChunkEncryptorRunTime 
columnChunkEncryptorRunTime,
+                            boolean encryptColumn) throws IOException {
+    CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0);
+    CompressionCodecFactory.BytesInputDecompressor decompressor = null;
+    CompressionCodecFactory.BytesInputCompressor compressor = null;
+    if (!newCodecName.equals(chunk.getCodec())) {
+      // Re-compress only if a different codec has been specified
+      decompressor = codecFactory.getDecompressor(chunk.getCodec());
+      compressor = codecFactory.getCompressor(newCodecName);
+    }
+
+    // EncryptorRunTime is only provided when encryption is required
+    BlockCipher.Encryptor metaEncryptor = null;
+    BlockCipher.Encryptor dataEncryptor = null;
+    byte[] dictPageAAD = null;
+    byte[] dataPageAAD = null;
+    byte[] dictPageHeaderAAD = null;
+    byte[] dataPageHeaderAAD = null;
+    if (columnChunkEncryptorRunTime != null) {
+      metaEncryptor = columnChunkEncryptorRunTime.getMetaDataEncryptor();
+      dataEncryptor = columnChunkEncryptorRunTime.getDataEncryptor();
+      dictPageAAD = columnChunkEncryptorRunTime.getDictPageAAD();
+      dataPageAAD = columnChunkEncryptorRunTime.getDataPageAAD();
+      dictPageHeaderAAD = columnChunkEncryptorRunTime.getDictPageHeaderAAD();
+      dataPageHeaderAAD = columnChunkEncryptorRunTime.getDataPageHeaderAAD();
+    }
+
+    ColumnIndex columnIndex = reader.readColumnIndex(chunk);
+    OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
+
+    reader.setStreamPosition(chunk.getStartingPos());
+    DictionaryPage dictionaryPage = null;
+    long readValues = 0;
+    Statistics statistics = null;
+    ParquetMetadataConverter converter = new ParquetMetadataConverter();
+    int pageOrdinal = 0;
+    long totalChunkValues = chunk.getValueCount();
+    while (readValues < totalChunkValues) {
+      PageHeader pageHeader = reader.readPageHeader();
+      int compressedPageSize = pageHeader.getCompressed_page_size();
+      byte[] pageLoad;
+      switch (pageHeader.type) {
+        case DICTIONARY_PAGE:
+          if (dictionaryPage != null) {
+            throw new IOException("has more than one dictionary page in column 
chunk");
+          }
+          //No quickUpdatePageAAD needed for dictionary page
+          DictionaryPageHeader dictPageHeader = 
pageHeader.dictionary_page_header;
+          pageLoad = processPageLoad(reader,
+                  true,
+                  compressor,
+                  decompressor,
+                  pageHeader.getCompressed_page_size(),
+                  pageHeader.getUncompressed_page_size(),
+                  encryptColumn,
+                  dataEncryptor,
+                  dictPageAAD);
+          writer.writeDictionaryPage(new 
DictionaryPage(BytesInput.from(pageLoad),
+                          pageHeader.getUncompressed_page_size(),
+                          dictPageHeader.getNum_values(),
+                          converter.getEncoding(dictPageHeader.getEncoding())),
+                  metaEncryptor,
+                  dictPageHeaderAAD);
+          break;
+        case DATA_PAGE:
+          if (encryptColumn) {
+            AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+            AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
+          }
+          DataPageHeader headerV1 = pageHeader.data_page_header;
+          pageLoad = processPageLoad(reader,
+                  true,
+                  compressor,
+                  decompressor,
+                  pageHeader.getCompressed_page_size(),
+                  pageHeader.getUncompressed_page_size(),
+                  encryptColumn,
+                  dataEncryptor,
+                  dataPageAAD);
+          statistics = convertStatistics(
+                  originalCreatedBy, chunk.getPrimitiveType(), 
headerV1.getStatistics(), columnIndex, pageOrdinal, converter);
+          readValues += headerV1.getNum_values();
+          if (offsetIndex != null) {
+            long rowCount = 1 + offsetIndex.getLastRowIndex(
+                    pageOrdinal, totalChunkValues) - 
offsetIndex.getFirstRowIndex(pageOrdinal);
+            writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()),
+                    pageHeader.getUncompressed_page_size(),
+                    BytesInput.from(pageLoad),
+                    statistics,
+                    toIntWithCheck(rowCount),
+                    
converter.getEncoding(headerV1.getRepetition_level_encoding()),
+                    
converter.getEncoding(headerV1.getDefinition_level_encoding()),
+                    converter.getEncoding(headerV1.getEncoding()),
+                    metaEncryptor,
+                    dataPageHeaderAAD);
+          } else {
+            writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()),
+                    pageHeader.getUncompressed_page_size(),
+                    BytesInput.from(pageLoad),
+                    statistics,
+                    
converter.getEncoding(headerV1.getRepetition_level_encoding()),
+                    
converter.getEncoding(headerV1.getDefinition_level_encoding()),
+                    converter.getEncoding(headerV1.getEncoding()),
+                    metaEncryptor,
+                    dataPageHeaderAAD);
+          }
+          pageOrdinal++;
+          break;
+        case DATA_PAGE_V2:
+          if (encryptColumn) {
+            AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+            AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
+          }
+          DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2;
+          int rlLength = headerV2.getRepetition_levels_byte_length();
+          BytesInput rlLevels = readBlockAllocate(rlLength, reader);
+          int dlLength = headerV2.getDefinition_levels_byte_length();
+          BytesInput dlLevels = readBlockAllocate(dlLength, reader);
+          int payLoadLength = pageHeader.getCompressed_page_size() - rlLength 
- dlLength;
+          int rawDataLength = pageHeader.getUncompressed_page_size() - 
rlLength - dlLength;
+          pageLoad = processPageLoad(
+                  reader,
+                  headerV2.is_compressed,
+                  compressor,
+                  decompressor,
+                  payLoadLength,
+                  rawDataLength,
+                  encryptColumn,
+                  dataEncryptor,
+                  dataPageAAD);
+          statistics = convertStatistics(
+                  originalCreatedBy, chunk.getPrimitiveType(), 
headerV2.getStatistics(), columnIndex, pageOrdinal, converter);
+          readValues += headerV2.getNum_values();
+          writer.writeDataPageV2(headerV2.getNum_rows(),
+                  headerV2.getNum_nulls(),
+                  headerV2.getNum_values(),
+                  rlLevels,
+                  dlLevels,
+                  converter.getEncoding(headerV2.getEncoding()),
+                  BytesInput.from(pageLoad),
+                  rawDataLength,
+                  statistics);
+          pageOrdinal++;
+          break;
+        default:
+          LOG.debug("skipping page of type {} of size {}", 
pageHeader.getType(), compressedPageSize);
+          break;
+      }
+    }
+  }
+
+  private Statistics convertStatistics(String createdBy,
+                                       PrimitiveType type,
+                                       org.apache.parquet.format.Statistics 
pageStatistics,
+                                       ColumnIndex columnIndex,
+                                       int pageIndex,
+                                       ParquetMetadataConverter converter) 
throws IOException {
+    if (columnIndex != null) {
+      if (columnIndex.getNullPages() == null) {
+        throw new IOException("columnIndex has null variable 'nullPages' which 
indicates corrupted data for type: " +
+                type.getName());
+      }
+      if (pageIndex > columnIndex.getNullPages().size()) {
+        throw new IOException("There are more pages " + pageIndex + " found in 
the column than in the columnIndex " +
+                columnIndex.getNullPages().size());
+      }
+      org.apache.parquet.column.statistics.Statistics.Builder statsBuilder =
+              
org.apache.parquet.column.statistics.Statistics.getBuilderForReading(type);
+      statsBuilder.withNumNulls(columnIndex.getNullCounts().get(pageIndex));
+
+      if (!columnIndex.getNullPages().get(pageIndex)) {
+        
statsBuilder.withMin(columnIndex.getMinValues().get(pageIndex).array().clone());
+        
statsBuilder.withMax(columnIndex.getMaxValues().get(pageIndex).array().clone());
+      }
+      return statsBuilder.build();
+    } else if (pageStatistics != null) {
+      return converter.fromParquetStatistics(createdBy, pageStatistics, type);
+    } else {
+      return null;
+    }
+  }
+
+  private byte[] processPageLoad(TransParquetFileReader reader,
+                                 boolean isCompressed,
+                                 CompressionCodecFactory.BytesInputCompressor 
compressor,
+                                 
CompressionCodecFactory.BytesInputDecompressor decompressor,
+                                 int payloadLength,
+                                 int rawDataLength,
+                                 boolean encrypt,
+                                 BlockCipher.Encryptor dataEncryptor,
+                                 byte[] AAD) throws IOException {
+    BytesInput data = readBlock(payloadLength, reader);
+
+    // recompress page load
+    if (compressor != null) {
+      if (isCompressed) {
+        data = decompressor.decompress(data, rawDataLength);
+      }
+      data = compressor.compress(data);
+    }
+
+    if (!encrypt) {
+      return data.toByteArray();
+    }
+
+    // encrypt page load
+    return dataEncryptor.encrypt(data.toByteArray(), AAD);
+  }
+
+  public BytesInput readBlock(int length, TransParquetFileReader reader) 
throws IOException {
+    byte[] data;
+    if (length > pageBufferSize) {
+      data = new byte[length];
+    } else {
+      data = pageBuffer;
+    }
+    reader.blockRead(data, 0, length);
+    return BytesInput.from(data, 0, length);
+  }
+
+  public BytesInput readBlockAllocate(int length, TransParquetFileReader 
reader) throws IOException {
+    byte[] data = new byte[length];
+    reader.blockRead(data, 0, length);
+    return BytesInput.from(data, 0, length);
+  }
+
+  private int toIntWithCheck(long size) {
+    if ((int) size != size) {
+      throw new ParquetEncodingException("size is bigger than " + 
Integer.MAX_VALUE + " bytes: " + size);
+    }
+    return (int) size;
+  }
+
+  // We have to rewrite getPaths because MessageType only get level 0 paths
+  private void getPaths(GroupType schema, List<String> paths, String parent) {
+    List<Type> fields = schema.getFields();
+    String prefix = (parent == null) ? "" : parent + ".";
+    for (Type field : fields) {
+      paths.add(prefix + field.getName());
+      if (field instanceof GroupType) {
+        getPaths(field.asGroupType(), paths, prefix + field.getName());
+      }
+    }
+  }
+
+  private MessageType pruneColumnsInSchema(MessageType schema, Set<ColumnPath> 
prunePaths) {
+    List<Type> fields = schema.getFields();
+    List<String> currentPath = new ArrayList<>();
+    List<Type> prunedFields = pruneColumnsInFields(fields, currentPath, 
prunePaths);
+    MessageType newSchema = new MessageType(schema.getName(), prunedFields);
+    return newSchema;
+  }
+
+  private List<Type> pruneColumnsInFields(List<Type> fields, List<String> 
currentPath, Set<ColumnPath> prunePaths) {
+    List<Type> prunedFields = new ArrayList<>();
+    for (Type childField : fields) {
+      Type prunedChildField = pruneColumnsInField(childField, currentPath, 
prunePaths);
+      if (prunedChildField != null) {
+        prunedFields.add(prunedChildField);
+      }
+    }
+    return prunedFields;
+  }
+
+  private Type pruneColumnsInField(Type field, List<String> currentPath, 
Set<ColumnPath> prunePaths) {
+    String fieldName = field.getName();
+    currentPath.add(fieldName);
+    ColumnPath path = ColumnPath.get(currentPath.toArray(new String[0]));
+    Type prunedField = null;
+    if (!prunePaths.contains(path)) {
+      if (field.isPrimitive()) {
+        prunedField = field;
+      } else {
+        List<Type> childFields = ((GroupType) field).getFields();
+        List<Type> prunedFields = pruneColumnsInFields(childFields, 
currentPath, prunePaths);
+        if (prunedFields.size() > 0) {
+          prunedField = ((GroupType) field).withNewFields(prunedFields);
+        }
+      }
+    }
+
+    currentPath.remove(currentPath.size() - 1);
+    return prunedField;
+  }
+
+  private Set<ColumnPath> convertToColumnPaths(List<String> cols) {
+    Set<ColumnPath> prunePaths = new HashSet<>();
+    for (String col : cols) {
+      prunePaths.add(ColumnPath.fromDotString(col));
+    }
+    return prunePaths;
+  }
+
+  private void nullifyColumn(ColumnDescriptor descriptor,
+                             ColumnChunkMetaData chunk,
+                             ColumnReadStoreImpl crStore,
+                             ParquetFileWriter writer,
+                             MessageType schema,
+                             CompressionCodecName newCodecName,
+                             ColumnChunkEncryptorRunTime 
columnChunkEncryptorRunTime,
+                             boolean encryptColumn) throws IOException {
+    // TODO: support nullifying and encrypting same column.
+    if (columnChunkEncryptorRunTime != null) {
+      throw new RuntimeException("Nullifying and encrypting column is not 
implemented yet");
+    }
+    long totalChunkValues = chunk.getValueCount();
+    int dMax = descriptor.getMaxDefinitionLevel();
+    ColumnReader cReader = crStore.getColumnReader(descriptor);
+
+    ParquetProperties.WriterVersion writerVersion = 
chunk.getEncodingStats().usesV2Pages() ?
+            ParquetProperties.WriterVersion.PARQUET_2_0 : 
ParquetProperties.WriterVersion.PARQUET_1_0;
+    ParquetProperties props = ParquetProperties.builder()
+            .withWriterVersion(writerVersion)
+            .build();
+    CodecFactory codecFactory = new CodecFactory(new Configuration(), 
props.getPageSizeThreshold());
+    CodecFactory.BytesCompressor compressor = 
codecFactory.getCompressor(newCodecName);
+
+    // Create new schema that only has the current column
+    MessageType newSchema = newSchema(schema, descriptor);
+    ColumnChunkPageWriteStore cPageStore = new ColumnChunkPageWriteStore(
+            compressor, newSchema, props.getAllocator(), 
props.getColumnIndexTruncateLength());
+    ColumnWriteStore cStore = props.newColumnWriteStore(newSchema, cPageStore);
+    ColumnWriter cWriter = cStore.getColumnWriter(descriptor);
+
+    for (int i = 0; i < totalChunkValues; i++) {
+      int rlvl = cReader.getCurrentRepetitionLevel();
+      int dlvl = cReader.getCurrentDefinitionLevel();
+      if (dlvl == dMax) {
+        // since we checked ether optional or repeated, dlvl should be > 0
+        if (dlvl == 0) {
+          throw new IOException("definition level is detected to be 0 for 
column " +
+                  chunk.getPath().toDotString() + " to be nullified");
+        }
+        // we just write one null for the whole list at the top level,
+        // instead of nullify the elements in the list one by one
+        if (rlvl == 0) {
+          cWriter.writeNull(rlvl, dlvl - 1);
+        }
+      } else {
+        cWriter.writeNull(rlvl, dlvl);
+      }
+      cStore.endRecord();
+    }
+
+    cStore.flush();
+    cPageStore.flushToFileWriter(writer);
+
+    cStore.close();
+    cWriter.close();
+  }
+
+  private MessageType newSchema(MessageType schema, ColumnDescriptor 
descriptor) {
+    String[] path = descriptor.getPath();
+    Type type = schema.getType(path);
+    if (path.length == 1) {
+      return new MessageType(schema.getName(), type);
+    }
+
+    for (Type field : schema.getFields()) {
+      if (!field.isPrimitive()) {
+        Type newType = extractField(field.asGroupType(), type);
+        if (newType != null) {
+          return new MessageType(schema.getName(), newType);
+        }
+      }
+    }
+
+    // We should never hit this because 'type' is returned by schema.getType().
+    throw new RuntimeException("No field is found");
+  }
+
+  private Type extractField(GroupType candidate, Type targetField) {
+    if (targetField.equals(candidate)) {
+      return targetField;
+    }
+
+    // In case 'type' is a descendants of candidate
+    for (Type field : candidate.asGroupType().getFields()) {
+      if (field.isPrimitive()) {
+        if (field.equals(targetField)) {
+          return new GroupType(candidate.getRepetition(), candidate.getName(), 
targetField);
+        }
+      } else {
+        Type tempField = extractField(field.asGroupType(), targetField);
+        if (tempField != null) {
+          return tempField;
+        }
+      }
+    }
+
+    return null;
+  }
+
+  private static final class DummyGroupConverter extends GroupConverter {
+    @Override
+    public void start() {
+    }
+
+    @Override
+    public void end() {
+    }
+
+    @Override
+    public Converter getConverter(int fieldIndex) {
+      return new DummyConverter();
+    }
+  }
+
+  private static final class DummyConverter extends PrimitiveConverter {
+    @Override
+    public GroupConverter asGroupConverter() {
+      return new DummyGroupConverter();
+    }
+  }
+
+  private static class ColumnChunkEncryptorRunTime {
+    private final InternalColumnEncryptionSetup colEncrSetup;
+    private final BlockCipher.Encryptor dataEncryptor;
+    private final BlockCipher.Encryptor metaDataEncryptor;
+    private final byte[] fileAAD;
+
+    private byte[] dataPageHeaderAAD;
+    private byte[] dataPageAAD;
+    private byte[] dictPageHeaderAAD;
+    private byte[] dictPageAAD;
+
+    public ColumnChunkEncryptorRunTime(InternalFileEncryptor fileEncryptor,
+                                       ColumnChunkMetaData chunk,
+                                       int blockId,
+                                       int columnId) throws IOException {
+      Preconditions.checkArgument(fileEncryptor != null,
+              "FileEncryptor is required to create 
ColumnChunkEncryptorRunTime");
+
+      this.colEncrSetup = fileEncryptor.getColumnSetup(chunk.getPath(), true, 
columnId);
+      this.dataEncryptor = colEncrSetup.getDataEncryptor();
+      this.metaDataEncryptor = colEncrSetup.getMetaDataEncryptor();
+
+      this.fileAAD = fileEncryptor.getFileAAD();
+      if (colEncrSetup != null && colEncrSetup.isEncrypted()) {
+        this.dataPageHeaderAAD = createAAD(ModuleType.DataPageHeader, blockId, 
columnId);
+        this.dataPageAAD = createAAD(ModuleType.DataPage, blockId, columnId);
+        this.dictPageHeaderAAD = createAAD(ModuleType.DictionaryPageHeader, 
blockId, columnId);
+        this.dictPageAAD = createAAD(ModuleType.DictionaryPage, blockId, 
columnId);
+      } else {
+        this.dataPageHeaderAAD = null;
+        this.dataPageAAD = null;
+        this.dictPageHeaderAAD = null;
+        this.dictPageAAD = null;
+      }
+    }
+
+    private byte[] createAAD(ModuleType moduleType, int blockId, int columnId) 
{
+      return AesCipher.createModuleAAD(fileAAD, moduleType, blockId, columnId, 
0);
+    }
+
+    public BlockCipher.Encryptor getDataEncryptor() {
+      return this.dataEncryptor;
+    }
+
+    public BlockCipher.Encryptor getMetaDataEncryptor() {
+      return this.metaDataEncryptor;
+    }
+
+    public byte[] getDataPageHeaderAAD() {
+      return this.dataPageHeaderAAD;
+    }
+
+    public byte[] getDataPageAAD() {
+      return this.dataPageAAD;
+    }
+
+    public byte[] getDictPageHeaderAAD() {
+      return this.dictPageHeaderAAD;
+    }
+
+    public byte[] getDictPageAAD() {
+      return this.dictPageAAD;
+    }
+  }
+
+}
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
new file mode 100644
index 000000000..a11c5a61f
--- /dev/null
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.rewrite;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+// A set of options to create a ParquetRewriter.
+public class RewriteOptions {
+
+  final Configuration conf;
+  final List<Path> inputFiles;
+  final Path outputFile;
+  final List<String> pruneColumns;
+  final CompressionCodecName newCodecName;
+  final Map<String, MaskMode> maskColumns;
+  final List<String> encryptColumns;
+  final FileEncryptionProperties fileEncryptionProperties;
+
+  private RewriteOptions(Configuration conf,
+                         List<Path> inputFiles,
+                         Path outputFile,
+                         List<String> pruneColumns,
+                         CompressionCodecName newCodecName,
+                         Map<String, MaskMode> maskColumns,
+                         List<String> encryptColumns,
+                         FileEncryptionProperties fileEncryptionProperties) {
+    this.conf = conf;
+    this.inputFiles = inputFiles;
+    this.outputFile = outputFile;
+    this.pruneColumns = pruneColumns;
+    this.newCodecName = newCodecName;
+    this.maskColumns = maskColumns;
+    this.encryptColumns = encryptColumns;
+    this.fileEncryptionProperties = fileEncryptionProperties;
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public List<Path> getInputFiles() {
+    return inputFiles;
+  }
+
+  public Path getOutputFile() {
+    return outputFile;
+  }
+
+  public List<String> getPruneColumns() {
+    return pruneColumns;
+  }
+
+  public CompressionCodecName getNewCodecName() {
+    return newCodecName;
+  }
+
+  public Map<String, MaskMode> getMaskColumns() {
+    return maskColumns;
+  }
+
+  public List<String> getEncryptColumns() {
+    return encryptColumns;
+  }
+
+  public FileEncryptionProperties getFileEncryptionProperties() {
+    return fileEncryptionProperties;
+  }
+
+  // Builder to create a RewriterOptions.
+  public static class Builder {
+    private Configuration conf;
+    private List<Path> inputFiles;
+    private Path outputFile;
+    private List<String> pruneColumns;
+    private CompressionCodecName newCodecName;
+    private Map<String, MaskMode> maskColumns;
+    private List<String> encryptColumns;
+    private FileEncryptionProperties fileEncryptionProperties;
+
+    public Builder(Configuration conf, Path inputFile, Path outputFile) {
+      this.conf = conf;
+      this.inputFiles = Arrays.asList(inputFile);
+      this.outputFile = outputFile;
+    }
+
+    public Builder prune(List<String> columns) {
+      this.pruneColumns = columns;
+      return this;
+    }
+
+    public Builder transform(CompressionCodecName newCodecName) {
+      this.newCodecName = newCodecName;
+      return this;
+    }
+
+    public Builder mask(Map<String, MaskMode> maskColumns) {
+      this.maskColumns = maskColumns;
+      return this;
+    }
+
+    public Builder encrypt(List<String> encryptColumns) {
+      this.encryptColumns = encryptColumns;
+      return this;
+    }
+
+    public Builder encryptionProperties(FileEncryptionProperties 
fileEncryptionProperties) {
+      this.fileEncryptionProperties = fileEncryptionProperties;
+      return this;
+    }
+
+    public RewriteOptions build() {
+      Preconditions.checkArgument(inputFiles != null && !inputFiles.isEmpty(), 
"Input file is required");
+      Preconditions.checkArgument(outputFile != null, "Output file is 
required");
+
+      if (pruneColumns != null) {
+        if (maskColumns != null) {
+          for (String pruneColumn : pruneColumns) {
+            Preconditions.checkArgument(!maskColumns.containsKey(pruneColumn),
+                    "Cannot prune and mask same column");
+          }
+        }
+
+        if (encryptColumns != null) {
+          for (String pruneColumn : pruneColumns) {
+            Preconditions.checkArgument(!encryptColumns.contains(pruneColumn),
+                    "Cannot prune and encrypt same column");
+          }
+        }
+      }
+
+      // TODO: support masking and encrypting same columns
+      if (maskColumns != null && encryptColumns != null) {
+        for (String encryptColumn : encryptColumns) {
+          Preconditions.checkArgument(!maskColumns.containsKey(encryptColumn),
+                  "Cannot encrypt and mask same column");
+        }
+      }
+
+      if (encryptColumns != null && !encryptColumns.isEmpty()) {
+        Preconditions.checkArgument(fileEncryptionProperties != null,
+                "FileEncryptionProperties is required when encrypting 
columns");
+      }
+
+      if (fileEncryptionProperties != null) {
+        Preconditions.checkArgument(encryptColumns != null && 
!encryptColumns.isEmpty(),
+                "Encrypt columns is required when FileEncryptionProperties is 
set");
+      }
+
+      return new RewriteOptions(conf,
+              inputFiles,
+              outputFile,
+              pruneColumns,
+              newCodecName,
+              maskColumns,
+              encryptColumns,
+              fileEncryptionProperties);
+    }
+  }
+
+}
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnEncryptor.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnEncryptor.java
index 32ac49a22..48381fdbb 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnEncryptor.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnEncryptor.java
@@ -20,44 +20,17 @@ package org.apache.parquet.hadoop.util;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.parquet.HadoopReadOptions;
 import org.apache.parquet.bytes.BytesInput;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.column.page.DictionaryPage;
-import org.apache.parquet.column.page.PageReadStore;
-import org.apache.parquet.crypto.AesCipher;
 import org.apache.parquet.crypto.FileEncryptionProperties;
-import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
-import org.apache.parquet.crypto.InternalFileEncryptor;
-import org.apache.parquet.format.BlockCipher;
-import org.apache.parquet.format.DataPageHeader;
-import org.apache.parquet.format.DataPageHeaderV2;
-import org.apache.parquet.format.DictionaryPageHeader;
-import org.apache.parquet.format.PageHeader;
-import org.apache.parquet.format.converter.ParquetMetadataConverter;
-import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.ParquetFileWriter;
-import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
-import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.rewrite.ParquetRewriter;
+import org.apache.parquet.hadoop.rewrite.RewriteOptions;
 import 
org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
-import org.apache.parquet.internal.column.columnindex.OffsetIndex;
-import org.apache.parquet.schema.MessageType;
 
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
-
-import static 
org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
-import static 
org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH;
-import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
-import static 
org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
-import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
-import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
 
 /**
  * This class is for fast rewriting existing file with column encryption
@@ -67,74 +40,8 @@ import static 
org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
  *
  * For columns not to be encrypted, the whole column chunk will be appended 
directly to writer.
  */
+@Deprecated
 public class ColumnEncryptor {
-  private static class EncryptorRunTime {
-    private final InternalColumnEncryptionSetup colEncrSetup;
-    private final BlockCipher.Encryptor dataEncryptor;
-    private final BlockCipher.Encryptor metaDataEncryptor;
-    private final byte[] fileAAD ;
-
-    private byte[] dataPageHeaderAAD;
-    private byte[] dataPageAAD;
-    private byte[] dictPageHeaderAAD;
-    private byte[] dictPageAAD;
-
-    public EncryptorRunTime(InternalFileEncryptor fileEncryptor, 
ColumnChunkMetaData chunk,
-                            int blockId, int columnId) throws IOException  {
-      if (fileEncryptor == null) {
-        this.colEncrSetup = null;
-        this.dataEncryptor =  null;
-        this.metaDataEncryptor =  null;
-
-        this.fileAAD =  null;
-        this.dataPageHeaderAAD =  null;
-        this.dataPageAAD =  null;
-        this.dictPageHeaderAAD =  null;
-        this.dictPageAAD =  null;
-      } else {
-        this.colEncrSetup = fileEncryptor.getColumnSetup(chunk.getPath(), 
true, columnId);
-        this.dataEncryptor = colEncrSetup.getDataEncryptor();
-        this.metaDataEncryptor = colEncrSetup.getMetaDataEncryptor();
-
-        this.fileAAD = fileEncryptor.getFileAAD();
-        this.dataPageHeaderAAD = createAAD(colEncrSetup, 
ModuleType.DataPageHeader, blockId, columnId);
-        this.dataPageAAD = createAAD(colEncrSetup, ModuleType.DataPage, 
blockId, columnId);
-        this.dictPageHeaderAAD = createAAD(colEncrSetup, 
ModuleType.DictionaryPageHeader, blockId, columnId);
-        this.dictPageAAD = createAAD(colEncrSetup, ModuleType.DictionaryPage, 
blockId, columnId);
-      }
-    }
-
-    private byte[] createAAD(InternalColumnEncryptionSetup colEncrSetup, 
ModuleType moduleType, int blockId, int columnId) {
-      if (colEncrSetup != null && colEncrSetup.isEncrypted()) {
-        return AesCipher.createModuleAAD(fileAAD, moduleType, blockId, 
columnId, 0);
-      }
-      return null;
-    }
-
-    public BlockCipher.Encryptor getDataEncryptor() {
-      return this.dataEncryptor;
-    }
-
-    public BlockCipher.Encryptor getMetaDataEncryptor() {
-      return this.metaDataEncryptor;
-    }
-
-    public byte[] getDataPageHeaderAAD() {
-      return this.dataPageHeaderAAD;
-    }
-
-    public byte[] getDataPageAAD() {
-      return this.dataPageAAD;
-    }
-
-    public byte[] getDictPageHeaderAAD() {
-      return this.dictPageHeaderAAD;
-    }
-
-    public byte[] getDictPageAAD() {
-      return this.dictPageAAD;
-    }
-  }
 
   private Configuration conf;
 
@@ -154,157 +61,11 @@ public class ColumnEncryptor {
   public void encryptColumns(String inputFile, String outputFile, List<String> 
paths, FileEncryptionProperties fileEncryptionProperties) throws IOException {
     Path inPath = new Path(inputFile);
     Path outPath = new Path(outputFile);
-
-    ParquetMetadata metaData = ParquetFileReader.readFooter(conf, inPath, 
NO_FILTER);
-    MessageType schema = metaData.getFileMetaData().getSchema();
-
-    ParquetFileWriter writer = new 
ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf), schema, 
ParquetFileWriter.Mode.OVERWRITE,
-      DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT, 
DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, DEFAULT_STATISTICS_TRUNCATE_LENGTH,
-      ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED, 
fileEncryptionProperties);
-    writer.start();
-
-    try (TransParquetFileReader reader = new 
TransParquetFileReader(HadoopInputFile.fromPath(inPath, conf), 
HadoopReadOptions.builder(conf).build())) {
-      processBlocks(reader, writer, metaData, schema, paths);
-    }
-    writer.end(metaData.getFileMetaData().getKeyValueMetaData());
-  }
-
-  private void processBlocks(TransParquetFileReader reader, ParquetFileWriter 
writer, ParquetMetadata meta,
-                            MessageType schema, List<String> encryptPaths) 
throws IOException {
-    Set<ColumnPath> encryptColumnsPath = convertToColumnPaths(encryptPaths);
-    int blockId = 0;
-    PageReadStore store = reader.readNextRowGroup();
-
-    while (store != null) {
-      writer.startBlock(store.getRowCount());
-
-      List<ColumnChunkMetaData> columnsInOrder = 
meta.getBlocks().get(blockId).getColumns();
-      Map<ColumnPath, ColumnDescriptor> descriptorsMap = 
schema.getColumns().stream().collect(
-        Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
-
-      for (int i = 0; i < columnsInOrder.size(); i += 1) {
-        ColumnChunkMetaData chunk = columnsInOrder.get(i);
-        // If a column is encrypted, we simply throw exception.
-        // Later we can add a feature to trans-encrypt it with different keys
-        if (chunk.isEncrypted()) {
-          throw new IOException("Column " + chunk.getPath().toDotString() + " 
is already encrypted");
-        }
-        ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
-        processChunk(descriptor, chunk, reader, writer, encryptColumnsPath, 
blockId, i, meta.getFileMetaData().getCreatedBy());
-      }
-
-      writer.endBlock();
-      store = reader.readNextRowGroup();
-      blockId++;
-    }
-  }
-
-  private void processChunk(ColumnDescriptor descriptor, ColumnChunkMetaData 
chunk, TransParquetFileReader reader, ParquetFileWriter writer,
-                            Set<ColumnPath> encryptPaths, int blockId, int 
columnId, String createdBy) throws IOException {
-    reader.setStreamPosition(chunk.getStartingPos());
-    writer.startColumn(descriptor, chunk.getValueCount(), chunk.getCodec());
-    processPages(reader, chunk, writer, createdBy, blockId, columnId, 
encryptPaths.contains(chunk.getPath()));
-    writer.endColumn();
-  }
-
-  private void processPages(TransParquetFileReader reader, ColumnChunkMetaData 
chunk, ParquetFileWriter writer,
-                            String createdBy, int blockId, int columnId, 
boolean encrypt) throws IOException {
-    int pageOrdinal = 0;
-    EncryptorRunTime encryptorRunTime = new 
EncryptorRunTime(writer.getEncryptor(), chunk, blockId, columnId);
-    DictionaryPage dictionaryPage = null;
-    long readValues = 0;
-    ParquetMetadataConverter converter = new ParquetMetadataConverter();
-    OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
-    reader.setStreamPosition(chunk.getStartingPos());
-    long totalChunkValues = chunk.getValueCount();
-    while (readValues < totalChunkValues) {
-      PageHeader pageHeader = reader.readPageHeader();
-      byte[] pageLoad;
-      switch (pageHeader.type) {
-        case DICTIONARY_PAGE:
-          if (dictionaryPage != null) {
-            throw new IOException("has more than one dictionary page in column 
chunk");
-          }
-          //No quickUpdatePageAAD needed for dictionary page
-          DictionaryPageHeader dictPageHeader = 
pageHeader.dictionary_page_header;
-          pageLoad = processPayload(reader, 
pageHeader.getCompressed_page_size(), encryptorRunTime.getDataEncryptor(), 
encryptorRunTime.getDictPageAAD(), encrypt);
-          writer.writeDictionaryPage(new 
DictionaryPage(BytesInput.from(pageLoad),
-                                        pageHeader.getUncompressed_page_size(),
-                                        dictPageHeader.getNum_values(),
-                                        
converter.getEncoding(dictPageHeader.getEncoding())),
-            encryptorRunTime.getMetaDataEncryptor(), 
encryptorRunTime.getDictPageHeaderAAD());
-          break;
-        case DATA_PAGE:
-          if (encrypt) {
-            
AesCipher.quickUpdatePageAAD(encryptorRunTime.getDataPageHeaderAAD(), 
pageOrdinal);
-            AesCipher.quickUpdatePageAAD(encryptorRunTime.getDataPageAAD(), 
pageOrdinal);
-          }
-          DataPageHeader headerV1 = pageHeader.data_page_header;
-          pageLoad = processPayload(reader, 
pageHeader.getCompressed_page_size(), encryptorRunTime.getDataEncryptor(), 
encryptorRunTime.getDataPageAAD(), encrypt);
-          readValues += headerV1.getNum_values();
-          if (offsetIndex != null) {
-            long rowCount = 1 + offsetIndex.getLastRowIndex(pageOrdinal, 
totalChunkValues) - offsetIndex.getFirstRowIndex(pageOrdinal);
-            writer.writeDataPage(Math.toIntExact(headerV1.getNum_values()),
-              pageHeader.getUncompressed_page_size(),
-              BytesInput.from(pageLoad),
-              converter.fromParquetStatistics(createdBy, 
headerV1.getStatistics(), chunk.getPrimitiveType()),
-              rowCount,
-              converter.getEncoding(headerV1.getRepetition_level_encoding()),
-              converter.getEncoding(headerV1.getDefinition_level_encoding()),
-              converter.getEncoding(headerV1.getEncoding()),
-              encryptorRunTime.getMetaDataEncryptor(),
-              encryptorRunTime.getDataPageHeaderAAD());
-          } else {
-            writer.writeDataPage(Math.toIntExact(headerV1.getNum_values()),
-              pageHeader.getUncompressed_page_size(),
-              BytesInput.from(pageLoad),
-              converter.fromParquetStatistics(createdBy, 
headerV1.getStatistics(), chunk.getPrimitiveType()),
-              converter.getEncoding(headerV1.getRepetition_level_encoding()),
-              converter.getEncoding(headerV1.getDefinition_level_encoding()),
-              converter.getEncoding(headerV1.getEncoding()),
-              encryptorRunTime.getMetaDataEncryptor(),
-              encryptorRunTime.getDataPageHeaderAAD());
-          }
-          pageOrdinal++;
-          break;
-        case DATA_PAGE_V2:
-          if (encrypt) {
-            
AesCipher.quickUpdatePageAAD(encryptorRunTime.getDataPageHeaderAAD(), 
pageOrdinal);
-            AesCipher.quickUpdatePageAAD(encryptorRunTime.getDataPageAAD(), 
pageOrdinal);
-          }
-          DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2;
-          int rlLength = headerV2.getRepetition_levels_byte_length();
-          BytesInput rlLevels = readBlockAllocate(rlLength, reader);
-          int dlLength = headerV2.getDefinition_levels_byte_length();
-          BytesInput dlLevels = readBlockAllocate(dlLength, reader);
-          int payLoadLength = pageHeader.getCompressed_page_size() - rlLength 
- dlLength;
-          int rawDataLength = pageHeader.getUncompressed_page_size() - 
rlLength - dlLength;
-          pageLoad = processPayload(reader, payLoadLength, 
encryptorRunTime.getDataEncryptor(), encryptorRunTime.getDataPageAAD(), 
encrypt);
-          readValues += headerV2.getNum_values();
-          writer.writeDataPageV2(headerV2.getNum_rows(),
-            headerV2.getNum_nulls(),
-            headerV2.getNum_values(),
-            rlLevels,
-            dlLevels,
-            converter.getEncoding(headerV2.getEncoding()),
-            BytesInput.from(pageLoad),
-            rawDataLength,
-            converter.fromParquetStatistics(createdBy, 
headerV2.getStatistics(), chunk.getPrimitiveType()));
-          pageOrdinal++;
-          break;
-        default:
-        break;
-      }
-    }
-  }
-
-  private byte[] processPayload(TransParquetFileReader reader, int 
payloadLength, BlockCipher.Encryptor dataEncryptor,
-                                byte[] AAD, boolean encrypt) throws 
IOException {
-    byte[] data = readBlock(payloadLength, reader);
-    if (!encrypt) {
-      return data;
-    }
-    return dataEncryptor.encrypt(data, AAD);
+    RewriteOptions options = new RewriteOptions.Builder(conf, inPath, outPath).
+            
encrypt(paths).encryptionProperties(fileEncryptionProperties).build();
+    ParquetRewriter rewriter = new ParquetRewriter(options);
+    rewriter.processBlocks();
+    rewriter.close();
   }
 
   public byte[] readBlock(int length, TransParquetFileReader reader) throws 
IOException {
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnMasker.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnMasker.java
index 74bc534f8..d3e29292a 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnMasker.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnMasker.java
@@ -18,39 +18,19 @@
  */
 package org.apache.parquet.hadoop.util;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.ColumnReader;
-import org.apache.parquet.column.ColumnWriteStore;
-import org.apache.parquet.column.ColumnWriter;
-import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.column.ParquetProperties.WriterVersion;
-import org.apache.parquet.column.impl.ColumnReadStoreImpl;
-import org.apache.parquet.column.page.PageReadStore;
-import org.apache.parquet.column.values.bloomfilter.BloomFilter;
-import org.apache.parquet.hadoop.CodecFactory;
-import org.apache.parquet.hadoop.ColumnChunkPageWriteStore;
 import org.apache.parquet.hadoop.ParquetFileWriter;
-import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.rewrite.ParquetRewriter;
 import 
org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
-import org.apache.parquet.internal.column.columnindex.ColumnIndex;
-import org.apache.parquet.internal.column.columnindex.OffsetIndex;
-import org.apache.parquet.io.api.Converter;
-import org.apache.parquet.io.api.GroupConverter;
-import org.apache.parquet.io.api.PrimitiveConverter;
-import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.Type;
 
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
 
+@Deprecated
 public class ColumnMasker {
   /**
    *
@@ -64,137 +44,22 @@ public class ColumnMasker {
    */
   public void processBlocks(TransParquetFileReader reader, ParquetFileWriter 
writer, ParquetMetadata meta,
                             MessageType schema, List<String> paths, MaskMode 
maskMode) throws IOException {
-    Set<ColumnPath> nullifyColumns = convertToColumnPaths(paths);
-    int blockIndex = 0;
-    PageReadStore store = reader.readNextRowGroup();
-
-    while (store != null) {
-      writer.startBlock(store.getRowCount());
-      List<ColumnChunkMetaData> columnsInOrder = 
meta.getBlocks().get(blockIndex).getColumns();
-      Map<ColumnPath, ColumnDescriptor> descriptorsMap = 
schema.getColumns().stream().collect(
-        Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
-      ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(store, new 
DummyGroupConverter(), schema,
-        meta.getFileMetaData().getCreatedBy());
-
-      for (int i = 0; i < columnsInOrder.size(); i += 1) {
-        ColumnChunkMetaData chunk = columnsInOrder.get(i);
-        ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
-        processChunk(descriptor, chunk, crStore, reader, writer, schema, 
nullifyColumns, maskMode);
-      }
-
-      writer.endBlock();
-      store = reader.readNextRowGroup();
-      blockIndex++;
-    }
-  }
-
-  private void processChunk(ColumnDescriptor descriptor, ColumnChunkMetaData 
chunk, ColumnReadStoreImpl crStore,
-                            TransParquetFileReader reader, ParquetFileWriter 
writer, MessageType schema,
-                            Set<ColumnPath> paths, MaskMode maskMode) throws 
IOException {
-    reader.setStreamPosition(chunk.getStartingPos());
-
-    if (paths.contains(chunk.getPath())) {
-      if (maskMode.equals(MaskMode.NULLIFY)) {
-        Type.Repetition repetition = 
descriptor.getPrimitiveType().getRepetition();
-        if (repetition.equals(Type.Repetition.REQUIRED)) {
-          throw new IOException("Required column [" + 
descriptor.getPrimitiveType().getName() + "] cannot be nullified");
-        }
-        nullifyColumn(descriptor, chunk, crStore, writer, schema);
-      } else {
-        throw new UnsupportedOperationException("Only nullify is supported for 
now");
-      }
-    } else {
-      BloomFilter bloomFilter = reader.readBloomFilter(chunk);
-      ColumnIndex columnIndex = reader.readColumnIndex(chunk);
-      OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
-      writer.appendColumnChunk(descriptor, reader.getStream(), chunk, 
bloomFilter, columnIndex, offsetIndex);
-    }
+    ParquetRewriter rewriter = new ParquetRewriter(
+            reader, writer, meta, schema, null, null,  paths, 
convertMaskMode(maskMode));
+    rewriter.processBlocks();
   }
 
-  private void nullifyColumn(ColumnDescriptor descriptor, ColumnChunkMetaData 
chunk, ColumnReadStoreImpl crStore,
-                             ParquetFileWriter writer, MessageType schema) 
throws IOException {
-    long totalChunkValues = chunk.getValueCount();
-    int dMax = descriptor.getMaxDefinitionLevel();
-    ColumnReader cReader = crStore.getColumnReader(descriptor);
-
-    WriterVersion writerVersion = chunk.getEncodingStats().usesV2Pages() ? 
WriterVersion.PARQUET_2_0 : WriterVersion.PARQUET_1_0;
-    ParquetProperties props = ParquetProperties.builder()
-      .withWriterVersion(writerVersion)
-      .build();
-    CodecFactory codecFactory = new CodecFactory(new Configuration(), 
props.getPageSizeThreshold());
-    CodecFactory.BytesCompressor compressor =  
codecFactory.getCompressor(chunk.getCodec());
-
-    // Create new schema that only has the current column
-    MessageType newSchema = newSchema(schema, descriptor);
-    ColumnChunkPageWriteStore cPageStore = new 
ColumnChunkPageWriteStore(compressor, newSchema, props.getAllocator(), 
props.getColumnIndexTruncateLength());
-    ColumnWriteStore cStore = props.newColumnWriteStore(newSchema, cPageStore);
-    ColumnWriter cWriter = cStore.getColumnWriter(descriptor);
-
-    for (int i = 0; i < totalChunkValues; i++) {
-      int rlvl = cReader.getCurrentRepetitionLevel();
-      int dlvl = cReader.getCurrentDefinitionLevel();
-      if (dlvl == dMax) {
-        // since we checked ether optional or repeated, dlvl should be > 0
-        if (dlvl == 0) {
-          throw new IOException("definition level is detected to be 0 for 
column " + chunk.getPath().toDotString() + " to be nullified");
-        }
-        // we just write one null for the whole list at the top level, instead 
of nullify the elements in the list one by one
-        if (rlvl == 0) {
-          cWriter.writeNull(rlvl, dlvl - 1);
-        }
-      } else {
-        cWriter.writeNull(rlvl, dlvl);
-      }
-      cStore.endRecord();
+  org.apache.parquet.hadoop.rewrite.MaskMode convertMaskMode(MaskMode 
maskMode) {
+    switch (maskMode) {
+      case NULLIFY:
+        return org.apache.parquet.hadoop.rewrite.MaskMode.NULLIFY;
+      case HASH:
+        return org.apache.parquet.hadoop.rewrite.MaskMode.HASH;
+      case REDACT:
+        return org.apache.parquet.hadoop.rewrite.MaskMode.REDACT;
+      default:
+        return null;
     }
-
-    cStore.flush();
-    cPageStore.flushToFileWriter(writer);
-
-    cStore.close();
-    cWriter.close();
-  }
-
-  private MessageType newSchema(MessageType schema, ColumnDescriptor 
descriptor) {
-    String[] path = descriptor.getPath();
-    Type type = schema.getType(path);
-    if (path.length == 1) {
-      return new MessageType(schema.getName(), type);
-    }
-
-    for (Type field : schema.getFields()) {
-      if (!field.isPrimitive()) {
-        Type newType = extractField(field.asGroupType(), type);
-        if (newType != null) {
-          return new MessageType(schema.getName(), newType);
-        }
-      }
-    }
-
-    // We should never hit this because 'type' is returned by schema.getType().
-    throw new RuntimeException("No field is found");
-  }
-
-  private Type extractField(GroupType candidate, Type targetField) {
-    if (targetField.equals(candidate)) {
-      return targetField;
-    }
-
-    // In case 'type' is a descendants of candidate
-    for (Type field : candidate.asGroupType().getFields()) {
-      if (field.isPrimitive()) {
-        if (field.equals(targetField)) {
-          return new GroupType(candidate.getRepetition(), candidate.getName(), 
targetField);
-        }
-      } else {
-        Type tempField = extractField(field.asGroupType(), targetField);
-        if (tempField != null) {
-          return tempField;
-        }
-      }
-    }
-
-    return null;
   }
 
   public static Set<ColumnPath> convertToColumnPaths(List<String> cols) {
@@ -230,13 +95,4 @@ public class ColumnMasker {
     }
   }
 
-  private static final class DummyGroupConverter extends GroupConverter {
-    @Override public void start() {}
-    @Override public void end() {}
-    @Override public Converter getConverter(int fieldIndex) { return new 
DummyConverter(); }
-  }
-
-  private static final class DummyConverter extends PrimitiveConverter {
-    @Override public GroupConverter asGroupConverter() { return new 
DummyGroupConverter(); }
-  }
 }
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnPruner.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnPruner.java
index 7ca958872..e36d215ff 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnPruner.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnPruner.java
@@ -20,107 +20,23 @@ package org.apache.parquet.hadoop.util;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.parquet.format.converter.ParquetMetadataConverter;
-import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.ParquetFileWriter;
-import org.apache.parquet.hadoop.metadata.ColumnPath;
-import org.apache.parquet.hadoop.metadata.FileMetaData;
-import org.apache.parquet.hadoop.metadata.ParquetMetadata;
-import org.apache.parquet.schema.GroupType;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.Type;
+import org.apache.parquet.hadoop.rewrite.ParquetRewriter;
+import org.apache.parquet.hadoop.rewrite.RewriteOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 
+@Deprecated
 public class ColumnPruner {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ColumnPruner.class);
 
   public void pruneColumns(Configuration conf, Path inputFile, Path 
outputFile, List<String> cols) throws IOException {
-    Set<ColumnPath> prunePaths = convertToColumnPaths(cols);
-    ParquetMetadata pmd = ParquetFileReader.readFooter(conf, inputFile, 
ParquetMetadataConverter.NO_FILTER);
-    FileMetaData metaData = pmd.getFileMetaData();
-    MessageType schema = metaData.getSchema();
-    List<String> paths = new ArrayList<>();
-    getPaths(schema, paths, null);
-
-    for (String col : cols) {
-      if (!paths.contains(col)) {
-        LOG.warn("Input column name {} doesn't show up in the schema of file 
{}", col, inputFile.getName());
-      }
-    }
-
-    ParquetFileWriter writer = new ParquetFileWriter(conf,
-      pruneColumnsInSchema(schema, prunePaths), outputFile, 
ParquetFileWriter.Mode.CREATE);
-
-    writer.start();
-    writer.appendFile(HadoopInputFile.fromPath(inputFile, conf));
-    writer.end(metaData.getKeyValueMetaData());
-  }
-
-  // We have to rewrite getPaths because MessageType only get level 0 paths
-  private void getPaths(GroupType schema, List<String> paths, String parent) {
-    List<Type> fields = schema.getFields();
-    String prefix = (parent == null) ? "" : parent + ".";
-    for (Type field : fields) {
-      paths.add(prefix + field.getName());
-      if (field instanceof GroupType) {
-        getPaths(field.asGroupType(), paths, prefix + field.getName());
-      }
-    }
-  }
-
-  private MessageType pruneColumnsInSchema(MessageType schema, Set<ColumnPath> 
prunePaths) {
-    List<Type> fields = schema.getFields();
-    List<String> currentPath = new ArrayList<>();
-    List<Type> prunedFields = pruneColumnsInFields(fields, currentPath, 
prunePaths);
-    MessageType newSchema = new MessageType(schema.getName(), prunedFields);
-    return newSchema;
-  }
-
-  private List<Type> pruneColumnsInFields(List<Type> fields, List<String> 
currentPath, Set<ColumnPath> prunePaths) {
-    List<Type> prunedFields = new ArrayList<>();
-    for (Type childField : fields) {
-      Type prunedChildField = pruneColumnsInField(childField, currentPath, 
prunePaths);
-      if (prunedChildField != null) {
-        prunedFields.add(prunedChildField);
-      }
-    }
-    return prunedFields;
-  }
-
-  private Type pruneColumnsInField(Type field, List<String> currentPath, 
Set<ColumnPath> prunePaths) {
-    String fieldName = field.getName();
-    currentPath.add(fieldName);
-    ColumnPath path = ColumnPath.get(currentPath.toArray(new String[0]));
-    Type prunedField = null;
-    if (!prunePaths.contains(path)) {
-      if (field.isPrimitive()) {
-        prunedField = field;
-      } else {
-        List<Type> childFields = ((GroupType) field).getFields();
-        List<Type> prunedFields = pruneColumnsInFields(childFields, 
currentPath, prunePaths);
-        if (prunedFields.size() > 0) {
-          prunedField = ((GroupType) field).withNewFields(prunedFields);
-        }
-      }
-    }
-
-    currentPath.remove(fieldName);
-    return prunedField;
-  }
-
-  private Set<ColumnPath> convertToColumnPaths(List<String> cols) {
-    Set<ColumnPath> prunePaths = new HashSet<>();
-    for (String col : cols) {
-      prunePaths.add(ColumnPath.fromDotString(col));
-    }
-    return prunePaths;
+    RewriteOptions options = new RewriteOptions.Builder(conf, inputFile, 
outputFile).prune(cols).build();
+    ParquetRewriter rewriter = new ParquetRewriter(options);
+    rewriter.processBlocks();
+    rewriter.close();
   }
 }
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java
index c77674d8c..161b89f2a 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java
@@ -20,231 +20,36 @@ package org.apache.parquet.hadoop.util;
 
 import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.bytes.BytesInput;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.column.impl.ColumnReadStoreImpl;
-import org.apache.parquet.column.page.DictionaryPage;
-import org.apache.parquet.column.page.PageReadStore;
-import org.apache.parquet.column.statistics.Statistics;
-import org.apache.parquet.compression.CompressionCodecFactory;
-import org.apache.parquet.format.DataPageHeader;
-import org.apache.parquet.format.DataPageHeaderV2;
-import org.apache.parquet.format.DictionaryPageHeader;
 import org.apache.parquet.format.PageHeader;
 import org.apache.parquet.format.Util;
-import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.ParquetFileWriter;
-import org.apache.parquet.hadoop.metadata.BlockMetaData;
-import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
-import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
-import org.apache.parquet.internal.column.columnindex.ColumnIndex;
-import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.hadoop.rewrite.ParquetRewriter;
 import org.apache.parquet.io.InputFile;
-import org.apache.parquet.io.ParquetEncodingException;
 import org.apache.parquet.io.SeekableInputStream;
-import org.apache.parquet.io.api.Converter;
-import org.apache.parquet.io.api.GroupConverter;
-import org.apache.parquet.io.api.PrimitiveConverter;
 import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.PrimitiveType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
 
+@Deprecated
 public class CompressionConverter {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(CompressionConverter.class);
-
-  private final int pageBufferSize = ParquetProperties.DEFAULT_PAGE_SIZE * 2;
-  private byte[] pageBuffer;
-
-  public CompressionConverter() {
-    this.pageBuffer = new byte[pageBufferSize];
-  }
+  private ParquetRewriter rewriter;
 
   public void processBlocks(TransParquetFileReader reader, ParquetFileWriter 
writer, ParquetMetadata meta, MessageType schema,
                             String createdBy, CompressionCodecName codecName) 
throws IOException {
-    int blockIndex = 0;
-    PageReadStore store = reader.readNextRowGroup();
-    while (store != null) {
-      writer.startBlock(store.getRowCount());
-      BlockMetaData blockMetaData = meta.getBlocks().get(blockIndex);
-      List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();
-      Map<ColumnPath, ColumnDescriptor> descriptorsMap = 
schema.getColumns().stream().collect(
-        Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
-      for (int i = 0; i < columnsInOrder.size(); i += 1) {
-        ColumnChunkMetaData chunk = columnsInOrder.get(i);
-        ColumnReadStoreImpl crstore = new ColumnReadStoreImpl(store, new 
DummyGroupConverter(), schema, createdBy);
-        ColumnDescriptor columnDescriptor = 
descriptorsMap.get(chunk.getPath());
-        writer.startColumn(columnDescriptor, 
crstore.getColumnReader(columnDescriptor).getTotalValueCount(), codecName);
-        processChunk(reader, writer, chunk, createdBy, codecName);
-        writer.endColumn();
-      }
-      writer.endBlock();
-      store = reader.readNextRowGroup();
-      blockIndex++;
-    }
-  }
-
-  private void processChunk(TransParquetFileReader reader, ParquetFileWriter 
writer, ColumnChunkMetaData chunk,
-                            String createdBy, CompressionCodecName codecName) 
throws IOException {
-    CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0);
-    CompressionCodecFactory.BytesInputDecompressor decompressor = 
codecFactory.getDecompressor(chunk.getCodec());
-    CompressionCodecFactory.BytesInputCompressor compressor = 
codecFactory.getCompressor(codecName);
-    ColumnIndex columnIndex = reader.readColumnIndex(chunk);
-    OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
-
-    reader.setStreamPosition(chunk.getStartingPos());
-    DictionaryPage dictionaryPage = null;
-    long readValues = 0;
-    Statistics statistics = null;
-    ParquetMetadataConverter converter = new ParquetMetadataConverter();
-    int pageIndex = 0;
-    long totalChunkValues = chunk.getValueCount();
-    while (readValues < totalChunkValues) {
-      PageHeader pageHeader = reader.readPageHeader();
-      int compressedPageSize = pageHeader.getCompressed_page_size();
-      byte[] pageLoad;
-      switch (pageHeader.type) {
-        case DICTIONARY_PAGE:
-          if (dictionaryPage != null) {
-            throw new IOException("has more than one dictionary page in column 
chunk");
-          }
-          DictionaryPageHeader dictPageHeader = 
pageHeader.dictionary_page_header;
-          pageLoad = translatePageLoad(reader, true, compressor, decompressor, 
pageHeader.getCompressed_page_size(), pageHeader.getUncompressed_page_size());
-          writer.writeDictionaryPage(new 
DictionaryPage(BytesInput.from(pageLoad),
-                                                   
pageHeader.getUncompressed_page_size(),
-                                                   
dictPageHeader.getNum_values(),
-                                                   
converter.getEncoding(dictPageHeader.getEncoding())));
-          break;
-        case DATA_PAGE:
-          DataPageHeader headerV1 = pageHeader.data_page_header;
-          pageLoad = translatePageLoad(reader, true, compressor, decompressor, 
pageHeader.getCompressed_page_size(), pageHeader.getUncompressed_page_size());
-          statistics = convertStatistics(createdBy, chunk.getPrimitiveType(), 
headerV1.getStatistics(), columnIndex, pageIndex, converter);
-          readValues += headerV1.getNum_values();
-          if (offsetIndex != null) {
-            long rowCount = 1 + offsetIndex.getLastRowIndex(pageIndex, 
totalChunkValues) - offsetIndex.getFirstRowIndex(pageIndex);
-            writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()),
-              pageHeader.getUncompressed_page_size(),
-              BytesInput.from(pageLoad),
-              statistics,
-              toIntWithCheck(rowCount),
-              converter.getEncoding(headerV1.getRepetition_level_encoding()),
-              converter.getEncoding(headerV1.getDefinition_level_encoding()),
-              converter.getEncoding(headerV1.getEncoding()));
-          } else {
-            writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()),
-              pageHeader.getUncompressed_page_size(),
-              BytesInput.from(pageLoad),
-              statistics,
-              converter.getEncoding(headerV1.getRepetition_level_encoding()),
-              converter.getEncoding(headerV1.getDefinition_level_encoding()),
-              converter.getEncoding(headerV1.getEncoding()));
-          }
-          pageIndex++;
-          break;
-        case DATA_PAGE_V2:
-          DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2;
-          int rlLength = headerV2.getRepetition_levels_byte_length();
-          BytesInput rlLevels = readBlockAllocate(rlLength, reader);
-          int dlLength = headerV2.getDefinition_levels_byte_length();
-          BytesInput dlLevels = readBlockAllocate(dlLength, reader);
-          int payLoadLength = pageHeader.getCompressed_page_size() - rlLength 
- dlLength;
-          int rawDataLength = pageHeader.getUncompressed_page_size() - 
rlLength - dlLength;
-          pageLoad = translatePageLoad(reader, headerV2.is_compressed, 
compressor, decompressor, payLoadLength, rawDataLength);
-          statistics = convertStatistics(createdBy, chunk.getPrimitiveType(), 
headerV2.getStatistics(), columnIndex, pageIndex, converter);
-          readValues += headerV2.getNum_values();
-          writer.writeDataPageV2(headerV2.getNum_rows(),
-            headerV2.getNum_nulls(),
-            headerV2.getNum_values(),
-            rlLevels,
-            dlLevels,
-            converter.getEncoding(headerV2.getEncoding()),
-            BytesInput.from(pageLoad),
-            rawDataLength,
-            statistics);
-          pageIndex++;
-          break;
-        default:
-          LOG.debug("skipping page of type {} of size {}", 
pageHeader.getType(), compressedPageSize);
-          break;
-      }
-    }
-  }
-
-  private Statistics convertStatistics(String createdBy, PrimitiveType type, 
org.apache.parquet.format.Statistics pageStatistics,
-                                       ColumnIndex columnIndex, int pageIndex, 
ParquetMetadataConverter converter) throws IOException {
-    if (columnIndex != null) {
-      if (columnIndex.getNullPages() == null) {
-        throw new IOException("columnIndex has null variable 'nullPages' which 
indicates corrupted data for type: " +  type.getName());
-      }
-      if (pageIndex > columnIndex.getNullPages().size()) {
-        throw new IOException("There are more pages " + pageIndex + " found in 
the column than in the columnIndex " + columnIndex.getNullPages().size());
-      }
-      org.apache.parquet.column.statistics.Statistics.Builder statsBuilder = 
org.apache.parquet.column.statistics.Statistics.getBuilderForReading(type);
-      statsBuilder.withNumNulls(columnIndex.getNullCounts().get(pageIndex));
-
-      if (!columnIndex.getNullPages().get(pageIndex)) {
-        
statsBuilder.withMin(columnIndex.getMinValues().get(pageIndex).array().clone());
-        
statsBuilder.withMax(columnIndex.getMaxValues().get(pageIndex).array().clone());
-      }
-      return statsBuilder.build();
-    } else if (pageStatistics != null) {
-      return converter.fromParquetStatistics(createdBy, pageStatistics, type);
-    } else {
-      return null;
-    }
-  }
-
-  private byte[] translatePageLoad(TransParquetFileReader reader, boolean 
isCompressed, CompressionCodecFactory.BytesInputCompressor compressor,
-                                   
CompressionCodecFactory.BytesInputDecompressor decompressor, int payloadLength, 
int rawDataLength) throws IOException {
-    BytesInput data = readBlock(payloadLength, reader);
-    if (isCompressed) {
-      data = decompressor.decompress(data, rawDataLength);
-    }
-    BytesInput newCompressedData = compressor.compress(data);
-    return newCompressedData.toByteArray();
+    rewriter = new ParquetRewriter(reader, writer, meta, schema, createdBy, 
codecName, null, null);
+    rewriter.processBlocks();
   }
 
   public BytesInput readBlock(int length, TransParquetFileReader reader) 
throws IOException {
-    byte[] data;
-    if (length > pageBufferSize) {
-      data = new byte[length];
-    } else {
-      data = pageBuffer;
-    }
-    reader.blockRead(data, 0, length);
-    return BytesInput.from(data, 0, length);
+    return rewriter.readBlock(length, reader);
   }
 
   public BytesInput readBlockAllocate(int length, TransParquetFileReader 
reader) throws IOException {
-    byte[] data = new byte[length];
-    reader.blockRead(data, 0, length);
-    return BytesInput.from(data, 0, length);
-  }
-
-  private int toIntWithCheck(long size) {
-    if ((int)size != size) {
-      throw new ParquetEncodingException("size is bigger than " + 
Integer.MAX_VALUE + " bytes: " + size);
-    }
-    return (int)size;
-  }
-
-  private static final class DummyGroupConverter extends GroupConverter {
-    @Override public void start() {}
-    @Override public void end() {}
-    @Override public Converter getConverter(int fieldIndex) { return new 
DummyConverter(); }
-  }
-
-  private static final class DummyConverter extends PrimitiveConverter {
-    @Override public GroupConverter asGroupConverter() { return new 
DummyGroupConverter(); }
+    return rewriter.readBlockAllocate(length, reader);
   }
 
   public static final class TransParquetFileReader extends ParquetFileReader {
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
new file mode 100644
index 000000000..a6881577c
--- /dev/null
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.rewrite;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.ParquetCipher;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroup;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import 
org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
+import org.apache.parquet.hadoop.util.EncDecProperties;
+import org.apache.parquet.hadoop.util.EncryptionTestFile;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.hadoop.util.TestFileBuilder;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.SeekableInputStream;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class ParquetRewriterTest {
+
+  private final int numRecord = 100000;
+  private Configuration conf = new Configuration();
+  private EncryptionTestFile inputFile = null;
+  private String outputFile = null;
+  private ParquetRewriter rewriter = null;
+
+  @Test
+  public void testPruneSingleColumnAndTranslateCodec() throws Exception {
+    testSetup("GZIP");
+
+    Path inputPath = new Path(inputFile.getFileName());
+    Path outputPath = new Path(outputFile);
+    List<String> pruneColumns = Arrays.asList("Gender");
+    CompressionCodecName newCodec = CompressionCodecName.ZSTD;
+    RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, 
inputPath, outputPath);
+    RewriteOptions options = 
builder.prune(pruneColumns).transform(newCodec).build();
+
+    rewriter = new ParquetRewriter(options);
+    rewriter.processBlocks();
+    rewriter.close();
+
+    // Verify the schema are not changed for the columns not pruned
+    ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new 
Path(outputFile), ParquetMetadataConverter.NO_FILTER);
+    MessageType schema = pmd.getFileMetaData().getSchema();
+    List<Type> fields = schema.getFields();
+    assertEquals(fields.size(), 3);
+    assertEquals(fields.get(0).getName(), "DocId");
+    assertEquals(fields.get(1).getName(), "Name");
+    assertEquals(fields.get(2).getName(), "Links");
+    List<Type> subFields = fields.get(2).asGroupType().getFields();
+    assertEquals(subFields.size(), 2);
+    assertEquals(subFields.get(0).getName(), "Backward");
+    assertEquals(subFields.get(1).getName(), "Forward");
+
+    // Verify codec has been translated
+    verifyCodec(outputFile, CompressionCodecName.ZSTD, null);
+
+    // Verify the data are not changed for the columns not pruned
+    validateColumnData(outputFile, inputFile.getFileContent(), new 
HashSet<>(pruneColumns), Collections.emptySet(), null);
+
+    // Verify the page index
+    validatePageIndex(new HashMap<Integer, Integer>() {{
+      put(0, 0);
+      put(1, 1);
+      put(2, 3);
+      put(3, 4);
+    }});
+
+    // Verify original.created.by is preserved
+    validateCreatedBy();
+  }
+
+  @Test
+  public void testPruneNullifyAndTranslateCodec() throws Exception {
+    testSetup("UNCOMPRESSED");
+
+    Path inputPath = new Path(inputFile.getFileName());
+    Path outputPath = new Path(outputFile);
+    List<String> pruneColumns = Arrays.asList("Gender");
+    Map<String, MaskMode> maskColumns = new HashMap<>();
+    maskColumns.put("Links.Forward", MaskMode.NULLIFY);
+    CompressionCodecName newCodec = CompressionCodecName.GZIP;
+    RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, 
inputPath, outputPath);
+    RewriteOptions options = 
builder.prune(pruneColumns).mask(maskColumns).transform(newCodec).build();
+
+    rewriter = new ParquetRewriter(options);
+    rewriter.processBlocks();
+    rewriter.close();
+
+    // Verify the schema are not changed for the columns not pruned
+    ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new 
Path(outputFile), ParquetMetadataConverter.NO_FILTER);
+    MessageType schema = pmd.getFileMetaData().getSchema();
+    List<Type> fields = schema.getFields();
+    assertEquals(fields.size(), 3);
+    assertEquals(fields.get(0).getName(), "DocId");
+    assertEquals(fields.get(1).getName(), "Name");
+    assertEquals(fields.get(2).getName(), "Links");
+    List<Type> subFields = fields.get(2).asGroupType().getFields();
+    assertEquals(subFields.size(), 2);
+    assertEquals(subFields.get(0).getName(), "Backward");
+    assertEquals(subFields.get(1).getName(), "Forward");
+
+    // Verify codec has been translated
+    verifyCodec(outputFile, newCodec, null);
+
+    // Verify the data are not changed for the columns not pruned
+    validateColumnData(outputFile, inputFile.getFileContent(), new 
HashSet<>(pruneColumns), maskColumns.keySet(), null);
+
+    // Verify the page index
+    validatePageIndex(new HashMap<Integer, Integer>() {{
+      put(0, 0);
+      put(1, 1);
+      put(2, 3);
+    }});
+
+    // Verify original.created.by is preserved
+    validateCreatedBy();
+  }
+
+  @Test
+  public void testPruneEncryptAndTranslateCodec() throws Exception {
+    testSetup("GZIP");
+
+    Path inputPath = new Path(inputFile.getFileName());
+    Path outputPath = new Path(outputFile);
+    RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, 
inputPath, outputPath);
+
+    // Prune
+    List<String> pruneColumns = Arrays.asList("Gender");
+    builder.prune(pruneColumns);
+
+    // Translate codec
+    CompressionCodecName newCodec = CompressionCodecName.ZSTD;
+    builder.transform(newCodec);
+
+    // Encrypt
+    String[] encryptColumns = {"DocId"};
+    FileEncryptionProperties fileEncryptionProperties =
+            EncDecProperties.getFileEncryptionProperties(encryptColumns, 
ParquetCipher.AES_GCM_CTR_V1, false);
+    
builder.encrypt(Arrays.asList(encryptColumns)).encryptionProperties(fileEncryptionProperties);
+
+    RewriteOptions options = builder.build();
+    rewriter = new ParquetRewriter(options);
+    rewriter.processBlocks();
+    rewriter.close();
+
+    // Verify the schema are not changed for the columns not pruned
+    ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new 
Path(outputFile), ParquetMetadataConverter.NO_FILTER);
+    MessageType schema = pmd.getFileMetaData().getSchema();
+    List<Type> fields = schema.getFields();
+    assertEquals(fields.size(), 3);
+    assertEquals(fields.get(0).getName(), "DocId");
+    assertEquals(fields.get(1).getName(), "Name");
+    assertEquals(fields.get(2).getName(), "Links");
+    List<Type> subFields = fields.get(2).asGroupType().getFields();
+    assertEquals(subFields.size(), 2);
+    assertEquals(subFields.get(0).getName(), "Backward");
+    assertEquals(subFields.get(1).getName(), "Forward");
+
+    // Verify codec has been translated
+    FileDecryptionProperties fileDecryptionProperties = 
EncDecProperties.getFileDecryptionProperties();
+    verifyCodec(outputFile, newCodec, fileDecryptionProperties);
+
+    // Verify the data are not changed for the columns not pruned
+    validateColumnData(outputFile,
+            inputFile.getFileContent(), new HashSet<>(pruneColumns), 
Collections.emptySet(), fileDecryptionProperties);
+
+    // Verify column encryption
+    ParquetMetadata metaData = getFileMetaData(outputFile, 
fileDecryptionProperties);
+    assertTrue(metaData.getBlocks().size() > 0);
+    List<ColumnChunkMetaData> columns = 
metaData.getBlocks().get(0).getColumns();
+    Set<String> set = new HashSet<>(Arrays.asList(encryptColumns));
+    for (ColumnChunkMetaData column : columns) {
+      if (set.contains(column.getPath().toDotString())) {
+        assertTrue(column.isEncrypted());
+      } else {
+        assertFalse(column.isEncrypted());
+      }
+    }
+
+    // Verify original.created.by is preserved
+    validateCreatedBy();
+  }
+
+  private void testSetup(String compression) throws IOException {
+    MessageType schema = createSchema();
+    inputFile = new TestFileBuilder(conf, schema)
+            .withNumRecord(numRecord)
+            .withCodec(compression)
+            .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+            .build();
+    outputFile = TestFileBuilder.createTempFile("test");
+  }
+
+  private MessageType createSchema() {
+    return new MessageType("schema",
+            new PrimitiveType(OPTIONAL, INT64, "DocId"),
+            new PrimitiveType(REQUIRED, BINARY, "Name"),
+            new PrimitiveType(OPTIONAL, BINARY, "Gender"),
+            new GroupType(OPTIONAL, "Links",
+                    new PrimitiveType(REPEATED, BINARY, "Backward"),
+                    new PrimitiveType(REPEATED, BINARY, "Forward")));
+  }
+
+  private void validateColumnData(String file,
+                                  SimpleGroup[] fileContent,
+                                  Set<String> prunePaths,
+                                  Set<String> nullifiedPaths,
+                                  FileDecryptionProperties 
fileDecryptionProperties) throws IOException {
+    ParquetReader<Group> reader = ParquetReader.builder(new 
GroupReadSupport(), new Path(file))
+            .withConf(conf).withDecryption(fileDecryptionProperties).build();
+    for (int i = 0; i < numRecord; i++) {
+      Group group = reader.read();
+      if (!prunePaths.contains("DocId") && !nullifiedPaths.contains("DocId")) {
+        assertTrue(group.getLong("DocId", 0) == 
fileContent[i].getLong("DocId", 0));
+      }
+      if (!prunePaths.contains("Name") && !nullifiedPaths.contains("Name")) {
+        assertArrayEquals(group.getBinary("Name", 0).getBytes(),
+                fileContent[i].getBinary("Name", 0).getBytes());
+      }
+      if (!prunePaths.contains("Gender") && 
!nullifiedPaths.contains("Gender")) {
+        assertArrayEquals(group.getBinary("Gender", 0).getBytes(),
+                fileContent[i].getBinary("Gender", 0).getBytes());
+      }
+      Group subGroup = group.getGroup("Links", 0);
+      if (!prunePaths.contains("Links.Backward") && 
!nullifiedPaths.contains("Links.Backward")) {
+        assertArrayEquals(subGroup.getBinary("Backward", 0).getBytes(),
+                fileContent[i].getGroup("Links", 0).getBinary("Backward", 
0).getBytes());
+      }
+      if (!prunePaths.contains("Links.Forward") && 
!nullifiedPaths.contains("Links.Forward")) {
+        assertArrayEquals(subGroup.getBinary("Forward", 0).getBytes(),
+                fileContent[i].getGroup("Links", 0).getBinary("Forward", 
0).getBytes());
+      }
+    }
+    reader.close();
+  }
+
+  private ParquetMetadata getFileMetaData(String file,
+                                          FileDecryptionProperties 
fileDecryptionProperties) throws IOException {
+    ParquetReadOptions readOptions = ParquetReadOptions.builder()
+            .withDecryption(fileDecryptionProperties)
+            .build();
+    ParquetMetadata pmd = null;
+    InputFile inputFile = HadoopInputFile.fromPath(new Path(file), conf);
+    try (SeekableInputStream in = inputFile.newStream()) {
+      pmd = ParquetFileReader.readFooter(inputFile, readOptions, in);
+    }
+    return pmd;
+  }
+
+  private void verifyCodec(String file,
+                           CompressionCodecName codec,
+                           FileDecryptionProperties fileDecryptionProperties) 
throws IOException {
+    ParquetMetadata pmd = getFileMetaData(file, fileDecryptionProperties);
+    for (int i = 0; i < pmd.getBlocks().size(); i++) {
+      BlockMetaData block = pmd.getBlocks().get(i);
+      for (int j = 0; j < block.getColumns().size(); ++j) {
+        ColumnChunkMetaData columnChunkMetaData = block.getColumns().get(j);
+        assertEquals(columnChunkMetaData.getCodec(), codec);
+      }
+    }
+  }
+
+  /**
+   * Verify the page index is correct.
+   *
+   * @param outFileColumnMapping the column mapping from the output file to 
the input file.
+   */
+  private void validatePageIndex(Map<Integer, Integer> outFileColumnMapping) 
throws Exception {
+    ParquetMetadata inMetaData = getFileMetaData(inputFile.getFileName(), 
null);
+    ParquetMetadata outMetaData = getFileMetaData(outputFile, null);
+    assertEquals(inMetaData.getBlocks().size(), 
outMetaData.getBlocks().size());
+
+    try (TransParquetFileReader inReader = new TransParquetFileReader(
+            HadoopInputFile.fromPath(new Path(inputFile.getFileName()), conf), 
HadoopReadOptions.builder(conf).build());
+         TransParquetFileReader outReader = new TransParquetFileReader(
+                 HadoopInputFile.fromPath(new Path(outputFile), conf), 
HadoopReadOptions.builder(conf).build())) {
+
+      for (int i = 0; i < inMetaData.getBlocks().size(); i++) {
+        BlockMetaData inBlockMetaData = inMetaData.getBlocks().get(i);
+        BlockMetaData outBlockMetaData = outMetaData.getBlocks().get(i);
+
+        for (int j = 0; j < outBlockMetaData.getColumns().size(); j++) {
+          if (!outFileColumnMapping.containsKey(j)) {
+            continue;
+          }
+          int columnIdFromInputFile = outFileColumnMapping.get(j);
+          ColumnChunkMetaData inChunk = 
inBlockMetaData.getColumns().get(columnIdFromInputFile);
+          ColumnIndex inColumnIndex = inReader.readColumnIndex(inChunk);
+          OffsetIndex inOffsetIndex = inReader.readOffsetIndex(inChunk);
+          ColumnChunkMetaData outChunk = outBlockMetaData.getColumns().get(j);
+          ColumnIndex outColumnIndex = outReader.readColumnIndex(outChunk);
+          OffsetIndex outOffsetIndex = outReader.readOffsetIndex(outChunk);
+          if (inColumnIndex != null) {
+            assertEquals(inColumnIndex.getBoundaryOrder(), 
outColumnIndex.getBoundaryOrder());
+            assertEquals(inColumnIndex.getMaxValues(), 
outColumnIndex.getMaxValues());
+            assertEquals(inColumnIndex.getMinValues(), 
outColumnIndex.getMinValues());
+            assertEquals(inColumnIndex.getNullCounts(), 
outColumnIndex.getNullCounts());
+          }
+          if (inOffsetIndex != null) {
+            List<Long> inOffsets = getOffsets(inReader, inChunk);
+            List<Long> outOffsets = getOffsets(outReader, outChunk);
+            assertEquals(inOffsets.size(), outOffsets.size());
+            assertEquals(inOffsets.size(), inOffsetIndex.getPageCount());
+            assertEquals(inOffsetIndex.getPageCount(), 
outOffsetIndex.getPageCount());
+            for (int k = 0; k < inOffsetIndex.getPageCount(); k++) {
+              assertEquals(inOffsetIndex.getFirstRowIndex(k), 
outOffsetIndex.getFirstRowIndex(k));
+              assertEquals(inOffsetIndex.getLastRowIndex(k, 
inChunk.getValueCount()),
+                      outOffsetIndex.getLastRowIndex(k, 
outChunk.getValueCount()));
+              assertEquals(inOffsetIndex.getOffset(k), (long) 
inOffsets.get(k));
+              assertEquals(outOffsetIndex.getOffset(k), (long) 
outOffsets.get(k));
+            }
+          }
+        }
+      }
+    }
+  }
+
+  private List<Long> getOffsets(TransParquetFileReader reader, 
ColumnChunkMetaData chunk) throws IOException {
+    List<Long> offsets = new ArrayList<>();
+    reader.setStreamPosition(chunk.getStartingPos());
+    long readValues = 0;
+    long totalChunkValues = chunk.getValueCount();
+    while (readValues < totalChunkValues) {
+      long curOffset = reader.getPos();
+      PageHeader pageHeader = reader.readPageHeader();
+      switch (pageHeader.type) {
+        case DICTIONARY_PAGE:
+          rewriter.readBlock(pageHeader.getCompressed_page_size(), reader);
+          break;
+        case DATA_PAGE:
+          DataPageHeader headerV1 = pageHeader.data_page_header;
+          offsets.add(curOffset);
+          rewriter.readBlock(pageHeader.getCompressed_page_size(), reader);
+          readValues += headerV1.getNum_values();
+          break;
+        case DATA_PAGE_V2:
+          DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2;
+          offsets.add(curOffset);
+          int rlLength = headerV2.getRepetition_levels_byte_length();
+          rewriter.readBlock(rlLength, reader);
+          int dlLength = headerV2.getDefinition_levels_byte_length();
+          rewriter.readBlock(dlLength, reader);
+          int payLoadLength = pageHeader.getCompressed_page_size() - rlLength 
- dlLength;
+          rewriter.readBlock(payLoadLength, reader);
+          readValues += headerV2.getNum_values();
+          break;
+        default:
+          throw new IOException("Not recognized page type");
+      }
+    }
+    return offsets;
+  }
+
+  private void validateCreatedBy() throws Exception {
+    FileMetaData inFMD = getFileMetaData(inputFile.getFileName(), 
null).getFileMetaData();
+    FileMetaData outFMD = getFileMetaData(outputFile, null).getFileMetaData();
+
+    assertEquals(inFMD.getCreatedBy(), outFMD.getCreatedBy());
+    
assertNull(inFMD.getKeyValueMetaData().get(ParquetRewriter.ORIGINAL_CREATED_BY_KEY));
+
+    String originalCreatedBy = 
outFMD.getKeyValueMetaData().get(ParquetRewriter.ORIGINAL_CREATED_BY_KEY);
+    assertNotNull(originalCreatedBy);
+    assertEquals(inFMD.getCreatedBy(), originalCreatedBy);
+  }
+
+}
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConveterTest.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConverterTest.java
similarity index 99%
rename from 
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConveterTest.java
rename to 
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConverterTest.java
index 218abb95b..d52344934 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConveterTest.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConverterTest.java
@@ -65,7 +65,7 @@ import static 
org.apache.parquet.schema.Type.Repetition.REQUIRED;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertTrue;
 
-public class CompressionConveterTest {
+public class CompressionConverterTest {
 
   private Configuration conf = new Configuration();
   private Map<String, String> extraMeta

Reply via email to