wgtmac commented on code in PR #47090:
URL: https://github.com/apache/arrow/pull/47090#discussion_r2304409897
##########
cpp/src/parquet/column_writer_test.cc:
##########
@@ -2075,5 +2116,164 @@ TEST_F(TestGeometryValuesWriter,
TestWriteAndReadAllNull) {
EXPECT_EQ(geospatial_statistics->geometry_types(), std::nullopt);
}
+template <typename TestType>
+class TestColumnWriterMaxRowsPerPage : public TestPrimitiveWriter<TestType> {
+ public:
+ TypedColumnWriter<TestType>* BuildWriter(
+ int64_t max_rows_per_page = kDefaultMaxRowsPerPage,
+ int64_t page_size = kDefaultDataPageSize) {
+ this->sink_ = CreateOutputStream();
+ this->writer_properties_ = WriterProperties::Builder()
+ .max_rows_per_page(max_rows_per_page)
+ ->data_pagesize(page_size)
+ ->enable_write_page_index()
+ ->build();
+ file_writer_ = ParquetFileWriter::Open(
+ this->sink_,
std::static_pointer_cast<GroupNode>(this->schema_.schema_root()),
+ this->writer_properties_);
+ return static_cast<TypedColumnWriter<TestType>*>(
+ file_writer_->AppendRowGroup()->NextColumn());
+ }
+
+ void CloseWriter() const { file_writer_->Close(); }
+
+ void BuildReader() {
+ ASSERT_OK_AND_ASSIGN(auto buffer, this->sink_->Finish());
+ file_reader_ = ParquetFileReader::Open(
+ std::make_shared<::arrow::io::BufferReader>(buffer),
default_reader_properties());
+ this->reader_ = std::static_pointer_cast<TypedColumnReader<TestType>>(
+ file_reader_->RowGroup(0)->Column(0));
+ }
+
+ void VerifyMaxRowsPerPage(int64_t max_rows_per_page) const {
+ auto file_meta = file_reader_->metadata();
+ int64_t num_row_groups = file_meta->num_row_groups();
+ ASSERT_EQ(num_row_groups, 1);
+
+ auto page_index_reader = file_reader_->GetPageIndexReader();
+ ASSERT_NE(page_index_reader, nullptr);
+
+ auto row_group_page_index_reader = page_index_reader->RowGroup(0);
+ ASSERT_NE(row_group_page_index_reader, nullptr);
+
+ auto offset_index = row_group_page_index_reader->GetOffsetIndex(0);
+ ASSERT_NE(offset_index, nullptr);
+ size_t num_pages = offset_index->page_locations().size();
+ for (size_t j = 1; j < num_pages; ++j) {
+ int64_t page_rows = offset_index->page_locations()[j].first_row_index -
+ offset_index->page_locations()[j -
1].first_row_index;
+ EXPECT_LE(page_rows, max_rows_per_page);
+ }
+ if (num_pages != 0) {
+ int64_t last_page_rows = file_meta->RowGroup(0)->num_rows() -
+
offset_index->page_locations().back().first_row_index;
+ EXPECT_LE(last_page_rows, max_rows_per_page);
+ }
+ }
+
+ private:
+ std::shared_ptr<ParquetFileWriter> file_writer_;
+ std::shared_ptr<ParquetFileReader> file_reader_;
+};
+
+TYPED_TEST_SUITE(TestColumnWriterMaxRowsPerPage, TestTypes);
+
+TYPED_TEST(TestColumnWriterMaxRowsPerPage, Optional) {
+ for (int64_t max_rows_per_page : {1, 10, 100}) {
+ this->SetUpSchema(Repetition::OPTIONAL);
+ this->GenerateData(SMALL_SIZE);
+ std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
+ definition_levels[1] = 0;
+
+ auto writer = this->BuildWriter(max_rows_per_page);
+ writer->WriteBatch(this->values_.size(), definition_levels.data(), nullptr,
+ this->values_ptr_);
+ this->CloseWriter();
+
+ this->BuildReader();
+ ASSERT_NO_FATAL_FAILURE(this->VerifyMaxRowsPerPage(max_rows_per_page));
+ }
+}
+
+TYPED_TEST(TestColumnWriterMaxRowsPerPage, OptionalSpaced) {
+ for (int64_t max_rows_per_page : {1, 10, 100}) {
+ this->SetUpSchema(Repetition::OPTIONAL);
+
+ this->GenerateData(SMALL_SIZE);
+ std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
+ std::vector<uint8_t>
valid_bits(::arrow::bit_util::BytesForBits(SMALL_SIZE), 255);
+
+ definition_levels[SMALL_SIZE - 1] = 0;
+ ::arrow::bit_util::ClearBit(valid_bits.data(), SMALL_SIZE - 1);
+ definition_levels[1] = 0;
+ ::arrow::bit_util::ClearBit(valid_bits.data(), 1);
+
+ auto writer = this->BuildWriter(max_rows_per_page);
+ writer->WriteBatchSpaced(this->values_.size(), definition_levels.data(),
nullptr,
+ valid_bits.data(), 0, this->values_ptr_);
+ this->CloseWriter();
+
+ this->BuildReader();
+ ASSERT_NO_FATAL_FAILURE(this->VerifyMaxRowsPerPage(max_rows_per_page));
+ }
+}
+
+TYPED_TEST(TestColumnWriterMaxRowsPerPage, Repeated) {
+ for (int64_t max_rows_per_page : {1, 10, 100}) {
+ this->SetUpSchema(Repetition::REPEATED);
+
+ this->GenerateData(SMALL_SIZE);
+ std::vector<int16_t> definition_levels(SMALL_SIZE);
+ std::vector<int16_t> repetition_levels(SMALL_SIZE);
+
+ // Generate levels to include variable-sized lists, null lists, and empty
lists
+ for (int i = 0; i < SMALL_SIZE; i++) {
+ int list_length = (i % 5) + 1;
+ bool is_null = false;
+ if (i % 17 == 0) {
+ is_null = true;
+ list_length = 0;
+ } else if (i % 13 == 0) {
+ list_length = 0;
+ }
+
+ if (is_null) {
+ definition_levels[i] = 0;
+ repetition_levels[i] = 0;
+ } else if (list_length == 0) {
+ definition_levels[i] = 1;
+ repetition_levels[i] = 0;
+ } else {
+ for (int j = 0; j < list_length && i + j < SMALL_SIZE; j++) {
+ definition_levels[i + j] = 1;
Review Comment:
The schema is actually as below:
```
parquet schema: required group field_id=-1 schema {
repeated int32 field_id=-1 column_0;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]