fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1049256686
##########
cpp/src/parquet/file_deserialize_test.cc:
##########
@@ -177,6 +201,217 @@ TEST_F(TestPageSerde, DataPageV1) {
ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_,
current_page.get()));
}
+// Templated test class to test page filtering for both format::DataPageHeader
+// and format::DataPageHeaderV2.
+template <typename T>
+class PageFilterTest : public TestPageSerde {
+ public:
+ const int kNumPages = 10;
+ void WriteStream();
+
+ protected:
+ std::vector<T> data_page_headers_;
+ int total_rows_ = 0;
+};
+
+template <>
+void PageFilterTest<format::DataPageHeader>::WriteStream() {
+ for (int i = 0; i < kNumPages; ++i) {
+ // Vary the number of rows to produce different headers.
+ int32_t num_rows = i + 100;
+ total_rows_ += num_rows;
+ int data_size = i + 1024;
+ this->data_page_header_.__set_num_values(num_rows);
+ this->data_page_header_.statistics.__set_min_value("A");
+ this->data_page_header_.statistics.__set_max_value("Z");
+ this->data_page_header_.statistics.__set_null_count(0);
+ this->data_page_header_.__isset.statistics = true;
+ ASSERT_NO_FATAL_FAILURE(
+ this->WriteDataPageHeader(/*max_serialized_len=*/1024, data_size,
data_size));
+ data_page_headers_.push_back(this->data_page_header_);
+ // Also write data, to make sure we skip the data correctly.
+ std::vector<uint8_t> faux_data(data_size);
+ ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+ }
+ this->EndStream();
+}
+
+template <>
+void PageFilterTest<format::DataPageHeaderV2>::WriteStream() {
+ for (int i = 0; i < kNumPages; ++i) {
+ // Vary the number of rows to produce different headers.
+ int32_t num_rows = i + 100;
+ total_rows_ += num_rows;
+ int data_size = i + 1024;
+ this->data_page_header_v2_.__set_num_values(num_rows);
+ this->data_page_header_v2_.__set_num_rows(num_rows);
+ this->data_page_header_v2_.statistics.__set_min_value("A");
+ this->data_page_header_v2_.statistics.__set_max_value("Z");
+ this->data_page_header_v2_.statistics.__set_null_count(0);
+ this->data_page_header_v2_.__isset.statistics = true;
+ ASSERT_NO_FATAL_FAILURE(
+ this->WriteDataPageHeaderV2(/*max_serialized_len=*/1024, data_size,
data_size));
+ data_page_headers_.push_back(this->data_page_header_v2_);
+ // Also write data, to make sure we skip the data correctly.
+ std::vector<uint8_t> faux_data(data_size);
+ ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+ }
+ this->EndStream();
+}
+
+using DataPageHeaderTypes =
+ ::testing::Types<format::DataPageHeader, format::DataPageHeaderV2>;
+TYPED_TEST_SUITE(PageFilterTest, DataPageHeaderTypes);
+
+// Creates a number of pages and skips some of them with the page filter
callback.
+TYPED_TEST(PageFilterTest, TestPageFilterCallback) {
+ this->WriteStream();
+
+ { // Read all pages.
+ auto stream =
std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+ this->page_reader_ =
+ PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+ // This callback will always return false.
+ auto read_all_pages = [](const DataPageStats& stats) -> bool { return
false; };
+
+ this->page_reader_->set_data_page_filter(read_all_pages);
+ for (int i = 0; i < this->kNumPages; ++i) {
+ std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+ ASSERT_NE(current_page, nullptr);
+ ASSERT_NO_FATAL_FAILURE(
+ CheckDataPageHeader(this->data_page_headers_[i],
current_page.get()));
+ }
+ ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+ }
+ { // Skip all pages.
+ auto stream =
std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+ this->page_reader_ =
+ PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+ auto skip_all_pages = [](const DataPageStats& stats) -> bool { return
true; };
+
+ this->page_reader_->set_data_page_filter(skip_all_pages);
+ std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+ ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+ }
+
+ { // Skip every other page.
+ auto stream =
std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+ this->page_reader_ =
+ PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+ // Skip pages with even number of values.
+ auto skip_even_pages = [](const DataPageStats& stats) -> bool {
+ if (stats.num_values % 2 == 0) return true;
Review Comment:
Done, I added checks for stats.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]