This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new d2c3c6d2e PARQUET-1629: Support CRC for data page v2 (#1044)
d2c3c6d2e is described below
commit d2c3c6d2e761a17b75d56fd356a37a2f754072f7
Author: Gang Wu <[email protected]>
AuthorDate: Mon Jul 24 12:50:15 2023 +0800
PARQUET-1629: Support CRC for data page v2 (#1044)
---
.../format/converter/ParquetMetadataConverter.java | 32 +-
.../parquet/hadoop/ColumnChunkPageReadStore.java | 11 +-
.../parquet/hadoop/ColumnChunkPageWriteStore.java | 35 ++-
.../apache/parquet/hadoop/ParquetFileReader.java | 47 +--
...V1Checksums.java => TestDataPageChecksums.java} | 327 +++++++++++++++------
5 files changed, 334 insertions(+), 118 deletions(-)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index d7788ac7a..09b21538e 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -1932,7 +1932,6 @@ public class ParquetMetadataConverter {
int valueCount, int nullCount, int rowCount,
org.apache.parquet.column.Encoding dataEncoding,
int rlByteLength, int dlByteLength) {
- // TODO: pageHeader.crc = ...;
DataPageHeaderV2 dataPageHeaderV2 = new DataPageHeaderV2(
valueCount, nullCount, rowCount,
getEncoding(dataEncoding),
@@ -1942,6 +1941,37 @@ public class ParquetMetadataConverter {
return pageHeader;
}
+ public void writeDataPageV2Header(
+ int uncompressedSize, int compressedSize,
+ int valueCount, int nullCount, int rowCount,
+ org.apache.parquet.column.Encoding dataEncoding,
+ int rlByteLength, int dlByteLength, int crc,
+ OutputStream to, BlockCipher.Encryptor blockEncryptor,
+ byte[] pageHeaderAAD) throws IOException {
+ writePageHeader(
+ newDataPageV2Header(
+ uncompressedSize, compressedSize,
+ valueCount, nullCount, rowCount,
+ dataEncoding,
+ rlByteLength, dlByteLength, crc),
+ to, blockEncryptor, pageHeaderAAD);
+ }
+
+ private PageHeader newDataPageV2Header(
+ int uncompressedSize, int compressedSize,
+ int valueCount, int nullCount, int rowCount,
+ org.apache.parquet.column.Encoding dataEncoding,
+ int rlByteLength, int dlByteLength, int crc) {
+ DataPageHeaderV2 dataPageHeaderV2 = new DataPageHeaderV2(
+ valueCount, nullCount, rowCount,
+ getEncoding(dataEncoding),
+ dlByteLength, rlByteLength);
+ PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE_V2,
uncompressedSize, compressedSize);
+ pageHeader.setData_page_header_v2(dataPageHeaderV2);
+ pageHeader.setCrc(crc);
+ return pageHeader;
+ }
+
public void writeDictionaryPageHeader(
int uncompressedSize, int compressedSize, int valueCount,
org.apache.parquet.column.Encoding valuesEncoding, OutputStream to)
throws IOException {
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
index 11447cc57..89625c4c1 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
@@ -251,9 +251,10 @@ class ColumnChunkPageReadStore implements PageReadStore,
DictionaryPageReadStore
} catch (IOException e) {
throw new ParquetDecodingException("could not decompress page", e);
}
-
+
+ final DataPageV2 decompressedPage;
if (offsetIndex == null) {
- return DataPageV2.uncompressed(
+ decompressedPage = DataPageV2.uncompressed(
dataPageV2.getRowCount(),
dataPageV2.getNullCount(),
dataPageV2.getValueCount(),
@@ -263,7 +264,7 @@ class ColumnChunkPageReadStore implements PageReadStore,
DictionaryPageReadStore
pageBytes,
dataPageV2.getStatistics());
} else {
- return DataPageV2.uncompressed(
+ decompressedPage = DataPageV2.uncompressed(
dataPageV2.getRowCount(),
dataPageV2.getNullCount(),
dataPageV2.getValueCount(),
@@ -274,6 +275,10 @@ class ColumnChunkPageReadStore implements PageReadStore,
DictionaryPageReadStore
pageBytes,
dataPageV2.getStatistics());
}
+ if (dataPageV2.getCrc().isPresent()) {
+ decompressedPage.setCrc(dataPageV2.getCrc().getAsInt());
+ }
+ return decompressedPage;
}
});
}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
index 3c4d7a996..106424e96 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -255,15 +255,44 @@ public class ColumnChunkPageWriteStore implements
PageWriteStore, BloomFilterWri
if (null != headerBlockEncryptor) {
AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
}
- parquetMetadataConverter.writeDataPageV2Header(
- uncompressedSize, compressedSize,
- valueCount, nullCount, rowCount,
+ if (pageWriteChecksumEnabled) {
+ crc.reset();
+ if (repetitionLevels.size() > 0) {
+ crc.update(repetitionLevels.toByteArray());
+ }
+ if (definitionLevels.size() > 0) {
+ crc.update(definitionLevels.toByteArray());
+ }
+ if (compressedData.size() > 0) {
+ crc.update(compressedData.toByteArray());
+ }
+ parquetMetadataConverter.writeDataPageV2Header(
+ uncompressedSize,
+ compressedSize,
+ valueCount,
+ nullCount,
+ rowCount,
dataEncoding,
rlByteLength,
dlByteLength,
+ (int) crc.getValue(),
tempOutputStream,
headerBlockEncryptor,
dataPageHeaderAAD);
+ } else {
+ parquetMetadataConverter.writeDataPageV2Header(
+ uncompressedSize,
+ compressedSize,
+ valueCount,
+ nullCount,
+ rowCount,
+ dataEncoding,
+ rlByteLength,
+ dlByteLength,
+ tempOutputStream,
+ headerBlockEncryptor,
+ dataPageHeaderAAD);
+ }
this.uncompressedLength += uncompressedSize;
this.compressedLength += compressedSize;
this.totalValueCount += valueCount;
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 8472dc310..f63367022 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -1627,23 +1627,36 @@ public class ParquetFileReader implements Closeable {
break;
case DATA_PAGE_V2:
DataPageHeaderV2 dataHeaderV2 =
pageHeader.getData_page_header_v2();
- int dataSize = compressedPageSize -
dataHeaderV2.getRepetition_levels_byte_length() -
dataHeaderV2.getDefinition_levels_byte_length();
- pagesInChunk.add(
- new DataPageV2(
- dataHeaderV2.getNum_rows(),
- dataHeaderV2.getNum_nulls(),
- dataHeaderV2.getNum_values(),
-
this.readAsBytesInput(dataHeaderV2.getRepetition_levels_byte_length()),
-
this.readAsBytesInput(dataHeaderV2.getDefinition_levels_byte_length()),
- converter.getEncoding(dataHeaderV2.getEncoding()),
- this.readAsBytesInput(dataSize),
- uncompressedPageSize,
- converter.fromParquetStatistics(
- getFileMetaData().getCreatedBy(),
- dataHeaderV2.getStatistics(),
- type),
- dataHeaderV2.isIs_compressed()
- ));
+ int dataSize = compressedPageSize -
dataHeaderV2.getRepetition_levels_byte_length() -
+ dataHeaderV2.getDefinition_levels_byte_length();
+ final BytesInput repetitionLevels =
this.readAsBytesInput(dataHeaderV2.getRepetition_levels_byte_length());
+ final BytesInput definitionLevels =
this.readAsBytesInput(dataHeaderV2.getDefinition_levels_byte_length());
+ final BytesInput values = this.readAsBytesInput(dataSize);
+ if (options.usePageChecksumVerification() &&
pageHeader.isSetCrc()) {
+ pageBytes = BytesInput.concat(repetitionLevels,
definitionLevels, values);
+ verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
+ "could not verify page integrity, CRC checksum verification
failed");
+ }
+ DataPageV2 dataPageV2 = new DataPageV2(
+ dataHeaderV2.getNum_rows(),
+ dataHeaderV2.getNum_nulls(),
+ dataHeaderV2.getNum_values(),
+ repetitionLevels,
+ definitionLevels,
+ converter.getEncoding(dataHeaderV2.getEncoding()),
+ values,
+ uncompressedPageSize,
+ converter.fromParquetStatistics(
+ getFileMetaData().getCreatedBy(),
+ dataHeaderV2.getStatistics(),
+ type),
+ dataHeaderV2.isIs_compressed()
+ );
+ // Copy crc to new page, used for testing
+ if (pageHeader.isSetCrc()) {
+ dataPageV2.setCrc(pageHeader.getCrc());
+ }
+ pagesInChunk.add(dataPageV2);
valuesCountReadSoFar += dataHeaderV2.getNum_values();
++dataPageCountReadSoFar;
break;
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageV1Checksums.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageChecksums.java
similarity index 63%
rename from
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageV1Checksums.java
rename to
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageChecksums.java
index 61a9d6331..0f4353237 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageV1Checksums.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageChecksums.java
@@ -34,7 +34,10 @@ import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.HeapByteBufferAllocator;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DataPage;
import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.Page;
import org.apache.parquet.column.page.PageReadStore;
@@ -72,7 +75,7 @@ import static org.junit.Assert.fail;
* Tests that page level checksums are correctly written and that checksum
verification works as
* expected
*/
-public class TestDataPageV1Checksums {
+public class TestDataPageChecksums {
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
@@ -99,7 +102,9 @@ public class TestDataPageV1Checksums {
private static final int numRecordsLargeFile = (2 * PAGE_SIZE) /
Integer.BYTES;
/** Write out sample Parquet file using ColumnChunkPageWriteStore directly,
return path to file */
- private Path writeSimpleParquetFile(Configuration conf, CompressionCodecName
compression)
+ private Path writeSimpleParquetFile(Configuration conf,
+ CompressionCodecName compression,
+ ParquetProperties.WriterVersion version)
throws IOException {
File file = tempFolder.newFile();
file.delete();
@@ -125,17 +130,49 @@ public class TestDataPageV1Checksums {
compressor, schemaSimple, new HeapByteBufferAllocator(),
Integer.MAX_VALUE,
ParquetOutputFormat.getPageWriteChecksumEnabled(conf));
- PageWriter pageWriter = writeStore.getPageWriter(colADesc);
- pageWriter.writePage(BytesInput.from(colAPage1Bytes), numRecordsLargeFile
/ 2,
- numRecordsLargeFile / 2, EMPTY_STATS_INT32, Encoding.RLE, Encoding.RLE,
Encoding.PLAIN);
- pageWriter.writePage(BytesInput.from(colAPage2Bytes), numRecordsLargeFile
/ 2,
- numRecordsLargeFile / 2, EMPTY_STATS_INT32, Encoding.RLE, Encoding.RLE,
Encoding.PLAIN);
-
- pageWriter = writeStore.getPageWriter(colBDesc);
- pageWriter.writePage(BytesInput.from(colBPage1Bytes), numRecordsLargeFile
/ 2,
- numRecordsLargeFile / 2, EMPTY_STATS_INT32, Encoding.RLE, Encoding.RLE,
Encoding.PLAIN);
- pageWriter.writePage(BytesInput.from(colBPage2Bytes), numRecordsLargeFile
/ 2,
- numRecordsLargeFile / 2, EMPTY_STATS_INT32, Encoding.RLE, Encoding.RLE,
Encoding.PLAIN);
+ if (version == ParquetProperties.WriterVersion.PARQUET_1_0) {
+ PageWriter pageWriter = writeStore.getPageWriter(colADesc);
+ pageWriter.writePage(BytesInput.from(colAPage1Bytes),
numRecordsLargeFile / 2,
+ numRecordsLargeFile / 2, EMPTY_STATS_INT32, Encoding.RLE,
Encoding.RLE, Encoding.PLAIN);
+ pageWriter.writePage(BytesInput.from(colAPage2Bytes),
numRecordsLargeFile / 2,
+ numRecordsLargeFile / 2, EMPTY_STATS_INT32, Encoding.RLE,
Encoding.RLE, Encoding.PLAIN);
+
+ pageWriter = writeStore.getPageWriter(colBDesc);
+ pageWriter.writePage(BytesInput.from(colBPage1Bytes),
numRecordsLargeFile / 2,
+ numRecordsLargeFile / 2, EMPTY_STATS_INT32, Encoding.RLE,
Encoding.RLE, Encoding.PLAIN);
+ pageWriter.writePage(BytesInput.from(colBPage2Bytes),
numRecordsLargeFile / 2,
+ numRecordsLargeFile / 2, EMPTY_STATS_INT32, Encoding.RLE,
Encoding.RLE, Encoding.PLAIN);
+ } else if (version == ParquetProperties.WriterVersion.PARQUET_2_0) {
+ PageWriter pageWriter = writeStore.getPageWriter(colADesc);
+ pageWriter.writePageV2(
+ numRecordsLargeFile / 2, 0, numRecordsLargeFile / 2,
+ BytesInput.from(colAPage1Bytes, 0, PAGE_SIZE / 4),
+ BytesInput.from(colAPage1Bytes, PAGE_SIZE / 4, PAGE_SIZE / 4),
+ Encoding.PLAIN, BytesInput.from(colAPage1Bytes, PAGE_SIZE / 2,
PAGE_SIZE / 2),
+ EMPTY_STATS_INT32);
+ pageWriter.writePageV2(
+ numRecordsLargeFile / 2, 0, numRecordsLargeFile / 2,
+ BytesInput.from(colAPage2Bytes, 0, PAGE_SIZE / 4),
+ BytesInput.from(colAPage2Bytes, PAGE_SIZE / 4, PAGE_SIZE / 4),
+ Encoding.PLAIN, BytesInput.from(colAPage2Bytes, PAGE_SIZE / 2,
PAGE_SIZE / 2),
+ EMPTY_STATS_INT32);
+
+ pageWriter = writeStore.getPageWriter(colBDesc);
+ pageWriter.writePageV2(
+ numRecordsLargeFile / 2, 0, numRecordsLargeFile / 2,
+ BytesInput.from(colBPage1Bytes, 0, PAGE_SIZE / 4),
+ BytesInput.from(colBPage1Bytes, PAGE_SIZE / 4, PAGE_SIZE / 4),
+ Encoding.PLAIN, BytesInput.from(colBPage1Bytes, PAGE_SIZE / 2,
PAGE_SIZE / 2),
+ EMPTY_STATS_INT32);
+ pageWriter.writePageV2(
+ numRecordsLargeFile / 2, 0, numRecordsLargeFile / 2,
+ BytesInput.from(colBPage2Bytes, 0, PAGE_SIZE / 4),
+ BytesInput.from(colBPage2Bytes, PAGE_SIZE / 4, PAGE_SIZE / 4),
+ Encoding.PLAIN, BytesInput.from(colBPage2Bytes, PAGE_SIZE / 2,
PAGE_SIZE / 2),
+ EMPTY_STATS_INT32);
+ } else {
+ throw new IllegalArgumentException("Unknown writer version: " + version);
+ }
writeStore.flushToFileWriter(writer);
@@ -166,7 +203,8 @@ public class TestDataPageV1Checksums {
private Path writeNestedWithNullsSampleParquetFile(Configuration conf,
boolean
dictionaryEncoding,
- CompressionCodecName
compression)
+ CompressionCodecName
compression,
+
ParquetProperties.WriterVersion version)
throws IOException {
File file = tempFolder.newFile();
file.delete();
@@ -179,6 +217,7 @@ public class TestDataPageV1Checksums {
.withDictionaryEncoding(dictionaryEncoding)
.withType(schemaNestedWithNulls)
.withPageWriteChecksumEnabled(ParquetOutputFormat.getPageWriteChecksumEnabled(conf))
+ .withWriterVersion(version)
.build()) {
GroupFactory groupFactory = new
SimpleGroupFactory(schemaNestedWithNulls);
Random rand = new Random(42);
@@ -210,44 +249,52 @@ public class TestDataPageV1Checksums {
* the crc checksums are correct. Tests whether we successfully write out
correct crc checksums
* without potentially failing on the read path verification .
*/
- @Test
- public void testWriteOnVerifyOff() throws IOException {
+ private void testWriteOnVerifyOff(ParquetProperties.WriterVersion version)
throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false);
- Path path = writeSimpleParquetFile(conf,
CompressionCodecName.UNCOMPRESSED);
+ Path path = writeSimpleParquetFile(conf,
CompressionCodecName.UNCOMPRESSED, version);
try (ParquetFileReader reader = getParquetFileReader(path, conf,
Arrays.asList(colADesc, colBDesc))) {
PageReadStore pageReadStore = reader.readNextRowGroup();
- DataPageV1 colAPage1 = readNextPage(colADesc, pageReadStore);
+ DataPage colAPage1 = readNextPage(colADesc, pageReadStore);
assertCrcSetAndCorrect(colAPage1, colAPage1Bytes);
- assertCorrectContent(colAPage1.getBytes().toByteArray(), colAPage1Bytes);
+ assertCorrectContent(getPageBytes(colAPage1), colAPage1Bytes);
- DataPageV1 colAPage2 = readNextPage(colADesc, pageReadStore);
+ DataPage colAPage2 = readNextPage(colADesc, pageReadStore);
assertCrcSetAndCorrect(colAPage2, colAPage2Bytes);
- assertCorrectContent(colAPage2.getBytes().toByteArray(), colAPage2Bytes);
+ assertCorrectContent(getPageBytes(colAPage2), colAPage2Bytes);
- DataPageV1 colBPage1 = readNextPage(colBDesc, pageReadStore);
+ DataPage colBPage1 = readNextPage(colBDesc, pageReadStore);
assertCrcSetAndCorrect(colBPage1, colBPage1Bytes);
- assertCorrectContent(colBPage1.getBytes().toByteArray(), colBPage1Bytes);
+ assertCorrectContent(getPageBytes(colBPage1), colBPage1Bytes);
- DataPageV1 colBPage2 = readNextPage(colBDesc, pageReadStore);
+ DataPage colBPage2 = readNextPage(colBDesc, pageReadStore);
assertCrcSetAndCorrect(colBPage2, colBPage2Bytes);
- assertCorrectContent(colBPage2.getBytes().toByteArray(), colBPage2Bytes);
+ assertCorrectContent(getPageBytes(colBPage2), colBPage2Bytes);
}
}
- /** Test that we do not write out checksums if the feature is turned off */
@Test
- public void testWriteOffVerifyOff() throws IOException {
+ public void testWriteOnVerifyOffV1() throws IOException {
+ testWriteOnVerifyOff(ParquetProperties.WriterVersion.PARQUET_1_0);
+ }
+
+ @Test
+ public void testWriteOnVerifyOffV2() throws IOException {
+ testWriteOnVerifyOff(ParquetProperties.WriterVersion.PARQUET_2_0);
+ }
+
+ /** Test that we do not write out checksums if the feature is turned off */
+ private void testWriteOffVerifyOff(ParquetProperties.WriterVersion version)
throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false);
conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false);
- Path path = writeSimpleParquetFile(conf,
CompressionCodecName.UNCOMPRESSED);
+ Path path = writeSimpleParquetFile(conf,
CompressionCodecName.UNCOMPRESSED, version);
try (ParquetFileReader reader = getParquetFileReader(path, conf,
Arrays.asList(colADesc, colBDesc))) {
@@ -260,76 +307,103 @@ public class TestDataPageV1Checksums {
}
}
+ @Test
+ public void testWriteOffVerifyOffV1() throws IOException {
+ testWriteOffVerifyOff(ParquetProperties.WriterVersion.PARQUET_1_0);
+ }
+
+ @Test
+ public void testWriteOffVerifyOffV2() throws IOException {
+ testWriteOffVerifyOff(ParquetProperties.WriterVersion.PARQUET_2_0);
+ }
+
/**
* Do not write out page level crc checksums, but enable verification on the
read path. Tests
* that the read still succeeds and does not throw an exception.
*/
- @Test
- public void testWriteOffVerifyOn() throws IOException {
+ private void testWriteOffVerifyOn(ParquetProperties.WriterVersion version)
throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false);
conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);
- Path path = writeSimpleParquetFile(conf,
CompressionCodecName.UNCOMPRESSED);
+ Path path = writeSimpleParquetFile(conf,
CompressionCodecName.UNCOMPRESSED, version);
try (ParquetFileReader reader = getParquetFileReader(path, conf,
Arrays.asList(colADesc, colBDesc))) {
PageReadStore pageReadStore = reader.readNextRowGroup();
- assertCorrectContent(readNextPage(colADesc,
pageReadStore).getBytes().toByteArray(),
+ assertCorrectContent(getPageBytes(readNextPage(colADesc, pageReadStore)),
colAPage1Bytes);
- assertCorrectContent(readNextPage(colADesc,
pageReadStore).getBytes().toByteArray(),
+ assertCorrectContent(getPageBytes(readNextPage(colADesc, pageReadStore)),
colAPage2Bytes);
- assertCorrectContent(readNextPage(colBDesc,
pageReadStore).getBytes().toByteArray(),
+ assertCorrectContent(getPageBytes(readNextPage(colBDesc, pageReadStore)),
colBPage1Bytes);
- assertCorrectContent(readNextPage(colBDesc,
pageReadStore).getBytes().toByteArray(),
+ assertCorrectContent(getPageBytes(readNextPage(colBDesc, pageReadStore)),
colBPage2Bytes);
}
}
+ @Test
+ public void testWriteOffVerifyOnV1() throws IOException {
+ testWriteOffVerifyOn(ParquetProperties.WriterVersion.PARQUET_1_0);
+ }
+
+ @Test
+ public void testWriteOffVerifyOnV2() throws IOException {
+ testWriteOffVerifyOn(ParquetProperties.WriterVersion.PARQUET_2_0);
+ }
+
/**
* Write out checksums and verify them on the read path. Tests that crc is
set and that we can
* read back what we wrote if checksums are enabled on both the write and
read path.
*/
- @Test
- public void testWriteOnVerifyOn() throws IOException {
+ private void testWriteOnVerifyOn(ParquetProperties.WriterVersion version)
throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);
- Path path = writeSimpleParquetFile(conf,
CompressionCodecName.UNCOMPRESSED);
+ Path path = writeSimpleParquetFile(conf,
CompressionCodecName.UNCOMPRESSED, version);
try (ParquetFileReader reader = getParquetFileReader(path, conf,
Arrays.asList(colADesc, colBDesc))) {
PageReadStore pageReadStore = reader.readNextRowGroup();
- DataPageV1 colAPage1 = readNextPage(colADesc, pageReadStore);
+ DataPage colAPage1 = readNextPage(colADesc, pageReadStore);
assertCrcSetAndCorrect(colAPage1, colAPage1Bytes);
- assertCorrectContent(colAPage1.getBytes().toByteArray(), colAPage1Bytes);
+ assertCorrectContent(getPageBytes(colAPage1), colAPage1Bytes);
- DataPageV1 colAPage2 = readNextPage(colADesc, pageReadStore);
+ DataPage colAPage2 = readNextPage(colADesc, pageReadStore);
assertCrcSetAndCorrect(colAPage2, colAPage2Bytes);
- assertCorrectContent(colAPage2.getBytes().toByteArray(), colAPage2Bytes);
+ assertCorrectContent(getPageBytes(colAPage2), colAPage2Bytes);
- DataPageV1 colBPage1 = readNextPage(colBDesc, pageReadStore);
+ DataPage colBPage1 = readNextPage(colBDesc, pageReadStore);
assertCrcSetAndCorrect(colBPage1, colBPage1Bytes);
- assertCorrectContent(colBPage1.getBytes().toByteArray(), colBPage1Bytes);
+ assertCorrectContent(getPageBytes(colBPage1), colBPage1Bytes);
- DataPageV1 colBPage2 = readNextPage(colBDesc, pageReadStore);
+ DataPage colBPage2 = readNextPage(colBDesc, pageReadStore);
assertCrcSetAndCorrect(colBPage2, colBPage2Bytes);
- assertCorrectContent(colBPage2.getBytes().toByteArray(), colBPage2Bytes);
+ assertCorrectContent(getPageBytes(colBPage2), colBPage2Bytes);
}
}
+ @Test
+ public void testWriteOnVerifyOnV1() throws IOException {
+ testWriteOnVerifyOn(ParquetProperties.WriterVersion.PARQUET_1_0);
+ }
+
+ @Test
+ public void testWriteOnVerifyOnV2() throws IOException {
+ testWriteOnVerifyOn(ParquetProperties.WriterVersion.PARQUET_2_0);
+ }
+
/**
* Test whether corruption in the page content is detected by checksum
verification
*/
- @Test
- public void testCorruptedPage() throws IOException {
+ private void testCorruptedPage(ParquetProperties.WriterVersion version)
throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
- Path path = writeSimpleParquetFile(conf,
CompressionCodecName.UNCOMPRESSED);
+ Path path = writeSimpleParquetFile(conf,
CompressionCodecName.UNCOMPRESSED, version);
InputFile inputFile = HadoopInputFile.fromPath(path, conf);
try (SeekableInputStream inputStream = inputFile.newStream()) {
@@ -356,14 +430,14 @@ public class TestDataPageV1Checksums {
Arrays.asList(colADesc, colBDesc))) {
PageReadStore pageReadStore = reader.readNextRowGroup();
- DataPageV1 colAPage1 = readNextPage(colADesc, pageReadStore);
+ DataPage colAPage1 = readNextPage(colADesc, pageReadStore);
assertFalse("Data in page was not corrupted",
- Arrays.equals(colAPage1.getBytes().toByteArray(), colAPage1Bytes));
+ Arrays.equals(getPageBytes(colAPage1), colAPage1Bytes));
readNextPage(colADesc, pageReadStore);
readNextPage(colBDesc, pageReadStore);
- DataPageV1 colBPage2 = readNextPage(colBDesc, pageReadStore);
+ DataPage colBPage2 = readNextPage(colBDesc, pageReadStore);
assertFalse("Data in page was not corrupted",
- Arrays.equals(colBPage2.getBytes().toByteArray(), colBPage2Bytes));
+ Arrays.equals(getPageBytes(colBPage2), colBPage2Bytes));
}
// Now we enable checksum verification, the corruption should be
detected
@@ -377,103 +451,130 @@ public class TestDataPageV1Checksums {
}
}
+ @Test
+ public void testCorruptedPageV1() throws IOException {
+ testCorruptedPage(ParquetProperties.WriterVersion.PARQUET_1_0);
+ }
+
+ @Test
+ public void testCorruptedPageV2() throws IOException {
+ testCorruptedPage(ParquetProperties.WriterVersion.PARQUET_2_0);
+ }
+
/**
* Tests that the checksum is calculated using the compressed version of the
data and that
* checksum verification succeeds
*/
- @Test
- public void testCompression() throws IOException {
+ private void testCompression(ParquetProperties.WriterVersion version) throws
IOException {
Configuration conf = new Configuration();
conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);
- Path path = writeSimpleParquetFile(conf, CompressionCodecName.SNAPPY);
+ Path path = writeSimpleParquetFile(conf, CompressionCodecName.SNAPPY,
version);
try (ParquetFileReader reader = getParquetFileReader(path, conf,
Arrays.asList(colADesc, colBDesc))) {
PageReadStore pageReadStore = reader.readNextRowGroup();
- DataPageV1 colAPage1 = readNextPage(colADesc, pageReadStore);
- assertCrcSetAndCorrect(colAPage1, snappy(colAPage1Bytes));
- assertCorrectContent(colAPage1.getBytes().toByteArray(), colAPage1Bytes);
+ DataPage colAPage1 = readNextPage(colADesc, pageReadStore);
+ assertCrcSetAndCorrect(colAPage1, snappy(colAPage1Bytes,
getDataOffset(colAPage1)));
+ assertCorrectContent(getPageBytes(colAPage1), colAPage1Bytes);
- DataPageV1 colAPage2 = readNextPage(colADesc, pageReadStore);
- assertCrcSetAndCorrect(colAPage2, snappy(colAPage2Bytes));
- assertCorrectContent(colAPage2.getBytes().toByteArray(), colAPage2Bytes);
+ DataPage colAPage2 = readNextPage(colADesc, pageReadStore);
+ assertCrcSetAndCorrect(colAPage2, snappy(colAPage2Bytes,
getDataOffset(colAPage2)));
+ assertCorrectContent(getPageBytes(colAPage2), colAPage2Bytes);
- DataPageV1 colBPage1 = readNextPage(colBDesc, pageReadStore);
- assertCrcSetAndCorrect(colBPage1, snappy(colBPage1Bytes));
- assertCorrectContent(colBPage1.getBytes().toByteArray(), colBPage1Bytes);
+ DataPage colBPage1 = readNextPage(colBDesc, pageReadStore);
+ assertCrcSetAndCorrect(colBPage1, snappy(colBPage1Bytes,
getDataOffset(colBPage1)));
+ assertCorrectContent(getPageBytes(colBPage1), colBPage1Bytes);
- DataPageV1 colBPage2 = readNextPage(colBDesc, pageReadStore);
- assertCrcSetAndCorrect(colBPage2, snappy(colBPage2Bytes));
- assertCorrectContent(colBPage2.getBytes().toByteArray(), colBPage2Bytes);
+ DataPage colBPage2 = readNextPage(colBDesc, pageReadStore);
+ assertCrcSetAndCorrect(colBPage2, snappy(colBPage2Bytes,
getDataOffset(colBPage2)));
+ assertCorrectContent(getPageBytes(colBPage2), colBPage2Bytes);
}
}
+ @Test
+ public void testCompressionV1() throws IOException {
+ testCompression(ParquetProperties.WriterVersion.PARQUET_1_0);
+ }
+
+ @Test
+ public void testCompressionV2() throws IOException {
+ testCompression(ParquetProperties.WriterVersion.PARQUET_2_0);
+ }
+
/**
* Tests that we adhere to the checksum calculation specification, namely
that the crc is
* calculated using the compressed concatenation of the repetition levels,
definition levels and
* the actual data. This is done by generating sample data with a nested
schema containing nulls
- * (generating non trivial repetition and definition levels).
+ * (generating non-trivial repetition and definition levels).
*/
- @Test
- public void testNestedWithNulls() throws IOException {
+ private void testNestedWithNulls(ParquetProperties.WriterVersion version)
throws IOException {
Configuration conf = new Configuration();
// Write out sample file via the non-checksum code path, extract the raw
bytes to calculate the
// reference crc with
conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false);
conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false);
- Path refPath = writeNestedWithNullsSampleParquetFile(conf, false,
CompressionCodecName.SNAPPY);
+ Path refPath = writeNestedWithNullsSampleParquetFile(conf, false,
CompressionCodecName.SNAPPY, version);
try (ParquetFileReader refReader = getParquetFileReader(refPath, conf,
Arrays.asList(colCIdDesc, colDValDesc))) {
PageReadStore refPageReadStore = refReader.readNextRowGroup();
- byte[] colCIdPageBytes = readNextPage(colCIdDesc,
refPageReadStore).getBytes().toByteArray();
- byte[] colDValPageBytes = readNextPage(colDValDesc,
refPageReadStore).getBytes().toByteArray();
+ byte[] colCIdPageBytes = getPageBytes(readNextPage(colCIdDesc,
refPageReadStore));
+ byte[] colDValPageBytes = getPageBytes(readNextPage(colDValDesc,
refPageReadStore));
// Write out sample file with checksums
conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);
- Path path = writeNestedWithNullsSampleParquetFile(conf, false,
CompressionCodecName.SNAPPY);
+ Path path = writeNestedWithNullsSampleParquetFile(conf, false,
CompressionCodecName.SNAPPY, version);
try (ParquetFileReader reader = getParquetFileReader(path, conf,
Arrays.asList(colCIdDesc, colDValDesc))) {
PageReadStore pageReadStore = reader.readNextRowGroup();
- DataPageV1 colCIdPage = readNextPage(colCIdDesc, pageReadStore);
- assertCrcSetAndCorrect(colCIdPage, snappy(colCIdPageBytes));
- assertCorrectContent(colCIdPage.getBytes().toByteArray(),
colCIdPageBytes);
+ DataPage colCIdPage = readNextPage(colCIdDesc, pageReadStore);
+ assertCrcSetAndCorrect(colCIdPage, snappy(colCIdPageBytes,
getDataOffset(colCIdPage)));
+ assertCorrectContent(getPageBytes(colCIdPage), colCIdPageBytes);
- DataPageV1 colDValPage = readNextPage(colDValDesc, pageReadStore);
- assertCrcSetAndCorrect(colDValPage, snappy(colDValPageBytes));
- assertCorrectContent(colDValPage.getBytes().toByteArray(),
colDValPageBytes);
+ DataPage colDValPage = readNextPage(colDValDesc, pageReadStore);
+ assertCrcSetAndCorrect(colDValPage, snappy(colDValPageBytes,
getDataOffset(colDValPage)));
+ assertCorrectContent(getPageBytes(colDValPage), colDValPageBytes);
}
}
}
@Test
- public void testDictionaryEncoding() throws IOException {
+ public void testNestedWithNullsV1() throws IOException {
+ testNestedWithNulls(ParquetProperties.WriterVersion.PARQUET_1_0);
+ }
+
+ @Test
+ public void testNestedWithNullsV2() throws IOException {
+ testNestedWithNulls(ParquetProperties.WriterVersion.PARQUET_2_0);
+ }
+
+ private void testDictionaryEncoding(ParquetProperties.WriterVersion version)
throws IOException {
Configuration conf = new Configuration();
// Write out dictionary encoded sample file via the non-checksum code
path, extract the raw
// bytes to calculate the reference crc with
conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false);
conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false);
- Path refPath = writeNestedWithNullsSampleParquetFile(conf, true,
CompressionCodecName.SNAPPY);
+ Path refPath = writeNestedWithNullsSampleParquetFile(conf, true,
CompressionCodecName.SNAPPY, version);
try (ParquetFileReader refReader =
getParquetFileReader(refPath, conf,
Collections.singletonList(colDValDesc))) {
PageReadStore refPageReadStore = refReader.readNextRowGroup();
// Read (decompressed) dictionary page
byte[] dictPageBytes = readDictPage(colDValDesc,
refPageReadStore).getBytes().toByteArray();
- byte[] colDValPageBytes = readNextPage(colDValDesc,
refPageReadStore).getBytes().toByteArray();
+ byte[] colDValPageBytes = getPageBytes(readNextPage(colDValDesc,
refPageReadStore));
// Write out sample file with checksums
conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);
- Path path = writeNestedWithNullsSampleParquetFile(conf, true,
CompressionCodecName.SNAPPY);
+ Path path = writeNestedWithNullsSampleParquetFile(conf, true,
CompressionCodecName.SNAPPY, version);
try (ParquetFileReader reader =
getParquetFileReader(path, conf,
Collections.singletonList(colDValDesc))) {
@@ -483,22 +584,47 @@ public class TestDataPageV1Checksums {
assertCrcSetAndCorrect(dictPage, snappy(dictPageBytes));
assertCorrectContent(dictPage.getBytes().toByteArray(), dictPageBytes);
- DataPageV1 colDValPage = readNextPage(colDValDesc, pageReadStore);
- assertCrcSetAndCorrect(colDValPage, snappy(colDValPageBytes));
- assertCorrectContent(colDValPage.getBytes().toByteArray(),
colDValPageBytes);
+ DataPage colDValPage = readNextPage(colDValDesc, pageReadStore);
+ assertCrcSetAndCorrect(colDValPage, snappy(colDValPageBytes,
getDataOffset(colDValPage)));
+ assertCorrectContent(getPageBytes(colDValPage), colDValPageBytes);
}
}
}
+ @Test
+ public void testDictionaryEncodingV1() throws IOException {
+ testDictionaryEncoding(ParquetProperties.WriterVersion.PARQUET_1_0);
+ }
+
+ @Test
+ public void testDictionaryEncodingV2() throws IOException {
+ testDictionaryEncoding(ParquetProperties.WriterVersion.PARQUET_2_0);
+ }
+
/** Compress using snappy */
- private byte[] snappy(byte[] bytes) throws IOException {
+ private byte[] snappy(byte[] bytes, int offset) throws IOException {
+ int length = bytes.length - offset;
SnappyCompressor compressor = new SnappyCompressor();
compressor.reset();
- compressor.setInput(bytes, 0, bytes.length);
+ compressor.setInput(bytes, offset, length);
compressor.finish();
- byte[] buffer = new byte[bytes.length * 2];
+ byte[] buffer = new byte[length * 2];
int compressedSize = compressor.compress(buffer, 0, buffer.length);
- return Arrays.copyOfRange(buffer, 0, compressedSize);
+ return BytesInput.concat(BytesInput.from(Arrays.copyOfRange(bytes, 0,
offset)),
+ BytesInput.from(Arrays.copyOfRange(buffer, 0,
compressedSize))).toByteArray();
+ }
+
+ private byte[] snappy(byte[] bytes) throws IOException {
+ return snappy(bytes, 0);
+ }
+
+ private int getDataOffset(Page page) {
+ if (page instanceof DataPageV2) {
+ return (int) (((DataPageV2) page).getRepetitionLevels().size() +
+ ((DataPageV2) page).getDefinitionLevels().size());
+ } else {
+ return 0;
+ }
}
/** Construct ParquetFileReader for input file and columns */
@@ -516,8 +642,8 @@ public class TestDataPageV1Checksums {
}
/** Read the next page for a column */
- private DataPageV1 readNextPage(ColumnDescriptor colDesc, PageReadStore
pageReadStore) {
- return (DataPageV1) pageReadStore.getPageReader(colDesc).readPage();
+ private DataPage readNextPage(ColumnDescriptor colDesc, PageReadStore
pageReadStore) {
+ return pageReadStore.getPageReader(colDesc).readPage();
}
/**
@@ -528,6 +654,19 @@ public class TestDataPageV1Checksums {
pageBytes);
}
+ private byte[] getPageBytes(DataPage page) throws IOException {
+ if (page instanceof DataPageV1) {
+ DataPageV1 pageV1 = (DataPageV1) page;
+ return pageV1.getBytes().toByteArray();
+ } else if (page instanceof DataPageV2) {
+ DataPageV2 pageV2 = (DataPageV2) page;
+ return BytesInput.concat(
+ pageV2.getRepetitionLevels(), pageV2.getDefinitionLevels(),
pageV2.getData()).toByteArray();
+ } else {
+ throw new IllegalArgumentException("Unknown page type " +
page.getClass().getName());
+ }
+ }
+
/**
* Verify that the crc is set in a page, calculate the reference crc using
the reference bytes and
* check that the crc's are identical.