fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1065131471
##########
cpp/src/parquet/file_deserialize_test.cc:
##########
@@ -177,6 +201,217 @@ TEST_F(TestPageSerde, DataPageV1) {
ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_,
current_page.get()));
}
+// Templated test class to test page filtering for both format::DataPageHeader
+// and format::DataPageHeaderV2.
+template <typename T>
+class PageFilterTest : public TestPageSerde {
+ public:
+ const int kNumPages = 10;
+ void WriteStream();
+
+ protected:
+ std::vector<T> data_page_headers_;
+ int total_rows_ = 0;
+};
+
+template <>
+void PageFilterTest<format::DataPageHeader>::WriteStream() {
+ for (int i = 0; i < kNumPages; ++i) {
+ // Vary the number of rows to produce different headers.
+ int32_t num_rows = i + 100;
+ total_rows_ += num_rows;
+ int data_size = i + 1024;
+ this->data_page_header_.__set_num_values(num_rows);
+ this->data_page_header_.statistics.__set_min_value("A");
+ this->data_page_header_.statistics.__set_max_value("Z");
+ this->data_page_header_.statistics.__set_null_count(0);
+ this->data_page_header_.__isset.statistics = true;
+ ASSERT_NO_FATAL_FAILURE(
+ this->WriteDataPageHeader(/*max_serialized_len=*/1024, data_size,
data_size));
+ data_page_headers_.push_back(this->data_page_header_);
+ // Also write data, to make sure we skip the data correctly.
+ std::vector<uint8_t> faux_data(data_size);
+ ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+ }
+ this->EndStream();
+}
+
+template <>
+void PageFilterTest<format::DataPageHeaderV2>::WriteStream() {
+ for (int i = 0; i < kNumPages; ++i) {
+ // Vary the number of rows to produce different headers.
+ int32_t num_rows = i + 100;
+ total_rows_ += num_rows;
+ int data_size = i + 1024;
+ this->data_page_header_v2_.__set_num_values(num_rows);
+ this->data_page_header_v2_.__set_num_rows(num_rows);
+ this->data_page_header_v2_.statistics.__set_min_value("A");
+ this->data_page_header_v2_.statistics.__set_max_value("Z");
+ this->data_page_header_v2_.statistics.__set_null_count(0);
+ this->data_page_header_v2_.__isset.statistics = true;
+ ASSERT_NO_FATAL_FAILURE(
+ this->WriteDataPageHeaderV2(/*max_serialized_len=*/1024, data_size,
data_size));
+ data_page_headers_.push_back(this->data_page_header_v2_);
+ // Also write data, to make sure we skip the data correctly.
+ std::vector<uint8_t> faux_data(data_size);
+ ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+ }
+ this->EndStream();
+}
+
+using DataPageHeaderTypes =
+ ::testing::Types<format::DataPageHeader, format::DataPageHeaderV2>;
+TYPED_TEST_SUITE(PageFilterTest, DataPageHeaderTypes);
+
+// Creates a number of pages and skips some of them with the page filter
callback.
+TYPED_TEST(PageFilterTest, TestPageFilterCallback) {
+ this->WriteStream();
+
+ { // Read all pages.
+ auto stream =
std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+ this->page_reader_ =
+ PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+ // This callback will always return false.
+ auto read_all_pages = [](const DataPageStats& stats) -> bool { return
false; };
+
+ this->page_reader_->set_data_page_filter(read_all_pages);
+ for (int i = 0; i < this->kNumPages; ++i) {
+ std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+ ASSERT_NE(current_page, nullptr);
+ ASSERT_NO_FATAL_FAILURE(
+ CheckDataPageHeader(this->data_page_headers_[i],
current_page.get()));
+ }
+ ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+ }
+ { // Skip all pages.
+ auto stream =
std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+ this->page_reader_ =
+ PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+ auto skip_all_pages = [](const DataPageStats& stats) -> bool { return
true; };
+
+ this->page_reader_->set_data_page_filter(skip_all_pages);
+ std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+ ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+ }
+
+ { // Skip every other page.
+ auto stream =
std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+ this->page_reader_ =
+ PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+ // Skip pages with even number of values.
+ auto skip_even_pages = [](const DataPageStats& stats) -> bool {
+ if (stats.num_values % 2 == 0) return true;
+ return false;
+ };
+
+ this->page_reader_->set_data_page_filter(skip_even_pages);
+
+ for (int i = 0; i < this->kNumPages; ++i) {
+ // Only pages with odd number of values are read.
+ if (i % 2 != 0) {
+ std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+ ASSERT_NE(current_page, nullptr);
+ ASSERT_NO_FATAL_FAILURE(
+ CheckDataPageHeader(this->data_page_headers_[i],
current_page.get()));
+ }
+ }
+ // We should have exhausted reading the pages by reading the odd pages
only.
+ ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+ }
+}
+
+// Set the page filter more than once. The new filter should be effective
+// on the next NextPage() call.
+TYPED_TEST(PageFilterTest, TestChangingPageFilter) {
+ this->WriteStream();
+
+ auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+ this->page_reader_ =
+ PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+ // This callback will always return false.
+ auto read_all_pages = [](const DataPageStats& stats) -> bool { return false;
};
+ this->page_reader_->set_data_page_filter(read_all_pages);
+ std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+ ASSERT_NE(current_page, nullptr);
+ ASSERT_NO_FATAL_FAILURE(
+ CheckDataPageHeader(this->data_page_headers_[0], current_page.get()));
+
+ // This callback will skip all pages.
+ auto skip_all_pages = [](const DataPageStats& stats) -> bool { return true;
};
+ this->page_reader_->set_data_page_filter(skip_all_pages);
+ ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+}
+
+// Test that we do not skip dictionary pages.
+TEST_F(TestPageSerde, DoesNotFilterDictionaryPages) {
+ int data_size = 1024;
+ std::vector<uint8_t> faux_data(data_size);
+
+ ASSERT_NO_FATAL_FAILURE(
+ WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+ ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+
+ ASSERT_NO_FATAL_FAILURE(WriteDictionaryPageHeader(data_size, data_size));
+ ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+
+ ASSERT_NO_FATAL_FAILURE(
+ WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+ ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+ EndStream();
+
+ // Try to read it back while asking for all data pages to be skipped.
+ auto stream = std::make_shared<::arrow::io::BufferReader>(out_buffer_);
+ page_reader_ = PageReader::Open(stream, /*num_rows=*/100,
Compression::UNCOMPRESSED);
+
+ auto skip_all_pages = [](const DataPageStats& stats) -> bool { return true;
};
+
+ page_reader_->set_data_page_filter(skip_all_pages);
Review Comment:
ACKed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]