This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ee513341 PARQUET-2363: ParquetRewriter encrypts the V2 page 
header(#1169)
5ee513341 is described below

commit 5ee513341158bc3962df80260c345b80076dbf3e
Author: Xianyang Liu <[email protected]>
AuthorDate: Sun Oct 15 23:48:40 2023 +0800

    PARQUET-2363: ParquetRewriter encrypts the V2 page header(#1169)
---
 .../apache/parquet/hadoop/ParquetFileWriter.java   | 117 +++++++++++++++++----
 .../parquet/hadoop/rewrite/ParquetRewriter.java    |  34 +++---
 .../hadoop/rewrite/ParquetRewriterTest.java        |  84 +++++++++------
 3 files changed, 163 insertions(+), 72 deletions(-)

diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index a71121e11..146ffd496 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -480,7 +480,7 @@ public class ParquetFileWriter {
   }
 
   /**
-   * writes a dictionary page page
+   * writes a dictionary page
    * @param dictionaryPage the dictionary page
    * @throws IOException if there is an error while writing
    */
@@ -677,14 +677,14 @@ public class ParquetFileWriter {
    * @throws IOException if there is an error while writing
    */
   public void writeDataPage(
-    int valueCount, int uncompressedPageSize,
-    BytesInput bytes,
-    Statistics<?> statistics,
-    Encoding rlEncoding,
-    Encoding dlEncoding,
-    Encoding valuesEncoding,
-    BlockCipher.Encryptor metadataBlockEncryptor,
-    byte[] pageHeaderAAD) throws IOException {
+      int valueCount, int uncompressedPageSize,
+      BytesInput bytes,
+      Statistics<?> statistics,
+      Encoding rlEncoding,
+      Encoding dlEncoding,
+      Encoding valuesEncoding,
+      BlockCipher.Encryptor metadataBlockEncryptor,
+      byte[] pageHeaderAAD) throws IOException {
     state = state.write();
     long beforeHeader = out.getPos();
     if (currentChunkFirstDataPage < 0) {
@@ -749,6 +749,7 @@ public class ParquetFileWriter {
 
   /**
    * Writes a single v2 data page
+   *
    * @param rowCount count of rows
    * @param nullCount count of nulls
    * @param valueCount count of values
@@ -760,13 +761,58 @@ public class ParquetFileWriter {
    * @param statistics the statistics of the page
    * @throws IOException if any I/O error occurs during writing the file
    */
-  public void writeDataPageV2(int rowCount, int nullCount, int valueCount,
-                              BytesInput repetitionLevels,
-                              BytesInput definitionLevels,
-                              Encoding dataEncoding,
-                              BytesInput compressedData,
-                              int uncompressedDataSize,
-                              Statistics<?> statistics) throws IOException {
+  public void writeDataPageV2(
+      int rowCount,
+      int nullCount,
+      int valueCount,
+      BytesInput repetitionLevels,
+      BytesInput definitionLevels,
+      Encoding dataEncoding,
+      BytesInput compressedData,
+      int uncompressedDataSize,
+      Statistics<?> statistics) throws IOException {
+    writeDataPageV2(
+      rowCount,
+      nullCount,
+      valueCount,
+      repetitionLevels,
+      definitionLevels,
+      dataEncoding,
+      compressedData,
+      uncompressedDataSize,
+      statistics,
+      null,
+      null);
+  }
+
+  /**
+   * Writes a single v2 data page
+   *
+   * @param rowCount count of rows
+   * @param nullCount count of nulls
+   * @param valueCount count of values
+   * @param repetitionLevels repetition level bytes
+   * @param definitionLevels definition level bytes
+   * @param dataEncoding encoding for data
+   * @param compressedData compressed data bytes
+   * @param uncompressedDataSize the size of uncompressed data
+   * @param statistics the statistics of the page
+   * @param metadataBlockEncryptor encryptor for block data
+   * @param pageHeaderAAD pageHeader AAD
+   * @throws IOException if any I/O error occurs during writing the file
+   */
+  public void writeDataPageV2(
+      int rowCount,
+      int nullCount,
+      int valueCount,
+      BytesInput repetitionLevels,
+      BytesInput definitionLevels,
+      Encoding dataEncoding,
+      BytesInput compressedData,
+      int uncompressedDataSize,
+      Statistics<?> statistics,
+      BlockCipher.Encryptor metadataBlockEncryptor,
+      byte[] pageHeaderAAD) throws IOException {
     state = state.write();
     int rlByteLength = toIntWithCheck(repetitionLevels.size());
     int dlByteLength = toIntWithCheck(definitionLevels.size());
@@ -784,13 +830,38 @@ public class ParquetFileWriter {
       currentChunkFirstDataPage = beforeHeader;
     }
 
-    metadataConverter.writeDataPageV2Header(
-      uncompressedSize, compressedSize,
-      valueCount, nullCount, rowCount,
-      dataEncoding,
-      rlByteLength,
-      dlByteLength,
-      out);
+    if (pageWriteChecksumEnabled) {
+      crc.reset();
+      if (repetitionLevels.size() > 0) {
+        crc.update(repetitionLevels.toByteArray());
+      }
+      if (definitionLevels.size() > 0) {
+        crc.update(definitionLevels.toByteArray());
+      }
+      if (compressedData.size() > 0) {
+        crc.update(compressedData.toByteArray());
+      }
+      metadataConverter.writeDataPageV2Header(
+        uncompressedSize, compressedSize,
+        valueCount, nullCount, rowCount,
+        dataEncoding,
+        rlByteLength,
+        dlByteLength,
+        (int) crc.getValue(),
+        out,
+        metadataBlockEncryptor,
+        pageHeaderAAD);
+    } else {
+      metadataConverter.writeDataPageV2Header(
+        uncompressedSize, compressedSize,
+        valueCount, nullCount, rowCount,
+        dataEncoding,
+        rlByteLength,
+        dlByteLength,
+        out,
+        metadataBlockEncryptor,
+        pageHeaderAAD);
+    }
 
     long headersSize  = out.getPos() - beforeHeader;
     this.uncompressedLength += uncompressedSize + headersSize;
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
index d336aaf57..659ac1e5c 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
@@ -309,7 +309,7 @@ public class ParquetRewriter implements Closeable {
           ColumnChunkEncryptorRunTime columnChunkEncryptorRunTime = null;
           if (encryptMode) {
             columnChunkEncryptorRunTime =
-                    new ColumnChunkEncryptorRunTime(writer.getEncryptor(), 
chunk, numBlocksRewritten, columnId);
+               new ColumnChunkEncryptorRunTime(writer.getEncryptor(), chunk, 
numBlocksRewritten, columnId);
           }
 
           // Translate compression and/or encryption
@@ -374,7 +374,7 @@ public class ParquetRewriter implements Closeable {
     reader.setStreamPosition(chunk.getStartingPos());
     DictionaryPage dictionaryPage = null;
     long readValues = 0;
-    Statistics statistics = null;
+    Statistics<?> statistics = null;
     ParquetMetadataConverter converter = new ParquetMetadataConverter();
     int pageOrdinal = 0;
     long totalChunkValues = chunk.getValueCount();
@@ -385,7 +385,7 @@ public class ParquetRewriter implements Closeable {
       switch (pageHeader.type) {
         case DICTIONARY_PAGE:
           if (dictionaryPage != null) {
-            throw new IOException("has more than one dictionary page in column 
chunk");
+            throw new IOException("has more than one dictionary page in column 
chunk: " + chunk);
           }
           //No quickUpdatePageAAD needed for dictionary page
           DictionaryPageHeader dictPageHeader = 
pageHeader.dictionary_page_header;
@@ -398,12 +398,12 @@ public class ParquetRewriter implements Closeable {
                   encryptColumn,
                   dataEncryptor,
                   dictPageAAD);
-          writer.writeDictionaryPage(new 
DictionaryPage(BytesInput.from(pageLoad),
-                          pageHeader.getUncompressed_page_size(),
-                          dictPageHeader.getNum_values(),
-                          converter.getEncoding(dictPageHeader.getEncoding())),
-                  metaEncryptor,
-                  dictPageHeaderAAD);
+          dictionaryPage = new DictionaryPage(
+            BytesInput.from(pageLoad),
+            pageHeader.getUncompressed_page_size(),
+            dictPageHeader.getNum_values(),
+            converter.getEncoding(dictPageHeader.getEncoding()));
+          writer.writeDictionaryPage(dictionaryPage, metaEncryptor, 
dictPageHeaderAAD);
           break;
         case DATA_PAGE:
           if (encryptColumn) {
@@ -482,7 +482,9 @@ public class ParquetRewriter implements Closeable {
                   converter.getEncoding(headerV2.getEncoding()),
                   BytesInput.from(pageLoad),
                   rawDataLength,
-                  statistics);
+                  statistics,
+                  metaEncryptor,
+                  dataPageHeaderAAD);
           pageOrdinal++;
           break;
         default:
@@ -492,12 +494,12 @@ public class ParquetRewriter implements Closeable {
     }
   }
 
-  private Statistics convertStatistics(String createdBy,
-                                       PrimitiveType type,
-                                       org.apache.parquet.format.Statistics 
pageStatistics,
-                                       ColumnIndex columnIndex,
-                                       int pageIndex,
-                                       ParquetMetadataConverter converter) 
throws IOException {
+  private Statistics<?> convertStatistics(String createdBy,
+                                          PrimitiveType type,
+                                          org.apache.parquet.format.Statistics 
pageStatistics,
+                                          ColumnIndex columnIndex,
+                                          int pageIndex,
+                                          ParquetMetadataConverter converter) 
throws IOException {
     if (columnIndex != null) {
       if (columnIndex.getNullPages() == null) {
         throw new IOException("columnIndex has null variable 'nullPages' which 
indicates corrupted data for type: " +
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
index 62d0a28ca..1f03deceb 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
@@ -64,6 +64,8 @@ import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -89,14 +91,26 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
+@RunWith(Parameterized.class)
 public class ParquetRewriterTest {
 
   private final int numRecord = 100000;
-  private Configuration conf = new Configuration();
+  private final Configuration conf = new Configuration();
+  private final ParquetProperties.WriterVersion writerVersion;
+
   private List<EncryptionTestFile> inputFiles = null;
   private String outputFile = null;
   private ParquetRewriter rewriter = null;
 
+  @Parameterized.Parameters(name = "WriterVersion = {0}")
+  public static Object[] parameters() {
+    return new Object[] {"v1", "v2"};
+  }
+
+  public ParquetRewriterTest(String writerVersion) {
+    this.writerVersion = 
ParquetProperties.WriterVersion.fromString(writerVersion);
+  }
+
   private void testPruneSingleColumnTranslateCodec(List<Path> inputPaths) 
throws Exception {
     Path outputPath = new Path(outputFile);
     List<String> pruneColumns = Arrays.asList("Gender");
@@ -498,27 +512,29 @@ public class ParquetRewriterTest {
   @Test(expected = InvalidSchemaException.class)
   public void testMergeTwoFilesWithDifferentSchema() throws Exception {
     MessageType schema1 = new MessageType("schema",
-            new PrimitiveType(OPTIONAL, INT64, "DocId"),
-            new PrimitiveType(REQUIRED, BINARY, "Name"),
-            new PrimitiveType(OPTIONAL, BINARY, "Gender"),
-            new GroupType(OPTIONAL, "Links",
-                    new PrimitiveType(REPEATED, BINARY, "Backward"),
-                    new PrimitiveType(REPEATED, BINARY, "Forward")));
+      new PrimitiveType(OPTIONAL, INT64, "DocId"),
+      new PrimitiveType(REQUIRED, BINARY, "Name"),
+      new PrimitiveType(OPTIONAL, BINARY, "Gender"),
+      new GroupType(OPTIONAL, "Links",
+        new PrimitiveType(REPEATED, BINARY, "Backward"),
+        new PrimitiveType(REPEATED, BINARY, "Forward")));
     MessageType schema2 = new MessageType("schema",
-            new PrimitiveType(OPTIONAL, INT64, "DocId"),
-            new PrimitiveType(REQUIRED, BINARY, "Name"),
-            new PrimitiveType(OPTIONAL, BINARY, "Gender"));
+      new PrimitiveType(OPTIONAL, INT64, "DocId"),
+      new PrimitiveType(REQUIRED, BINARY, "Name"),
+      new PrimitiveType(OPTIONAL, BINARY, "Gender"));
     inputFiles = Lists.newArrayList();
     inputFiles.add(new TestFileBuilder(conf, schema1)
-            .withNumRecord(numRecord)
-            .withCodec("UNCOMPRESSED")
-            .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
-            .build());
+      .withNumRecord(numRecord)
+      .withCodec("UNCOMPRESSED")
+      .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+      .withWriterVersion(writerVersion)
+      .build());
     inputFiles.add(new TestFileBuilder(conf, schema2)
-            .withNumRecord(numRecord)
-            .withCodec("UNCOMPRESSED")
-            .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
-            .build());
+      .withNumRecord(numRecord)
+      .withCodec("UNCOMPRESSED")
+      .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+      .withWriterVersion(writerVersion)
+      .build());
 
     List<Path> inputPaths = new ArrayList<>();
     for (EncryptionTestFile inputFile : inputFiles) {
@@ -617,6 +633,7 @@ public class ParquetRewriterTest {
       .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
       .withRowGroupSize(rowGroupSize)
       .withBloomFilterEnabled(bloomFilterEnabledColumns)
+      .withWriterVersion(writerVersion)
       .build());
   }
 
@@ -624,26 +641,28 @@ public class ParquetRewriterTest {
     MessageType schema = createSchema();
     inputFiles = Lists.newArrayList();
     inputFiles.add(new TestFileBuilder(conf, schema)
-            .withNumRecord(numRecord)
-            .withCodec("GZIP")
-            .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
-            .build());
+      .withNumRecord(numRecord)
+      .withCodec("GZIP")
+      .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+      .withWriterVersion(writerVersion)
+      .build());
     inputFiles.add(new TestFileBuilder(conf, schema)
-            .withNumRecord(numRecord)
-            .withCodec("UNCOMPRESSED")
-            .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
-            .build());
+      .withNumRecord(numRecord)
+      .withCodec("UNCOMPRESSED")
+      .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+      .withWriterVersion(writerVersion)
+      .build());
 
   }
 
   private MessageType createSchema() {
     return new MessageType("schema",
-            new PrimitiveType(OPTIONAL, INT64, "DocId"),
-            new PrimitiveType(REQUIRED, BINARY, "Name"),
-            new PrimitiveType(OPTIONAL, BINARY, "Gender"),
-            new GroupType(OPTIONAL, "Links",
-                    new PrimitiveType(REPEATED, BINARY, "Backward"),
-                    new PrimitiveType(REPEATED, BINARY, "Forward")));
+      new PrimitiveType(OPTIONAL, INT64, "DocId"),
+      new PrimitiveType(REQUIRED, BINARY, "Name"),
+      new PrimitiveType(OPTIONAL, BINARY, "Gender"),
+      new GroupType(OPTIONAL, "Links",
+        new PrimitiveType(REPEATED, BINARY, "Backward"),
+        new PrimitiveType(REPEATED, BINARY, "Forward")));
   }
 
   private void validateColumnData(Set<String> prunePaths,
@@ -849,7 +868,6 @@ public class ParquetRewriterTest {
     assertNotNull(createdBy);
     assertEquals(createdBy, Version.FULL_VERSION);
 
-
     // Verify original.created.by has been set
     String inputCreatedBy = (String) inputCreatedBys[0];
     String originalCreatedBy = 
outFMD.getKeyValueMetaData().get(ParquetRewriter.ORIGINAL_CREATED_BY_KEY);

Reply via email to