This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new d2c3c6d2e PARQUET-1629: Support CRC for data page v2 (#1044)
d2c3c6d2e is described below

commit d2c3c6d2e761a17b75d56fd356a37a2f754072f7
Author: Gang Wu <[email protected]>
AuthorDate: Mon Jul 24 12:50:15 2023 +0800

    PARQUET-1629: Support CRC for data page v2 (#1044)
---
 .../format/converter/ParquetMetadataConverter.java |  32 +-
 .../parquet/hadoop/ColumnChunkPageReadStore.java   |  11 +-
 .../parquet/hadoop/ColumnChunkPageWriteStore.java  |  35 ++-
 .../apache/parquet/hadoop/ParquetFileReader.java   |  47 +--
 ...V1Checksums.java => TestDataPageChecksums.java} | 327 +++++++++++++++------
 5 files changed, 334 insertions(+), 118 deletions(-)

diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index d7788ac7a..09b21538e 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -1932,7 +1932,6 @@ public class ParquetMetadataConverter {
       int valueCount, int nullCount, int rowCount,
       org.apache.parquet.column.Encoding dataEncoding,
       int rlByteLength, int dlByteLength) {
-    // TODO: pageHeader.crc = ...;
     DataPageHeaderV2 dataPageHeaderV2 = new DataPageHeaderV2(
         valueCount, nullCount, rowCount,
         getEncoding(dataEncoding),
@@ -1942,6 +1941,37 @@ public class ParquetMetadataConverter {
     return pageHeader;
   }
 
+  public void writeDataPageV2Header(
+    int uncompressedSize, int compressedSize,
+    int valueCount, int nullCount, int rowCount,
+    org.apache.parquet.column.Encoding dataEncoding,
+    int rlByteLength, int dlByteLength, int crc,
+    OutputStream to, BlockCipher.Encryptor blockEncryptor,
+    byte[] pageHeaderAAD) throws IOException {
+    writePageHeader(
+      newDataPageV2Header(
+        uncompressedSize, compressedSize,
+        valueCount, nullCount, rowCount,
+        dataEncoding,
+        rlByteLength, dlByteLength, crc),
+      to, blockEncryptor, pageHeaderAAD);
+  }
+
+  private PageHeader newDataPageV2Header(
+    int uncompressedSize, int compressedSize,
+    int valueCount, int nullCount, int rowCount,
+    org.apache.parquet.column.Encoding dataEncoding,
+    int rlByteLength, int dlByteLength, int crc) {
+    DataPageHeaderV2 dataPageHeaderV2 = new DataPageHeaderV2(
+      valueCount, nullCount, rowCount,
+      getEncoding(dataEncoding),
+      dlByteLength, rlByteLength);
+    PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE_V2, 
uncompressedSize, compressedSize);
+    pageHeader.setData_page_header_v2(dataPageHeaderV2);
+    pageHeader.setCrc(crc);
+    return pageHeader;
+  }
+
   public void writeDictionaryPageHeader(
       int uncompressedSize, int compressedSize, int valueCount,
       org.apache.parquet.column.Encoding valuesEncoding, OutputStream to) 
throws IOException {
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
index 11447cc57..89625c4c1 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
@@ -251,9 +251,10 @@ class ColumnChunkPageReadStore implements PageReadStore, 
DictionaryPageReadStore
           } catch (IOException e) {
             throw new ParquetDecodingException("could not decompress page", e);
           }
-          
+
+          final DataPageV2 decompressedPage;
           if (offsetIndex == null) {
-            return DataPageV2.uncompressed(
+            decompressedPage = DataPageV2.uncompressed(
                 dataPageV2.getRowCount(),
                 dataPageV2.getNullCount(),
                 dataPageV2.getValueCount(),
@@ -263,7 +264,7 @@ class ColumnChunkPageReadStore implements PageReadStore, 
DictionaryPageReadStore
                 pageBytes,
                 dataPageV2.getStatistics());
           } else {
-            return DataPageV2.uncompressed(
+            decompressedPage = DataPageV2.uncompressed(
                 dataPageV2.getRowCount(),
                 dataPageV2.getNullCount(),
                 dataPageV2.getValueCount(),
@@ -274,6 +275,10 @@ class ColumnChunkPageReadStore implements PageReadStore, 
DictionaryPageReadStore
                 pageBytes,
                 dataPageV2.getStatistics());
           }
+          if (dataPageV2.getCrc().isPresent()) {
+            decompressedPage.setCrc(dataPageV2.getCrc().getAsInt());
+          }
+          return decompressedPage;
         } 
       });
     }
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
index 3c4d7a996..106424e96 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -255,15 +255,44 @@ public class ColumnChunkPageWriteStore implements 
PageWriteStore, BloomFilterWri
       if (null != headerBlockEncryptor) {
         AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
       }
