This is an automated email from the ASF dual-hosted git repository.

emkornfield pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8e09f8c5dd PARQUET-2201: [parquet-cpp] Add stress test for 
RecordReader ReadRecords and SkipRecords. (#14879)
8e09f8c5dd is described below

commit 8e09f8c5dd8935eed830127b7d73aa034092ca4c
Author: Fatemah Panahi <[email protected]>
AuthorDate: Thu Feb 23 20:58:23 2023 -0800

    PARQUET-2201: [parquet-cpp] Add stress test for RecordReader ReadRecords 
and SkipRecords. (#14879)
    
    This pull request adds a stress test for testing ReadRecords and 
SkipRecords for required, optional, and repeated fields. It will create random 
pages and run a random set of skip and read calls to consume all the pages. It 
will then compare the expected output with the actual output from the record 
reader.
    
    Lead-authored-by: Fatemah Panahi <[email protected]>
    Co-authored-by: Fatemah Panahi <[email protected]>
    Co-authored-by: Antoine Pitrou <[email protected]>
    Co-authored-by: fatemehp <[email protected]>
    Signed-off-by: Micah Kornfield <[email protected]>
---
 cpp/src/parquet/column_reader.cc      |   3 +
 cpp/src/parquet/column_reader_test.cc | 200 +++++++++++++++++++++++++++++++++-
 cpp/src/parquet/test_util.h           |  20 +++-
 3 files changed, 218 insertions(+), 5 deletions(-)

diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index 422016e176..f19079b902 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -1360,6 +1360,7 @@ class TypedRecordReader : public 
TypedColumnReaderImpl<DType>,
   }
 
   int64_t ReadRecords(int64_t num_records) override {
+    if (num_records == 0) return 0;
     // Delimit records, then read values at the end
     int64_t records_read = 0;
 
@@ -1621,6 +1622,8 @@ class TypedRecordReader : public 
TypedColumnReaderImpl<DType>,
   }
 
   int64_t SkipRecords(int64_t num_records) override {
+    if (num_records == 0) return 0;
+
     // Top level required field. Number of records equals to number of levels,
     // and there is not read-ahead for levels.
     if (this->max_rep_level_ == 0 && this->max_def_level_ == 0) {
diff --git a/cpp/src/parquet/column_reader_test.cc 
b/cpp/src/parquet/column_reader_test.cc
index 29414b6b4d..32fb09f21b 100644
--- a/cpp/src/parquet/column_reader_test.cc
+++ b/cpp/src/parquet/column_reader_test.cc
@@ -723,8 +723,12 @@ TEST_F(RecordReaderTest, BasicReadRepeatedField) {
   auto pager = std::make_unique<MockPageReader>(pages);
   record_reader_->SetPageReader(std::move(pager));
 
+  // Test reading 0 records.
+  int64_t records_read = record_reader_->ReadRecords(/*num_records=*/0);
+  ASSERT_EQ(records_read, 0);
+
   // Read [10], null
-  int64_t records_read = record_reader_->ReadRecords(/*num_records=*/2);
+  records_read = record_reader_->ReadRecords(/*num_records=*/2);
   ASSERT_EQ(records_read, 2);
   CheckState(/*values_written=*/2, /*null_count=*/1, /*levels_written=*/9,
              /*levels_position=*/2);
@@ -744,6 +748,11 @@ TEST_F(RecordReaderTest, BasicReadRepeatedField) {
   record_reader_->Reset();
   CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/1,
              /*levels_position=*/0);
+
+  // Test reading 0 records.
+  records_read = record_reader_->ReadRecords(/*num_records=*/0);
+  ASSERT_EQ(records_read, 0);
+
   // Read the last null value and read past the end.
   records_read = record_reader_->ReadRecords(/*num_records=*/3);
   ASSERT_EQ(records_read, 1);
@@ -887,6 +896,12 @@ TEST_F(RecordReaderTest, SkipRepeated) {
   auto pager = std::make_unique<MockPageReader>(pages);
   record_reader_->SetPageReader(std::move(pager));
 
+  {
+    // Skip 0 records.
+    int64_t records_skipped = record_reader_->SkipRecords(/*num_records=*/0);
+    ASSERT_EQ(records_skipped, 0);
+  }
+
   {
     // This should skip the first null record.
     int64_t records_skipped = record_reader_->SkipRecords(/*num_records=*/1);
@@ -915,6 +930,12 @@ TEST_F(RecordReaderTest, SkipRepeated) {
                     /*expected_reps=*/{0, 1, 1});
   }
 
+  {
+    // Skip 0 records.
+    int64_t records_skipped = record_reader_->SkipRecords(/*num_records=*/0);
+    ASSERT_EQ(records_skipped, 0);
+  }
+
   {
     // Skip the null record and also skip [30, 30]
     int64_t records_skipped = record_reader_->SkipRecords(/*num_records=*/2);
@@ -1211,5 +1232,182 @@ TEST(RecordReaderByteArrayTest, SkipByteArray) {
   }
 }
 
+// Test random combination of ReadRecords and SkipRecords.
+class RecordReaderStressTest : public 
::testing::TestWithParam<Repetition::type> {};
+
+TEST_P(RecordReaderStressTest, StressTest) {
+  internal::LevelInfo level_info;
+  // Define these boolean variables for improving readability below.
+  bool repeated = false, required = false;
+  if (GetParam() == Repetition::REQUIRED) {
+    level_info.def_level = 0;
+    level_info.rep_level = 0;
+    required = true;
+  } else if (GetParam() == Repetition::OPTIONAL) {
+    level_info.def_level = 1;
+    level_info.rep_level = 0;
+  } else {
+    level_info.def_level = 1;
+    level_info.rep_level = 1;
+    repeated = true;
+  }
+
+  NodePtr type = schema::Int32("b", GetParam());
+  const ColumnDescriptor descr(type, level_info.def_level, 
level_info.rep_level);
+
+  auto seed1 = static_cast<uint32_t>(time(0));
+  std::default_random_engine gen(seed1);
+  // Generate random number of pages with random number of values per page.
+  std::uniform_int_distribution<int> d(0, 2000);
+  const int num_pages = d(gen);
+  const int levels_per_page = d(gen);
+  std::vector<int32_t> values;
+  std::vector<int16_t> def_levels;
+  std::vector<int16_t> rep_levels;
+  std::vector<uint8_t> data_buffer;
+  std::vector<std::shared_ptr<Page>> pages;
+  auto seed2 = static_cast<uint32_t>(time(0));
+  // Uses time(0) as seed so it would run a different test every time it is
+  // run.
+  MakePages<Int32Type>(&descr, num_pages, levels_per_page, def_levels, 
rep_levels, values,
+                       data_buffer, pages, Encoding::PLAIN, seed2);
+  std::unique_ptr<PageReader> pager;
+  pager.reset(new test::MockPageReader(pages));
+
+  // Set up the RecordReader.
+  std::shared_ptr<internal::RecordReader> record_reader =
+      internal::RecordReader::Make(&descr, level_info);
+  record_reader->SetPageReader(std::move(pager));
+
+  // Figure out how many total records.
+  int total_records = 0;
+  if (repeated) {
+    for (int16_t rep : rep_levels) {
+      if (rep == 0) {
+        ++total_records;
+      }
+    }
+  } else {
+    total_records = static_cast<int>(def_levels.size());
+  }
+
+  // Generate a sequence of reads and skips.
+  int records_left = total_records;
+  // The first element of the pair is 1 if SkipRecords and 0 if ReadRecords.
+  // The second element indicates the number of records for reading or
+  // skipping.
+  std::vector<std::pair<bool, int>> sequence;
+  while (records_left > 0) {
+    std::uniform_int_distribution<int> d(0, records_left);
+    // Generate a number to decide if this is a skip or read.
+    bool is_skip = d(gen) < records_left / 2;
+    int num_records = d(gen);
+
+    sequence.emplace_back(is_skip, num_records);
+    records_left -= num_records;
+  }
+
+  // The levels_index and values_index are over the original vectors that have
+  // all the rep/def values for all the records. In the following loop, we will
+  // read/skip a numebr of records and Reset the reader after each iteration.
+  // This is on-par with how the record reader is used.
+  size_t levels_index = 0;
+  size_t values_index = 0;
+  for (const auto& [is_skip, num_records] : sequence) {
+    // Reset the reader before the next round of read/skip.
+    record_reader->Reset();
+
+    // Prepare the expected result and do the SkipRecords and ReadRecords.
+    std::vector<int32_t> expected_values;
+    std::vector<int16_t> expected_def_levels;
+    std::vector<int16_t> expected_rep_levels;
+    bool inside_repeated_field = false;
+
+    int read_records = 0;
+    while (read_records < num_records || inside_repeated_field) {
+      if (!repeated || (repeated && rep_levels[levels_index] == 0)) {
+        ++read_records;
+      }
+
+      bool has_value =
+          required || (!required && def_levels[levels_index] == 
level_info.def_level);
+
+      // If we are not skipping, we need to update the expected values and
+      // rep/defs. If we are skipping, we just keep going.
+      if (!is_skip) {
+        if (!required) {
+          expected_def_levels.push_back(def_levels[levels_index]);
+          if (!has_value) {
+            expected_values.push_back(-1);
+          }
+        }
+        if (repeated) {
+          expected_rep_levels.push_back(rep_levels[levels_index]);
+        }
+        if (has_value) {
+          expected_values.push_back(values[values_index]);
+        }
+      }
+
+      if (has_value) {
+        ++values_index;
+      }
+
+      // If we are in the middle of a repeated field, we should keep going
+      // until we consume it all.
+      if (repeated && levels_index + 1 < rep_levels.size() &&
+          rep_levels[levels_index + 1] == 1) {
+        inside_repeated_field = true;
+      } else {
+        inside_repeated_field = false;
+      }
+
+      ++levels_index;
+    }
+
+    // Print out the seeds with each failing ASSERT to easily reproduce the 
bug.
+    std::string seeds = "seeds: " + std::to_string(seed1) + " " + 
std::to_string(seed2);
+
+    // Perform the actual read/skip.
+    if (is_skip) {
+      int64_t skipped_records = record_reader->SkipRecords(num_records);
+      ASSERT_EQ(skipped_records, num_records) << seeds;
+    } else {
+      int64_t read_records = record_reader->ReadRecords(num_records);
+      ASSERT_EQ(read_records, num_records) << seeds;
+    }
+
+    const auto read_values = reinterpret_cast<const 
int32_t*>(record_reader->values());
+    if (required) {
+      ASSERT_EQ(record_reader->null_count(), 0) << seeds;
+    }
+    std::vector<int32_t> read_vals(read_values,
+                                   read_values + 
record_reader->values_written());
+    for (size_t i = 0; i < expected_values.size(); ++i) {
+      if (expected_values[i] != -1) {
+        ASSERT_EQ(read_vals[i], expected_values[i]) << seeds;
+      }
+    }
+
+    if (!required) {
+      std::vector<int16_t> read_def_levels(
+          record_reader->def_levels(),
+          record_reader->def_levels() + record_reader->levels_position());
+      ASSERT_TRUE(vector_equal(read_def_levels, expected_def_levels)) << seeds;
+    }
+
+    if (repeated) {
+      std::vector<int16_t> read_rep_levels(
+          record_reader->rep_levels(),
+          record_reader->rep_levels() + record_reader->levels_position());
+      ASSERT_TRUE(vector_equal(read_rep_levels, expected_rep_levels)) << seeds;
+    }
+  }
+}
+
+INSTANTIATE_TEST_SUITE_P(Repetition_type, RecordReaderStressTest,
+                         ::testing::Values(Repetition::REQUIRED, 
Repetition::OPTIONAL,
+                                           Repetition::REPEATED));
+
 }  // namespace test
 }  // namespace parquet
diff --git a/cpp/src/parquet/test_util.h b/cpp/src/parquet/test_util.h
index 11a082bf1b..9bdbad8fb8 100644
--- a/cpp/src/parquet/test_util.h
+++ b/cpp/src/parquet/test_util.h
@@ -531,10 +531,10 @@ static inline int MakePages(const ColumnDescriptor* d, 
int num_pages, int levels
                             std::vector<typename Type::c_type>& values,
                             std::vector<uint8_t>& buffer,
                             std::vector<std::shared_ptr<Page>>& pages,
-                            Encoding::type encoding = Encoding::PLAIN) {
+                            Encoding::type encoding = Encoding::PLAIN,
+                            uint32_t seed = 0) {
   int num_levels = levels_per_page * num_pages;
   int num_values = 0;
-  uint32_t seed = 0;
   int16_t zero = 0;
   int16_t max_def_level = d->max_definition_level();
   int16_t max_rep_level = d->max_repetition_level();
@@ -556,10 +556,22 @@ static inline int MakePages(const ColumnDescriptor* d, 
int num_pages, int levels
   } else {
     num_values = num_levels;
   }
-  // Create repetition levels
+  // Create repitition levels
   if (max_rep_level > 0) {
     rep_levels.resize(num_levels);
-    random_numbers(num_levels, seed, zero, max_rep_level, rep_levels.data());
+    // Using a different seed so that def_levels and rep_levels are different.
+    random_numbers(num_levels, seed + 789, zero, max_rep_level, 
rep_levels.data());
+    // The generated levels are random. Force the very first page to start 
with a new
+    // record.
+    rep_levels[0] = 0;
+    // For a null value, rep_levels and def_levels are both 0.
+    // If we have a repeated value right after this, it needs to start with
+    // rep_level = 0 to indicate a new record.
+    for (int i = 0; i < num_levels - 1; ++i) {
+      if (rep_levels[i] == 0 && def_levels[i] == 0) {
+        rep_levels[i + 1] = 0;
+      }
+    }
   }
   // Create values
   values.resize(num_values);

Reply via email to