pitrou commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1048484566
##########
cpp/src/parquet/statistics.h:
##########
@@ -216,6 +216,13 @@ class PARQUET_EXPORT Statistics {
bool has_distinct_count,
::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
+ // Helper function to convert EncodedStatistics to Statistics.
+ // Note that num_values will be set to -1 because number of non-null values
in
+ // the column is not available in EncodedStatistics.
+ static std::shared_ptr<Statistics> Make(
+ const ColumnDescriptor* descr, const EncodedStatistics*
encoded_statistics,
+ ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
Review Comment:
Let's allow passing `num_values` optionally?
```suggestion
static std::shared_ptr<Statistics> Make(
const ColumnDescriptor* descr, const EncodedStatistics*
encoded_statistics,
int64_t num_values = -1, ::arrow::MemoryPool* pool =
::arrow::default_memory_pool());
```
##########
cpp/src/parquet/file_deserialize_test.cc:
##########
@@ -177,6 +201,217 @@ TEST_F(TestPageSerde, DataPageV1) {
ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_,
current_page.get()));
}
+// Templated test class to test page filtering for both format::DataPageHeader
+// and format::DataPageHeaderV2.
+template <typename T>
+class PageFilterTest : public TestPageSerde {
+ public:
+ const int kNumPages = 10;
+ void WriteStream();
+
+ protected:
+ std::vector<T> data_page_headers_;
+ int total_rows_ = 0;
+};
+
+template <>
+void PageFilterTest<format::DataPageHeader>::WriteStream() {
+ for (int i = 0; i < kNumPages; ++i) {
+ // Vary the number of rows to produce different headers.
+ int32_t num_rows = i + 100;
+ total_rows_ += num_rows;
+ int data_size = i + 1024;
+ this->data_page_header_.__set_num_values(num_rows);
+ this->data_page_header_.statistics.__set_min_value("A");
+ this->data_page_header_.statistics.__set_max_value("Z");
+ this->data_page_header_.statistics.__set_null_count(0);
Review Comment:
Might be nice to vary the statistics a bit from page to page?
##########
cpp/src/parquet/column_reader.h:
##########
@@ -115,11 +116,30 @@ class PARQUET_EXPORT PageReader {
bool always_compressed = false,
const CryptoContext* ctx = NULLPTR);
+ // If skip_page_callback_ is present (not null), NextPage() will call the
+ // callback function exactly once per page in the order the pages appear in
+ // the column. If the callback function returns true the page will be
+ // skipped. The callback will be called only if the page type is DATA_PAGE or
Review Comment:
Ok, but this should then skip the page if the returned value is false, not
true.
##########
cpp/src/parquet/column_reader.h:
##########
@@ -55,6 +56,29 @@ static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 *
1024 * 1024;
// 16 KB is the default expected page header size
static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024;
+// \brief DataPageStats stores encoded statistics and number of values/rows for
+// a page.
+struct PARQUET_EXPORT DataPageStats {
+ DataPageStats(EncodedStatistics* encoded_statistics, int32_t num_values,
+ std::optional<int32_t> num_rows)
+ : encoded_statistics(encoded_statistics),
+ is_stats_set(encoded_statistics->is_set()),
+ num_values(num_values),
+ num_rows(num_rows) {}
+
+ // Encoded statistics extracted from the page header.
+ EncodedStatistics* encoded_statistics;
Review Comment:
Also, this should perhaps be `const`?
##########
cpp/src/parquet/metadata.h:
##########
@@ -20,13 +20,15 @@
#include <cstdint>
#include <map>
#include <memory>
+#include <optional>
#include <string>
#include <utility>
#include <vector>
#include "parquet/platform.h"
#include "parquet/properties.h"
#include "parquet/schema.h"
+#include "parquet/statistics.h"
Review Comment:
You said "done" but didn't address this?
##########
cpp/src/parquet/metadata.cc:
##########
@@ -19,6 +19,8 @@
#include <algorithm>
#include <cinttypes>
+#include <csignal>
+#include <iostream>
Review Comment:
Why are `csignal` and `iostream` needed here?
##########
cpp/src/parquet/file_deserialize_test.cc:
##########
@@ -177,6 +201,217 @@ TEST_F(TestPageSerde, DataPageV1) {
ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_,
current_page.get()));
}
+// Templated test class to test page filtering for both format::DataPageHeader
+// and format::DataPageHeaderV2.
+template <typename T>
+class PageFilterTest : public TestPageSerde {
+ public:
+ const int kNumPages = 10;
+ void WriteStream();
+
+ protected:
+ std::vector<T> data_page_headers_;
+ int total_rows_ = 0;
+};
+
+template <>
+void PageFilterTest<format::DataPageHeader>::WriteStream() {
+ for (int i = 0; i < kNumPages; ++i) {
+ // Vary the number of rows to produce different headers.
+ int32_t num_rows = i + 100;
+ total_rows_ += num_rows;
+ int data_size = i + 1024;
+ this->data_page_header_.__set_num_values(num_rows);
+ this->data_page_header_.statistics.__set_min_value("A");
+ this->data_page_header_.statistics.__set_max_value("Z");
+ this->data_page_header_.statistics.__set_null_count(0);
+ this->data_page_header_.__isset.statistics = true;
+ ASSERT_NO_FATAL_FAILURE(
+ this->WriteDataPageHeader(/*max_serialized_len=*/1024, data_size,
data_size));
+ data_page_headers_.push_back(this->data_page_header_);
+ // Also write data, to make sure we skip the data correctly.
+ std::vector<uint8_t> faux_data(data_size);
+ ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+ }
+ this->EndStream();
+}
+
+template <>
+void PageFilterTest<format::DataPageHeaderV2>::WriteStream() {
+ for (int i = 0; i < kNumPages; ++i) {
+ // Vary the number of rows to produce different headers.
+ int32_t num_rows = i + 100;
+ total_rows_ += num_rows;
+ int data_size = i + 1024;
+ this->data_page_header_v2_.__set_num_values(num_rows);
+ this->data_page_header_v2_.__set_num_rows(num_rows);
+ this->data_page_header_v2_.statistics.__set_min_value("A");
+ this->data_page_header_v2_.statistics.__set_max_value("Z");
+ this->data_page_header_v2_.statistics.__set_null_count(0);
+ this->data_page_header_v2_.__isset.statistics = true;
+ ASSERT_NO_FATAL_FAILURE(
+ this->WriteDataPageHeaderV2(/*max_serialized_len=*/1024, data_size,
data_size));
+ data_page_headers_.push_back(this->data_page_header_v2_);
+ // Also write data, to make sure we skip the data correctly.
+ std::vector<uint8_t> faux_data(data_size);
+ ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+ }
+ this->EndStream();
+}
+
+using DataPageHeaderTypes =
+ ::testing::Types<format::DataPageHeader, format::DataPageHeaderV2>;
+TYPED_TEST_SUITE(PageFilterTest, DataPageHeaderTypes);
+
+// Creates a number of pages and skips some of them with the page filter
callback.
+TYPED_TEST(PageFilterTest, TestPageFilterCallback) {
+ this->WriteStream();
+
+ { // Read all pages.
+ auto stream =
std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+ this->page_reader_ =
+ PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+ // This callback will always return false.
+ auto read_all_pages = [](const DataPageStats& stats) -> bool { return
false; };
+
+ this->page_reader_->set_data_page_filter(read_all_pages);
+ for (int i = 0; i < this->kNumPages; ++i) {
+ std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+ ASSERT_NE(current_page, nullptr);
+ ASSERT_NO_FATAL_FAILURE(
+ CheckDataPageHeader(this->data_page_headers_[i],
current_page.get()));
+ }
+ ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+ }
+ { // Skip all pages.
+ auto stream =
std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+ this->page_reader_ =
+ PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+ auto skip_all_pages = [](const DataPageStats& stats) -> bool { return
true; };
+
+ this->page_reader_->set_data_page_filter(skip_all_pages);
+ std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+ ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+ }
+
+ { // Skip every other page.
+ auto stream =
std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+ this->page_reader_ =
+ PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+ // Skip pages with even number of values.
+ auto skip_even_pages = [](const DataPageStats& stats) -> bool {
+ if (stats.num_values % 2 == 0) return true;
Review Comment:
Can you also test something about the page statistics in `stats`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]