wgtmac commented on code in PR #34193:
URL: https://github.com/apache/arrow/pull/34193#discussion_r1113833768
##########
cpp/src/parquet/column_writer_test.cc:
##########
@@ -1143,5 +1143,140 @@ TEST(TestColumnWriter, WriteDataPageV2Header) {
}
}
+TEST(TestColumnWriter, WriteDataPagesChangeOnRecordBoundaries) {
+ auto sink = CreateOutputStream();
+ auto schema = std::static_pointer_cast<GroupNode>(
+ GroupNode::Make("schema", Repetition::REQUIRED,
+ {
+ schema::Int32("required", Repetition::REQUIRED),
+ schema::Int32("optional", Repetition::OPTIONAL),
+ schema::Int32("repeated", Repetition::REPEATED),
+ }));
+ // Write 11 levels at a time
+ constexpr int64_t batch_size = 11;
+ auto properties = WriterProperties::Builder()
+ .disable_dictionary()
+ ->data_page_version(ParquetDataPageVersion::V2)
+ ->write_batch_size(batch_size)
+ ->data_pagesize(1)
+ ->build();
+ auto file_writer = ParquetFileWriter::Open(sink, schema, properties);
+ auto rg_writer = file_writer->AppendRowGroup();
+
+ constexpr int32_t num_levels = 100;
+ const std::vector<int32_t> values(num_levels, 1024);
+ std::array<int16_t, num_levels> def_levels;
+ std::array<int16_t, num_levels> rep_levels;
+ for (int32_t i = 0; i < num_levels; i++) {
+ def_levels[i] = i % 2 == 0 ? 1 : 0;
+ rep_levels[i] = i % 2 == 0 ? 0 : 1;
+ }
+
+ auto required_writer =
static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
+ required_writer->WriteBatch(num_levels, nullptr, nullptr, values.data());
+
+ // Write a null value at every other row.
+ auto optional_writer =
static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
+ optional_writer->WriteBatch(num_levels, def_levels.data(), nullptr,
values.data());
+
+ // Each row has repeated twice.
+ auto repeated_writer =
static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
+ repeated_writer->WriteBatch(num_levels, def_levels.data(), rep_levels.data(),
+ values.data());
+ repeated_writer->WriteBatch(num_levels, def_levels.data(), rep_levels.data(),
+ values.data());
+
+ ASSERT_NO_THROW(file_writer->Close());
+ ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+ auto file_reader = ParquetFileReader::Open(
+ std::make_shared<::arrow::io::BufferReader>(buffer),
default_reader_properties());
+ auto metadata = file_reader->metadata();
+ ASSERT_EQ(1, metadata->num_row_groups());
+ auto row_group_reader = file_reader->RowGroup(0);
+
+ // Check if pages are changed on record boundaries.
+ constexpr int num_columns = 3;
+ const std::array<int64_t, num_columns> expected_num_pages = {10, 10, 19};
+ for (int i = 0; i < num_columns; ++i) {
+ auto page_reader = row_group_reader->GetColumnPageReader(i);
+ int64_t num_rows = 0;
+ int64_t num_pages = 0;
+ std::shared_ptr<Page> page;
+ while ((page = page_reader->NextPage()) != nullptr) {
+ auto data_page = std::static_pointer_cast<DataPageV2>(page);
+ if (i < 2) {
+ EXPECT_EQ(data_page->num_values(), data_page->num_rows());
+ } else {
+ // Make sure repeated column has 2 values per row and not span
multiple pages.
+ EXPECT_EQ(data_page->num_values(), 2 * data_page->num_rows());
+ }
+ num_rows += data_page->num_rows();
+ num_pages++;
+ }
+ EXPECT_EQ(num_levels, num_rows);
+ EXPECT_EQ(expected_num_pages[i], num_pages);
+ }
+}
+
+TEST(TestColumnWriter, WriteDataPagesChangeOnRecordBoundariesLargeBatchSize) {
Review Comment:
Update the test to cover different repeat patterns
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]