[ 
https://issues.apache.org/jira/browse/PARQUET-2075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17651836#comment-17651836
 ] 

ASF GitHub Bot commented on PARQUET-2075:
-----------------------------------------

shangxinli commented on code in PR #1014:
URL: https://github.com/apache/parquet-mr/pull/1014#discussion_r1056879313


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##########
@@ -0,0 +1,733 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.rewrite;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.ColumnWriteStore;
+import org.apache.parquet.column.ColumnWriter;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.impl.ColumnReadStoreImpl;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.ColumnChunkPageWriteStore;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import 
org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopCodecs;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static 
org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
+import static 
org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH;
+import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
+
+public class ParquetRewriter implements Closeable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ParquetRewriter.class);
+  private final int pageBufferSize = ParquetProperties.DEFAULT_PAGE_SIZE * 2;
+  private final byte[] pageBuffer = new byte[pageBufferSize];
+  private TransParquetFileReader reader;
+  private ParquetFileWriter writer;
+  private ParquetMetadata meta;
+  private MessageType schema;
+  private String createdBy;
+  private CompressionCodecName codecName = null;
+  private List<String> pruneColumns = null;
+  private Map<ColumnPath, MaskMode> maskColumns = null;
+  private Set<ColumnPath> encryptColumns = null;
+  private boolean encryptMode = false;
+
+  public ParquetRewriter(RewriteOptions options) throws IOException {
+    Path inPath = options.getInputFile();
+    Path outPath = options.getOutputFile();
+    Configuration conf = options.getConf();
+
+    // TODO: set more member variables
+    codecName = options.getCodecName();
+    pruneColumns = options.getPruneColumns();
+
+    // Get file metadata and full schema from the input file
+    meta = ParquetFileReader.readFooter(conf, inPath, NO_FILTER);
+    schema = meta.getFileMetaData().getSchema();
+    createdBy = meta.getFileMetaData().getCreatedBy();
+
+    // Prune columns if specified
+    if (pruneColumns != null && !pruneColumns.isEmpty()) {
+      List<String> paths = new ArrayList<>();
+      getPaths(schema, paths, null);

Review Comment:
   Should we move the line 120 to be inside the getPath()?





> Unified Rewriter Tool  
> -----------------------
>
>                 Key: PARQUET-2075
>                 URL: https://issues.apache.org/jira/browse/PARQUET-2075
>             Project: Parquet
>          Issue Type: New Feature
>            Reporter: Xinli Shang
>            Assignee: Gang Wu
>            Priority: Major
>
> During the discussion of PARQUET-2071, we came up with the idea of a 
> universal tool to translate the existing file to a different state while 
> skipping some level steps like encoding/decoding, to gain speed. For example, 
> only decompress pages and then compress directly. For PARQUET-2071, we only 
> decrypt and then encrypt directly. This will be useful for the existing data 
> to onboard Parquet features like column encryption, zstd etc. 
> We already have tools like trans-compression, column pruning etc. We will 
> consolidate all these tools with this universal tool. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to