This is an automated email from the ASF dual-hosted git repository.
emkornfield pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 8e09f8c5dd PARQUET-2201: [parquet-cpp] Add stress test for
RecordReader ReadRecords and SkipRecords. (#14879)
8e09f8c5dd is described below
commit 8e09f8c5dd8935eed830127b7d73aa034092ca4c
Author: Fatemah Panahi <[email protected]>
AuthorDate: Thu Feb 23 20:58:23 2023 -0800
PARQUET-2201: [parquet-cpp] Add stress test for RecordReader ReadRecords
and SkipRecords. (#14879)
This pull request adds a stress test for testing ReadRecords and
SkipRecords for required, optional, and repeated fields. It will create random
pages and run a random set of skip and read calls to consume all the pages. It
will then compare the expected output with the actual output from the record
reader.
Lead-authored-by: Fatemah Panahi <[email protected]>
Co-authored-by: Fatemah Panahi <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Co-authored-by: fatemehp <[email protected]>
Signed-off-by: Micah Kornfield <[email protected]>
---
cpp/src/parquet/column_reader.cc | 3 +
cpp/src/parquet/column_reader_test.cc | 200 +++++++++++++++++++++++++++++++++-
cpp/src/parquet/test_util.h | 20 +++-
3 files changed, 218 insertions(+), 5 deletions(-)
diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index 422016e176..f19079b902 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -1360,6 +1360,7 @@ class TypedRecordReader : public
TypedColumnReaderImpl<DType>,
}
int64_t ReadRecords(int64_t num_records) override {
+ if (num_records == 0) return 0;
// Delimit records, then read values at the end
int64_t records_read = 0;
@@ -1621,6 +1622,8 @@ class TypedRecordReader : public
TypedColumnReaderImpl<DType>,
}
int64_t SkipRecords(int64_t num_records) override {
+ if (num_records == 0) return 0;
+
// Top level required field. Number of records equals to number of levels,
// and there is not read-ahead for levels.
if (this->max_rep_level_ == 0 && this->max_def_level_ == 0) {
diff --git a/cpp/src/parquet/column_reader_test.cc
b/cpp/src/parquet/column_reader_test.cc
index 29414b6b4d..32fb09f21b 100644
--- a/cpp/src/parquet/column_reader_test.cc
+++ b/cpp/src/parquet/column_reader_test.cc
@@ -723,8 +723,12 @@ TEST_F(RecordReaderTest, BasicReadRepeatedField) {
auto pager = std::make_unique<MockPageReader>(pages);
record_reader_->SetPageReader(std::move(pager));
+ // Test reading 0 records.
+ int64_t records_read = record_reader_->ReadRecords(/*num_records=*/0);
+ ASSERT_EQ(records_read, 0);
+
// Read [10], null
- int64_t records_read = record_reader_->ReadRecords(/*num_records=*/2);
+ records_read = record_reader_->ReadRecords(/*num_records=*/2);
ASSERT_EQ(records_read, 2);
CheckState(/*values_written=*/2, /*null_count=*/1, /*levels_written=*/9,
/*levels_position=*/2);
@@ -744,6 +748,11 @@ TEST_F(RecordReaderTest, BasicReadRepeatedField) {
record_reader_->Reset();
CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/1,
/*levels_position=*/0);
+
+ // Test reading 0 records.
+ records_read = record_reader_->ReadRecords(/*num_records=*/0);
+ ASSERT_EQ(records_read, 0);
+
// Read the last null value and read past the end.
records_read = record_reader_->ReadRecords(/*num_records=*/3);
ASSERT_EQ(records_read, 1);
@@ -887,6 +896,12 @@ TEST_F(RecordReaderTest, SkipRepeated) {
auto pager = std::make_unique<MockPageReader>(pages);
record_reader_->SetPageReader(std::move(pager));
+ {
+ // Skip 0 records.
+ int64_t records_skipped = record_reader_->SkipRecords(/*num_records=*/0);
+ ASSERT_EQ(records_skipped, 0);
+ }
+
{
// This should skip the first null record.
int64_t records_skipped = record_reader_->SkipRecords(/*num_records=*/1);
@@ -915,6 +930,12 @@ TEST_F(RecordReaderTest, SkipRepeated) {
/*expected_reps=*/{0, 1, 1});
}
+ {
+ // Skip 0 records.
+ int64_t records_skipped = record_reader_->SkipRecords(/*num_records=*/0);
+ ASSERT_EQ(records_skipped, 0);
+ }
+
{
// Skip the null record and also skip [30, 30]
int64_t records_skipped = record_reader_->SkipRecords(/*num_records=*/2);
@@ -1211,5 +1232,182 @@ TEST(RecordReaderByteArrayTest, SkipByteArray) {
}
}
+// Test random combination of ReadRecords and SkipRecords.
+class RecordReaderStressTest : public
::testing::TestWithParam<Repetition::type> {};
+
+TEST_P(RecordReaderStressTest, StressTest) {
+ internal::LevelInfo level_info;
+ // Define these boolean variables for improving readability below.
+ bool repeated = false, required = false;
+ if (GetParam() == Repetition::REQUIRED) {
+ level_info.def_level = 0;
+ level_info.rep_level = 0;
+ required = true;
+ } else if (GetParam() == Repetition::OPTIONAL) {
+ level_info.def_level = 1;
+ level_info.rep_level = 0;
+ } else {
+ level_info.def_level = 1;
+ level_info.rep_level = 1;
+ repeated = true;
+ }
+
+ NodePtr type = schema::Int32("b", GetParam());
+ const ColumnDescriptor descr(type, level_info.def_level,
level_info.rep_level);
+
+ auto seed1 = static_cast<uint32_t>(time(0));
+ std::default_random_engine gen(seed1);
+ // Generate random number of pages with random number of values per page.
+ std::uniform_int_distribution<int> d(0, 2000);
+ const int num_pages = d(gen);
+ const int levels_per_page = d(gen);
+ std::vector<int32_t> values;
+ std::vector<int16_t> def_levels;
+ std::vector<int16_t> rep_levels;
+ std::vector<uint8_t> data_buffer;
+ std::vector<std::shared_ptr<Page>> pages;
+ auto seed2 = static_cast<uint32_t>(time(0));
+ // Uses time(0) as seed so it would run a different test every time it is
+ // run.
+ MakePages<Int32Type>(&descr, num_pages, levels_per_page, def_levels,
rep_levels, values,
+ data_buffer, pages, Encoding::PLAIN, seed2);
+ std::unique_ptr<PageReader> pager;
+ pager.reset(new test::MockPageReader(pages));
+
+ // Set up the RecordReader.
+ std::shared_ptr<internal::RecordReader> record_reader =
+ internal::RecordReader::Make(&descr, level_info);
+ record_reader->SetPageReader(std::move(pager));
+
+ // Figure out how many total records.
+ int total_records = 0;
+ if (repeated) {
+ for (int16_t rep : rep_levels) {
+ if (rep == 0) {
+ ++total_records;
+ }
+ }
+ } else {
+ total_records = static_cast<int>(def_levels.size());
+ }
+
+ // Generate a sequence of reads and skips.
+ int records_left = total_records;
+ // The first element of the pair is 1 if SkipRecords and 0 if ReadRecords.
+ // The second element indicates the number of records for reading or
+ // skipping.
+ std::vector<std::pair<bool, int>> sequence;
+ while (records_left > 0) {
+ std::uniform_int_distribution<int> d(0, records_left);
+ // Generate a number to decide if this is a skip or read.
+ bool is_skip = d(gen) < records_left / 2;
+ int num_records = d(gen);
+
+ sequence.emplace_back(is_skip, num_records);
+ records_left -= num_records;
+ }
+
+ // The levels_index and values_index are over the original vectors that have
+ // all the rep/def values for all the records. In the following loop, we will
+ // read/skip a numebr of records and Reset the reader after each iteration.
+ // This is on-par with how the record reader is used.
+ size_t levels_index = 0;
+ size_t values_index = 0;
+ for (const auto& [is_skip, num_records] : sequence) {
+ // Reset the reader before the next round of read/skip.
+ record_reader->Reset();
+
+ // Prepare the expected result and do the SkipRecords and ReadRecords.
+ std::vector<int32_t> expected_values;
+ std::vector<int16_t> expected_def_levels;
+ std::vector<int16_t> expected_rep_levels;
+ bool inside_repeated_field = false;
+
+ int read_records = 0;
+ while (read_records < num_records || inside_repeated_field) {
+ if (!repeated || (repeated && rep_levels[levels_index] == 0)) {
+ ++read_records;
+ }
+
+ bool has_value =
+ required || (!required && def_levels[levels_index] ==
level_info.def_level);
+
+ // If we are not skipping, we need to update the expected values and
+ // rep/defs. If we are skipping, we just keep going.
+ if (!is_skip) {
+ if (!required) {
+ expected_def_levels.push_back(def_levels[levels_index]);
+ if (!has_value) {
+ expected_values.push_back(-1);
+ }
+ }
+ if (repeated) {
+ expected_rep_levels.push_back(rep_levels[levels_index]);
+ }
+ if (has_value) {
+ expected_values.push_back(values[values_index]);
+ }
+ }
+
+ if (has_value) {
+ ++values_index;
+ }
+
+ // If we are in the middle of a repeated field, we should keep going
+ // until we consume it all.
+ if (repeated && levels_index + 1 < rep_levels.size() &&
+ rep_levels[levels_index + 1] == 1) {
+ inside_repeated_field = true;
+ } else {
+ inside_repeated_field = false;
+ }
+
+ ++levels_index;
+ }
+
+ // Print out the seeds with each failing ASSERT to easily reproduce the
bug.
+ std::string seeds = "seeds: " + std::to_string(seed1) + " " +
std::to_string(seed2);
+
+ // Perform the actual read/skip.
+ if (is_skip) {
+ int64_t skipped_records = record_reader->SkipRecords(num_records);
+ ASSERT_EQ(skipped_records, num_records) << seeds;
+ } else {
+ int64_t read_records = record_reader->ReadRecords(num_records);
+ ASSERT_EQ(read_records, num_records) << seeds;
+ }
+
+ const auto read_values = reinterpret_cast<const
int32_t*>(record_reader->values());
+ if (required) {
+ ASSERT_EQ(record_reader->null_count(), 0) << seeds;
+ }
+ std::vector<int32_t> read_vals(read_values,
+ read_values +
record_reader->values_written());
+ for (size_t i = 0; i < expected_values.size(); ++i) {
+ if (expected_values[i] != -1) {
+ ASSERT_EQ(read_vals[i], expected_values[i]) << seeds;
+ }
+ }
+
+ if (!required) {
+ std::vector<int16_t> read_def_levels(
+ record_reader->def_levels(),
+ record_reader->def_levels() + record_reader->levels_position());
+ ASSERT_TRUE(vector_equal(read_def_levels, expected_def_levels)) << seeds;
+ }
+
+ if (repeated) {
+ std::vector<int16_t> read_rep_levels(
+ record_reader->rep_levels(),
+ record_reader->rep_levels() + record_reader->levels_position());
+ ASSERT_TRUE(vector_equal(read_rep_levels, expected_rep_levels)) << seeds;
+ }
+ }
+}
+
+INSTANTIATE_TEST_SUITE_P(Repetition_type, RecordReaderStressTest,
+ ::testing::Values(Repetition::REQUIRED,
Repetition::OPTIONAL,
+ Repetition::REPEATED));
+
} // namespace test
} // namespace parquet
diff --git a/cpp/src/parquet/test_util.h b/cpp/src/parquet/test_util.h
index 11a082bf1b..9bdbad8fb8 100644
--- a/cpp/src/parquet/test_util.h
+++ b/cpp/src/parquet/test_util.h
@@ -531,10 +531,10 @@ static inline int MakePages(const ColumnDescriptor* d,
int num_pages, int levels
std::vector<typename Type::c_type>& values,
std::vector<uint8_t>& buffer,
std::vector<std::shared_ptr<Page>>& pages,
- Encoding::type encoding = Encoding::PLAIN) {
+ Encoding::type encoding = Encoding::PLAIN,
+ uint32_t seed = 0) {
int num_levels = levels_per_page * num_pages;
int num_values = 0;
- uint32_t seed = 0;
int16_t zero = 0;
int16_t max_def_level = d->max_definition_level();
int16_t max_rep_level = d->max_repetition_level();
@@ -556,10 +556,22 @@ static inline int MakePages(const ColumnDescriptor* d,
int num_pages, int levels
} else {
num_values = num_levels;
}
- // Create repetition levels
+ // Create repitition levels
if (max_rep_level > 0) {
rep_levels.resize(num_levels);
- random_numbers(num_levels, seed, zero, max_rep_level, rep_levels.data());
+ // Using a different seed so that def_levels and rep_levels are different.
+ random_numbers(num_levels, seed + 789, zero, max_rep_level,
rep_levels.data());
+ // The generated levels are random. Force the very first page to start
with a new
+ // record.
+ rep_levels[0] = 0;
+ // For a null value, rep_levels and def_levels are both 0.
+ // If we have a repeated value right after this, it needs to start with
+ // rep_level = 0 to indicate a new record.
+ for (int i = 0; i < num_levels - 1; ++i) {
+ if (rep_levels[i] == 0 && def_levels[i] == 0) {
+ rep_levels[i + 1] = 0;
+ }
+ }
}
// Create values
values.resize(num_values);