This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new d6417dfad PARQUET-2227: Refactor several file rewriters to use a new
unified ParquetRewriter implementation (#1014)
d6417dfad is described below
commit d6417dfad59c1423e358a5e859c332abd1244d2f
Author: Gang Wu <[email protected]>
AuthorDate: Mon Jan 30 04:25:17 2023 +0800
PARQUET-2227: Refactor several file rewriters to use a new unified
ParquetRewriter implementation (#1014)
- A new ParquetRewriter is introduced to unify rewriting logic.
- RewriteOptions is defined to provide essential settings.
- CompressionConverter, ColumnPruner, ColumnMasker, and ColumnEncryptor
have been refactored.
- Check conflicts in the RewriterOptions.
- Rename EncryptorRunTime to ColumnChunkEncryptorRunTime.
- Avoid redundant check in the ColumnChunkEncryptorRunTime.
- Simplify MaskMode enum.
- add mixed test cases for rewriter
- add test case with encryption/pruning/transcodec
- fix error message
- rename createdBy to originalCreatedBy, add multiple inputs to
RewriterOptions
- rewriter keeps old writer version into original.created.by
---
.../apache/parquet/hadoop/rewrite/MaskMode.java | 38 ++
.../parquet/hadoop/rewrite/ParquetRewriter.java | 754 +++++++++++++++++++++
.../parquet/hadoop/rewrite/RewriteOptions.java | 184 +++++
.../parquet/hadoop/util/ColumnEncryptor.java | 255 +------
.../apache/parquet/hadoop/util/ColumnMasker.java | 174 +----
.../apache/parquet/hadoop/util/ColumnPruner.java | 98 +--
.../parquet/hadoop/util/CompressionConverter.java | 209 +-----
.../hadoop/rewrite/ParquetRewriterTest.java | 428 ++++++++++++
...eterTest.java => CompressionConverterTest.java} | 2 +-
9 files changed, 1442 insertions(+), 700 deletions(-)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/MaskMode.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/MaskMode.java
new file mode 100644
index 000000000..4e1fda0de
--- /dev/null
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/MaskMode.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.rewrite;
+
+import org.apache.parquet.Preconditions;
+
+public enum MaskMode {
+ NULLIFY("nullify"),
+ HASH("hash"),
+ REDACT("redact");
+
+ private String mode;
+
+ MaskMode(String text) {
+ Preconditions.checkArgument(text != null, "Text of mask mode is required");
+ this.mode = text;
+ }
+
+ public String getMode() {
+ return this.mode;
+ }
+}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
new file mode 100644
index 000000000..e4870af98
--- /dev/null
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
@@ -0,0 +1,754 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.rewrite;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.ColumnWriteStore;
+import org.apache.parquet.column.ColumnWriter;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.impl.ColumnReadStoreImpl;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.ColumnChunkPageWriteStore;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import
org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopCodecs;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static
org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
+import static
org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH;
+import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import static
org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
+
+public class ParquetRewriter implements Closeable {
+
+ // Key to store original writer version in the file key-value metadata
+ public static final String ORIGINAL_CREATED_BY_KEY = "original.created.by";
+ private static final Logger LOG =
LoggerFactory.getLogger(ParquetRewriter.class);
+ private final int pageBufferSize = ParquetProperties.DEFAULT_PAGE_SIZE * 2;
+ private final byte[] pageBuffer = new byte[pageBufferSize];
+ private TransParquetFileReader reader;
+ private ParquetFileWriter writer;
+ private ParquetMetadata meta;
+ private MessageType schema;
+ private String originalCreatedBy;
+ private CompressionCodecName newCodecName = null;
+ private List<String> pruneColumns = null;
+ private Map<ColumnPath, MaskMode> maskColumns = null;
+ private Set<ColumnPath> encryptColumns = null;
+ private boolean encryptMode = false;
+ private Map<String, String> extraMetaData = new HashMap<>();
+
+ public ParquetRewriter(RewriteOptions options) throws IOException {
+ Preconditions.checkArgument(options.getInputFiles().size() == 1, "Only
support one input file");
+ Path inPath = options.getInputFiles().get(0);
+ Path outPath = options.getOutputFile();
+ Configuration conf = options.getConf();
+
+ newCodecName = options.getNewCodecName();
+ pruneColumns = options.getPruneColumns();
+
+ // Get file metadata and full schema from the input file
+ meta = ParquetFileReader.readFooter(conf, inPath, NO_FILTER);
+ schema = meta.getFileMetaData().getSchema();
+ originalCreatedBy = meta.getFileMetaData().getCreatedBy();
+ extraMetaData.putAll(meta.getFileMetaData().getKeyValueMetaData());
+ extraMetaData.put(ORIGINAL_CREATED_BY_KEY, originalCreatedBy);
+
+ // Prune columns if specified
+ if (pruneColumns != null && !pruneColumns.isEmpty()) {
+ List<String> paths = new ArrayList<>();
+ getPaths(schema, paths, null);
+ for (String col : pruneColumns) {
+ if (!paths.contains(col)) {
+ LOG.warn("Input column name {} doesn't show up in the schema of file
{}", col, inPath.getName());
+ }
+ }
+
+ Set<ColumnPath> prunePaths = convertToColumnPaths(pruneColumns);
+ schema = pruneColumnsInSchema(schema, prunePaths);
+ }
+
+ if (options.getMaskColumns() != null) {
+ this.maskColumns = new HashMap<>();
+ for (Map.Entry<String, MaskMode> col :
options.getMaskColumns().entrySet()) {
+ maskColumns.put(ColumnPath.fromDotString(col.getKey()),
col.getValue());
+ }
+ }
+
+ if (options.getEncryptColumns() != null &&
options.getFileEncryptionProperties() != null) {
+ this.encryptColumns = convertToColumnPaths(options.getEncryptColumns());
+ this.encryptMode = true;
+ }
+
+ reader = new TransParquetFileReader(
+ HadoopInputFile.fromPath(inPath, conf),
HadoopReadOptions.builder(conf).build());
+
+ ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE;
+ writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf),
schema, writerMode,
+ DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT,
DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
+ DEFAULT_STATISTICS_TRUNCATE_LENGTH,
ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED,
+ options.getFileEncryptionProperties());
+ writer.start();
+ }
+
+ // Ctor for legacy CompressionConverter and ColumnMasker
+ public ParquetRewriter(TransParquetFileReader reader,
+ ParquetFileWriter writer,
+ ParquetMetadata meta,
+ MessageType schema,
+ String originalCreatedBy,
+ CompressionCodecName codecName,
+ List<String> maskColumns,
+ MaskMode maskMode) {
+ this.reader = reader;
+ this.writer = writer;
+ this.meta = meta;
+ this.schema = schema;
+ this.newCodecName = codecName;
+ originalCreatedBy = originalCreatedBy == null ?
meta.getFileMetaData().getCreatedBy() : originalCreatedBy;
+ extraMetaData.putAll(meta.getFileMetaData().getKeyValueMetaData());
+ extraMetaData.put(ORIGINAL_CREATED_BY_KEY, originalCreatedBy);
+ if (maskColumns != null && maskMode != null) {
+ this.maskColumns = new HashMap<>();
+ for (String col : maskColumns) {
+ this.maskColumns.put(ColumnPath.fromDotString(col), maskMode);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ writer.end(extraMetaData);
+ }
+
+ public void processBlocks() throws IOException {
+ PageReadStore store = reader.readNextRowGroup();
+ ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(store, new
DummyGroupConverter(), schema, originalCreatedBy);
+ Map<ColumnPath, ColumnDescriptor> descriptorsMap =
schema.getColumns().stream().collect(
+ Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+
+ int blockId = 0;
+ while (store != null) {
+ writer.startBlock(store.getRowCount());
+
+ BlockMetaData blockMetaData = meta.getBlocks().get(blockId);
+ List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();
+
+ for (int i = 0, columnId = 0; i < columnsInOrder.size(); i++) {
+ ColumnChunkMetaData chunk = columnsInOrder.get(i);
+ ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
+
+ // This column has been pruned.
+ if (descriptor == null) {
+ continue;
+ }
+
+ // If a column is encrypted, we simply throw exception.
+ // Later we can add a feature to trans-encrypt it with different keys
+ if (chunk.isEncrypted()) {
+ throw new IOException("Column " + chunk.getPath().toDotString() + "
is already encrypted");
+ }
+
+ reader.setStreamPosition(chunk.getStartingPos());
+ CompressionCodecName newCodecName = this.newCodecName == null ?
chunk.getCodec() : this.newCodecName;
+ ColumnChunkEncryptorRunTime columnChunkEncryptorRunTime = null;
+ boolean encryptColumn = false;
+ if (encryptMode) {
+ columnChunkEncryptorRunTime =
+ new ColumnChunkEncryptorRunTime(writer.getEncryptor(),
chunk, blockId, columnId);
+ encryptColumn = encryptColumns != null &&
encryptColumns.contains(chunk.getPath());
+ }
+
+ if (maskColumns != null && maskColumns.containsKey(chunk.getPath())) {
+ // Mask column and compress it again.
+ MaskMode maskMode = maskColumns.get(chunk.getPath());
+ if (maskMode.equals(MaskMode.NULLIFY)) {
+ Type.Repetition repetition =
descriptor.getPrimitiveType().getRepetition();
+ if (repetition.equals(Type.Repetition.REQUIRED)) {
+ throw new IOException(
+ "Required column [" +
descriptor.getPrimitiveType().getName() + "] cannot be nullified");
+ }
+ nullifyColumn(
+ descriptor,
+ chunk,
+ crStore,
+ writer,
+ schema,
+ newCodecName,
+ columnChunkEncryptorRunTime,
+ encryptColumn);
+ } else {
+ throw new UnsupportedOperationException("Only nullify is supported
for now");
+ }
+ } else if (encryptMode || this.newCodecName != null) {
+ // Translate compression and/or encryption
+ writer.startColumn(descriptor,
crStore.getColumnReader(descriptor).getTotalValueCount(), newCodecName);
+ processChunk(chunk, newCodecName, columnChunkEncryptorRunTime,
encryptColumn);
+ writer.endColumn();
+ } else {
+ // Nothing changed, simply copy the binary data.
+ BloomFilter bloomFilter = reader.readBloomFilter(chunk);
+ ColumnIndex columnIndex = reader.readColumnIndex(chunk);
+ OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
+ writer.appendColumnChunk(descriptor, reader.getStream(), chunk,
bloomFilter, columnIndex, offsetIndex);
+ }
+
+ columnId++;
+ }
+
+ writer.endBlock();
+ store = reader.readNextRowGroup();
+ blockId++;
+ }
+ }
+
+ private void processChunk(ColumnChunkMetaData chunk,
+ CompressionCodecName newCodecName,
+ ColumnChunkEncryptorRunTime
columnChunkEncryptorRunTime,
+ boolean encryptColumn) throws IOException {
+ CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0);
+ CompressionCodecFactory.BytesInputDecompressor decompressor = null;
+ CompressionCodecFactory.BytesInputCompressor compressor = null;
+ if (!newCodecName.equals(chunk.getCodec())) {
+ // Re-compress only if a different codec has been specified
+ decompressor = codecFactory.getDecompressor(chunk.getCodec());
+ compressor = codecFactory.getCompressor(newCodecName);
+ }
+
+ // EncryptorRunTime is only provided when encryption is required
+ BlockCipher.Encryptor metaEncryptor = null;
+ BlockCipher.Encryptor dataEncryptor = null;
+ byte[] dictPageAAD = null;
+ byte[] dataPageAAD = null;
+ byte[] dictPageHeaderAAD = null;
+ byte[] dataPageHeaderAAD = null;
+ if (columnChunkEncryptorRunTime != null) {
+ metaEncryptor = columnChunkEncryptorRunTime.getMetaDataEncryptor();
+ dataEncryptor = columnChunkEncryptorRunTime.getDataEncryptor();
+ dictPageAAD = columnChunkEncryptorRunTime.getDictPageAAD();
+ dataPageAAD = columnChunkEncryptorRunTime.getDataPageAAD();
+ dictPageHeaderAAD = columnChunkEncryptorRunTime.getDictPageHeaderAAD();
+ dataPageHeaderAAD = columnChunkEncryptorRunTime.getDataPageHeaderAAD();
+ }
+
+ ColumnIndex columnIndex = reader.readColumnIndex(chunk);
+ OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
+
+ reader.setStreamPosition(chunk.getStartingPos());
+ DictionaryPage dictionaryPage = null;
+ long readValues = 0;
+ Statistics statistics = null;
+ ParquetMetadataConverter converter = new ParquetMetadataConverter();
+ int pageOrdinal = 0;
+ long totalChunkValues = chunk.getValueCount();
+ while (readValues < totalChunkValues) {
+ PageHeader pageHeader = reader.readPageHeader();
+ int compressedPageSize = pageHeader.getCompressed_page_size();
+ byte[] pageLoad;
+ switch (pageHeader.type) {
+ case DICTIONARY_PAGE:
+ if (dictionaryPage != null) {
+ throw new IOException("has more than one dictionary page in column
chunk");
+ }
+ //No quickUpdatePageAAD needed for dictionary page
+ DictionaryPageHeader dictPageHeader =
pageHeader.dictionary_page_header;
+ pageLoad = processPageLoad(reader,
+ true,
+ compressor,
+ decompressor,
+ pageHeader.getCompressed_page_size(),
+ pageHeader.getUncompressed_page_size(),
+ encryptColumn,
+ dataEncryptor,
+ dictPageAAD);
+ writer.writeDictionaryPage(new
DictionaryPage(BytesInput.from(pageLoad),
+ pageHeader.getUncompressed_page_size(),
+ dictPageHeader.getNum_values(),
+ converter.getEncoding(dictPageHeader.getEncoding())),
+ metaEncryptor,
+ dictPageHeaderAAD);
+ break;
+ case DATA_PAGE:
+ if (encryptColumn) {
+ AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+ AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
+ }
+ DataPageHeader headerV1 = pageHeader.data_page_header;
+ pageLoad = processPageLoad(reader,
+ true,
+ compressor,
+ decompressor,
+ pageHeader.getCompressed_page_size(),
+ pageHeader.getUncompressed_page_size(),
+ encryptColumn,
+ dataEncryptor,
+ dataPageAAD);
+ statistics = convertStatistics(
+ originalCreatedBy, chunk.getPrimitiveType(),
headerV1.getStatistics(), columnIndex, pageOrdinal, converter);
+ readValues += headerV1.getNum_values();
+ if (offsetIndex != null) {
+ long rowCount = 1 + offsetIndex.getLastRowIndex(
+ pageOrdinal, totalChunkValues) -
offsetIndex.getFirstRowIndex(pageOrdinal);
+ writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()),
+ pageHeader.getUncompressed_page_size(),
+ BytesInput.from(pageLoad),
+ statistics,
+ toIntWithCheck(rowCount),
+
converter.getEncoding(headerV1.getRepetition_level_encoding()),
+
converter.getEncoding(headerV1.getDefinition_level_encoding()),
+ converter.getEncoding(headerV1.getEncoding()),
+ metaEncryptor,
+ dataPageHeaderAAD);
+ } else {
+ writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()),
+ pageHeader.getUncompressed_page_size(),
+ BytesInput.from(pageLoad),
+ statistics,
+
converter.getEncoding(headerV1.getRepetition_level_encoding()),
+
converter.getEncoding(headerV1.getDefinition_level_encoding()),
+ converter.getEncoding(headerV1.getEncoding()),
+ metaEncryptor,
+ dataPageHeaderAAD);
+ }
+ pageOrdinal++;
+ break;
+ case DATA_PAGE_V2:
+ if (encryptColumn) {
+ AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+ AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
+ }
+ DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2;
+ int rlLength = headerV2.getRepetition_levels_byte_length();
+ BytesInput rlLevels = readBlockAllocate(rlLength, reader);
+ int dlLength = headerV2.getDefinition_levels_byte_length();
+ BytesInput dlLevels = readBlockAllocate(dlLength, reader);
+ int payLoadLength = pageHeader.getCompressed_page_size() - rlLength
- dlLength;
+ int rawDataLength = pageHeader.getUncompressed_page_size() -
rlLength - dlLength;
+ pageLoad = processPageLoad(
+ reader,
+ headerV2.is_compressed,
+ compressor,
+ decompressor,
+ payLoadLength,
+ rawDataLength,
+ encryptColumn,
+ dataEncryptor,
+ dataPageAAD);
+ statistics = convertStatistics(
+ originalCreatedBy, chunk.getPrimitiveType(),
headerV2.getStatistics(), columnIndex, pageOrdinal, converter);
+ readValues += headerV2.getNum_values();
+ writer.writeDataPageV2(headerV2.getNum_rows(),
+ headerV2.getNum_nulls(),
+ headerV2.getNum_values(),
+ rlLevels,
+ dlLevels,
+ converter.getEncoding(headerV2.getEncoding()),
+ BytesInput.from(pageLoad),
+ rawDataLength,
+ statistics);
+ pageOrdinal++;
+ break;
+ default:
+ LOG.debug("skipping page of type {} of size {}",
pageHeader.getType(), compressedPageSize);
+ break;
+ }
+ }
+ }
+
+ private Statistics convertStatistics(String createdBy,
+ PrimitiveType type,
+ org.apache.parquet.format.Statistics
pageStatistics,
+ ColumnIndex columnIndex,
+ int pageIndex,
+ ParquetMetadataConverter converter)
throws IOException {
+ if (columnIndex != null) {
+ if (columnIndex.getNullPages() == null) {
+ throw new IOException("columnIndex has null variable 'nullPages' which
indicates corrupted data for type: " +
+ type.getName());
+ }
+ if (pageIndex > columnIndex.getNullPages().size()) {
+ throw new IOException("There are more pages " + pageIndex + " found in
the column than in the columnIndex " +
+ columnIndex.getNullPages().size());
+ }
+ org.apache.parquet.column.statistics.Statistics.Builder statsBuilder =
+
org.apache.parquet.column.statistics.Statistics.getBuilderForReading(type);
+ statsBuilder.withNumNulls(columnIndex.getNullCounts().get(pageIndex));
+
+ if (!columnIndex.getNullPages().get(pageIndex)) {
+
statsBuilder.withMin(columnIndex.getMinValues().get(pageIndex).array().clone());
+
statsBuilder.withMax(columnIndex.getMaxValues().get(pageIndex).array().clone());
+ }
+ return statsBuilder.build();
+ } else if (pageStatistics != null) {
+ return converter.fromParquetStatistics(createdBy, pageStatistics, type);
+ } else {
+ return null;
+ }
+ }
+
+ private byte[] processPageLoad(TransParquetFileReader reader,
+ boolean isCompressed,
+ CompressionCodecFactory.BytesInputCompressor
compressor,
+
CompressionCodecFactory.BytesInputDecompressor decompressor,
+ int payloadLength,
+ int rawDataLength,
+ boolean encrypt,
+ BlockCipher.Encryptor dataEncryptor,
+ byte[] AAD) throws IOException {
+ BytesInput data = readBlock(payloadLength, reader);
+
+ // recompress page load
+ if (compressor != null) {
+ if (isCompressed) {
+ data = decompressor.decompress(data, rawDataLength);
+ }
+ data = compressor.compress(data);
+ }
+
+ if (!encrypt) {
+ return data.toByteArray();
+ }
+
+ // encrypt page load
+ return dataEncryptor.encrypt(data.toByteArray(), AAD);
+ }
+
+ public BytesInput readBlock(int length, TransParquetFileReader reader)
throws IOException {
+ byte[] data;
+ if (length > pageBufferSize) {
+ data = new byte[length];
+ } else {
+ data = pageBuffer;
+ }
+ reader.blockRead(data, 0, length);
+ return BytesInput.from(data, 0, length);
+ }
+
+ public BytesInput readBlockAllocate(int length, TransParquetFileReader
reader) throws IOException {
+ byte[] data = new byte[length];
+ reader.blockRead(data, 0, length);
+ return BytesInput.from(data, 0, length);
+ }
+
+ private int toIntWithCheck(long size) {
+ if ((int) size != size) {
+ throw new ParquetEncodingException("size is bigger than " +
Integer.MAX_VALUE + " bytes: " + size);
+ }
+ return (int) size;
+ }
+
+ // We have to rewrite getPaths because MessageType only get level 0 paths
+ private void getPaths(GroupType schema, List<String> paths, String parent) {
+ List<Type> fields = schema.getFields();
+ String prefix = (parent == null) ? "" : parent + ".";
+ for (Type field : fields) {
+ paths.add(prefix + field.getName());
+ if (field instanceof GroupType) {
+ getPaths(field.asGroupType(), paths, prefix + field.getName());
+ }
+ }
+ }
+
+ private MessageType pruneColumnsInSchema(MessageType schema, Set<ColumnPath>
prunePaths) {
+ List<Type> fields = schema.getFields();
+ List<String> currentPath = new ArrayList<>();
+ List<Type> prunedFields = pruneColumnsInFields(fields, currentPath,
prunePaths);
+ MessageType newSchema = new MessageType(schema.getName(), prunedFields);
+ return newSchema;
+ }
+
+ private List<Type> pruneColumnsInFields(List<Type> fields, List<String>
currentPath, Set<ColumnPath> prunePaths) {
+ List<Type> prunedFields = new ArrayList<>();
+ for (Type childField : fields) {
+ Type prunedChildField = pruneColumnsInField(childField, currentPath,
prunePaths);
+ if (prunedChildField != null) {
+ prunedFields.add(prunedChildField);
+ }
+ }
+ return prunedFields;
+ }
+
+ private Type pruneColumnsInField(Type field, List<String> currentPath,
Set<ColumnPath> prunePaths) {
+ String fieldName = field.getName();
+ currentPath.add(fieldName);
+ ColumnPath path = ColumnPath.get(currentPath.toArray(new String[0]));
+ Type prunedField = null;
+ if (!prunePaths.contains(path)) {
+ if (field.isPrimitive()) {
+ prunedField = field;
+ } else {
+ List<Type> childFields = ((GroupType) field).getFields();
+ List<Type> prunedFields = pruneColumnsInFields(childFields,
currentPath, prunePaths);
+ if (prunedFields.size() > 0) {
+ prunedField = ((GroupType) field).withNewFields(prunedFields);
+ }
+ }
+ }
+
+ currentPath.remove(currentPath.size() - 1);
+ return prunedField;
+ }
+
+ private Set<ColumnPath> convertToColumnPaths(List<String> cols) {
+ Set<ColumnPath> prunePaths = new HashSet<>();
+ for (String col : cols) {
+ prunePaths.add(ColumnPath.fromDotString(col));
+ }
+ return prunePaths;
+ }
+
+ private void nullifyColumn(ColumnDescriptor descriptor,
+ ColumnChunkMetaData chunk,
+ ColumnReadStoreImpl crStore,
+ ParquetFileWriter writer,
+ MessageType schema,
+ CompressionCodecName newCodecName,
+ ColumnChunkEncryptorRunTime
columnChunkEncryptorRunTime,
+ boolean encryptColumn) throws IOException {
+ // TODO: support nullifying and encrypting same column.
+ if (columnChunkEncryptorRunTime != null) {
+ throw new RuntimeException("Nullifying and encrypting column is not
implemented yet");
+ }
+ long totalChunkValues = chunk.getValueCount();
+ int dMax = descriptor.getMaxDefinitionLevel();
+ ColumnReader cReader = crStore.getColumnReader(descriptor);
+
+ ParquetProperties.WriterVersion writerVersion =
chunk.getEncodingStats().usesV2Pages() ?
+ ParquetProperties.WriterVersion.PARQUET_2_0 :
ParquetProperties.WriterVersion.PARQUET_1_0;
+ ParquetProperties props = ParquetProperties.builder()
+ .withWriterVersion(writerVersion)
+ .build();
+ CodecFactory codecFactory = new CodecFactory(new Configuration(),
props.getPageSizeThreshold());
+ CodecFactory.BytesCompressor compressor =
codecFactory.getCompressor(newCodecName);
+
+ // Create new schema that only has the current column
+ MessageType newSchema = newSchema(schema, descriptor);
+ ColumnChunkPageWriteStore cPageStore = new ColumnChunkPageWriteStore(
+ compressor, newSchema, props.getAllocator(),
props.getColumnIndexTruncateLength());
+ ColumnWriteStore cStore = props.newColumnWriteStore(newSchema, cPageStore);
+ ColumnWriter cWriter = cStore.getColumnWriter(descriptor);
+
+ for (int i = 0; i < totalChunkValues; i++) {
+ int rlvl = cReader.getCurrentRepetitionLevel();
+ int dlvl = cReader.getCurrentDefinitionLevel();
+ if (dlvl == dMax) {
+ // since we checked ether optional or repeated, dlvl should be > 0
+ if (dlvl == 0) {
+ throw new IOException("definition level is detected to be 0 for
column " +
+ chunk.getPath().toDotString() + " to be nullified");
+ }
+ // we just write one null for the whole list at the top level,
+ // instead of nullify the elements in the list one by one
+ if (rlvl == 0) {
+ cWriter.writeNull(rlvl, dlvl - 1);
+ }
+ } else {
+ cWriter.writeNull(rlvl, dlvl);
+ }
+ cStore.endRecord();
+ }
+
+ cStore.flush();
+ cPageStore.flushToFileWriter(writer);
+
+ cStore.close();
+ cWriter.close();
+ }
+
+ private MessageType newSchema(MessageType schema, ColumnDescriptor
descriptor) {
+ String[] path = descriptor.getPath();
+ Type type = schema.getType(path);
+ if (path.length == 1) {
+ return new MessageType(schema.getName(), type);
+ }
+
+ for (Type field : schema.getFields()) {
+ if (!field.isPrimitive()) {
+ Type newType = extractField(field.asGroupType(), type);
+ if (newType != null) {
+ return new MessageType(schema.getName(), newType);
+ }
+ }
+ }
+
+ // We should never hit this because 'type' is returned by schema.getType().
+ throw new RuntimeException("No field is found");
+ }
+
+ private Type extractField(GroupType candidate, Type targetField) {
+ if (targetField.equals(candidate)) {
+ return targetField;
+ }
+
+ // In case 'type' is a descendants of candidate
+ for (Type field : candidate.asGroupType().getFields()) {
+ if (field.isPrimitive()) {
+ if (field.equals(targetField)) {
+ return new GroupType(candidate.getRepetition(), candidate.getName(),
targetField);
+ }
+ } else {
+ Type tempField = extractField(field.asGroupType(), targetField);
+ if (tempField != null) {
+ return tempField;
+ }
+ }
+ }
+
+ return null;
+ }
+
+ private static final class DummyGroupConverter extends GroupConverter {
+ @Override
+ public void start() {
+ }
+
+ @Override
+ public void end() {
+ }
+
+ @Override
+ public Converter getConverter(int fieldIndex) {
+ return new DummyConverter();
+ }
+ }
+
+ private static final class DummyConverter extends PrimitiveConverter {
+ @Override
+ public GroupConverter asGroupConverter() {
+ return new DummyGroupConverter();
+ }
+ }
+
+ private static class ColumnChunkEncryptorRunTime {
+ private final InternalColumnEncryptionSetup colEncrSetup;
+ private final BlockCipher.Encryptor dataEncryptor;
+ private final BlockCipher.Encryptor metaDataEncryptor;
+ private final byte[] fileAAD;
+
+ private byte[] dataPageHeaderAAD;
+ private byte[] dataPageAAD;
+ private byte[] dictPageHeaderAAD;
+ private byte[] dictPageAAD;
+
+ public ColumnChunkEncryptorRunTime(InternalFileEncryptor fileEncryptor,
+ ColumnChunkMetaData chunk,
+ int blockId,
+ int columnId) throws IOException {
+ Preconditions.checkArgument(fileEncryptor != null,
+ "FileEncryptor is required to create
ColumnChunkEncryptorRunTime");
+
+ this.colEncrSetup = fileEncryptor.getColumnSetup(chunk.getPath(), true,
columnId);
+ this.dataEncryptor = colEncrSetup.getDataEncryptor();
+ this.metaDataEncryptor = colEncrSetup.getMetaDataEncryptor();
+
+ this.fileAAD = fileEncryptor.getFileAAD();
+ if (colEncrSetup != null && colEncrSetup.isEncrypted()) {
+ this.dataPageHeaderAAD = createAAD(ModuleType.DataPageHeader, blockId,
columnId);
+ this.dataPageAAD = createAAD(ModuleType.DataPage, blockId, columnId);
+ this.dictPageHeaderAAD = createAAD(ModuleType.DictionaryPageHeader,
blockId, columnId);
+ this.dictPageAAD = createAAD(ModuleType.DictionaryPage, blockId,
columnId);
+ } else {
+ this.dataPageHeaderAAD = null;
+ this.dataPageAAD = null;
+ this.dictPageHeaderAAD = null;
+ this.dictPageAAD = null;
+ }
+ }
+
+ private byte[] createAAD(ModuleType moduleType, int blockId, int columnId)
{
+ return AesCipher.createModuleAAD(fileAAD, moduleType, blockId, columnId,
0);
+ }
+
+ public BlockCipher.Encryptor getDataEncryptor() {
+ return this.dataEncryptor;
+ }
+
+ public BlockCipher.Encryptor getMetaDataEncryptor() {
+ return this.metaDataEncryptor;
+ }
+
+ public byte[] getDataPageHeaderAAD() {
+ return this.dataPageHeaderAAD;
+ }
+
+ public byte[] getDataPageAAD() {
+ return this.dataPageAAD;
+ }
+
+ public byte[] getDictPageHeaderAAD() {
+ return this.dictPageHeaderAAD;
+ }
+
+ public byte[] getDictPageAAD() {
+ return this.dictPageAAD;
+ }
+ }
+
+}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
new file mode 100644
index 000000000..a11c5a61f
--- /dev/null
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.rewrite;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+// A set of options to create a ParquetRewriter.
+public class RewriteOptions {
+
+ final Configuration conf;
+ final List<Path> inputFiles;
+ final Path outputFile;
+ final List<String> pruneColumns;
+ final CompressionCodecName newCodecName;
+ final Map<String, MaskMode> maskColumns;
+ final List<String> encryptColumns;
+ final FileEncryptionProperties fileEncryptionProperties;
+
+ private RewriteOptions(Configuration conf,
+ List<Path> inputFiles,
+ Path outputFile,
+ List<String> pruneColumns,
+ CompressionCodecName newCodecName,
+ Map<String, MaskMode> maskColumns,
+ List<String> encryptColumns,
+ FileEncryptionProperties fileEncryptionProperties) {
+ this.conf = conf;
+ this.inputFiles = inputFiles;
+ this.outputFile = outputFile;
+ this.pruneColumns = pruneColumns;
+ this.newCodecName = newCodecName;
+ this.maskColumns = maskColumns;
+ this.encryptColumns = encryptColumns;
+ this.fileEncryptionProperties = fileEncryptionProperties;
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public List<Path> getInputFiles() {
+ return inputFiles;
+ }
+
+ public Path getOutputFile() {
+ return outputFile;
+ }
+
+ public List<String> getPruneColumns() {
+ return pruneColumns;
+ }
+
+ public CompressionCodecName getNewCodecName() {
+ return newCodecName;
+ }
+
+ public Map<String, MaskMode> getMaskColumns() {
+ return maskColumns;
+ }
+
+ public List<String> getEncryptColumns() {
+ return encryptColumns;
+ }
+
+ public FileEncryptionProperties getFileEncryptionProperties() {
+ return fileEncryptionProperties;
+ }
+
+ // Builder to create a RewriterOptions.
+ public static class Builder {
+ private Configuration conf;
+ private List<Path> inputFiles;
+ private Path outputFile;
+ private List<String> pruneColumns;
+ private CompressionCodecName newCodecName;
+ private Map<String, MaskMode> maskColumns;
+ private List<String> encryptColumns;
+ private FileEncryptionProperties fileEncryptionProperties;
+
+ public Builder(Configuration conf, Path inputFile, Path outputFile) {
+ this.conf = conf;
+ this.inputFiles = Arrays.asList(inputFile);
+ this.outputFile = outputFile;
+ }
+
+ public Builder prune(List<String> columns) {
+ this.pruneColumns = columns;
+ return this;
+ }
+
+ public Builder transform(CompressionCodecName newCodecName) {
+ this.newCodecName = newCodecName;
+ return this;
+ }
+
+ public Builder mask(Map<String, MaskMode> maskColumns) {
+ this.maskColumns = maskColumns;
+ return this;
+ }
+
+ public Builder encrypt(List<String> encryptColumns) {
+ this.encryptColumns = encryptColumns;
+ return this;
+ }
+
+ public Builder encryptionProperties(FileEncryptionProperties
fileEncryptionProperties) {
+ this.fileEncryptionProperties = fileEncryptionProperties;
+ return this;
+ }
+
+ public RewriteOptions build() {
+ Preconditions.checkArgument(inputFiles != null && !inputFiles.isEmpty(),
"Input file is required");
+ Preconditions.checkArgument(outputFile != null, "Output file is
required");
+
+ if (pruneColumns != null) {
+ if (maskColumns != null) {
+ for (String pruneColumn : pruneColumns) {
+ Preconditions.checkArgument(!maskColumns.containsKey(pruneColumn),
+ "Cannot prune and mask same column");
+ }
+ }
+
+ if (encryptColumns != null) {
+ for (String pruneColumn : pruneColumns) {
+ Preconditions.checkArgument(!encryptColumns.contains(pruneColumn),
+ "Cannot prune and encrypt same column");
+ }
+ }
+ }
+
+ // TODO: support masking and encrypting same columns
+ if (maskColumns != null && encryptColumns != null) {
+ for (String encryptColumn : encryptColumns) {
+ Preconditions.checkArgument(!maskColumns.containsKey(encryptColumn),
+ "Cannot encrypt and mask same column");
+ }
+ }
+
+ if (encryptColumns != null && !encryptColumns.isEmpty()) {
+ Preconditions.checkArgument(fileEncryptionProperties != null,
+ "FileEncryptionProperties is required when encrypting
columns");
+ }
+
+ if (fileEncryptionProperties != null) {
+ Preconditions.checkArgument(encryptColumns != null &&
!encryptColumns.isEmpty(),
+ "Encrypt columns is required when FileEncryptionProperties is
set");
+ }
+
+ return new RewriteOptions(conf,
+ inputFiles,
+ outputFile,
+ pruneColumns,
+ newCodecName,
+ maskColumns,
+ encryptColumns,
+ fileEncryptionProperties);
+ }
+ }
+
+}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnEncryptor.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnEncryptor.java
index 32ac49a22..48381fdbb 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnEncryptor.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnEncryptor.java
@@ -20,44 +20,17 @@ package org.apache.parquet.hadoop.util;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.parquet.HadoopReadOptions;
import org.apache.parquet.bytes.BytesInput;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.column.page.DictionaryPage;
-import org.apache.parquet.column.page.PageReadStore;
-import org.apache.parquet.crypto.AesCipher;
import org.apache.parquet.crypto.FileEncryptionProperties;
-import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
-import org.apache.parquet.crypto.InternalFileEncryptor;
-import org.apache.parquet.format.BlockCipher;
-import org.apache.parquet.format.DataPageHeader;
-import org.apache.parquet.format.DataPageHeaderV2;
-import org.apache.parquet.format.DictionaryPageHeader;
-import org.apache.parquet.format.PageHeader;
-import org.apache.parquet.format.converter.ParquetMetadataConverter;
-import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.ParquetFileWriter;
-import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath;
-import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.rewrite.ParquetRewriter;
+import org.apache.parquet.hadoop.rewrite.RewriteOptions;
import
org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
-import org.apache.parquet.internal.column.columnindex.OffsetIndex;
-import org.apache.parquet.schema.MessageType;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
-
-import static
org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
-import static
org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH;
-import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
-import static
org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
-import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
-import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
/**
* This class is for fast rewriting existing file with column encryption
@@ -67,74 +40,8 @@ import static
org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
*
* For columns not to be encrypted, the whole column chunk will be appended
directly to writer.
*/
+@Deprecated
public class ColumnEncryptor {
- private static class EncryptorRunTime {
- private final InternalColumnEncryptionSetup colEncrSetup;
- private final BlockCipher.Encryptor dataEncryptor;
- private final BlockCipher.Encryptor metaDataEncryptor;
- private final byte[] fileAAD ;
-
- private byte[] dataPageHeaderAAD;
- private byte[] dataPageAAD;
- private byte[] dictPageHeaderAAD;
- private byte[] dictPageAAD;
-
- public EncryptorRunTime(InternalFileEncryptor fileEncryptor,
ColumnChunkMetaData chunk,
- int blockId, int columnId) throws IOException {
- if (fileEncryptor == null) {
- this.colEncrSetup = null;
- this.dataEncryptor = null;
- this.metaDataEncryptor = null;
-
- this.fileAAD = null;
- this.dataPageHeaderAAD = null;
- this.dataPageAAD = null;
- this.dictPageHeaderAAD = null;
- this.dictPageAAD = null;
- } else {
- this.colEncrSetup = fileEncryptor.getColumnSetup(chunk.getPath(),
true, columnId);
- this.dataEncryptor = colEncrSetup.getDataEncryptor();
- this.metaDataEncryptor = colEncrSetup.getMetaDataEncryptor();
-
- this.fileAAD = fileEncryptor.getFileAAD();
- this.dataPageHeaderAAD = createAAD(colEncrSetup,
ModuleType.DataPageHeader, blockId, columnId);
- this.dataPageAAD = createAAD(colEncrSetup, ModuleType.DataPage,
blockId, columnId);
- this.dictPageHeaderAAD = createAAD(colEncrSetup,
ModuleType.DictionaryPageHeader, blockId, columnId);
- this.dictPageAAD = createAAD(colEncrSetup, ModuleType.DictionaryPage,
blockId, columnId);
- }
- }
-
- private byte[] createAAD(InternalColumnEncryptionSetup colEncrSetup,
ModuleType moduleType, int blockId, int columnId) {
- if (colEncrSetup != null && colEncrSetup.isEncrypted()) {
- return AesCipher.createModuleAAD(fileAAD, moduleType, blockId,
columnId, 0);
- }
- return null;
- }
-
- public BlockCipher.Encryptor getDataEncryptor() {
- return this.dataEncryptor;
- }
-
- public BlockCipher.Encryptor getMetaDataEncryptor() {
- return this.metaDataEncryptor;
- }
-
- public byte[] getDataPageHeaderAAD() {
- return this.dataPageHeaderAAD;
- }
-
- public byte[] getDataPageAAD() {
- return this.dataPageAAD;
- }
-
- public byte[] getDictPageHeaderAAD() {
- return this.dictPageHeaderAAD;
- }
-
- public byte[] getDictPageAAD() {
- return this.dictPageAAD;
- }
- }
private Configuration conf;
@@ -154,157 +61,11 @@ public class ColumnEncryptor {
public void encryptColumns(String inputFile, String outputFile, List<String>
paths, FileEncryptionProperties fileEncryptionProperties) throws IOException {
Path inPath = new Path(inputFile);
Path outPath = new Path(outputFile);
-
- ParquetMetadata metaData = ParquetFileReader.readFooter(conf, inPath,
NO_FILTER);
- MessageType schema = metaData.getFileMetaData().getSchema();
-
- ParquetFileWriter writer = new
ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf), schema,
ParquetFileWriter.Mode.OVERWRITE,
- DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT,
DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, DEFAULT_STATISTICS_TRUNCATE_LENGTH,
- ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED,
fileEncryptionProperties);
- writer.start();
-
- try (TransParquetFileReader reader = new
TransParquetFileReader(HadoopInputFile.fromPath(inPath, conf),
HadoopReadOptions.builder(conf).build())) {
- processBlocks(reader, writer, metaData, schema, paths);
- }
- writer.end(metaData.getFileMetaData().getKeyValueMetaData());
- }
-
- private void processBlocks(TransParquetFileReader reader, ParquetFileWriter
writer, ParquetMetadata meta,
- MessageType schema, List<String> encryptPaths)
throws IOException {
- Set<ColumnPath> encryptColumnsPath = convertToColumnPaths(encryptPaths);
- int blockId = 0;
- PageReadStore store = reader.readNextRowGroup();
-
- while (store != null) {
- writer.startBlock(store.getRowCount());
-
- List<ColumnChunkMetaData> columnsInOrder =
meta.getBlocks().get(blockId).getColumns();
- Map<ColumnPath, ColumnDescriptor> descriptorsMap =
schema.getColumns().stream().collect(
- Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
-
- for (int i = 0; i < columnsInOrder.size(); i += 1) {
- ColumnChunkMetaData chunk = columnsInOrder.get(i);
- // If a column is encrypted, we simply throw exception.
- // Later we can add a feature to trans-encrypt it with different keys
- if (chunk.isEncrypted()) {
- throw new IOException("Column " + chunk.getPath().toDotString() + "
is already encrypted");
- }
- ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
- processChunk(descriptor, chunk, reader, writer, encryptColumnsPath,
blockId, i, meta.getFileMetaData().getCreatedBy());
- }
-
- writer.endBlock();
- store = reader.readNextRowGroup();
- blockId++;
- }
- }
-
- private void processChunk(ColumnDescriptor descriptor, ColumnChunkMetaData
chunk, TransParquetFileReader reader, ParquetFileWriter writer,
- Set<ColumnPath> encryptPaths, int blockId, int
columnId, String createdBy) throws IOException {
- reader.setStreamPosition(chunk.getStartingPos());
- writer.startColumn(descriptor, chunk.getValueCount(), chunk.getCodec());
- processPages(reader, chunk, writer, createdBy, blockId, columnId,
encryptPaths.contains(chunk.getPath()));
- writer.endColumn();
- }
-
- private void processPages(TransParquetFileReader reader, ColumnChunkMetaData
chunk, ParquetFileWriter writer,
- String createdBy, int blockId, int columnId,
boolean encrypt) throws IOException {
- int pageOrdinal = 0;
- EncryptorRunTime encryptorRunTime = new
EncryptorRunTime(writer.getEncryptor(), chunk, blockId, columnId);
- DictionaryPage dictionaryPage = null;
- long readValues = 0;
- ParquetMetadataConverter converter = new ParquetMetadataConverter();
- OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
- reader.setStreamPosition(chunk.getStartingPos());
- long totalChunkValues = chunk.getValueCount();
- while (readValues < totalChunkValues) {
- PageHeader pageHeader = reader.readPageHeader();
- byte[] pageLoad;
- switch (pageHeader.type) {
- case DICTIONARY_PAGE:
- if (dictionaryPage != null) {
- throw new IOException("has more than one dictionary page in column
chunk");
- }
- //No quickUpdatePageAAD needed for dictionary page
- DictionaryPageHeader dictPageHeader =
pageHeader.dictionary_page_header;
- pageLoad = processPayload(reader,
pageHeader.getCompressed_page_size(), encryptorRunTime.getDataEncryptor(),
encryptorRunTime.getDictPageAAD(), encrypt);
- writer.writeDictionaryPage(new
DictionaryPage(BytesInput.from(pageLoad),
- pageHeader.getUncompressed_page_size(),
- dictPageHeader.getNum_values(),
-
converter.getEncoding(dictPageHeader.getEncoding())),
- encryptorRunTime.getMetaDataEncryptor(),
encryptorRunTime.getDictPageHeaderAAD());
- break;
- case DATA_PAGE:
- if (encrypt) {
-
AesCipher.quickUpdatePageAAD(encryptorRunTime.getDataPageHeaderAAD(),
pageOrdinal);
- AesCipher.quickUpdatePageAAD(encryptorRunTime.getDataPageAAD(),
pageOrdinal);
- }
- DataPageHeader headerV1 = pageHeader.data_page_header;
- pageLoad = processPayload(reader,
pageHeader.getCompressed_page_size(), encryptorRunTime.getDataEncryptor(),
encryptorRunTime.getDataPageAAD(), encrypt);
- readValues += headerV1.getNum_values();
- if (offsetIndex != null) {
- long rowCount = 1 + offsetIndex.getLastRowIndex(pageOrdinal,
totalChunkValues) - offsetIndex.getFirstRowIndex(pageOrdinal);
- writer.writeDataPage(Math.toIntExact(headerV1.getNum_values()),
- pageHeader.getUncompressed_page_size(),
- BytesInput.from(pageLoad),
- converter.fromParquetStatistics(createdBy,
headerV1.getStatistics(), chunk.getPrimitiveType()),
- rowCount,
- converter.getEncoding(headerV1.getRepetition_level_encoding()),
- converter.getEncoding(headerV1.getDefinition_level_encoding()),
- converter.getEncoding(headerV1.getEncoding()),
- encryptorRunTime.getMetaDataEncryptor(),
- encryptorRunTime.getDataPageHeaderAAD());
- } else {
- writer.writeDataPage(Math.toIntExact(headerV1.getNum_values()),
- pageHeader.getUncompressed_page_size(),
- BytesInput.from(pageLoad),
- converter.fromParquetStatistics(createdBy,
headerV1.getStatistics(), chunk.getPrimitiveType()),
- converter.getEncoding(headerV1.getRepetition_level_encoding()),
- converter.getEncoding(headerV1.getDefinition_level_encoding()),
- converter.getEncoding(headerV1.getEncoding()),
- encryptorRunTime.getMetaDataEncryptor(),
- encryptorRunTime.getDataPageHeaderAAD());
- }
- pageOrdinal++;
- break;
- case DATA_PAGE_V2:
- if (encrypt) {
-
AesCipher.quickUpdatePageAAD(encryptorRunTime.getDataPageHeaderAAD(),
pageOrdinal);
- AesCipher.quickUpdatePageAAD(encryptorRunTime.getDataPageAAD(),
pageOrdinal);
- }
- DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2;
- int rlLength = headerV2.getRepetition_levels_byte_length();
- BytesInput rlLevels = readBlockAllocate(rlLength, reader);
- int dlLength = headerV2.getDefinition_levels_byte_length();
- BytesInput dlLevels = readBlockAllocate(dlLength, reader);
- int payLoadLength = pageHeader.getCompressed_page_size() - rlLength
- dlLength;
- int rawDataLength = pageHeader.getUncompressed_page_size() -
rlLength - dlLength;
- pageLoad = processPayload(reader, payLoadLength,
encryptorRunTime.getDataEncryptor(), encryptorRunTime.getDataPageAAD(),
encrypt);
- readValues += headerV2.getNum_values();
- writer.writeDataPageV2(headerV2.getNum_rows(),
- headerV2.getNum_nulls(),
- headerV2.getNum_values(),
- rlLevels,
- dlLevels,
- converter.getEncoding(headerV2.getEncoding()),
- BytesInput.from(pageLoad),
- rawDataLength,
- converter.fromParquetStatistics(createdBy,
headerV2.getStatistics(), chunk.getPrimitiveType()));
- pageOrdinal++;
- break;
- default:
- break;
- }
- }
- }
-
- private byte[] processPayload(TransParquetFileReader reader, int
payloadLength, BlockCipher.Encryptor dataEncryptor,
- byte[] AAD, boolean encrypt) throws
IOException {
- byte[] data = readBlock(payloadLength, reader);
- if (!encrypt) {
- return data;
- }
- return dataEncryptor.encrypt(data, AAD);
+ RewriteOptions options = new RewriteOptions.Builder(conf, inPath, outPath).
+
encrypt(paths).encryptionProperties(fileEncryptionProperties).build();
+ ParquetRewriter rewriter = new ParquetRewriter(options);
+ rewriter.processBlocks();
+ rewriter.close();
}
public byte[] readBlock(int length, TransParquetFileReader reader) throws
IOException {
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnMasker.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnMasker.java
index 74bc534f8..d3e29292a 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnMasker.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnMasker.java
@@ -18,39 +18,19 @@
*/
package org.apache.parquet.hadoop.util;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.ColumnReader;
-import org.apache.parquet.column.ColumnWriteStore;
-import org.apache.parquet.column.ColumnWriter;
-import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.column.ParquetProperties.WriterVersion;
-import org.apache.parquet.column.impl.ColumnReadStoreImpl;
-import org.apache.parquet.column.page.PageReadStore;
-import org.apache.parquet.column.values.bloomfilter.BloomFilter;
-import org.apache.parquet.hadoop.CodecFactory;
-import org.apache.parquet.hadoop.ColumnChunkPageWriteStore;
import org.apache.parquet.hadoop.ParquetFileWriter;
-import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.rewrite.ParquetRewriter;
import
org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
-import org.apache.parquet.internal.column.columnindex.ColumnIndex;
-import org.apache.parquet.internal.column.columnindex.OffsetIndex;
-import org.apache.parquet.io.api.Converter;
-import org.apache.parquet.io.api.GroupConverter;
-import org.apache.parquet.io.api.PrimitiveConverter;
-import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.Type;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
+@Deprecated
public class ColumnMasker {
/**
*
@@ -64,137 +44,22 @@ public class ColumnMasker {
*/
public void processBlocks(TransParquetFileReader reader, ParquetFileWriter
writer, ParquetMetadata meta,
MessageType schema, List<String> paths, MaskMode
maskMode) throws IOException {
- Set<ColumnPath> nullifyColumns = convertToColumnPaths(paths);
- int blockIndex = 0;
- PageReadStore store = reader.readNextRowGroup();
-
- while (store != null) {
- writer.startBlock(store.getRowCount());
- List<ColumnChunkMetaData> columnsInOrder =
meta.getBlocks().get(blockIndex).getColumns();
- Map<ColumnPath, ColumnDescriptor> descriptorsMap =
schema.getColumns().stream().collect(
- Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
- ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(store, new
DummyGroupConverter(), schema,
- meta.getFileMetaData().getCreatedBy());
-
- for (int i = 0; i < columnsInOrder.size(); i += 1) {
- ColumnChunkMetaData chunk = columnsInOrder.get(i);
- ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
- processChunk(descriptor, chunk, crStore, reader, writer, schema,
nullifyColumns, maskMode);
- }
-
- writer.endBlock();
- store = reader.readNextRowGroup();
- blockIndex++;
- }
- }
-
- private void processChunk(ColumnDescriptor descriptor, ColumnChunkMetaData
chunk, ColumnReadStoreImpl crStore,
- TransParquetFileReader reader, ParquetFileWriter
writer, MessageType schema,
- Set<ColumnPath> paths, MaskMode maskMode) throws
IOException {
- reader.setStreamPosition(chunk.getStartingPos());
-
- if (paths.contains(chunk.getPath())) {
- if (maskMode.equals(MaskMode.NULLIFY)) {
- Type.Repetition repetition =
descriptor.getPrimitiveType().getRepetition();
- if (repetition.equals(Type.Repetition.REQUIRED)) {
- throw new IOException("Required column [" +
descriptor.getPrimitiveType().getName() + "] cannot be nullified");
- }
- nullifyColumn(descriptor, chunk, crStore, writer, schema);
- } else {
- throw new UnsupportedOperationException("Only nullify is supported for
now");
- }
- } else {
- BloomFilter bloomFilter = reader.readBloomFilter(chunk);
- ColumnIndex columnIndex = reader.readColumnIndex(chunk);
- OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
- writer.appendColumnChunk(descriptor, reader.getStream(), chunk,
bloomFilter, columnIndex, offsetIndex);
- }
+ ParquetRewriter rewriter = new ParquetRewriter(
+ reader, writer, meta, schema, null, null, paths,
convertMaskMode(maskMode));
+ rewriter.processBlocks();
}
- private void nullifyColumn(ColumnDescriptor descriptor, ColumnChunkMetaData
chunk, ColumnReadStoreImpl crStore,
- ParquetFileWriter writer, MessageType schema)
throws IOException {
- long totalChunkValues = chunk.getValueCount();
- int dMax = descriptor.getMaxDefinitionLevel();
- ColumnReader cReader = crStore.getColumnReader(descriptor);
-
- WriterVersion writerVersion = chunk.getEncodingStats().usesV2Pages() ?
WriterVersion.PARQUET_2_0 : WriterVersion.PARQUET_1_0;
- ParquetProperties props = ParquetProperties.builder()
- .withWriterVersion(writerVersion)
- .build();
- CodecFactory codecFactory = new CodecFactory(new Configuration(),
props.getPageSizeThreshold());
- CodecFactory.BytesCompressor compressor =
codecFactory.getCompressor(chunk.getCodec());
-
- // Create new schema that only has the current column
- MessageType newSchema = newSchema(schema, descriptor);
- ColumnChunkPageWriteStore cPageStore = new
ColumnChunkPageWriteStore(compressor, newSchema, props.getAllocator(),
props.getColumnIndexTruncateLength());
- ColumnWriteStore cStore = props.newColumnWriteStore(newSchema, cPageStore);
- ColumnWriter cWriter = cStore.getColumnWriter(descriptor);
-
- for (int i = 0; i < totalChunkValues; i++) {
- int rlvl = cReader.getCurrentRepetitionLevel();
- int dlvl = cReader.getCurrentDefinitionLevel();
- if (dlvl == dMax) {
- // since we checked ether optional or repeated, dlvl should be > 0
- if (dlvl == 0) {
- throw new IOException("definition level is detected to be 0 for
column " + chunk.getPath().toDotString() + " to be nullified");
- }
- // we just write one null for the whole list at the top level, instead
of nullify the elements in the list one by one
- if (rlvl == 0) {
- cWriter.writeNull(rlvl, dlvl - 1);
- }
- } else {
- cWriter.writeNull(rlvl, dlvl);
- }
- cStore.endRecord();
+ org.apache.parquet.hadoop.rewrite.MaskMode convertMaskMode(MaskMode
maskMode) {
+ switch (maskMode) {
+ case NULLIFY:
+ return org.apache.parquet.hadoop.rewrite.MaskMode.NULLIFY;
+ case HASH:
+ return org.apache.parquet.hadoop.rewrite.MaskMode.HASH;
+ case REDACT:
+ return org.apache.parquet.hadoop.rewrite.MaskMode.REDACT;
+ default:
+ return null;
}
-
- cStore.flush();
- cPageStore.flushToFileWriter(writer);
-
- cStore.close();
- cWriter.close();
- }
-
- private MessageType newSchema(MessageType schema, ColumnDescriptor
descriptor) {
- String[] path = descriptor.getPath();
- Type type = schema.getType(path);
- if (path.length == 1) {
- return new MessageType(schema.getName(), type);
- }
-
- for (Type field : schema.getFields()) {
- if (!field.isPrimitive()) {
- Type newType = extractField(field.asGroupType(), type);
- if (newType != null) {
- return new MessageType(schema.getName(), newType);
- }
- }
- }
-
- // We should never hit this because 'type' is returned by schema.getType().
- throw new RuntimeException("No field is found");
- }
-
- private Type extractField(GroupType candidate, Type targetField) {
- if (targetField.equals(candidate)) {
- return targetField;
- }
-
- // In case 'type' is a descendants of candidate
- for (Type field : candidate.asGroupType().getFields()) {
- if (field.isPrimitive()) {
- if (field.equals(targetField)) {
- return new GroupType(candidate.getRepetition(), candidate.getName(),
targetField);
- }
- } else {
- Type tempField = extractField(field.asGroupType(), targetField);
- if (tempField != null) {
- return tempField;
- }
- }
- }
-
- return null;
}
public static Set<ColumnPath> convertToColumnPaths(List<String> cols) {
@@ -230,13 +95,4 @@ public class ColumnMasker {
}
}
- private static final class DummyGroupConverter extends GroupConverter {
- @Override public void start() {}
- @Override public void end() {}
- @Override public Converter getConverter(int fieldIndex) { return new
DummyConverter(); }
- }
-
- private static final class DummyConverter extends PrimitiveConverter {
- @Override public GroupConverter asGroupConverter() { return new
DummyGroupConverter(); }
- }
}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnPruner.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnPruner.java
index 7ca958872..e36d215ff 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnPruner.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnPruner.java
@@ -20,107 +20,23 @@ package org.apache.parquet.hadoop.util;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.parquet.format.converter.ParquetMetadataConverter;
-import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.ParquetFileWriter;
-import org.apache.parquet.hadoop.metadata.ColumnPath;
-import org.apache.parquet.hadoop.metadata.FileMetaData;
-import org.apache.parquet.hadoop.metadata.ParquetMetadata;
-import org.apache.parquet.schema.GroupType;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.Type;
+import org.apache.parquet.hadoop.rewrite.ParquetRewriter;
+import org.apache.parquet.hadoop.rewrite.RewriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
+@Deprecated
public class ColumnPruner {
private static final Logger LOG =
LoggerFactory.getLogger(ColumnPruner.class);
public void pruneColumns(Configuration conf, Path inputFile, Path
outputFile, List<String> cols) throws IOException {
- Set<ColumnPath> prunePaths = convertToColumnPaths(cols);
- ParquetMetadata pmd = ParquetFileReader.readFooter(conf, inputFile,
ParquetMetadataConverter.NO_FILTER);
- FileMetaData metaData = pmd.getFileMetaData();
- MessageType schema = metaData.getSchema();
- List<String> paths = new ArrayList<>();
- getPaths(schema, paths, null);
-
- for (String col : cols) {
- if (!paths.contains(col)) {
- LOG.warn("Input column name {} doesn't show up in the schema of file
{}", col, inputFile.getName());
- }
- }
-
- ParquetFileWriter writer = new ParquetFileWriter(conf,
- pruneColumnsInSchema(schema, prunePaths), outputFile,
ParquetFileWriter.Mode.CREATE);
-
- writer.start();
- writer.appendFile(HadoopInputFile.fromPath(inputFile, conf));
- writer.end(metaData.getKeyValueMetaData());
- }
-
- // We have to rewrite getPaths because MessageType only get level 0 paths
- private void getPaths(GroupType schema, List<String> paths, String parent) {
- List<Type> fields = schema.getFields();
- String prefix = (parent == null) ? "" : parent + ".";
- for (Type field : fields) {
- paths.add(prefix + field.getName());
- if (field instanceof GroupType) {
- getPaths(field.asGroupType(), paths, prefix + field.getName());
- }
- }
- }
-
- private MessageType pruneColumnsInSchema(MessageType schema, Set<ColumnPath>
prunePaths) {
- List<Type> fields = schema.getFields();
- List<String> currentPath = new ArrayList<>();
- List<Type> prunedFields = pruneColumnsInFields(fields, currentPath,
prunePaths);
- MessageType newSchema = new MessageType(schema.getName(), prunedFields);
- return newSchema;
- }
-
- private List<Type> pruneColumnsInFields(List<Type> fields, List<String>
currentPath, Set<ColumnPath> prunePaths) {
- List<Type> prunedFields = new ArrayList<>();
- for (Type childField : fields) {
- Type prunedChildField = pruneColumnsInField(childField, currentPath,
prunePaths);
- if (prunedChildField != null) {
- prunedFields.add(prunedChildField);
- }
- }
- return prunedFields;
- }
-
- private Type pruneColumnsInField(Type field, List<String> currentPath,
Set<ColumnPath> prunePaths) {
- String fieldName = field.getName();
- currentPath.add(fieldName);
- ColumnPath path = ColumnPath.get(currentPath.toArray(new String[0]));
- Type prunedField = null;
- if (!prunePaths.contains(path)) {
- if (field.isPrimitive()) {
- prunedField = field;
- } else {
- List<Type> childFields = ((GroupType) field).getFields();
- List<Type> prunedFields = pruneColumnsInFields(childFields,
currentPath, prunePaths);
- if (prunedFields.size() > 0) {
- prunedField = ((GroupType) field).withNewFields(prunedFields);
- }
- }
- }
-
- currentPath.remove(fieldName);
- return prunedField;
- }
-
- private Set<ColumnPath> convertToColumnPaths(List<String> cols) {
- Set<ColumnPath> prunePaths = new HashSet<>();
- for (String col : cols) {
- prunePaths.add(ColumnPath.fromDotString(col));
- }
- return prunePaths;
+ RewriteOptions options = new RewriteOptions.Builder(conf, inputFile,
outputFile).prune(cols).build();
+ ParquetRewriter rewriter = new ParquetRewriter(options);
+ rewriter.processBlocks();
+ rewriter.close();
}
}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java
index c77674d8c..161b89f2a 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java
@@ -20,231 +20,36 @@ package org.apache.parquet.hadoop.util;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.bytes.BytesInput;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.column.impl.ColumnReadStoreImpl;
-import org.apache.parquet.column.page.DictionaryPage;
-import org.apache.parquet.column.page.PageReadStore;
-import org.apache.parquet.column.statistics.Statistics;
-import org.apache.parquet.compression.CompressionCodecFactory;
-import org.apache.parquet.format.DataPageHeader;
-import org.apache.parquet.format.DataPageHeaderV2;
-import org.apache.parquet.format.DictionaryPageHeader;
import org.apache.parquet.format.PageHeader;
import org.apache.parquet.format.Util;
-import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetFileWriter;
-import org.apache.parquet.hadoop.metadata.BlockMetaData;
-import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
-import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
-import org.apache.parquet.internal.column.columnindex.ColumnIndex;
-import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.hadoop.rewrite.ParquetRewriter;
import org.apache.parquet.io.InputFile;
-import org.apache.parquet.io.ParquetEncodingException;
import org.apache.parquet.io.SeekableInputStream;
-import org.apache.parquet.io.api.Converter;
-import org.apache.parquet.io.api.GroupConverter;
-import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.PrimitiveType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
+@Deprecated
public class CompressionConverter {
- private static final Logger LOG =
LoggerFactory.getLogger(CompressionConverter.class);
-
- private final int pageBufferSize = ParquetProperties.DEFAULT_PAGE_SIZE * 2;
- private byte[] pageBuffer;
-
- public CompressionConverter() {
- this.pageBuffer = new byte[pageBufferSize];
- }
+ private ParquetRewriter rewriter;
public void processBlocks(TransParquetFileReader reader, ParquetFileWriter
writer, ParquetMetadata meta, MessageType schema,
String createdBy, CompressionCodecName codecName)
throws IOException {
- int blockIndex = 0;
- PageReadStore store = reader.readNextRowGroup();
- while (store != null) {
- writer.startBlock(store.getRowCount());
- BlockMetaData blockMetaData = meta.getBlocks().get(blockIndex);
- List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();
- Map<ColumnPath, ColumnDescriptor> descriptorsMap =
schema.getColumns().stream().collect(
- Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
- for (int i = 0; i < columnsInOrder.size(); i += 1) {
- ColumnChunkMetaData chunk = columnsInOrder.get(i);
- ColumnReadStoreImpl crstore = new ColumnReadStoreImpl(store, new
DummyGroupConverter(), schema, createdBy);
- ColumnDescriptor columnDescriptor =
descriptorsMap.get(chunk.getPath());
- writer.startColumn(columnDescriptor,
crstore.getColumnReader(columnDescriptor).getTotalValueCount(), codecName);
- processChunk(reader, writer, chunk, createdBy, codecName);
- writer.endColumn();
- }
- writer.endBlock();
- store = reader.readNextRowGroup();
- blockIndex++;
- }
- }
-
- private void processChunk(TransParquetFileReader reader, ParquetFileWriter
writer, ColumnChunkMetaData chunk,
- String createdBy, CompressionCodecName codecName)
throws IOException {
- CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0);
- CompressionCodecFactory.BytesInputDecompressor decompressor =
codecFactory.getDecompressor(chunk.getCodec());
- CompressionCodecFactory.BytesInputCompressor compressor =
codecFactory.getCompressor(codecName);
- ColumnIndex columnIndex = reader.readColumnIndex(chunk);
- OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
-
- reader.setStreamPosition(chunk.getStartingPos());
- DictionaryPage dictionaryPage = null;
- long readValues = 0;
- Statistics statistics = null;
- ParquetMetadataConverter converter = new ParquetMetadataConverter();
- int pageIndex = 0;
- long totalChunkValues = chunk.getValueCount();
- while (readValues < totalChunkValues) {
- PageHeader pageHeader = reader.readPageHeader();
- int compressedPageSize = pageHeader.getCompressed_page_size();
- byte[] pageLoad;
- switch (pageHeader.type) {
- case DICTIONARY_PAGE:
- if (dictionaryPage != null) {
- throw new IOException("has more than one dictionary page in column
chunk");
- }
- DictionaryPageHeader dictPageHeader =
pageHeader.dictionary_page_header;
- pageLoad = translatePageLoad(reader, true, compressor, decompressor,
pageHeader.getCompressed_page_size(), pageHeader.getUncompressed_page_size());
- writer.writeDictionaryPage(new
DictionaryPage(BytesInput.from(pageLoad),
-
pageHeader.getUncompressed_page_size(),
-
dictPageHeader.getNum_values(),
-
converter.getEncoding(dictPageHeader.getEncoding())));
- break;
- case DATA_PAGE:
- DataPageHeader headerV1 = pageHeader.data_page_header;
- pageLoad = translatePageLoad(reader, true, compressor, decompressor,
pageHeader.getCompressed_page_size(), pageHeader.getUncompressed_page_size());
- statistics = convertStatistics(createdBy, chunk.getPrimitiveType(),
headerV1.getStatistics(), columnIndex, pageIndex, converter);
- readValues += headerV1.getNum_values();
- if (offsetIndex != null) {
- long rowCount = 1 + offsetIndex.getLastRowIndex(pageIndex,
totalChunkValues) - offsetIndex.getFirstRowIndex(pageIndex);
- writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()),
- pageHeader.getUncompressed_page_size(),
- BytesInput.from(pageLoad),
- statistics,
- toIntWithCheck(rowCount),
- converter.getEncoding(headerV1.getRepetition_level_encoding()),
- converter.getEncoding(headerV1.getDefinition_level_encoding()),
- converter.getEncoding(headerV1.getEncoding()));
- } else {
- writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()),
- pageHeader.getUncompressed_page_size(),
- BytesInput.from(pageLoad),
- statistics,
- converter.getEncoding(headerV1.getRepetition_level_encoding()),
- converter.getEncoding(headerV1.getDefinition_level_encoding()),
- converter.getEncoding(headerV1.getEncoding()));
- }
- pageIndex++;
- break;
- case DATA_PAGE_V2:
- DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2;
- int rlLength = headerV2.getRepetition_levels_byte_length();
- BytesInput rlLevels = readBlockAllocate(rlLength, reader);
- int dlLength = headerV2.getDefinition_levels_byte_length();
- BytesInput dlLevels = readBlockAllocate(dlLength, reader);
- int payLoadLength = pageHeader.getCompressed_page_size() - rlLength
- dlLength;
- int rawDataLength = pageHeader.getUncompressed_page_size() -
rlLength - dlLength;
- pageLoad = translatePageLoad(reader, headerV2.is_compressed,
compressor, decompressor, payLoadLength, rawDataLength);
- statistics = convertStatistics(createdBy, chunk.getPrimitiveType(),
headerV2.getStatistics(), columnIndex, pageIndex, converter);
- readValues += headerV2.getNum_values();
- writer.writeDataPageV2(headerV2.getNum_rows(),
- headerV2.getNum_nulls(),
- headerV2.getNum_values(),
- rlLevels,
- dlLevels,
- converter.getEncoding(headerV2.getEncoding()),
- BytesInput.from(pageLoad),
- rawDataLength,
- statistics);
- pageIndex++;
- break;
- default:
- LOG.debug("skipping page of type {} of size {}",
pageHeader.getType(), compressedPageSize);
- break;
- }
- }
- }
-
- private Statistics convertStatistics(String createdBy, PrimitiveType type,
org.apache.parquet.format.Statistics pageStatistics,
- ColumnIndex columnIndex, int pageIndex,
ParquetMetadataConverter converter) throws IOException {
- if (columnIndex != null) {
- if (columnIndex.getNullPages() == null) {
- throw new IOException("columnIndex has null variable 'nullPages' which
indicates corrupted data for type: " + type.getName());
- }
- if (pageIndex > columnIndex.getNullPages().size()) {
- throw new IOException("There are more pages " + pageIndex + " found in
the column than in the columnIndex " + columnIndex.getNullPages().size());
- }
- org.apache.parquet.column.statistics.Statistics.Builder statsBuilder =
org.apache.parquet.column.statistics.Statistics.getBuilderForReading(type);
- statsBuilder.withNumNulls(columnIndex.getNullCounts().get(pageIndex));
-
- if (!columnIndex.getNullPages().get(pageIndex)) {
-
statsBuilder.withMin(columnIndex.getMinValues().get(pageIndex).array().clone());
-
statsBuilder.withMax(columnIndex.getMaxValues().get(pageIndex).array().clone());
- }
- return statsBuilder.build();
- } else if (pageStatistics != null) {
- return converter.fromParquetStatistics(createdBy, pageStatistics, type);
- } else {
- return null;
- }
- }
-
- private byte[] translatePageLoad(TransParquetFileReader reader, boolean
isCompressed, CompressionCodecFactory.BytesInputCompressor compressor,
-
CompressionCodecFactory.BytesInputDecompressor decompressor, int payloadLength,
int rawDataLength) throws IOException {
- BytesInput data = readBlock(payloadLength, reader);
- if (isCompressed) {
- data = decompressor.decompress(data, rawDataLength);
- }
- BytesInput newCompressedData = compressor.compress(data);
- return newCompressedData.toByteArray();
+ rewriter = new ParquetRewriter(reader, writer, meta, schema, createdBy,
codecName, null, null);
+ rewriter.processBlocks();
}
public BytesInput readBlock(int length, TransParquetFileReader reader)
throws IOException {
- byte[] data;
- if (length > pageBufferSize) {
- data = new byte[length];
- } else {
- data = pageBuffer;
- }
- reader.blockRead(data, 0, length);
- return BytesInput.from(data, 0, length);
+ return rewriter.readBlock(length, reader);
}
public BytesInput readBlockAllocate(int length, TransParquetFileReader
reader) throws IOException {
- byte[] data = new byte[length];
- reader.blockRead(data, 0, length);
- return BytesInput.from(data, 0, length);
- }
-
- private int toIntWithCheck(long size) {
- if ((int)size != size) {
- throw new ParquetEncodingException("size is bigger than " +
Integer.MAX_VALUE + " bytes: " + size);
- }
- return (int)size;
- }
-
- private static final class DummyGroupConverter extends GroupConverter {
- @Override public void start() {}
- @Override public void end() {}
- @Override public Converter getConverter(int fieldIndex) { return new
DummyConverter(); }
- }
-
- private static final class DummyConverter extends PrimitiveConverter {
- @Override public GroupConverter asGroupConverter() { return new
DummyGroupConverter(); }
+ return rewriter.readBlockAllocate(length, reader);
}
public static final class TransParquetFileReader extends ParquetFileReader {
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
new file mode 100644
index 000000000..a6881577c
--- /dev/null
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.rewrite;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.ParquetCipher;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroup;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import
org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
+import org.apache.parquet.hadoop.util.EncDecProperties;
+import org.apache.parquet.hadoop.util.EncryptionTestFile;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.hadoop.util.TestFileBuilder;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.SeekableInputStream;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class ParquetRewriterTest {
+
+ private final int numRecord = 100000;
+ private Configuration conf = new Configuration();
+ private EncryptionTestFile inputFile = null;
+ private String outputFile = null;
+ private ParquetRewriter rewriter = null;
+
+ @Test
+ public void testPruneSingleColumnAndTranslateCodec() throws Exception {
+ testSetup("GZIP");
+
+ Path inputPath = new Path(inputFile.getFileName());
+ Path outputPath = new Path(outputFile);
+ List<String> pruneColumns = Arrays.asList("Gender");
+ CompressionCodecName newCodec = CompressionCodecName.ZSTD;
+ RewriteOptions.Builder builder = new RewriteOptions.Builder(conf,
inputPath, outputPath);
+ RewriteOptions options =
builder.prune(pruneColumns).transform(newCodec).build();
+
+ rewriter = new ParquetRewriter(options);
+ rewriter.processBlocks();
+ rewriter.close();
+
+ // Verify the schema are not changed for the columns not pruned
+ ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new
Path(outputFile), ParquetMetadataConverter.NO_FILTER);
+ MessageType schema = pmd.getFileMetaData().getSchema();
+ List<Type> fields = schema.getFields();
+ assertEquals(fields.size(), 3);
+ assertEquals(fields.get(0).getName(), "DocId");
+ assertEquals(fields.get(1).getName(), "Name");
+ assertEquals(fields.get(2).getName(), "Links");
+ List<Type> subFields = fields.get(2).asGroupType().getFields();
+ assertEquals(subFields.size(), 2);
+ assertEquals(subFields.get(0).getName(), "Backward");
+ assertEquals(subFields.get(1).getName(), "Forward");
+
+ // Verify codec has been translated
+ verifyCodec(outputFile, CompressionCodecName.ZSTD, null);
+
+ // Verify the data are not changed for the columns not pruned
+ validateColumnData(outputFile, inputFile.getFileContent(), new
HashSet<>(pruneColumns), Collections.emptySet(), null);
+
+ // Verify the page index
+ validatePageIndex(new HashMap<Integer, Integer>() {{
+ put(0, 0);
+ put(1, 1);
+ put(2, 3);
+ put(3, 4);
+ }});
+
+ // Verify original.created.by is preserved
+ validateCreatedBy();
+ }
+
+ @Test
+ public void testPruneNullifyAndTranslateCodec() throws Exception {
+ testSetup("UNCOMPRESSED");
+
+ Path inputPath = new Path(inputFile.getFileName());
+ Path outputPath = new Path(outputFile);
+ List<String> pruneColumns = Arrays.asList("Gender");
+ Map<String, MaskMode> maskColumns = new HashMap<>();
+ maskColumns.put("Links.Forward", MaskMode.NULLIFY);
+ CompressionCodecName newCodec = CompressionCodecName.GZIP;
+ RewriteOptions.Builder builder = new RewriteOptions.Builder(conf,
inputPath, outputPath);
+ RewriteOptions options =
builder.prune(pruneColumns).mask(maskColumns).transform(newCodec).build();
+
+ rewriter = new ParquetRewriter(options);
+ rewriter.processBlocks();
+ rewriter.close();
+
+ // Verify the schema are not changed for the columns not pruned
+ ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new
Path(outputFile), ParquetMetadataConverter.NO_FILTER);
+ MessageType schema = pmd.getFileMetaData().getSchema();
+ List<Type> fields = schema.getFields();
+ assertEquals(fields.size(), 3);
+ assertEquals(fields.get(0).getName(), "DocId");
+ assertEquals(fields.get(1).getName(), "Name");
+ assertEquals(fields.get(2).getName(), "Links");
+ List<Type> subFields = fields.get(2).asGroupType().getFields();
+ assertEquals(subFields.size(), 2);
+ assertEquals(subFields.get(0).getName(), "Backward");
+ assertEquals(subFields.get(1).getName(), "Forward");
+
+ // Verify codec has been translated
+ verifyCodec(outputFile, newCodec, null);
+
+ // Verify the data are not changed for the columns not pruned
+ validateColumnData(outputFile, inputFile.getFileContent(), new
HashSet<>(pruneColumns), maskColumns.keySet(), null);
+
+ // Verify the page index
+ validatePageIndex(new HashMap<Integer, Integer>() {{
+ put(0, 0);
+ put(1, 1);
+ put(2, 3);
+ }});
+
+ // Verify original.created.by is preserved
+ validateCreatedBy();
+ }
+
+ @Test
+ public void testPruneEncryptAndTranslateCodec() throws Exception {
+ testSetup("GZIP");
+
+ Path inputPath = new Path(inputFile.getFileName());
+ Path outputPath = new Path(outputFile);
+ RewriteOptions.Builder builder = new RewriteOptions.Builder(conf,
inputPath, outputPath);
+
+ // Prune
+ List<String> pruneColumns = Arrays.asList("Gender");
+ builder.prune(pruneColumns);
+
+ // Translate codec
+ CompressionCodecName newCodec = CompressionCodecName.ZSTD;
+ builder.transform(newCodec);
+
+ // Encrypt
+ String[] encryptColumns = {"DocId"};
+ FileEncryptionProperties fileEncryptionProperties =
+ EncDecProperties.getFileEncryptionProperties(encryptColumns,
ParquetCipher.AES_GCM_CTR_V1, false);
+
builder.encrypt(Arrays.asList(encryptColumns)).encryptionProperties(fileEncryptionProperties);
+
+ RewriteOptions options = builder.build();
+ rewriter = new ParquetRewriter(options);
+ rewriter.processBlocks();
+ rewriter.close();
+
+ // Verify the schema are not changed for the columns not pruned
+ ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new
Path(outputFile), ParquetMetadataConverter.NO_FILTER);
+ MessageType schema = pmd.getFileMetaData().getSchema();
+ List<Type> fields = schema.getFields();
+ assertEquals(fields.size(), 3);
+ assertEquals(fields.get(0).getName(), "DocId");
+ assertEquals(fields.get(1).getName(), "Name");
+ assertEquals(fields.get(2).getName(), "Links");
+ List<Type> subFields = fields.get(2).asGroupType().getFields();
+ assertEquals(subFields.size(), 2);
+ assertEquals(subFields.get(0).getName(), "Backward");
+ assertEquals(subFields.get(1).getName(), "Forward");
+
+ // Verify codec has been translated
+ FileDecryptionProperties fileDecryptionProperties =
EncDecProperties.getFileDecryptionProperties();
+ verifyCodec(outputFile, newCodec, fileDecryptionProperties);
+
+ // Verify the data are not changed for the columns not pruned
+ validateColumnData(outputFile,
+ inputFile.getFileContent(), new HashSet<>(pruneColumns),
Collections.emptySet(), fileDecryptionProperties);
+
+ // Verify column encryption
+ ParquetMetadata metaData = getFileMetaData(outputFile,
fileDecryptionProperties);
+ assertTrue(metaData.getBlocks().size() > 0);
+ List<ColumnChunkMetaData> columns =
metaData.getBlocks().get(0).getColumns();
+ Set<String> set = new HashSet<>(Arrays.asList(encryptColumns));
+ for (ColumnChunkMetaData column : columns) {
+ if (set.contains(column.getPath().toDotString())) {
+ assertTrue(column.isEncrypted());
+ } else {
+ assertFalse(column.isEncrypted());
+ }
+ }
+
+ // Verify original.created.by is preserved
+ validateCreatedBy();
+ }
+
+ private void testSetup(String compression) throws IOException {
+ MessageType schema = createSchema();
+ inputFile = new TestFileBuilder(conf, schema)
+ .withNumRecord(numRecord)
+ .withCodec(compression)
+ .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+ .build();
+ outputFile = TestFileBuilder.createTempFile("test");
+ }
+
+ private MessageType createSchema() {
+ return new MessageType("schema",
+ new PrimitiveType(OPTIONAL, INT64, "DocId"),
+ new PrimitiveType(REQUIRED, BINARY, "Name"),
+ new PrimitiveType(OPTIONAL, BINARY, "Gender"),
+ new GroupType(OPTIONAL, "Links",
+ new PrimitiveType(REPEATED, BINARY, "Backward"),
+ new PrimitiveType(REPEATED, BINARY, "Forward")));
+ }
+
+ private void validateColumnData(String file,
+ SimpleGroup[] fileContent,
+ Set<String> prunePaths,
+ Set<String> nullifiedPaths,
+ FileDecryptionProperties
fileDecryptionProperties) throws IOException {
+ ParquetReader<Group> reader = ParquetReader.builder(new
GroupReadSupport(), new Path(file))
+ .withConf(conf).withDecryption(fileDecryptionProperties).build();
+ for (int i = 0; i < numRecord; i++) {
+ Group group = reader.read();
+ if (!prunePaths.contains("DocId") && !nullifiedPaths.contains("DocId")) {
+ assertTrue(group.getLong("DocId", 0) ==
fileContent[i].getLong("DocId", 0));
+ }
+ if (!prunePaths.contains("Name") && !nullifiedPaths.contains("Name")) {
+ assertArrayEquals(group.getBinary("Name", 0).getBytes(),
+ fileContent[i].getBinary("Name", 0).getBytes());
+ }
+ if (!prunePaths.contains("Gender") &&
!nullifiedPaths.contains("Gender")) {
+ assertArrayEquals(group.getBinary("Gender", 0).getBytes(),
+ fileContent[i].getBinary("Gender", 0).getBytes());
+ }
+ Group subGroup = group.getGroup("Links", 0);
+ if (!prunePaths.contains("Links.Backward") &&
!nullifiedPaths.contains("Links.Backward")) {
+ assertArrayEquals(subGroup.getBinary("Backward", 0).getBytes(),
+ fileContent[i].getGroup("Links", 0).getBinary("Backward",
0).getBytes());
+ }
+ if (!prunePaths.contains("Links.Forward") &&
!nullifiedPaths.contains("Links.Forward")) {
+ assertArrayEquals(subGroup.getBinary("Forward", 0).getBytes(),
+ fileContent[i].getGroup("Links", 0).getBinary("Forward",
0).getBytes());
+ }
+ }
+ reader.close();
+ }
+
+ private ParquetMetadata getFileMetaData(String file,
+ FileDecryptionProperties
fileDecryptionProperties) throws IOException {
+ ParquetReadOptions readOptions = ParquetReadOptions.builder()
+ .withDecryption(fileDecryptionProperties)
+ .build();
+ ParquetMetadata pmd = null;
+ InputFile inputFile = HadoopInputFile.fromPath(new Path(file), conf);
+ try (SeekableInputStream in = inputFile.newStream()) {
+ pmd = ParquetFileReader.readFooter(inputFile, readOptions, in);
+ }
+ return pmd;
+ }
+
+ private void verifyCodec(String file,
+ CompressionCodecName codec,
+ FileDecryptionProperties fileDecryptionProperties)
throws IOException {
+ ParquetMetadata pmd = getFileMetaData(file, fileDecryptionProperties);
+ for (int i = 0; i < pmd.getBlocks().size(); i++) {
+ BlockMetaData block = pmd.getBlocks().get(i);
+ for (int j = 0; j < block.getColumns().size(); ++j) {
+ ColumnChunkMetaData columnChunkMetaData = block.getColumns().get(j);
+ assertEquals(columnChunkMetaData.getCodec(), codec);
+ }
+ }
+ }
+
+ /**
+ * Verify the page index is correct.
+ *
+ * @param outFileColumnMapping the column mapping from the output file to
the input file.
+ */
+ private void validatePageIndex(Map<Integer, Integer> outFileColumnMapping)
throws Exception {
+ ParquetMetadata inMetaData = getFileMetaData(inputFile.getFileName(),
null);
+ ParquetMetadata outMetaData = getFileMetaData(outputFile, null);
+ assertEquals(inMetaData.getBlocks().size(),
outMetaData.getBlocks().size());
+
+ try (TransParquetFileReader inReader = new TransParquetFileReader(
+ HadoopInputFile.fromPath(new Path(inputFile.getFileName()), conf),
HadoopReadOptions.builder(conf).build());
+ TransParquetFileReader outReader = new TransParquetFileReader(
+ HadoopInputFile.fromPath(new Path(outputFile), conf),
HadoopReadOptions.builder(conf).build())) {
+
+ for (int i = 0; i < inMetaData.getBlocks().size(); i++) {
+ BlockMetaData inBlockMetaData = inMetaData.getBlocks().get(i);
+ BlockMetaData outBlockMetaData = outMetaData.getBlocks().get(i);
+
+ for (int j = 0; j < outBlockMetaData.getColumns().size(); j++) {
+ if (!outFileColumnMapping.containsKey(j)) {
+ continue;
+ }
+ int columnIdFromInputFile = outFileColumnMapping.get(j);
+ ColumnChunkMetaData inChunk =
inBlockMetaData.getColumns().get(columnIdFromInputFile);
+ ColumnIndex inColumnIndex = inReader.readColumnIndex(inChunk);
+ OffsetIndex inOffsetIndex = inReader.readOffsetIndex(inChunk);
+ ColumnChunkMetaData outChunk = outBlockMetaData.getColumns().get(j);
+ ColumnIndex outColumnIndex = outReader.readColumnIndex(outChunk);
+ OffsetIndex outOffsetIndex = outReader.readOffsetIndex(outChunk);
+ if (inColumnIndex != null) {
+ assertEquals(inColumnIndex.getBoundaryOrder(),
outColumnIndex.getBoundaryOrder());
+ assertEquals(inColumnIndex.getMaxValues(),
outColumnIndex.getMaxValues());
+ assertEquals(inColumnIndex.getMinValues(),
outColumnIndex.getMinValues());
+ assertEquals(inColumnIndex.getNullCounts(),
outColumnIndex.getNullCounts());
+ }
+ if (inOffsetIndex != null) {
+ List<Long> inOffsets = getOffsets(inReader, inChunk);
+ List<Long> outOffsets = getOffsets(outReader, outChunk);
+ assertEquals(inOffsets.size(), outOffsets.size());
+ assertEquals(inOffsets.size(), inOffsetIndex.getPageCount());
+ assertEquals(inOffsetIndex.getPageCount(),
outOffsetIndex.getPageCount());
+ for (int k = 0; k < inOffsetIndex.getPageCount(); k++) {
+ assertEquals(inOffsetIndex.getFirstRowIndex(k),
outOffsetIndex.getFirstRowIndex(k));
+ assertEquals(inOffsetIndex.getLastRowIndex(k,
inChunk.getValueCount()),
+ outOffsetIndex.getLastRowIndex(k,
outChunk.getValueCount()));
+ assertEquals(inOffsetIndex.getOffset(k), (long)
inOffsets.get(k));
+ assertEquals(outOffsetIndex.getOffset(k), (long)
outOffsets.get(k));
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private List<Long> getOffsets(TransParquetFileReader reader,
ColumnChunkMetaData chunk) throws IOException {
+ List<Long> offsets = new ArrayList<>();
+ reader.setStreamPosition(chunk.getStartingPos());
+ long readValues = 0;
+ long totalChunkValues = chunk.getValueCount();
+ while (readValues < totalChunkValues) {
+ long curOffset = reader.getPos();
+ PageHeader pageHeader = reader.readPageHeader();
+ switch (pageHeader.type) {
+ case DICTIONARY_PAGE:
+ rewriter.readBlock(pageHeader.getCompressed_page_size(), reader);
+ break;
+ case DATA_PAGE:
+ DataPageHeader headerV1 = pageHeader.data_page_header;
+ offsets.add(curOffset);
+ rewriter.readBlock(pageHeader.getCompressed_page_size(), reader);
+ readValues += headerV1.getNum_values();
+ break;
+ case DATA_PAGE_V2:
+ DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2;
+ offsets.add(curOffset);
+ int rlLength = headerV2.getRepetition_levels_byte_length();
+ rewriter.readBlock(rlLength, reader);
+ int dlLength = headerV2.getDefinition_levels_byte_length();
+ rewriter.readBlock(dlLength, reader);
+ int payLoadLength = pageHeader.getCompressed_page_size() - rlLength
- dlLength;
+ rewriter.readBlock(payLoadLength, reader);
+ readValues += headerV2.getNum_values();
+ break;
+ default:
+ throw new IOException("Not recognized page type");
+ }
+ }
+ return offsets;
+ }
+
+ private void validateCreatedBy() throws Exception {
+ FileMetaData inFMD = getFileMetaData(inputFile.getFileName(),
null).getFileMetaData();
+ FileMetaData outFMD = getFileMetaData(outputFile, null).getFileMetaData();
+
+ assertEquals(inFMD.getCreatedBy(), outFMD.getCreatedBy());
+
assertNull(inFMD.getKeyValueMetaData().get(ParquetRewriter.ORIGINAL_CREATED_BY_KEY));
+
+ String originalCreatedBy =
outFMD.getKeyValueMetaData().get(ParquetRewriter.ORIGINAL_CREATED_BY_KEY);
+ assertNotNull(originalCreatedBy);
+ assertEquals(inFMD.getCreatedBy(), originalCreatedBy);
+ }
+
+}
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConveterTest.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConverterTest.java
similarity index 99%
rename from
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConveterTest.java
rename to
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConverterTest.java
index 218abb95b..d52344934 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConveterTest.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConverterTest.java
@@ -65,7 +65,7 @@ import static
org.apache.parquet.schema.Type.Repetition.REQUIRED;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertTrue;
-public class CompressionConveterTest {
+public class CompressionConverterTest {
private Configuration conf = new Configuration();
private Map<String, String> extraMeta