This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 5ee513341 PARQUET-2363: ParquetRewriter encrypts the V2 page
header(#1169)
5ee513341 is described below
commit 5ee513341158bc3962df80260c345b80076dbf3e
Author: Xianyang Liu <[email protected]>
AuthorDate: Sun Oct 15 23:48:40 2023 +0800
PARQUET-2363: ParquetRewriter encrypts the V2 page header(#1169)
---
.../apache/parquet/hadoop/ParquetFileWriter.java | 117 +++++++++++++++++----
.../parquet/hadoop/rewrite/ParquetRewriter.java | 34 +++---
.../hadoop/rewrite/ParquetRewriterTest.java | 84 +++++++++------
3 files changed, 163 insertions(+), 72 deletions(-)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index a71121e11..146ffd496 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -480,7 +480,7 @@ public class ParquetFileWriter {
}
/**
- * writes a dictionary page page
+ * writes a dictionary page
* @param dictionaryPage the dictionary page
* @throws IOException if there is an error while writing
*/
@@ -677,14 +677,14 @@ public class ParquetFileWriter {
* @throws IOException if there is an error while writing
*/
public void writeDataPage(
- int valueCount, int uncompressedPageSize,
- BytesInput bytes,
- Statistics<?> statistics,
- Encoding rlEncoding,
- Encoding dlEncoding,
- Encoding valuesEncoding,
- BlockCipher.Encryptor metadataBlockEncryptor,
- byte[] pageHeaderAAD) throws IOException {
+ int valueCount, int uncompressedPageSize,
+ BytesInput bytes,
+ Statistics<?> statistics,
+ Encoding rlEncoding,
+ Encoding dlEncoding,
+ Encoding valuesEncoding,
+ BlockCipher.Encryptor metadataBlockEncryptor,
+ byte[] pageHeaderAAD) throws IOException {
state = state.write();
long beforeHeader = out.getPos();
if (currentChunkFirstDataPage < 0) {
@@ -749,6 +749,7 @@ public class ParquetFileWriter {
/**
* Writes a single v2 data page
+ *
* @param rowCount count of rows
* @param nullCount count of nulls
* @param valueCount count of values
@@ -760,13 +761,58 @@ public class ParquetFileWriter {
* @param statistics the statistics of the page
* @throws IOException if any I/O error occurs during writing the file
*/
- public void writeDataPageV2(int rowCount, int nullCount, int valueCount,
- BytesInput repetitionLevels,
- BytesInput definitionLevels,
- Encoding dataEncoding,
- BytesInput compressedData,
- int uncompressedDataSize,
- Statistics<?> statistics) throws IOException {
+ public void writeDataPageV2(
+ int rowCount,
+ int nullCount,
+ int valueCount,
+ BytesInput repetitionLevels,
+ BytesInput definitionLevels,
+ Encoding dataEncoding,
+ BytesInput compressedData,
+ int uncompressedDataSize,
+ Statistics<?> statistics) throws IOException {
+ writeDataPageV2(
+ rowCount,
+ nullCount,
+ valueCount,
+ repetitionLevels,
+ definitionLevels,
+ dataEncoding,
+ compressedData,
+ uncompressedDataSize,
+ statistics,
+ null,
+ null);
+ }
+
+ /**
+ * Writes a single v2 data page
+ *
+ * @param rowCount count of rows
+ * @param nullCount count of nulls
+ * @param valueCount count of values
+ * @param repetitionLevels repetition level bytes
+ * @param definitionLevels definition level bytes
+ * @param dataEncoding encoding for data
+ * @param compressedData compressed data bytes
+ * @param uncompressedDataSize the size of uncompressed data
+ * @param statistics the statistics of the page
+ * @param metadataBlockEncryptor encryptor for block data
+ * @param pageHeaderAAD pageHeader AAD
+ * @throws IOException if any I/O error occurs during writing the file
+ */
+ public void writeDataPageV2(
+ int rowCount,
+ int nullCount,
+ int valueCount,
+ BytesInput repetitionLevels,
+ BytesInput definitionLevels,
+ Encoding dataEncoding,
+ BytesInput compressedData,
+ int uncompressedDataSize,
+ Statistics<?> statistics,
+ BlockCipher.Encryptor metadataBlockEncryptor,
+ byte[] pageHeaderAAD) throws IOException {
state = state.write();
int rlByteLength = toIntWithCheck(repetitionLevels.size());
int dlByteLength = toIntWithCheck(definitionLevels.size());
@@ -784,13 +830,38 @@ public class ParquetFileWriter {
currentChunkFirstDataPage = beforeHeader;
}
- metadataConverter.writeDataPageV2Header(
- uncompressedSize, compressedSize,
- valueCount, nullCount, rowCount,
- dataEncoding,
- rlByteLength,
- dlByteLength,
- out);
+ if (pageWriteChecksumEnabled) {
+ crc.reset();
+ if (repetitionLevels.size() > 0) {
+ crc.update(repetitionLevels.toByteArray());
+ }
+ if (definitionLevels.size() > 0) {
+ crc.update(definitionLevels.toByteArray());
+ }
+ if (compressedData.size() > 0) {
+ crc.update(compressedData.toByteArray());
+ }
+ metadataConverter.writeDataPageV2Header(
+ uncompressedSize, compressedSize,
+ valueCount, nullCount, rowCount,
+ dataEncoding,
+ rlByteLength,
+ dlByteLength,
+ (int) crc.getValue(),
+ out,
+ metadataBlockEncryptor,
+ pageHeaderAAD);
+ } else {
+ metadataConverter.writeDataPageV2Header(
+ uncompressedSize, compressedSize,
+ valueCount, nullCount, rowCount,
+ dataEncoding,
+ rlByteLength,
+ dlByteLength,
+ out,
+ metadataBlockEncryptor,
+ pageHeaderAAD);
+ }
long headersSize = out.getPos() - beforeHeader;
this.uncompressedLength += uncompressedSize + headersSize;
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
index d336aaf57..659ac1e5c 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
@@ -309,7 +309,7 @@ public class ParquetRewriter implements Closeable {
ColumnChunkEncryptorRunTime columnChunkEncryptorRunTime = null;
if (encryptMode) {
columnChunkEncryptorRunTime =
- new ColumnChunkEncryptorRunTime(writer.getEncryptor(),
chunk, numBlocksRewritten, columnId);
+ new ColumnChunkEncryptorRunTime(writer.getEncryptor(), chunk,
numBlocksRewritten, columnId);
}
// Translate compression and/or encryption
@@ -374,7 +374,7 @@ public class ParquetRewriter implements Closeable {
reader.setStreamPosition(chunk.getStartingPos());
DictionaryPage dictionaryPage = null;
long readValues = 0;
- Statistics statistics = null;
+ Statistics<?> statistics = null;
ParquetMetadataConverter converter = new ParquetMetadataConverter();
int pageOrdinal = 0;
long totalChunkValues = chunk.getValueCount();
@@ -385,7 +385,7 @@ public class ParquetRewriter implements Closeable {
switch (pageHeader.type) {
case DICTIONARY_PAGE:
if (dictionaryPage != null) {
- throw new IOException("has more than one dictionary page in column
chunk");
+ throw new IOException("has more than one dictionary page in column
chunk: " + chunk);
}
//No quickUpdatePageAAD needed for dictionary page
DictionaryPageHeader dictPageHeader =
pageHeader.dictionary_page_header;
@@ -398,12 +398,12 @@ public class ParquetRewriter implements Closeable {
encryptColumn,
dataEncryptor,
dictPageAAD);
- writer.writeDictionaryPage(new
DictionaryPage(BytesInput.from(pageLoad),
- pageHeader.getUncompressed_page_size(),
- dictPageHeader.getNum_values(),
- converter.getEncoding(dictPageHeader.getEncoding())),
- metaEncryptor,
- dictPageHeaderAAD);
+ dictionaryPage = new DictionaryPage(
+ BytesInput.from(pageLoad),
+ pageHeader.getUncompressed_page_size(),
+ dictPageHeader.getNum_values(),
+ converter.getEncoding(dictPageHeader.getEncoding()));
+ writer.writeDictionaryPage(dictionaryPage, metaEncryptor,
dictPageHeaderAAD);
break;
case DATA_PAGE:
if (encryptColumn) {
@@ -482,7 +482,9 @@ public class ParquetRewriter implements Closeable {
converter.getEncoding(headerV2.getEncoding()),
BytesInput.from(pageLoad),
rawDataLength,
- statistics);
+ statistics,
+ metaEncryptor,
+ dataPageHeaderAAD);
pageOrdinal++;
break;
default:
@@ -492,12 +494,12 @@ public class ParquetRewriter implements Closeable {
}
}
- private Statistics convertStatistics(String createdBy,
- PrimitiveType type,
- org.apache.parquet.format.Statistics
pageStatistics,
- ColumnIndex columnIndex,
- int pageIndex,
- ParquetMetadataConverter converter)
throws IOException {
+ private Statistics<?> convertStatistics(String createdBy,
+ PrimitiveType type,
+ org.apache.parquet.format.Statistics
pageStatistics,
+ ColumnIndex columnIndex,
+ int pageIndex,
+ ParquetMetadataConverter converter)
throws IOException {
if (columnIndex != null) {
if (columnIndex.getNullPages() == null) {
throw new IOException("columnIndex has null variable 'nullPages' which
indicates corrupted data for type: " +
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
index 62d0a28ca..1f03deceb 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
@@ -64,6 +64,8 @@ import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.ArrayList;
@@ -89,14 +91,26 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
+@RunWith(Parameterized.class)
public class ParquetRewriterTest {
private final int numRecord = 100000;
- private Configuration conf = new Configuration();
+ private final Configuration conf = new Configuration();
+ private final ParquetProperties.WriterVersion writerVersion;
+
private List<EncryptionTestFile> inputFiles = null;
private String outputFile = null;
private ParquetRewriter rewriter = null;
+ @Parameterized.Parameters(name = "WriterVersion = {0}")
+ public static Object[] parameters() {
+ return new Object[] {"v1", "v2"};
+ }
+
+ public ParquetRewriterTest(String writerVersion) {
+ this.writerVersion =
ParquetProperties.WriterVersion.fromString(writerVersion);
+ }
+
private void testPruneSingleColumnTranslateCodec(List<Path> inputPaths)
throws Exception {
Path outputPath = new Path(outputFile);
List<String> pruneColumns = Arrays.asList("Gender");
@@ -498,27 +512,29 @@ public class ParquetRewriterTest {
@Test(expected = InvalidSchemaException.class)
public void testMergeTwoFilesWithDifferentSchema() throws Exception {
MessageType schema1 = new MessageType("schema",
- new PrimitiveType(OPTIONAL, INT64, "DocId"),
- new PrimitiveType(REQUIRED, BINARY, "Name"),
- new PrimitiveType(OPTIONAL, BINARY, "Gender"),
- new GroupType(OPTIONAL, "Links",
- new PrimitiveType(REPEATED, BINARY, "Backward"),
- new PrimitiveType(REPEATED, BINARY, "Forward")));
+ new PrimitiveType(OPTIONAL, INT64, "DocId"),
+ new PrimitiveType(REQUIRED, BINARY, "Name"),
+ new PrimitiveType(OPTIONAL, BINARY, "Gender"),
+ new GroupType(OPTIONAL, "Links",
+ new PrimitiveType(REPEATED, BINARY, "Backward"),
+ new PrimitiveType(REPEATED, BINARY, "Forward")));
MessageType schema2 = new MessageType("schema",
- new PrimitiveType(OPTIONAL, INT64, "DocId"),
- new PrimitiveType(REQUIRED, BINARY, "Name"),
- new PrimitiveType(OPTIONAL, BINARY, "Gender"));
+ new PrimitiveType(OPTIONAL, INT64, "DocId"),
+ new PrimitiveType(REQUIRED, BINARY, "Name"),
+ new PrimitiveType(OPTIONAL, BINARY, "Gender"));
inputFiles = Lists.newArrayList();
inputFiles.add(new TestFileBuilder(conf, schema1)
- .withNumRecord(numRecord)
- .withCodec("UNCOMPRESSED")
- .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
- .build());
+ .withNumRecord(numRecord)
+ .withCodec("UNCOMPRESSED")
+ .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+ .withWriterVersion(writerVersion)
+ .build());
inputFiles.add(new TestFileBuilder(conf, schema2)
- .withNumRecord(numRecord)
- .withCodec("UNCOMPRESSED")
- .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
- .build());
+ .withNumRecord(numRecord)
+ .withCodec("UNCOMPRESSED")
+ .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+ .withWriterVersion(writerVersion)
+ .build());
List<Path> inputPaths = new ArrayList<>();
for (EncryptionTestFile inputFile : inputFiles) {
@@ -617,6 +633,7 @@ public class ParquetRewriterTest {
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
.withRowGroupSize(rowGroupSize)
.withBloomFilterEnabled(bloomFilterEnabledColumns)
+ .withWriterVersion(writerVersion)
.build());
}
@@ -624,26 +641,28 @@ public class ParquetRewriterTest {
MessageType schema = createSchema();
inputFiles = Lists.newArrayList();
inputFiles.add(new TestFileBuilder(conf, schema)
- .withNumRecord(numRecord)
- .withCodec("GZIP")
- .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
- .build());
+ .withNumRecord(numRecord)
+ .withCodec("GZIP")
+ .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+ .withWriterVersion(writerVersion)
+ .build());
inputFiles.add(new TestFileBuilder(conf, schema)
- .withNumRecord(numRecord)
- .withCodec("UNCOMPRESSED")
- .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
- .build());
+ .withNumRecord(numRecord)
+ .withCodec("UNCOMPRESSED")
+ .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+ .withWriterVersion(writerVersion)
+ .build());
}
private MessageType createSchema() {
return new MessageType("schema",
- new PrimitiveType(OPTIONAL, INT64, "DocId"),
- new PrimitiveType(REQUIRED, BINARY, "Name"),
- new PrimitiveType(OPTIONAL, BINARY, "Gender"),
- new GroupType(OPTIONAL, "Links",
- new PrimitiveType(REPEATED, BINARY, "Backward"),
- new PrimitiveType(REPEATED, BINARY, "Forward")));
+ new PrimitiveType(OPTIONAL, INT64, "DocId"),
+ new PrimitiveType(REQUIRED, BINARY, "Name"),
+ new PrimitiveType(OPTIONAL, BINARY, "Gender"),
+ new GroupType(OPTIONAL, "Links",
+ new PrimitiveType(REPEATED, BINARY, "Backward"),
+ new PrimitiveType(REPEATED, BINARY, "Forward")));
}
private void validateColumnData(Set<String> prunePaths,
@@ -849,7 +868,6 @@ public class ParquetRewriterTest {
assertNotNull(createdBy);
assertEquals(createdBy, Version.FULL_VERSION);
-
// Verify original.created.by has been set
String inputCreatedBy = (String) inputCreatedBys[0];
String originalCreatedBy =
outFMD.getKeyValueMetaData().get(ParquetRewriter.ORIGINAL_CREATED_BY_KEY);