emkornfield commented on code in PR #34054:
URL: https://github.com/apache/arrow/pull/34054#discussion_r1157983937
##########
cpp/src/parquet/page_index_test.cc:
##########
@@ -416,4 +416,420 @@ TEST(PageIndex,
DeterminePageIndexRangesInRowGroupWithMissingPageIndex) {
-1);
}
+TEST(PageIndex, WriteOffsetIndex) {
+ /// Create offset index via the OffsetIndexBuilder interface.
+ auto builder = OffsetIndexBuilder::Make();
+ const size_t num_pages = 5;
+ const std::vector<int64_t> offsets = {100, 200, 300, 400, 500};
+ const std::vector<int32_t> page_sizes = {1024, 2048, 3072, 4096, 8192};
+ const std::vector<int64_t> first_row_indices = {0, 10000, 20000, 30000,
40000};
+ for (size_t i = 0; i < num_pages; ++i) {
+ builder->AddPage(offsets[i], page_sizes[i], first_row_indices[i]);
+ }
+ const int64_t final_position = 4096;
+ builder->Finish(final_position);
+
+ std::vector<std::unique_ptr<OffsetIndex>> offset_indexes;
+ /// 1st element is the offset index just built.
+ offset_indexes.emplace_back(builder->Build());
+ /// 2nd element is the offset index restored by serialize-then-deserialize
round trip.
+ auto sink = CreateOutputStream();
+ builder->WriteTo(sink.get());
+ PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+ offset_indexes.emplace_back(OffsetIndex::Make(buffer->data(),
+
static_cast<uint32_t>(buffer->size()),
+ default_reader_properties()));
+
+ /// Verify the data of the offset index.
+ for (const auto& offset_index : offset_indexes) {
+ ASSERT_EQ(num_pages, offset_index->page_locations().size());
+ for (size_t i = 0; i < num_pages; ++i) {
+ const auto& page_location = offset_index->page_locations().at(i);
+ ASSERT_EQ(offsets[i] + final_position, page_location.offset);
+ ASSERT_EQ(page_sizes[i], page_location.compressed_page_size);
+ ASSERT_EQ(first_row_indices[i], page_location.first_row_index);
+ }
+ }
+}
+
+void TestWriteTypedColumnIndex(schema::NodePtr node,
+ const std::vector<EncodedStatistics>&
page_stats,
+ BoundaryOrder::type boundary_order, bool
has_null_counts) {
+ auto descr = std::make_unique<ColumnDescriptor>(node,
/*max_definition_level=*/1, 0);
+
+ auto builder = ColumnIndexBuilder::Make(descr.get());
+ for (const auto& stats : page_stats) {
+ builder->AddPage(stats);
+ }
+ ASSERT_NO_THROW(builder->Finish());
+
+ std::vector<std::unique_ptr<ColumnIndex>> column_indexes;
+ /// 1st element is the column index just built.
+ column_indexes.emplace_back(builder->Build());
+ /// 2nd element is the column index restored by serialize-then-deserialize
round trip.
+ auto sink = CreateOutputStream();
+ builder->WriteTo(sink.get());
+ PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+ column_indexes.emplace_back(ColumnIndex::Make(*descr, buffer->data(),
+
static_cast<uint32_t>(buffer->size()),
+ default_reader_properties()));
+
+ /// Verify the data of the column index.
+ for (const auto& column_index : column_indexes) {
+ ASSERT_EQ(boundary_order, column_index->boundary_order());
+ ASSERT_EQ(has_null_counts, column_index->has_null_counts());
+ const size_t num_pages = column_index->null_pages().size();
+ for (size_t i = 0; i < num_pages; ++i) {
+ ASSERT_EQ(page_stats[i].all_null_value, column_index->null_pages()[i]);
+ ASSERT_EQ(page_stats[i].min(), column_index->encoded_min_values()[i]);
+ ASSERT_EQ(page_stats[i].max(), column_index->encoded_max_values()[i]);
+ if (has_null_counts) {
+ ASSERT_EQ(page_stats[i].null_count, column_index->null_counts()[i]);
+ }
+ }
+ }
+}
+
+TEST(PageIndex, WriteInt32ColumnIndex) {
+ auto encode = [=](int32_t value) {
+ return std::string(reinterpret_cast<const char*>(&value), sizeof(int32_t));
+ };
+
+ // Integer values in the ascending order.
+ std::vector<EncodedStatistics> page_stats(3);
+ page_stats.at(0).set_null_count(1).set_min(encode(1)).set_max(encode(2));
+ page_stats.at(1).set_null_count(2).set_min(encode(2)).set_max(encode(3));
+ page_stats.at(2).set_null_count(3).set_min(encode(3)).set_max(encode(4));
+
+ TestWriteTypedColumnIndex(schema::Int32("c1"), page_stats,
BoundaryOrder::Ascending,
+ /*has_null_counts=*/true);
+}
+
+TEST(PageIndex, WriteInt64ColumnIndex) {
+ auto encode = [=](int64_t value) {
+ return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+ };
+
+ // Integer values in the descending order.
+ std::vector<EncodedStatistics> page_stats(3);
+ page_stats.at(0).set_null_count(4).set_min(encode(-1)).set_max(encode(-2));
+ page_stats.at(1).set_null_count(0).set_min(encode(-2)).set_max(encode(-3));
+ page_stats.at(2).set_null_count(4).set_min(encode(-3)).set_max(encode(-4));
+
+ TestWriteTypedColumnIndex(schema::Int64("c1"), page_stats,
BoundaryOrder::Descending,
+ /*has_null_counts=*/true);
+}
+
+TEST(PageIndex, WriteFloatColumnIndex) {
+ auto encode = [=](float value) {
+ return std::string(reinterpret_cast<const char*>(&value), sizeof(float));
+ };
+
+ // Float values with no specific order.
+ std::vector<EncodedStatistics> page_stats(3);
+
page_stats.at(0).set_null_count(0).set_min(encode(2.2F)).set_max(encode(4.4F));
+
page_stats.at(1).set_null_count(0).set_min(encode(1.1F)).set_max(encode(5.5F));
+
page_stats.at(2).set_null_count(0).set_min(encode(3.3F)).set_max(encode(6.6F));
+
+ TestWriteTypedColumnIndex(schema::Float("c1"), page_stats,
BoundaryOrder::Unordered,
+ /*has_null_counts=*/true);
+}
+
+TEST(PageIndex, WriteDoubleColumnIndex) {
+ auto encode = [=](double value) {
+ return std::string(reinterpret_cast<const char*>(&value), sizeof(double));
+ };
+
+ // Double values with no specific order and without null count.
+ std::vector<EncodedStatistics> page_stats(3);
+ page_stats.at(0).set_min(encode(1.2)).set_max(encode(4.4));
+ page_stats.at(1).set_min(encode(2.2)).set_max(encode(5.5));
+ page_stats.at(2).set_min(encode(3.3)).set_max(encode(-6.6));
+
+ TestWriteTypedColumnIndex(schema::Double("c1"), page_stats,
BoundaryOrder::Unordered,
+ /*has_null_counts=*/false);
+}
+
+TEST(PageIndex, WriteByteArrayColumnIndex) {
+ // Byte array values with identical min/max.
+ std::vector<EncodedStatistics> page_stats(3);
+ page_stats.at(0).set_min("bar").set_max("foo");
+ page_stats.at(1).set_min("bar").set_max("foo");
+ page_stats.at(2).set_min("bar").set_max("foo");
+
+ TestWriteTypedColumnIndex(schema::ByteArray("c1"), page_stats,
BoundaryOrder::Ascending,
+ /*has_null_counts=*/false);
+}
+
+TEST(PageIndex, WriteFLBAColumnIndex) {
+ // FLBA values in the ascending order with some null pages
+ std::vector<EncodedStatistics> page_stats(5);
+ page_stats.at(0).set_min("abc").set_max("ABC");
+ page_stats.at(1).all_null_value = true;
+ page_stats.at(2).set_min("foo").set_max("FOO");
+ page_stats.at(3).all_null_value = true;
+ page_stats.at(4).set_min("xyz").set_max("XYZ");
+
+ auto node =
+ schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL,
Type::FIXED_LEN_BYTE_ARRAY,
+ ConvertedType::NONE, /*length=*/3);
+ TestWriteTypedColumnIndex(std::move(node), page_stats,
BoundaryOrder::Ascending,
+ /*has_null_counts=*/false);
+}
+
+TEST(PageIndex, WriteColumnIndexWithAllNullPages) {
+ // All values are null.
+ std::vector<EncodedStatistics> page_stats(3);
+ page_stats.at(0).set_null_count(100).all_null_value = true;
+ page_stats.at(1).set_null_count(100).all_null_value = true;
+ page_stats.at(2).set_null_count(100).all_null_value = true;
+
+ TestWriteTypedColumnIndex(schema::Int32("c1"), page_stats,
BoundaryOrder::Unordered,
+ /*has_null_counts=*/true);
+}
+
+TEST(PageIndex, WriteColumnIndexWithInvalidNullCounts) {
+ auto encode = [=](int32_t value) {
+ return std::string(reinterpret_cast<const char*>(&value), sizeof(int32_t));
+ };
+
+ // Some pages do not provide null_count
+ std::vector<EncodedStatistics> page_stats(3);
+ page_stats.at(0).set_min(encode(1)).set_max(encode(2)).set_null_count(0);
+ page_stats.at(1).set_min(encode(1)).set_max(encode(3));
+ page_stats.at(2).set_min(encode(2)).set_max(encode(3)).set_null_count(0);
+
+ TestWriteTypedColumnIndex(schema::Int32("c1"), page_stats,
BoundaryOrder::Ascending,
+ /*has_null_counts=*/false);
+}
+
+TEST(PageIndex, WriteColumnIndexWithCorruptedStats) {
+ auto encode = [=](int32_t value) {
+ return std::string(reinterpret_cast<const char*>(&value), sizeof(int32_t));
+ };
+
+ // 2nd page does not set anything
+ std::vector<EncodedStatistics> page_stats(3);
+ page_stats.at(0).set_min(encode(1)).set_max(encode(2));
+ page_stats.at(2).set_min(encode(3)).set_max(encode(4));
+
+ ColumnDescriptor descr(schema::Int32("c1"), /*max_definition_level=*/1, 0);
+ auto builder = ColumnIndexBuilder::Make(&descr);
+ for (const auto& stats : page_stats) {
+ builder->AddPage(stats);
+ }
+ ASSERT_NO_THROW(builder->Finish());
+ ASSERT_EQ(nullptr, builder->Build());
+
+ auto sink = CreateOutputStream();
+ builder->WriteTo(sink.get());
+ PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+ EXPECT_EQ(0, buffer->size());
+}
+
+TEST(PageIndex, TestPageIndexBuilderWithZeroRowGroup) {
+ schema::NodeVector fields = {schema::Int32("c1"), schema::ByteArray("c2")};
+ schema::NodePtr root = schema::GroupNode::Make("schema",
Repetition::REPEATED, fields);
+ SchemaDescriptor schema;
+ schema.Init(root);
+
+ auto builder = PageIndexBuilder::Make(&schema);
+
+ // AppendRowGroup() is not called and expect throw.
+ ASSERT_THROW(builder->GetColumnIndexBuilder(0), ParquetException);
+ ASSERT_THROW(builder->GetOffsetIndexBuilder(0), ParquetException);
+
+ // Finish the builder without calling AppendRowGroup().
+ ASSERT_NO_THROW(builder->Finish());
+
+ // Verify WriteTo does not write anything.
+ auto sink = CreateOutputStream();
+ PageIndexLocation location;
+ builder->WriteTo(sink.get(), &location);
+ PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+ ASSERT_EQ(0, buffer->size());
+ ASSERT_TRUE(location.column_index_location.empty());
+ ASSERT_TRUE(location.offset_index_location.empty());
+}
+
+TEST(PageIndex, TestPageIndexBuilderWithSingleRowGroup) {
+ schema::NodePtr root = schema::GroupNode::Make(
+ "schema", Repetition::REPEATED,
+ {schema::ByteArray("c1"), schema::ByteArray("c2"),
schema::ByteArray("c3")});
+ SchemaDescriptor schema;
+ schema.Init(root);
+
+ // Prepare page stats and page locations.
+ const std::vector<EncodedStatistics> page_stats = {
+ EncodedStatistics().set_null_count(0).set_min("a").set_max("b"),
+ EncodedStatistics().set_null_count(0).set_min("A").set_max("B")};
+ const std::vector<PageLocation> page_locations = {
+ {/*offset=*/128, /*compressed_page_size=*/512,
+ /*first_row_index=*/0},
+ {/*offset=*/1024, /*compressed_page_size=*/512,
+ /*first_row_index=*/0}};
+ const int64_t final_position = 200;
+
+ // Create builder and add pages of single row group.
+ // Note that the 3rd column does not add any pages and its page index is
disabled.
+ auto builder = PageIndexBuilder::Make(&schema);
+ ASSERT_NO_THROW(builder->AppendRowGroup());
+ for (int i = 0; i < 2; ++i) {
+
ASSERT_NO_THROW(builder->GetColumnIndexBuilder(i)->AddPage(page_stats.at(i)));
+ ASSERT_NO_THROW(builder->GetColumnIndexBuilder(i)->Finish());
+
ASSERT_NO_THROW(builder->GetOffsetIndexBuilder(i)->AddPage(page_locations.at(i)));
+ ASSERT_NO_THROW(builder->GetOffsetIndexBuilder(i)->Finish(final_position));
+ }
+ ASSERT_NO_THROW(builder->Finish());
+
+ // Verify WriteTo only serializes page index of first two columns.
+ auto sink = CreateOutputStream();
+ PageIndexLocation location;
+ builder->WriteTo(sink.get(), &location);
+ PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+
+ const size_t num_row_groups = 1;
+ const size_t num_columns = 3;
+ ASSERT_EQ(num_row_groups, location.column_index_location.size());
+ ASSERT_EQ(num_row_groups, location.offset_index_location.size());
+ auto column_index_locations = location.column_index_location[0];
+ auto offset_index_locations = location.offset_index_location[0];
+ ASSERT_EQ(num_columns, column_index_locations.size());
+ ASSERT_EQ(num_columns, offset_index_locations.size());
+
+ auto properties = default_reader_properties();
+ for (int i = 0; i < 3; i++) {
+ if (i < 2) {
+ ASSERT_TRUE(column_index_locations[i].has_value());
+ ASSERT_TRUE(offset_index_locations[i].has_value());
+ auto ci_location = column_index_locations[i].value();
+ auto oi_location = offset_index_locations[i].value();
+
+ auto column_index =
+ ColumnIndex::Make(*schema.Column(i), buffer->data() +
ci_location.offset,
+ static_cast<uint32_t>(ci_location.length),
properties);
+ const size_t num_pages = 1;
+ ASSERT_EQ(num_pages, column_index->null_pages().size());
+ ASSERT_EQ(page_stats[i].all_null_value, column_index->null_pages()[0]);
+ ASSERT_EQ(page_stats[i].min(), column_index->encoded_min_values()[0]);
+ ASSERT_EQ(page_stats[i].max(), column_index->encoded_max_values()[0]);
+ ASSERT_TRUE(column_index->has_null_counts());
+ ASSERT_EQ(page_stats[i].null_count, column_index->null_counts()[0]);
+
+ auto offset_index =
+ OffsetIndex::Make(buffer->data() + oi_location.offset,
+ static_cast<uint32_t>(oi_location.length),
properties);
+ ASSERT_EQ(num_pages, offset_index->page_locations().size());
+ ASSERT_EQ(page_locations[i].offset + final_position,
+ offset_index->page_locations()[0].offset);
+ ASSERT_EQ(page_locations[i].compressed_page_size,
+ offset_index->page_locations()[0].compressed_page_size);
+ ASSERT_EQ(page_locations[i].first_row_index,
+ offset_index->page_locations()[0].first_row_index);
+ } else {
+ ASSERT_FALSE(column_index_locations[i].has_value());
+ ASSERT_FALSE(offset_index_locations[i].has_value());
+ }
+ }
+}
+
+TEST(PageIndex, TestPageIndexBuilderWithTwoRowGroups) {
+ schema::NodePtr root = schema::GroupNode::Make(
+ "schema", Repetition::REPEATED, {schema::ByteArray("c1"),
schema::ByteArray("c2")});
+ SchemaDescriptor schema;
+ schema.Init(root);
+
+ // Prepare page stats and page locations for two row groups.
+ const std::vector<std::vector<EncodedStatistics>> page_stats = {
+ /* 1st row group */
+ {EncodedStatistics().set_min("a").set_max("b"),
+ EncodedStatistics().set_null_count(0).set_min("A").set_max("B")},
+ /* 2nd row group */
+ {EncodedStatistics() /* corrupted stats */,
+ EncodedStatistics().set_null_count(0).set_min("bar").set_max("foo")}};
+ const std::vector<std::vector<PageLocation>> page_locations = {
+ /* 1st row group */
+ {{/*offset=*/128, /*compressed_page_size=*/512,
+ /*first_row_index=*/0},
+ {/*offset=*/1024, /*compressed_page_size=*/512,
+ /*first_row_index=*/0}},
+ /* 2nd row group */
+ {{/*offset=*/128, /*compressed_page_size=*/512,
+ /*first_row_index=*/0},
+ {/*offset=*/1024, /*compressed_page_size=*/512,
+ /*first_row_index=*/0}}};
+ const std::vector<int64_t> final_positions = {1024, 2048};
+
+ // Create builder and add pages of two row groups.
+ const size_t num_row_groups = 2;
+ const size_t num_columns = 2;
+ const size_t num_pages = 1;
+ auto builder = PageIndexBuilder::Make(&schema);
+ for (size_t rg = 0; rg < num_row_groups; ++rg) {
+ ASSERT_NO_THROW(builder->AppendRowGroup());
+ for (int c = 0; c < static_cast<int>(num_columns); ++c) {
+
ASSERT_NO_THROW(builder->GetColumnIndexBuilder(c)->AddPage(page_stats[rg][c]));
+ ASSERT_NO_THROW(builder->GetColumnIndexBuilder(c)->Finish());
+
ASSERT_NO_THROW(builder->GetOffsetIndexBuilder(c)->AddPage(page_locations[rg][c]));
+
ASSERT_NO_THROW(builder->GetOffsetIndexBuilder(c)->Finish(final_positions[rg]));
+ }
+ }
+ ASSERT_NO_THROW(builder->Finish());
+
+ // Verify WriteTo only serializes valid page index.
+ auto sink = CreateOutputStream();
+ PageIndexLocation location;
+ builder->WriteTo(sink.get(), &location);
+ PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+ ASSERT_EQ(num_row_groups, location.column_index_location.size());
+ ASSERT_EQ(num_row_groups, location.offset_index_location.size());
+
+ // Verify data of deserialized page index.
+ auto properties = default_reader_properties();
+ for (size_t rg = 0; rg < num_row_groups; ++rg) {
Review Comment:
would unwinding the loops + appropriate helper methods make this signifantly
more verbose?
Having logic in tests, at least for me, makes them much more difficult to
read (sometimes it is warranted but sometimes it is cleaner to try make things
less complex).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]