wgtmac commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1044237366
##########
cpp/src/parquet/column_reader.h:
##########
@@ -55,6 +56,21 @@ static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 *
1024 * 1024;
// 16 KB is the default expected page header size
static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024;
+// \brief DataPageStats is a proxy around format::DataPageHeader and
Review Comment:
The comment seems inaccurate since it does not have access to all fields in
the thrift page header.
##########
cpp/src/parquet/file_deserialize_test.cc:
##########
@@ -177,6 +201,217 @@ TEST_F(TestPageSerde, DataPageV1) {
ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_,
current_page.get()));
}
+// Templated test class to test page filtering for both format::DataPageHeader
+// and format::DataPageHeaderV2.
+template <typename T>
+class PageFilterTest : public TestPageSerde {
+ public:
+ const int kNumPages = 10;
+ void WriteStream();
+
+ protected:
+ std::vector<T> data_page_headers_;
+ int total_rows_ = 0;
+};
+
+template <>
+void PageFilterTest<format::DataPageHeader>::WriteStream() {
+ for (int i = 0; i < kNumPages; ++i) {
+ // Vary the number of rows to produce different headers.
+ int32_t num_rows = i + 100;
+ total_rows_ += num_rows;
+ int data_size = i + 1024;
+ this->data_page_header_.__set_num_values(num_rows);
+ this->data_page_header_.statistics.__set_min_value("A");
+ this->data_page_header_.statistics.__set_max_value("Z");
+ this->data_page_header_.statistics.__set_null_count(0);
+ this->data_page_header_.__isset.statistics = true;
+ ASSERT_NO_FATAL_FAILURE(
+ this->WriteDataPageHeader(/*max_serialized_len=*/1024, data_size,
data_size));
+ data_page_headers_.push_back(this->data_page_header_);
+ // Also write data, to make sure we skip the data correctly.
+ std::vector<uint8_t> faux_data(data_size);
+ ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+ }
+ this->EndStream();
+}
+
+template <>
+void PageFilterTest<format::DataPageHeaderV2>::WriteStream() {
+ for (int i = 0; i < kNumPages; ++i) {
+ // Vary the number of rows to produce different headers.
+ int32_t num_rows = i + 100;
+ total_rows_ += num_rows;
+ int data_size = i + 1024;
+ this->data_page_header_v2_.__set_num_values(num_rows);
+ this->data_page_header_v2_.__set_num_rows(num_rows);
+ this->data_page_header_v2_.statistics.__set_min_value("A");
+ this->data_page_header_v2_.statistics.__set_max_value("Z");
+ this->data_page_header_v2_.statistics.__set_null_count(0);
+ this->data_page_header_v2_.__isset.statistics = true;
+ ASSERT_NO_FATAL_FAILURE(
+ this->WriteDataPageHeaderV2(/*max_serialized_len=*/1024, data_size,
data_size));
+ data_page_headers_.push_back(this->data_page_header_v2_);
+ // Also write data, to make sure we skip the data correctly.
+ std::vector<uint8_t> faux_data(data_size);
+ ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+ }
+ this->EndStream();
+}
+
+using DataPageHeaderTypes =
+ ::testing::Types<format::DataPageHeader, format::DataPageHeaderV2>;
+TYPED_TEST_SUITE(PageFilterTest, DataPageHeaderTypes);
+
+// Creates a number of pages and skips some of them with the page filter
callback.
+TYPED_TEST(PageFilterTest, TestPageFilterCallback) {
+ this->WriteStream();
+
+ { // Read all pages.
+ auto stream =
std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+ this->page_reader_ =
+ PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+ // This callback will always return false.
+ auto read_all_pages = [](const DataPageStats& stats) -> bool { return
false; };
+
+ this->page_reader_->set_data_page_filter(read_all_pages);
+ for (int i = 0; i < this->kNumPages; ++i) {
+ std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+ ASSERT_NE(current_page, nullptr);
+ ASSERT_NO_FATAL_FAILURE(
+ CheckDataPageHeader(this->data_page_headers_[i],
current_page.get()));
+ }
+ ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+ }
+ { // Skip all pages.
+ auto stream =
std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+ this->page_reader_ =
+ PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+ auto skip_all_pages = [](const DataPageStats& stats) -> bool { return
true; };
+
+ this->page_reader_->set_data_page_filter(skip_all_pages);
+ std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+ ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+ }
+
+ { // Skip every other page.
+ auto stream =
std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+ this->page_reader_ =
+ PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+ // Skip pages with even number of values.
+ auto skip_even_pages = [](const DataPageStats& stats) -> bool {
+ if (stats.num_values % 2 == 0) return true;
+ return false;
+ };
+
+ this->page_reader_->set_data_page_filter(skip_even_pages);
+
+ for (int i = 0; i < this->kNumPages; ++i) {
+ // Only pages with odd number of values are read.
+ if (i % 2 != 0) {
+ std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+ ASSERT_NE(current_page, nullptr);
+ ASSERT_NO_FATAL_FAILURE(
+ CheckDataPageHeader(this->data_page_headers_[i],
current_page.get()));
+ }
+ }
+ // We should have exhausted reading the pages by reading the odd pages
only.
+ ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+ }
+}
+
+// Set the page filter more than once. The new filter should be effective
+// on the next NextPage() call.
+TYPED_TEST(PageFilterTest, TestChangingPageFilter) {
+ this->WriteStream();
+
+ auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+ this->page_reader_ =
+ PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+ // This callback will always return false.
+ auto read_all_pages = [](const DataPageStats& stats) -> bool { return false;
};
+ this->page_reader_->set_data_page_filter(read_all_pages);
+ std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+ ASSERT_NE(current_page, nullptr);
+ ASSERT_NO_FATAL_FAILURE(
+ CheckDataPageHeader(this->data_page_headers_[0], current_page.get()));
+
+ // This callback will skip all pages.
+ auto skip_all_pages = [](const DataPageStats& stats) -> bool { return true;
};
+ this->page_reader_->set_data_page_filter(skip_all_pages);
+ ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+}
+
+// Test that we do not skip dictionary pages.
+TEST_F(TestPageSerde, DoesNotFilterDictionaryPages) {
+ int data_size = 1024;
+ std::vector<uint8_t> faux_data(data_size);
+
+ ASSERT_NO_FATAL_FAILURE(
+ WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+ ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+
+ ASSERT_NO_FATAL_FAILURE(WriteDictionaryPageHeader(data_size, data_size));
+ ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+
+ ASSERT_NO_FATAL_FAILURE(
+ WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+ ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+ EndStream();
+
+ // Try to read it back while asking for all data pages to be skipped.
+ auto stream = std::make_shared<::arrow::io::BufferReader>(out_buffer_);
+ page_reader_ = PageReader::Open(stream, /*num_rows=*/100,
Compression::UNCOMPRESSED);
+
+ auto skip_all_pages = [](const DataPageStats& stats) -> bool { return true;
};
+
+ page_reader_->set_data_page_filter(skip_all_pages);
Review Comment:
Do you plan to add an e2e case to implement a callback based on the stats?
##########
cpp/src/parquet/column_reader.h:
##########
@@ -55,6 +56,21 @@ static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 *
1024 * 1024;
// 16 KB is the default expected page header size
static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024;
+// \brief DataPageStats is a proxy around format::DataPageHeader and
+// format::DataPageHeaderV2.
+class PARQUET_EXPORT DataPageStats {
Review Comment:
Simply use a struct and remove the ctor?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]