This is an automated email from the ASF dual-hosted git repository.
wjones127 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 99ac74e689 GH-35171: [C++][Parquet] Implement CRC for data page v2
(#35242)
99ac74e689 is described below
commit 99ac74e6890767e1a3cf96d7aff0b96b9503153c
Author: mwish <[email protected]>
AuthorDate: Mon May 1 22:56:14 2023 +0800
GH-35171: [C++][Parquet] Implement CRC for data page v2 (#35242)
### Rationale for this change
Implement crc for data page v2.
### What changes are included in this PR?
1. When writing, able to write crc for data page v2
2. When reading, able to read crc for data page v2
3. Testing
### Are these changes tested?
Yes, but parquet-testing don't have crc page v2 test. So no integration test
### Are there any user-facing changes?
* Closes: #35171
Authored-by: mwish <[email protected]>
Signed-off-by: Will Jones <[email protected]>
---
cpp/src/parquet/arrow/arrow_reader_writer_test.cc | 2 +-
cpp/src/parquet/arrow/reader.h | 2 +-
cpp/src/parquet/column_reader.cc | 12 +++--
cpp/src/parquet/column_writer.cc | 3 +-
cpp/src/parquet/file_deserialize_test.cc | 56 ++++++++++++++++++++---
cpp/src/parquet/types.cc | 11 +++++
cpp/src/parquet/types.h | 2 +
7 files changed, 73 insertions(+), 15 deletions(-)
diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
index 831513a94d..ad33ca296a 100644
--- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
+++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
@@ -4092,7 +4092,7 @@ TEST_P(TestArrowWriteDictionary, Statistics) {
{{"b", "a"}, {"b", "a"}}, {{"c", "c"}, {"c", "c"}}, {{"d", "a"}, {"d",
"a"}}};
const std::vector<std::vector<std::vector<bool>>>
expected_has_min_max_by_page = {
{{true, true}, {true, true}},
- {{true, true}, {true, true}},
+ {{true, false}, {true, false}},
{{true, true}, {true, true}},
{{false}, {false}}};
diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h
index 5dff35e887..2cbd36176f 100644
--- a/cpp/src/parquet/arrow/reader.h
+++ b/cpp/src/parquet/arrow/reader.h
@@ -87,7 +87,7 @@ class RowGroupReader;
// optional int32 val4;
// }
//
-// In the Parquet file, there are 3 leaf nodes:
+// In the Parquet file, there are 4 leaf nodes:
//
// * data.record.val1
// * data.record.val2
diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index 61e007d612..d670f3d676 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -441,6 +441,8 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
UpdateDecryption(crypto_ctx_.meta_decryptor,
encryption::kDictionaryPageHeader,
&data_page_header_aad_);
}
+ // Reset current page header to avoid unclearing the __isset flag.
+ current_page_header_ = format::PageHeader();
deserializer.DeserializeMessage(reinterpret_cast<const
uint8_t*>(view.data()),
&header_size, ¤t_page_header_,
crypto_ctx_.meta_decryptor);
@@ -487,16 +489,16 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
const PageType::type page_type = LoadEnumSafe(¤t_page_header_.type);
- // TODO(PARQUET-594) crc checksum for DATA_PAGE_V2
- if (properties_.page_checksum_verification() &&
- (page_type == PageType::DATA_PAGE || page_type ==
PageType::DICTIONARY_PAGE) &&
- current_page_header_.__isset.crc) {
+ if (properties_.page_checksum_verification() &&
current_page_header_.__isset.crc &&
+ PageCanUseChecksum(page_type)) {
// verify crc
uint32_t checksum =
::arrow::internal::crc32(/* prev */ 0, page_buffer->data(),
compressed_len);
if (static_cast<int32_t>(checksum) != current_page_header_.crc) {
throw ParquetException(
- "could not verify page integrity, CRC checksum verification
failed");
+ "could not verify page integrity, CRC checksum verification failed
for "
+ "page_ordinal " +
+ std::to_string(page_ordinal_));
}
}
diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index 2892335227..f298f282fe 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -397,8 +397,7 @@ class SerializedPageWriter : public PageWriter {
page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size));
page_header.__set_compressed_page_size(static_cast<int32_t>(output_data_len));
- // TODO(PARQUET-594) crc checksum for DATA_PAGE_V2
- if (page_checksum_verification_ && page.type() == PageType::DATA_PAGE) {
+ if (page_checksum_verification_) {
uint32_t crc32 =
::arrow::internal::crc32(/* prev */ 0, output_data_buffer,
output_data_len);
page_header.__set_crc(static_cast<int32_t>(crc32));
diff --git a/cpp/src/parquet/file_deserialize_test.cc
b/cpp/src/parquet/file_deserialize_test.cc
index 1f06cca8ab..a0467e2a33 100644
--- a/cpp/src/parquet/file_deserialize_test.cc
+++ b/cpp/src/parquet/file_deserialize_test.cc
@@ -145,7 +145,8 @@ class TestPageSerde : public ::testing::Test {
}
void WriteDataPageHeaderV2(int max_serialized_len = 1024, int32_t
uncompressed_size = 0,
- int32_t compressed_size = 0) {
+ int32_t compressed_size = 0,
+ std::optional<int32_t> checksum = std::nullopt) {
// Simplifying writing serialized data page V2 headers which may or may not
// have meaningful data associated with them
@@ -154,6 +155,9 @@ class TestPageSerde : public ::testing::Test {
page_header_.uncompressed_page_size = uncompressed_size;
page_header_.compressed_page_size = compressed_size;
page_header_.type = format::PageType::DATA_PAGE_V2;
+ if (checksum.has_value()) {
+ page_header_.__set_crc(checksum.value());
+ }
ThriftSerializer serializer;
ASSERT_NO_THROW(serializer.Serialize(&page_header_, out_stream_.get()));
@@ -189,7 +193,8 @@ class TestPageSerde : public ::testing::Test {
void EndStream() { PARQUET_ASSIGN_OR_THROW(out_buffer_,
out_stream_->Finish()); }
void TestPageSerdeCrc(bool write_checksum, bool write_page_corrupt,
- bool verification_checksum, bool has_dictionary =
false);
+ bool verification_checksum, bool has_dictionary =
false,
+ bool write_data_page_v2 = false);
protected:
std::shared_ptr<::arrow::io::BufferOutputStream> out_stream_;
@@ -204,11 +209,16 @@ class TestPageSerde : public ::testing::Test {
};
void TestPageSerde::TestPageSerdeCrc(bool write_checksum, bool
write_page_corrupt,
- bool verification_checksum, bool
has_dictionary) {
+ bool verification_checksum, bool
has_dictionary,
+ bool write_data_page_v2) {
auto codec_types = GetSupportedCodecTypes();
codec_types.push_back(Compression::UNCOMPRESSED);
const int32_t num_rows = 32; // dummy value
- data_page_header_.num_values = num_rows;
+ if (write_data_page_v2) {
+ data_page_header_v2_.num_values = num_rows;
+ } else {
+ data_page_header_.num_values = num_rows;
+ }
dictionary_page_header_.num_values = num_rows;
const int num_pages = 10;
@@ -252,8 +262,13 @@ void TestPageSerde::TestPageSerdeCrc(bool write_checksum,
bool write_page_corrup
ASSERT_NO_FATAL_FAILURE(WriteDictionaryPageHeader(
data_size, static_cast<int32_t>(actual_size), checksum_opt));
} else {
- ASSERT_NO_FATAL_FAILURE(WriteDataPageHeader(
- 1024, data_size, static_cast<int32_t>(actual_size), checksum_opt));
+ if (write_data_page_v2) {
+ ASSERT_NO_FATAL_FAILURE(WriteDataPageHeaderV2(
+ 1024, data_size, static_cast<int32_t>(actual_size),
checksum_opt));
+ } else {
+ ASSERT_NO_FATAL_FAILURE(WriteDataPageHeader(
+ 1024, data_size, static_cast<int32_t>(actual_size),
checksum_opt));
+ }
}
ASSERT_OK(out_stream_->Write(buffer.data(), actual_size));
}
@@ -275,6 +290,11 @@ void TestPageSerde::TestPageSerdeCrc(bool write_checksum,
bool write_page_corrup
const auto dict_page = static_cast<const
DictionaryPage*>(page.get());
ASSERT_EQ(data_size, dict_page->size());
ASSERT_EQ(0, memcmp(faux_data[i].data(), dict_page->data(),
data_size));
+ } else if (write_data_page_v2) {
+ ASSERT_EQ(PageType::DATA_PAGE_V2, page->type());
+ const auto data_page = static_cast<const DataPageV2*>(page.get());
+ ASSERT_EQ(data_size, data_page->size());
+ ASSERT_EQ(0, memcmp(faux_data[i].data(), data_page->data(),
data_size));
} else {
ASSERT_EQ(PageType::DATA_PAGE, page->type());
const auto data_page = static_cast<const DataPageV1*>(page.get());
@@ -814,6 +834,30 @@ TEST_F(TestPageSerde, DictCrcCheckNonExistent) {
/* verification_checksum */ true, /* has_dictionary
*/ true);
}
+TEST_F(TestPageSerde, DataPageV2CrcCheckSuccessful) {
+ this->TestPageSerdeCrc(/* write_checksum */ true, /* write_page_corrupt */
false,
+ /* verification_checksum */ true, /* has_dictionary
*/ false,
+ /* write_data_page_v2 */ true);
+}
+
+TEST_F(TestPageSerde, DataPageV2CrcCheckFail) {
+ this->TestPageSerdeCrc(/* write_checksum */ true, /* write_page_corrupt */
true,
+ /* verification_checksum */ true, /* has_dictionary
*/ false,
+ /* write_data_page_v2 */ true);
+}
+
+TEST_F(TestPageSerde, DataPageV2CrcCorruptNotChecked) {
+ this->TestPageSerdeCrc(/* write_checksum */ true, /* write_page_corrupt */
true,
+ /* verification_checksum */ false, /* has_dictionary
*/ false,
+ /* write_data_page_v2 */ true);
+}
+
+TEST_F(TestPageSerde, DataPageV2CrcCheckNonExistent) {
+ this->TestPageSerdeCrc(/* write_checksum */ false, /* write_page_corrupt */
false,
+ /* verification_checksum */ true, /* has_dictionary
*/ false,
+ /* write_data_page_v2 */ true);
+}
+
// ----------------------------------------------------------------------
// File structure tests
diff --git a/cpp/src/parquet/types.cc b/cpp/src/parquet/types.cc
index 532fd4c3d7..28f472aaf9 100644
--- a/cpp/src/parquet/types.cc
+++ b/cpp/src/parquet/types.cc
@@ -73,6 +73,17 @@ std::unique_ptr<Codec> GetCodec(Compression::type codec, int
compression_level)
return result;
}
+bool PageCanUseChecksum(PageType::type pageType) {
+ switch (pageType) {
+ case PageType::type::DATA_PAGE:
+ case PageType::type::DATA_PAGE_V2:
+ case PageType::type::DICTIONARY_PAGE:
+ return true;
+ default:
+ return false;
+ }
+}
+
std::string FormatStatValue(Type::type parquet_type, ::std::string_view val) {
std::stringstream result;
diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h
index c5445facc3..d4d6a73f14 100644
--- a/cpp/src/parquet/types.h
+++ b/cpp/src/parquet/types.h
@@ -517,6 +517,8 @@ struct PageType {
};
};
+bool PageCanUseChecksum(PageType::type pageType);
+
class ColumnOrder {
public:
enum type { UNDEFINED, TYPE_DEFINED_ORDER };