[ https://issues.apache.org/jira/browse/PARQUET-2075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17651836#comment-17651836 ]
ASF GitHub Bot commented on PARQUET-2075: ----------------------------------------- shangxinli commented on code in PR #1014: URL: https://github.com/apache/parquet-mr/pull/1014#discussion_r1056879313 ########## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java: ########## @@ -0,0 +1,733 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop.rewrite; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.HadoopReadOptions; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.ColumnReader; +import org.apache.parquet.column.ColumnWriteStore; +import org.apache.parquet.column.ColumnWriter; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.column.impl.ColumnReadStoreImpl; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.column.values.bloomfilter.BloomFilter; +import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.crypto.AesCipher; +import org.apache.parquet.crypto.InternalColumnEncryptionSetup; +import org.apache.parquet.crypto.InternalFileEncryptor; +import org.apache.parquet.format.BlockCipher; +import org.apache.parquet.format.DataPageHeader; +import org.apache.parquet.format.DataPageHeaderV2; +import org.apache.parquet.format.DictionaryPageHeader; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.CodecFactory; +import org.apache.parquet.hadoop.ColumnChunkPageWriteStore; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader; +import org.apache.parquet.hadoop.util.HadoopCodecs; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.hadoop.util.HadoopOutputFile; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.io.ParquetEncodingException; +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; +import static org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH; +import static org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH; +import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType; +import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE; +import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT; + +public class ParquetRewriter implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(ParquetRewriter.class); + private final int pageBufferSize = ParquetProperties.DEFAULT_PAGE_SIZE * 2; + private final byte[] pageBuffer = new byte[pageBufferSize]; + private TransParquetFileReader reader; + private ParquetFileWriter writer; + private ParquetMetadata meta; + private MessageType schema; + private String createdBy; + private CompressionCodecName codecName = null; + private List<String> pruneColumns = null; + private Map<ColumnPath, MaskMode> maskColumns = null; + private Set<ColumnPath> encryptColumns = null; + private boolean encryptMode = false; + + public ParquetRewriter(RewriteOptions options) throws IOException { + Path inPath = options.getInputFile(); + Path outPath = options.getOutputFile(); + Configuration conf = options.getConf(); + + // TODO: set more member variables + codecName = options.getCodecName(); + pruneColumns = options.getPruneColumns(); + + // Get file metadata and full schema from the input file + meta = ParquetFileReader.readFooter(conf, inPath, NO_FILTER); + schema = meta.getFileMetaData().getSchema(); + createdBy = meta.getFileMetaData().getCreatedBy(); + + // Prune columns if specified + if (pruneColumns != null && !pruneColumns.isEmpty()) { + List<String> paths = new ArrayList<>(); + getPaths(schema, paths, null); Review Comment: Should we move the line 120 to be inside the getPath()? > Unified Rewriter Tool > ----------------------- > > Key: PARQUET-2075 > URL: https://issues.apache.org/jira/browse/PARQUET-2075 > Project: Parquet > Issue Type: New Feature > Reporter: Xinli Shang > Assignee: Gang Wu > Priority: Major > > During the discussion of PARQUET-2071, we came up with the idea of a > universal tool to translate the existing file to a different state while > skipping some level steps like encoding/decoding, to gain speed. For example, > only decompress pages and then compress directly. For PARQUET-2071, we only > decrypt and then encrypt directly. This will be useful for the existing data > to onboard Parquet features like column encryption, zstd etc. > We already have tools like trans-compression, column pruning etc. We will > consolidate all these tools with this universal tool. -- This message was sent by Atlassian Jira (v8.20.10#820010)