-      parquetMetadataConverter.writeDataPageV2Header(
-          uncompressedSize, compressedSize,
-          valueCount, nullCount, rowCount,
+      if (pageWriteChecksumEnabled) {
+        crc.reset();
+        if (repetitionLevels.size() > 0) {
+          crc.update(repetitionLevels.toByteArray());
+        }
+        if (definitionLevels.size() > 0) {
+          crc.update(definitionLevels.toByteArray());
+        }
+        if (compressedData.size() > 0) {
+          crc.update(compressedData.toByteArray());
+        }
+        parquetMetadataConverter.writeDataPageV2Header(
+          uncompressedSize,
+          compressedSize,
+          valueCount,
+          nullCount,
+          rowCount,
           dataEncoding,
           rlByteLength,
           dlByteLength,
+          (int) crc.getValue(),
           tempOutputStream,
           headerBlockEncryptor,
           dataPageHeaderAAD);
+      } else {
+        parquetMetadataConverter.writeDataPageV2Header(
+          uncompressedSize,
+          compressedSize,
+          valueCount,
+          nullCount,
+          rowCount,
+          dataEncoding,
+          rlByteLength,
+          dlByteLength,
+          tempOutputStream,
+          headerBlockEncryptor,
+          dataPageHeaderAAD);
+      }
       this.uncompressedLength += uncompressedSize;
       this.compressedLength += compressedSize;
       this.totalValueCount += valueCount;
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 8472dc310..f63367022 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -1627,23 +1627,36 @@ public class ParquetFileReader implements Closeable {
             break;
           case DATA_PAGE_V2:
             DataPageHeaderV2 dataHeaderV2 = 
pageHeader.getData_page_header_v2();
-            int dataSize = compressedPageSize - 
dataHeaderV2.getRepetition_levels_byte_length() - 
dataHeaderV2.getDefinition_levels_byte_length();
-            pagesInChunk.add(
-                new DataPageV2(
-                    dataHeaderV2.getNum_rows(),
-                    dataHeaderV2.getNum_nulls(),
-                    dataHeaderV2.getNum_values(),
-                    
this.readAsBytesInput(dataHeaderV2.getRepetition_levels_byte_length()),
-                    
this.readAsBytesInput(dataHeaderV2.getDefinition_levels_byte_length()),
-                    converter.getEncoding(dataHeaderV2.getEncoding()),
-                    this.readAsBytesInput(dataSize),
-                    uncompressedPageSize,
-                    converter.fromParquetStatistics(
-                        getFileMetaData().getCreatedBy(),
-                        dataHeaderV2.getStatistics(),
-                        type),
-                    dataHeaderV2.isIs_compressed()
-                    ));
+            int dataSize = compressedPageSize - 
dataHeaderV2.getRepetition_levels_byte_length() -
+              dataHeaderV2.getDefinition_levels_byte_length();
+            final BytesInput repetitionLevels = 
this.readAsBytesInput(dataHeaderV2.getRepetition_levels_byte_length());
+            final BytesInput definitionLevels = 
this.readAsBytesInput(dataHeaderV2.getDefinition_levels_byte_length());
+            final BytesInput values = this.readAsBytesInput(dataSize);
+            if (options.usePageChecksumVerification() && 
pageHeader.isSetCrc()) {
+              pageBytes = BytesInput.concat(repetitionLevels, 
definitionLevels, values);
+              verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
+                "could not verify page integrity, CRC checksum verification 
failed");
+            }
+            DataPageV2 dataPageV2 = new DataPageV2(
+              dataHeaderV2.getNum_rows(),
+              dataHeaderV2.getNum_nulls(),
+              dataHeaderV2.getNum_values(),
+              repetitionLevels,
+              definitionLevels,
+              converter.getEncoding(dataHeaderV2.getEncoding()),
+              values,
+              uncompressedPageSize,
+              converter.fromParquetStatistics(
+                getFileMetaData().getCreatedBy(),
+                dataHeaderV2.getStatistics(),
+                type),
+              dataHeaderV2.isIs_compressed()
+            );
+            // Copy crc to new page, used for testing
+            if (pageHeader.isSetCrc()) {
+              dataPageV2.setCrc(pageHeader.getCrc());
+            }
+            pagesInChunk.add(dataPageV2);
             valuesCountReadSoFar += dataHeaderV2.getNum_values();
             ++dataPageCountReadSoFar;
             break;
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageV1Checksums.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageChecksums.java
similarity index 63%
rename from 
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageV1Checksums.java
rename to 
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageChecksums.java
index 61a9d6331..0f4353237 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageV1Checksums.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageChecksums.java
@@ -34,7 +34,10 @@ import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.bytes.HeapByteBufferAllocator;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DataPage;
 import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
 import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.page.Page;
 import org.apache.parquet.column.page.PageReadStore;
@@ -72,7 +75,7 @@ import static org.junit.Assert.fail;
  * Tests that page level checksums are correctly written and that checksum 
verification works as
  * expected
  */
-public class TestDataPageV1Checksums {
+public class TestDataPageChecksums {
   @Rule
   public final TemporaryFolder tempFolder = new TemporaryFolder();
 
@@ -99,7 +102,9 @@ public class TestDataPageV1Checksums {
   private static final int numRecordsLargeFile = (2 * PAGE_SIZE) / 
Integer.BYTES;
 
   /** Write out sample Parquet file using ColumnChunkPageWriteStore directly, 
return path to file */
-  private Path writeSimpleParquetFile(Configuration conf, CompressionCodecName 
compression)
+  private Path writeSimpleParquetFile(Configuration conf,
+                                      CompressionCodecName compression,
+                                      ParquetProperties.WriterVersion version)
     throws IOException {
     File file = tempFolder.newFile();
     file.delete();
@@ -125,17 +130,49 @@ public class TestDataPageV1Checksums {
       compressor, schemaSimple, new HeapByteBufferAllocator(),
       Integer.MAX_VALUE, 
ParquetOutputFormat.getPageWriteChecksumEnabled(conf));
 
-    PageWriter pageWriter = writeStore.getPageWriter(colADesc);
-    pageWriter.writePage(BytesInput.from(colAPage1Bytes), numRecordsLargeFile 
/ 2,
-      numRecordsLargeFile / 2, EMPTY_STATS_INT32, Encoding.RLE, Encoding.RLE, 
Encoding.PLAIN);
-    pageWriter.writePage(BytesInput.from(colAPage2Bytes), numRecordsLargeFile 
/ 2,
-      numRecordsLargeFile / 2, EMPTY_STATS_INT32, Encoding.RLE, Encoding.RLE, 
Encoding.PLAIN);
-
-    pageWriter = writeStore.getPageWriter(colBDesc);
-    pageWriter.writePage(BytesInput.from(colBPage1Bytes), numRecordsLargeFile 
/ 2,
-      numRecordsLargeFile / 2, EMPTY_STATS_INT32, Encoding.RLE, Encoding.RLE, 
Encoding.PLAIN);
-    pageWriter.writePage(BytesInput.from(colBPage2Bytes), numRecordsLargeFile 
/ 2,
-      numRecordsLargeFile / 2, EMPTY_STATS_INT32, Encoding.RLE, Encoding.RLE, 
Encoding.PLAIN);
+    if (version == ParquetProperties.WriterVersion.PARQUET_1_0) {
+      PageWriter pageWriter = writeStore.getPageWriter(colADesc);
+      pageWriter.writePage(BytesInput.from(colAPage1Bytes), 
numRecordsLargeFile / 2,
+        numRecordsLargeFile / 2, EMPTY_STATS_INT32, Encoding.RLE, 
Encoding.RLE, Encoding.PLAIN);
+      pageWriter.writePage(BytesInput.from(colAPage2Bytes), 
numRecordsLargeFile / 2,
+        numRecordsLargeFile / 2, EMPTY_STATS_INT32, Encoding.RLE, 
Encoding.RLE, Encoding.PLAIN);
+
+      pageWriter = writeStore.getPageWriter(colBDesc);
+      pageWriter.writePage(BytesInput.from(colBPage1Bytes), 
numRecordsLargeFile / 2,
+        numRecordsLargeFile / 2, EMPTY_STATS_INT32, Encoding.RLE, 
Encoding.RLE, Encoding.PLAIN);
+      pageWriter.writePage(BytesInput.from(colBPage2Bytes), 
numRecordsLargeFile / 2,
+        numRecordsLargeFile / 2, EMPTY_STATS_INT32, Encoding.RLE, 
Encoding.RLE, Encoding.PLAIN);
+    } else if (version == ParquetProperties.WriterVersion.PARQUET_2_0) {
+      PageWriter pageWriter = writeStore.getPageWriter(colADesc);
+      pageWriter.writePageV2(
+        numRecordsLargeFile / 2, 0, numRecordsLargeFile / 2,
+        BytesInput.from(colAPage1Bytes, 0, PAGE_SIZE / 4),
+        BytesInput.from(colAPage1Bytes, PAGE_SIZE / 4, PAGE_SIZE / 4),
+        Encoding.PLAIN, BytesInput.from(colAPage1Bytes, PAGE_SIZE / 2, 
PAGE_SIZE / 2),
+        EMPTY_STATS_INT32);
+      pageWriter.writePageV2(
+        numRecordsLargeFile / 2, 0, numRecordsLargeFile / 2,
+        BytesInput.from(colAPage2Bytes, 0, PAGE_SIZE / 4),
+        BytesInput.from(colAPage2Bytes, PAGE_SIZE / 4, PAGE_SIZE / 4),
+        Encoding.PLAIN, BytesInput.from(colAPage2Bytes, PAGE_SIZE / 2, 
PAGE_SIZE / 2),
+        EMPTY_STATS_INT32);
+
+      pageWriter = writeStore.getPageWriter(colBDesc);
+      pageWriter.writePageV2(
+        numRecordsLargeFile / 2, 0, numRecordsLargeFile / 2,
+        BytesInput.from(colBPage1Bytes, 0, PAGE_SIZE / 4),
+        BytesInput.from(colBPage1Bytes, PAGE_SIZE / 4, PAGE_SIZE / 4),
+        Encoding.PLAIN, BytesInput.from(colBPage1Bytes, PAGE_SIZE / 2, 
PAGE_SIZE / 2),
+        EMPTY_STATS_INT32);
+      pageWriter.writePageV2(
+        numRecordsLargeFile / 2, 0, numRecordsLargeFile / 2,
+        BytesInput.from(colBPage2Bytes, 0, PAGE_SIZE / 4),
+        BytesInput.from(colBPage2Bytes, PAGE_SIZE / 4, PAGE_SIZE / 4),
+        Encoding.PLAIN, BytesInput.from(colBPage2Bytes, PAGE_SIZE / 2, 
PAGE_SIZE / 2),
+        EMPTY_STATS_INT32);
+    } else {
+      throw new IllegalArgumentException("Unknown writer version: " + version);
+    }
 
     writeStore.flushToFileWriter(writer);
 
@@ -166,7 +203,8 @@ public class TestDataPageV1Checksums {
 
   private Path writeNestedWithNullsSampleParquetFile(Configuration conf,
                                                      boolean 
dictionaryEncoding,
-                                                     CompressionCodecName 
compression)
+                                                     CompressionCodecName 
compression,
+                                                     
ParquetProperties.WriterVersion version)
     throws IOException {
     File file = tempFolder.newFile();
     file.delete();
@@ -179,6 +217,7 @@ public class TestDataPageV1Checksums {
       .withDictionaryEncoding(dictionaryEncoding)
       .withType(schemaNestedWithNulls)
       
.withPageWriteChecksumEnabled(ParquetOutputFormat.getPageWriteChecksumEnabled(conf))
+      .withWriterVersion(version)
       .build()) {
       GroupFactory groupFactory = new 
SimpleGroupFactory(schemaNestedWithNulls);
       Random rand = new Random(42);
@@ -210,44 +249,52 @@ public class TestDataPageV1Checksums {
    * the crc checksums are correct. Tests whether we successfully write out 
correct crc checksums
    * without potentially failing on the read path verification .
    */
-  @Test
-  public void testWriteOnVerifyOff() throws IOException {
+  private void testWriteOnVerifyOff(ParquetProperties.WriterVersion version) 
throws IOException {
     Configuration conf = new Configuration();
     conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
     conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false);
 
-    Path path = writeSimpleParquetFile(conf, 
CompressionCodecName.UNCOMPRESSED);
+    Path path = writeSimpleParquetFile(conf, 
CompressionCodecName.UNCOMPRESSED, version);
 
     try (ParquetFileReader reader = getParquetFileReader(path, conf,
       Arrays.asList(colADesc, colBDesc))) {
       PageReadStore pageReadStore = reader.readNextRowGroup();
 
-      DataPageV1 colAPage1 = readNextPage(colADesc, pageReadStore);
+      DataPage colAPage1 = readNextPage(colADesc, pageReadStore);
       assertCrcSetAndCorrect(colAPage1, colAPage1Bytes);
-      assertCorrectContent(colAPage1.getBytes().toByteArray(), colAPage1Bytes);
+      assertCorrectContent(getPageBytes(colAPage1), colAPage1Bytes);
 
-      DataPageV1 colAPage2 = readNextPage(colADesc, pageReadStore);
+      DataPage colAPage2 = readNextPage(colADesc, pageReadStore);
       assertCrcSetAndCorrect(colAPage2, colAPage2Bytes);
-      assertCorrectContent(colAPage2.getBytes().toByteArray(), colAPage2Bytes);
+      assertCorrectContent(getPageBytes(colAPage2), colAPage2Bytes);
 
-      DataPageV1 colBPage1 = readNextPage(colBDesc, pageReadStore);
+      DataPage colBPage1 = readNextPage(colBDesc, pageReadStore);
       assertCrcSetAndCorrect(colBPage1, colBPage1Bytes);
-      assertCorrectContent(colBPage1.getBytes().toByteArray(), colBPage1Bytes);
+      assertCorrectContent(getPageBytes(colBPage1), colBPage1Bytes);
 
-      DataPageV1 colBPage2 = readNextPage(colBDesc, pageReadStore);
+      DataPage colBPage2 = readNextPage(colBDesc, pageReadStore);
       assertCrcSetAndCorrect(colBPage2, colBPage2Bytes);
-      assertCorrectContent(colBPage2.getBytes().toByteArray(), colBPage2Bytes);
+      assertCorrectContent(getPageBytes(colBPage2), colBPage2Bytes);
     }
   }
 
-  /** Test that we do not write out checksums if the feature is turned off */
   @Test
-  public void testWriteOffVerifyOff() throws IOException {
+  public void testWriteOnVerifyOffV1() throws IOException {
+    testWriteOnVerifyOff(ParquetProperties.WriterVersion.PARQUET_1_0);
+  }
+
+  @Test
+  public void testWriteOnVerifyOffV2() throws IOException {
+    testWriteOnVerifyOff(ParquetProperties.WriterVersion.PARQUET_2_0);
+  }
+
+  /** Test that we do not write out checksums if the feature is turned off */
+  private void testWriteOffVerifyOff(ParquetProperties.WriterVersion version) 
throws IOException {
     Configuration conf = new Configuration();
     conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false);
     conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false);
 
-    Path path = writeSimpleParquetFile(conf, 
CompressionCodecName.UNCOMPRESSED);
+    Path path = writeSimpleParquetFile(conf, 
CompressionCodecName.UNCOMPRESSED, version);
 
     try (ParquetFileReader reader = getParquetFileReader(path, conf,
       Arrays.asList(colADesc, colBDesc))) {
@@ -260,76 +307,103 @@ public class TestDataPageV1Checksums {
     }
   }
 
+  @Test
+  public void testWriteOffVerifyOffV1() throws IOException {
+    testWriteOffVerifyOff(ParquetProperties.WriterVersion.PARQUET_1_0);
+  }
+
+  @Test
+  public void testWriteOffVerifyOffV2() throws IOException {
+    testWriteOffVerifyOff(ParquetProperties.WriterVersion.PARQUET_2_0);
+  }
+
   /**
    * Do not write out page level crc checksums, but enable verification on the 
read path. Tests
    * that the read still succeeds and does not throw an exception.
    */
-  @Test
-  public void testWriteOffVerifyOn() throws IOException {
+  private void testWriteOffVerifyOn(ParquetProperties.WriterVersion version) 
throws IOException {
     Configuration conf = new Configuration();
     conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false);
     conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);
 
-    Path path = writeSimpleParquetFile(conf, 
CompressionCodecName.UNCOMPRESSED);
+    Path path = writeSimpleParquetFile(conf, 
CompressionCodecName.UNCOMPRESSED, version);
 
     try (ParquetFileReader reader = getParquetFileReader(path, conf,
       Arrays.asList(colADesc, colBDesc))) {
       PageReadStore pageReadStore = reader.readNextRowGroup();
 
-      assertCorrectContent(readNextPage(colADesc, 
pageReadStore).getBytes().toByteArray(),
+      assertCorrectContent(getPageBytes(readNextPage(colADesc, pageReadStore)),
         colAPage1Bytes);
-      assertCorrectContent(readNextPage(colADesc, 
pageReadStore).getBytes().toByteArray(),
+      assertCorrectContent(getPageBytes(readNextPage(colADesc, pageReadStore)),
         colAPage2Bytes);
-      assertCorrectContent(readNextPage(colBDesc, 
pageReadStore).getBytes().toByteArray(),
+      assertCorrectContent(getPageBytes(readNextPage(colBDesc, pageReadStore)),
         colBPage1Bytes);
-      assertCorrectContent(readNextPage(colBDesc, 
pageReadStore).getBytes().toByteArray(),
+      assertCorrectContent(getPageBytes(readNextPage(colBDesc, pageReadStore)),
         colBPage2Bytes);
     }
   }
 
+  @Test
+  public void testWriteOffVerifyOnV1() throws IOException {
+    testWriteOffVerifyOn(ParquetProperties.WriterVersion.PARQUET_1_0);
+  }
+
+  @Test
+  public void testWriteOffVerifyOnV2() throws IOException {
+    testWriteOffVerifyOn(ParquetProperties.WriterVersion.PARQUET_2_0);
+  }
+
   /**
    * Write out checksums and verify them on the read path. Tests that crc is 
set and that we can
    * read back what we wrote if checksums are enabled on both the write and 
read path.
    */
-  @Test
-  public void testWriteOnVerifyOn() throws IOException {
+  private void testWriteOnVerifyOn(ParquetProperties.WriterVersion version) 
throws IOException {
     Configuration conf = new Configuration();
     conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
     conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);
 
-    Path path = writeSimpleParquetFile(conf, 
CompressionCodecName.UNCOMPRESSED);
+    Path path = writeSimpleParquetFile(conf, 
CompressionCodecName.UNCOMPRESSED, version);
 
     try (ParquetFileReader reader = getParquetFileReader(path, conf,
       Arrays.asList(colADesc, colBDesc))) {
       PageReadStore pageReadStore = reader.readNextRowGroup();
 
-      DataPageV1 colAPage1 = readNextPage(colADesc, pageReadStore);
+      DataPage colAPage1 = readNextPage(colADesc, pageReadStore);
       assertCrcSetAndCorrect(colAPage1, colAPage1Bytes);
-      assertCorrectContent(colAPage1.getBytes().toByteArray(), colAPage1Bytes);
+      assertCorrectContent(getPageBytes(colAPage1), colAPage1Bytes);
 
-      DataPageV1 colAPage2 = readNextPage(colADesc, pageReadStore);
+      DataPage colAPage2 = readNextPage(colADesc, pageReadStore);
       assertCrcSetAndCorrect(colAPage2, colAPage2Bytes);
-      assertCorrectContent(colAPage2.getBytes().toByteArray(), colAPage2Bytes);
+      assertCorrectContent(getPageBytes(colAPage2), colAPage2Bytes);
 
-      DataPageV1 colBPage1 = readNextPage(colBDesc, pageReadStore);
+      DataPage colBPage1 = readNextPage(colBDesc, pageReadStore);
       assertCrcSetAndCorrect(colBPage1, colBPage1Bytes);
-      assertCorrectContent(colBPage1.getBytes().toByteArray(), colBPage1Bytes);
+      assertCorrectContent(getPageBytes(colBPage1), colBPage1Bytes);
 
-      DataPageV1 colBPage2 = readNextPage(colBDesc, pageReadStore);
+      DataPage colBPage2 = readNextPage(colBDesc, pageReadStore);
       assertCrcSetAndCorrect(colBPage2, colBPage2Bytes);
-      assertCorrectContent(colBPage2.getBytes().toByteArray(), colBPage2Bytes);
+      assertCorrectContent(getPageBytes(colBPage2), colBPage2Bytes);
     }
   }
 
+  @Test
+  public void testWriteOnVerifyOnV1() throws IOException {
+    testWriteOnVerifyOn(ParquetProperties.WriterVersion.PARQUET_1_0);
+  }
+
+  @Test
+  public void testWriteOnVerifyOnV2() throws IOException {
+    testWriteOnVerifyOn(ParquetProperties.WriterVersion.PARQUET_2_0);
+  }
+
   /**
    * Test whether corruption in the page content is detected by checksum 
verification
    */
-  @Test
-  public void testCorruptedPage() throws IOException {
+  private void testCorruptedPage(ParquetProperties.WriterVersion version) 
throws IOException {
     Configuration conf = new Configuration();
     conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
 
-    Path path = writeSimpleParquetFile(conf, 
CompressionCodecName.UNCOMPRESSED);
+    Path path = writeSimpleParquetFile(conf, 
CompressionCodecName.UNCOMPRESSED, version);
 
     InputFile inputFile = HadoopInputFile.fromPath(path, conf);
     try (SeekableInputStream inputStream = inputFile.newStream()) {
@@ -356,14 +430,14 @@ public class TestDataPageV1Checksums {
           Arrays.asList(colADesc, colBDesc))) {
           PageReadStore pageReadStore = reader.readNextRowGroup();
 
-          DataPageV1 colAPage1 = readNextPage(colADesc, pageReadStore);
+          DataPage colAPage1 = readNextPage(colADesc, pageReadStore);
           assertFalse("Data in page was not corrupted",
-            Arrays.equals(colAPage1.getBytes().toByteArray(), colAPage1Bytes));
+            Arrays.equals(getPageBytes(colAPage1), colAPage1Bytes));
           readNextPage(colADesc, pageReadStore);
           readNextPage(colBDesc, pageReadStore);
-          DataPageV1 colBPage2 = readNextPage(colBDesc, pageReadStore);
+          DataPage colBPage2 = readNextPage(colBDesc, pageReadStore);
           assertFalse("Data in page was not corrupted",
-            Arrays.equals(colBPage2.getBytes().toByteArray(), colBPage2Bytes));
+            Arrays.equals(getPageBytes(colBPage2), colBPage2Bytes));
         }
 
         // Now we enable checksum verification, the corruption should be 
detected
@@ -377,103 +451,130 @@ public class TestDataPageV1Checksums {
     }
   }
 
+  @Test
+  public void testCorruptedPageV1() throws IOException {
+    testCorruptedPage(ParquetProperties.WriterVersion.PARQUET_1_0);
+  }
+
+  @Test
+  public void testCorruptedPageV2() throws IOException {
+    testCorruptedPage(ParquetProperties.WriterVersion.PARQUET_2_0);
+  }
+
   /**
    * Tests that the checksum is calculated using the compressed version of the 
data and that
    * checksum verification succeeds
    */
-  @Test
-  public void testCompression() throws IOException {
+  private void testCompression(ParquetProperties.WriterVersion version) throws 
IOException {
     Configuration conf = new Configuration();
     conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
     conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);
 
-    Path path = writeSimpleParquetFile(conf, CompressionCodecName.SNAPPY);
+    Path path = writeSimpleParquetFile(conf, CompressionCodecName.SNAPPY, 
version);
 
     try (ParquetFileReader reader = getParquetFileReader(path, conf,
       Arrays.asList(colADesc, colBDesc))) {
       PageReadStore pageReadStore = reader.readNextRowGroup();
 
-      DataPageV1 colAPage1 = readNextPage(colADesc, pageReadStore);
-      assertCrcSetAndCorrect(colAPage1, snappy(colAPage1Bytes));
-      assertCorrectContent(colAPage1.getBytes().toByteArray(), colAPage1Bytes);
+      DataPage colAPage1 = readNextPage(colADesc, pageReadStore);
+      assertCrcSetAndCorrect(colAPage1, snappy(colAPage1Bytes, 
getDataOffset(colAPage1)));
+      assertCorrectContent(getPageBytes(colAPage1), colAPage1Bytes);
 
-      DataPageV1 colAPage2 = readNextPage(colADesc, pageReadStore);
-      assertCrcSetAndCorrect(colAPage2, snappy(colAPage2Bytes));
-      assertCorrectContent(colAPage2.getBytes().toByteArray(), colAPage2Bytes);
+      DataPage colAPage2 = readNextPage(colADesc, pageReadStore);
+      assertCrcSetAndCorrect(colAPage2, snappy(colAPage2Bytes, 
getDataOffset(colAPage2)));
+      assertCorrectContent(getPageBytes(colAPage2), colAPage2Bytes);
 
-      DataPageV1 colBPage1 = readNextPage(colBDesc, pageReadStore);
-      assertCrcSetAndCorrect(colBPage1, snappy(colBPage1Bytes));
-      assertCorrectContent(colBPage1.getBytes().toByteArray(), colBPage1Bytes);
+      DataPage colBPage1 = readNextPage(colBDesc, pageReadStore);
+      assertCrcSetAndCorrect(colBPage1, snappy(colBPage1Bytes, 
getDataOffset(colBPage1)));
+      assertCorrectContent(getPageBytes(colBPage1), colBPage1Bytes);
 
-      DataPageV1 colBPage2 = readNextPage(colBDesc, pageReadStore);
-      assertCrcSetAndCorrect(colBPage2, snappy(colBPage2Bytes));
-      assertCorrectContent(colBPage2.getBytes().toByteArray(), colBPage2Bytes);
+      DataPage colBPage2 = readNextPage(colBDesc, pageReadStore);
+      assertCrcSetAndCorrect(colBPage2, snappy(colBPage2Bytes, 
getDataOffset(colBPage2)));
+      assertCorrectContent(getPageBytes(colBPage2), colBPage2Bytes);
     }
   }
 
+  @Test
+  public void testCompressionV1() throws IOException {
+    testCompression(ParquetProperties.WriterVersion.PARQUET_1_0);
+  }
+
+  @Test
+  public void testCompressionV2() throws IOException {
+    testCompression(ParquetProperties.WriterVersion.PARQUET_2_0);
+  }
+
   /**
    * Tests that we adhere to the checksum calculation specification, namely 
that the crc is
    * calculated using the compressed concatenation of the repetition levels, 
definition levels and
    * the actual data. This is done by generating sample data with a nested 
schema containing nulls
-   * (generating non trivial repetition and definition levels).
+   * (generating non-trivial repetition and definition levels).
    */
-  @Test
-  public void testNestedWithNulls() throws IOException {
+  private void testNestedWithNulls(ParquetProperties.WriterVersion version) 
throws IOException {
     Configuration conf = new Configuration();
 
     // Write out sample file via the non-checksum code path, extract the raw 
bytes to calculate the
     // reference crc with
     conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false);
     conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false);
-    Path refPath = writeNestedWithNullsSampleParquetFile(conf, false, 
CompressionCodecName.SNAPPY);
+    Path refPath = writeNestedWithNullsSampleParquetFile(conf, false, 
CompressionCodecName.SNAPPY, version);
 
     try (ParquetFileReader refReader = getParquetFileReader(refPath, conf,
       Arrays.asList(colCIdDesc, colDValDesc))) {
       PageReadStore refPageReadStore = refReader.readNextRowGroup();
-      byte[] colCIdPageBytes = readNextPage(colCIdDesc, 
refPageReadStore).getBytes().toByteArray();
-      byte[] colDValPageBytes = readNextPage(colDValDesc, 
refPageReadStore).getBytes().toByteArray();
+      byte[] colCIdPageBytes = getPageBytes(readNextPage(colCIdDesc, 
refPageReadStore));
+      byte[] colDValPageBytes = getPageBytes(readNextPage(colDValDesc, 
refPageReadStore));
 
       // Write out sample file with checksums
       conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
       conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);
-      Path path = writeNestedWithNullsSampleParquetFile(conf, false, 
CompressionCodecName.SNAPPY);
+      Path path = writeNestedWithNullsSampleParquetFile(conf, false, 
CompressionCodecName.SNAPPY, version);
 
       try (ParquetFileReader reader = getParquetFileReader(path, conf,
         Arrays.asList(colCIdDesc, colDValDesc))) {
         PageReadStore pageReadStore = reader.readNextRowGroup();
 
-        DataPageV1 colCIdPage = readNextPage(colCIdDesc, pageReadStore);
-        assertCrcSetAndCorrect(colCIdPage, snappy(colCIdPageBytes));
-        assertCorrectContent(colCIdPage.getBytes().toByteArray(), 
colCIdPageBytes);
+        DataPage colCIdPage = readNextPage(colCIdDesc, pageReadStore);
+        assertCrcSetAndCorrect(colCIdPage, snappy(colCIdPageBytes, 
getDataOffset(colCIdPage)));
+        assertCorrectContent(getPageBytes(colCIdPage), colCIdPageBytes);
 
-        DataPageV1 colDValPage = readNextPage(colDValDesc, pageReadStore);
-        assertCrcSetAndCorrect(colDValPage, snappy(colDValPageBytes));
-        assertCorrectContent(colDValPage.getBytes().toByteArray(), 
colDValPageBytes);
+        DataPage colDValPage = readNextPage(colDValDesc, pageReadStore);
+        assertCrcSetAndCorrect(colDValPage, snappy(colDValPageBytes, 
getDataOffset(colDValPage)));
+        assertCorrectContent(getPageBytes(colDValPage), colDValPageBytes);
       }
     }
   }
 
   @Test
-  public void testDictionaryEncoding() throws IOException {
+  public void testNestedWithNullsV1() throws IOException {
+    testNestedWithNulls(ParquetProperties.WriterVersion.PARQUET_1_0);
+  }
+
+  @Test
+  public void testNestedWithNullsV2() throws IOException {
+    testNestedWithNulls(ParquetProperties.WriterVersion.PARQUET_2_0);
+  }
+
+  private void testDictionaryEncoding(ParquetProperties.WriterVersion version) 
throws IOException {
     Configuration conf = new Configuration();
 
     // Write out dictionary encoded sample file via the non-checksum code 
path, extract the raw
     // bytes to calculate the  reference crc with
     conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false);
     conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false);
-    Path refPath = writeNestedWithNullsSampleParquetFile(conf, true, 
CompressionCodecName.SNAPPY);
+    Path refPath = writeNestedWithNullsSampleParquetFile(conf, true, 
CompressionCodecName.SNAPPY, version);
 
     try (ParquetFileReader refReader =
       getParquetFileReader(refPath, conf, 
Collections.singletonList(colDValDesc))) {
       PageReadStore refPageReadStore = refReader.readNextRowGroup();
       // Read (decompressed) dictionary page
       byte[] dictPageBytes = readDictPage(colDValDesc, 
refPageReadStore).getBytes().toByteArray();
-      byte[] colDValPageBytes = readNextPage(colDValDesc, 
refPageReadStore).getBytes().toByteArray();
+      byte[] colDValPageBytes = getPageBytes(readNextPage(colDValDesc, 
refPageReadStore));
 
       // Write out sample file with checksums
       conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
       conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);
-      Path path = writeNestedWithNullsSampleParquetFile(conf, true, 
CompressionCodecName.SNAPPY);
+      Path path = writeNestedWithNullsSampleParquetFile(conf, true, 
CompressionCodecName.SNAPPY, version);
 
       try (ParquetFileReader reader =
         getParquetFileReader(path, conf, 
Collections.singletonList(colDValDesc))) {
@@ -483,22 +584,47 @@ public class TestDataPageV1Checksums {
         assertCrcSetAndCorrect(dictPage, snappy(dictPageBytes));
         assertCorrectContent(dictPage.getBytes().toByteArray(), dictPageBytes);
 
-        DataPageV1 colDValPage = readNextPage(colDValDesc, pageReadStore);
-        assertCrcSetAndCorrect(colDValPage, snappy(colDValPageBytes));
-        assertCorrectContent(colDValPage.getBytes().toByteArray(), 
colDValPageBytes);
+        DataPage colDValPage = readNextPage(colDValDesc, pageReadStore);
+        assertCrcSetAndCorrect(colDValPage, snappy(colDValPageBytes, 
getDataOffset(colDValPage)));
+        assertCorrectContent(getPageBytes(colDValPage), colDValPageBytes);
       }
     }
   }
 
+  @Test
+  public void testDictionaryEncodingV1() throws IOException {
+    testDictionaryEncoding(ParquetProperties.WriterVersion.PARQUET_1_0);
+  }
+
+  @Test
+  public void testDictionaryEncodingV2() throws IOException {
+    testDictionaryEncoding(ParquetProperties.WriterVersion.PARQUET_2_0);
+  }
+
   /** Compress using snappy */
-  private byte[] snappy(byte[] bytes) throws IOException {
+  private byte[] snappy(byte[] bytes, int offset) throws IOException {
+    int length = bytes.length - offset;
     SnappyCompressor compressor = new SnappyCompressor();
     compressor.reset();
-    compressor.setInput(bytes, 0, bytes.length);
+    compressor.setInput(bytes, offset, length);
     compressor.finish();
-    byte[] buffer = new byte[bytes.length * 2];
+    byte[] buffer = new byte[length * 2];
     int compressedSize = compressor.compress(buffer, 0, buffer.length);
-    return Arrays.copyOfRange(buffer, 0, compressedSize);
+    return BytesInput.concat(BytesInput.from(Arrays.copyOfRange(bytes, 0, 
offset)),
+      BytesInput.from(Arrays.copyOfRange(buffer, 0, 
compressedSize))).toByteArray();
+  }
+
+  private byte[] snappy(byte[] bytes) throws IOException {
+    return snappy(bytes, 0);
+  }
+
+  private int getDataOffset(Page page) {
+    if (page instanceof DataPageV2) {
+      return (int) (((DataPageV2) page).getRepetitionLevels().size() +
+        ((DataPageV2) page).getDefinitionLevels().size());
+    } else {
+      return 0;
+    }
   }
 
   /** Construct ParquetFileReader for input file and columns */
@@ -516,8 +642,8 @@ public class TestDataPageV1Checksums {
   }
 
   /** Read the next page for a column */
-  private DataPageV1 readNextPage(ColumnDescriptor colDesc, PageReadStore 
pageReadStore) {
-    return (DataPageV1) pageReadStore.getPageReader(colDesc).readPage();
+  private DataPage readNextPage(ColumnDescriptor colDesc, PageReadStore 
pageReadStore) {
+    return pageReadStore.getPageReader(colDesc).readPage();
   }
 
   /**
@@ -528,6 +654,19 @@ public class TestDataPageV1Checksums {
       pageBytes);
   }
 
+  private byte[] getPageBytes(DataPage page) throws IOException {
+    if (page instanceof DataPageV1) {
+      DataPageV1 pageV1 = (DataPageV1) page;
+      return pageV1.getBytes().toByteArray();
+    } else if (page instanceof DataPageV2) {
+      DataPageV2 pageV2 = (DataPageV2) page;
+      return BytesInput.concat(
+        pageV2.getRepetitionLevels(), pageV2.getDefinitionLevels(), 
pageV2.getData()).toByteArray();
+    } else {
+      throw new IllegalArgumentException("Unknown page type " + 
page.getClass().getName());
+    }
+  }
+
   /**
    * Verify that the crc is set in a page, calculate the reference crc using 
the reference bytes and
    * check that the crc's are identical.


Reply via email to