fatemehp commented on code in PR #14142:
URL: https://github.com/apache/arrow/pull/14142#discussion_r988441524


##########
cpp/src/parquet/column_reader_test.cc:
##########
@@ -26,6 +27,7 @@
 #include <utility>
 #include <vector>
 
+#include "arrow/api.h"

Review Comment:
   Done.



##########
cpp/src/parquet/column_reader_test.cc:
##########
@@ -572,5 +576,615 @@ TEST_F(TestPrimitiveReader, 
TestNonDictionaryEncodedPagesWithExposeEncoding) {
   pages_.clear();
 }
 
+// Tests reading a repeated field using the RecordReader.
+TEST(RecordReaderTest, BasicReadRepeatedField) {
+  internal::LevelInfo level_info;
+  level_info.def_level = 1;
+  level_info.rep_level = 1;
+
+  NodePtr type = schema::Int32("b", Repetition::OPTIONAL);
+  const ColumnDescriptor descr(type, level_info.def_level, 
level_info.rep_level);
+
+  // Records look like: {[10], [20, 20], [30, 30, 30]}
+  std::vector<std::shared_ptr<Page>> pages;
+  std::vector<int32_t> values = {10, 20, 20, 30, 30, 30};
+  std::vector<int16_t> def_levels = {1, 1, 1, 1, 1, 1};
+  std::vector<int16_t> rep_levels = {0, 0, 1, 0, 1, 1};
+
+  std::unique_ptr<PageReader> pager;
+  std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+      &descr, values, /*num_values=*/static_cast<int>(def_levels.size()), 
Encoding::PLAIN,
+      /*indices=*/{},
+      /*indices_size=*/0, def_levels, level_info.def_level, rep_levels,
+      level_info.rep_level);
+  pages.push_back(std::move(page));
+  pager.reset(new test::MockPageReader(pages));
+
+  std::shared_ptr<internal::RecordReader> record_reader =
+      internal::RecordReader::Make(&descr, level_info);
+  record_reader->SetPageReader(std::move(pager));
+
+  int64_t records_read = record_reader->ReadRecords(/*num_records=*/2);
+
+  ASSERT_EQ(records_read, 2);
+  ASSERT_EQ(record_reader->values_written(), 3);
+  ASSERT_EQ(record_reader->null_count(), 0);
+  ASSERT_EQ(record_reader->levels_written(), 6);
+  ASSERT_EQ(record_reader->levels_position(), 3);
+
+  const auto read_values = reinterpret_cast<const 
int32_t*>(record_reader->values());
+  std::vector<int32_t> read_vals(read_values,
+                                 read_values + 
record_reader->values_written());
+  std::vector<int16_t> read_defs(
+      record_reader->def_levels(),
+      record_reader->def_levels() + record_reader->levels_position());
+  std::vector<int16_t> read_reps(
+      record_reader->rep_levels(),
+      record_reader->rep_levels() + record_reader->levels_position());
+
+  ASSERT_THAT(read_vals, ElementsAre(10, 20, 20));
+  ASSERT_THAT(read_defs, ElementsAre(1, 1, 1));
+  ASSERT_THAT(read_reps, ElementsAre(0, 0, 1));
+}
+
+// Test that we can skip required top level field.
+TEST(RecordReaderTest, SkipRequiredTopLevel) {
+  internal::LevelInfo level_info;
+  level_info.def_level = 0;
+  level_info.rep_level = 0;
+
+  NodePtr type = schema::Int32("b", Repetition::REQUIRED);
+  const ColumnDescriptor descr(type, level_info.def_level, 
level_info.rep_level);
+
+  std::vector<std::shared_ptr<Page>> pages;
+  std::vector<int32_t> values = {10, 20, 20, 30, 30, 30};
+
+  std::unique_ptr<PageReader> pager;
+  std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+      &descr, values, /*num_values=*/static_cast<int>(values.size()), 
Encoding::PLAIN,
+      /*indices=*/{},
+      /*indices_size=*/0, /*def_levels=*/{}, level_info.def_level,
+      /*rep_levels=*/{}, level_info.rep_level);
+  pages.push_back(std::move(page));
+  pager.reset(new test::MockPageReader(pages));
+
+  std::shared_ptr<internal::RecordReader> record_reader =
+      internal::RecordReader::Make(&descr, level_info);
+  record_reader->SetPageReader(std::move(pager));
+
+  int64_t records_skipped = record_reader->SkipRecords(/*num_records=*/3);
+
+  ASSERT_EQ(records_skipped, 3);
+  ASSERT_EQ(record_reader->values_written(), 0);
+  ASSERT_EQ(record_reader->levels_written(), 0);
+  ASSERT_EQ(record_reader->levels_position(), 0);
+
+  int64_t records_read = record_reader->ReadRecords(/*num_records=*/2);
+
+  ASSERT_EQ(records_read, 2);
+  ASSERT_EQ(record_reader->values_written(), 2);
+  ASSERT_EQ(record_reader->null_count(), 0);
+  ASSERT_EQ(record_reader->levels_written(), 0);
+  ASSERT_EQ(record_reader->levels_position(), 0);
+
+  const auto read_values = reinterpret_cast<const 
int32_t*>(record_reader->values());
+  std::vector<int32_t> read_vals(read_values,
+                                 read_values + 
record_reader->values_written());
+
+  // There are no null values to account for here.
+  ASSERT_THAT(read_vals, ElementsAre(30, 30));
+}
+
+// Skip an optional field. Intentionally included some null values.
+TEST(RecordReaderTest, SkipOptional) {
+  internal::LevelInfo level_info;
+  level_info.def_level = 1;
+  level_info.rep_level = 0;
+
+  NodePtr type = schema::Int32("b", Repetition::OPTIONAL);
+  const ColumnDescriptor descr(type, level_info.def_level, 
level_info.rep_level);
+
+  // Records look like {null, 10, 20, 30, null, 40, 50, 60}
+  std::vector<std::shared_ptr<Page>> pages;
+  std::vector<int32_t> values = {10, 20, 30, 40, 50, 60};
+  std::vector<int16_t> def_levels = {0, 1, 1, 0, 1, 1, 1, 1};
+
+  std::unique_ptr<PageReader> pager;
+  std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+      &descr, values, /*num_values=*/static_cast<int>(values.size()), 
Encoding::PLAIN,
+      /*indices=*/{},
+      /*indices_size=*/0, def_levels, level_info.def_level,
+      /*rep_levels=*/{}, level_info.rep_level);
+  pages.push_back(std::move(page));
+  pager.reset(new test::MockPageReader(pages));
+
+  std::shared_ptr<internal::RecordReader> record_reader =
+      internal::RecordReader::Make(&descr, level_info);
+  record_reader->SetPageReader(std::move(pager));
+
+  {
+    // Skip {null, 10}
+    // This also tests when we start with a Skip.
+    int64_t records_skipped = record_reader->SkipRecords(/*num_records=*/2);
+
+    ASSERT_EQ(records_skipped, 2);
+    ASSERT_EQ(record_reader->values_written(), 0);
+    // Since we started with skipping, there was nothing in the buffer to 
consume
+    // for skipping.
+    ASSERT_EQ(record_reader->levels_written(), 0);
+    ASSERT_EQ(record_reader->levels_position(), 0);
+  }
+
+  {
+    // Read 3 records: {20, null, 30}
+    int64_t records_read = record_reader->ReadRecords(/*num_records=*/3);
+
+    ASSERT_EQ(records_read, 3);
+    // One of these values is null. values_written() includes null values.
+    ASSERT_EQ(record_reader->values_written(), 3);
+    ASSERT_EQ(record_reader->null_count(), 1);
+    // When we read, we buffer some levels. Since there are only a few levels
+    // in our whole column, all of them are read.
+    // We had skipped 2 of the levels above. So there is only 6 left in total 
to
+    // read, and we read 3 of them here.
+    ASSERT_EQ(record_reader->levels_written(), 6);
+    ASSERT_EQ(record_reader->levels_position(), 3);
+
+    std::vector<int16_t> read_defs(
+        record_reader->def_levels(),
+        record_reader->def_levels() + record_reader->levels_position());
+
+    const auto read_values = reinterpret_cast<const 
int32_t*>(record_reader->values());
+    std::vector<int32_t> read_vals(read_values,
+                                   read_values + 
record_reader->values_written());
+
+    // ReadRecords for optional fields uses ReadValuesSpaced, so there is a
+    // placeholder for null.
+    ASSERT_EQ(read_vals[0], 20);
+    // read_vals[1] is a space for null.
+    ASSERT_EQ(read_vals[2], 30);
+    ASSERT_THAT(read_defs, ElementsAre(1, 0, 1));
+  }
+
+  {
+    // Skip {40, 50}.
+    int64_t records_skipped = record_reader->SkipRecords(/*num_records=*/2);
+
+    ASSERT_EQ(records_skipped, 2);
+
+    ASSERT_EQ(record_reader->values_written(), 3);
+    ASSERT_EQ(record_reader->levels_written(), 4);
+    ASSERT_EQ(record_reader->levels_position(), 3);
+
+    std::vector<int16_t> read_defs(
+        record_reader->def_levels(),
+        record_reader->def_levels() + record_reader->levels_position());
+
+    const auto read_values = reinterpret_cast<const 
int32_t*>(record_reader->values());
+    std::vector<int32_t> read_vals(read_values,
+                                   read_values + 
record_reader->values_written());
+  }
+
+  {
+    // Read to the end of the column. Read {60}
+    // This test checks that ReadAndThrowAwayValues works, since if it
+    // does not we would read the wrong values.
+    int64_t records_read = record_reader->ReadRecords(/*num_records=*/1);
+
+    ASSERT_EQ(records_read, 1);
+    ASSERT_EQ(record_reader->values_written(), 4);
+    ASSERT_EQ(record_reader->null_count(), 1);
+    ASSERT_EQ(record_reader->levels_written(), 4);
+    ASSERT_EQ(record_reader->levels_position(), 4);
+
+    std::vector<int16_t> read_defs(
+        record_reader->def_levels(),
+        record_reader->def_levels() + record_reader->levels_position());
+
+    const auto read_values = reinterpret_cast<const 
int32_t*>(record_reader->values());
+    std::vector<int32_t> read_vals(read_values,
+                                   read_values + 
record_reader->values_written());
+
+    ASSERT_EQ(read_vals[0], 20);
+    // read_vals[1] is a space for null.
+    ASSERT_EQ(read_vals[2], 30);
+    ASSERT_EQ(read_vals[3], 60);
+    ASSERT_THAT(read_defs, ElementsAre(1, 0, 1, 1));
+  }
+
+  // We have exhausted all the records.
+  ASSERT_EQ(record_reader->ReadRecords(/*num_records=*/3), 0);
+  ASSERT_EQ(record_reader->SkipRecords(/*num_records=*/3), 0);
+}
+
+// Test skipping for repeated fields.
+TEST(RecordReaderTest, SkipRepeated) {
+  internal::LevelInfo level_info;
+  level_info.def_level = 1;
+  level_info.rep_level = 1;
+
+  NodePtr type = schema::Int32("b", Repetition::REPEATED);
+  const ColumnDescriptor descr(type, level_info.def_level, 
level_info.rep_level);
+
+  // Records look like {null, [20, 20, 20], null, [30, 30], [40]}
+  std::vector<std::shared_ptr<Page>> pages;
+  std::vector<int32_t> values = {20, 20, 20, 30, 30, 40};
+  std::vector<int16_t> def_levels = {0, 1, 1, 1, 0, 1, 1, 1};
+  std::vector<int16_t> rep_levels = {0, 0, 1, 1, 0, 0, 1, 0};
+
+  std::unique_ptr<PageReader> pager;
+  std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+      &descr, values, /*num_values=*/static_cast<int>(values.size()), 
Encoding::PLAIN,
+      /*indices=*/{},
+      /*indices_size=*/0, def_levels, level_info.def_level, rep_levels,
+      level_info.rep_level);
+  pages.push_back(std::move(page));
+  pager.reset(new test::MockPageReader(pages));
+
+  std::shared_ptr<internal::RecordReader> record_reader =
+      internal::RecordReader::Make(&descr, level_info);
+  record_reader->SetPageReader(std::move(pager));
+
+  {
+    // This should skip the first null record.
+    int64_t records_skipped = record_reader->SkipRecords(/*num_records=*/1);
+
+    ASSERT_EQ(records_skipped, 1);
+    ASSERT_EQ(record_reader->values_written(), 0);
+    ASSERT_EQ(record_reader->null_count(), 0);
+    // For repeated fields, we need to read the levels to find the record
+    // boundaries and skip. So some levels are read, however, the skipped
+    // level should not be there after the skip. That's why levels_position()
+    // is 0.
+    ASSERT_EQ(record_reader->levels_written(), 7);
+    ASSERT_EQ(record_reader->levels_position(), 0);
+  }
+
+  {
+    // Read [20, 20, 20]
+    int64_t records_read = record_reader->ReadRecords(/*num_records=*/1);
+
+    ASSERT_EQ(records_read, 1);
+    ASSERT_EQ(record_reader->values_written(), 3);
+    ASSERT_EQ(record_reader->null_count(), 0);
+    ASSERT_EQ(record_reader->levels_written(), 7);
+    ASSERT_EQ(record_reader->levels_position(), 3);
+
+    const auto read_values = reinterpret_cast<const 
int32_t*>(record_reader->values());
+    std::vector<int32_t> read_vals(read_values,
+                                   read_values + 
record_reader->values_written());
+    std::vector<int16_t> read_defs(
+        record_reader->def_levels(),
+        record_reader->def_levels() + record_reader->levels_position());
+    std::vector<int16_t> read_reps(
+        record_reader->rep_levels(),
+        record_reader->rep_levels() + record_reader->levels_position());
+
+    ASSERT_THAT(read_vals, ElementsAre(20, 20, 20));
+    ASSERT_THAT(read_defs, ElementsAre(1, 1, 1));
+    ASSERT_THAT(read_reps, ElementsAre(0, 1, 1));
+  }
+
+  {
+    // Skip the null record and also skip [30, 30]
+    int64_t records_skipped = record_reader->SkipRecords(/*num_records=*/2);
+
+    ASSERT_EQ(records_skipped, 2);
+    ASSERT_EQ(record_reader->values_written(), 3);
+    ASSERT_EQ(record_reader->null_count(), 0);
+    // We remove the skipped levels from the buffer.
+    ASSERT_EQ(record_reader->levels_written(), 4);
+    ASSERT_EQ(record_reader->levels_position(), 3);
+
+    const auto read_values = reinterpret_cast<const 
int32_t*>(record_reader->values());
+    std::vector<int32_t> read_vals(read_values,
+                                   read_values + 
record_reader->values_written());
+    std::vector<int16_t> read_defs(
+        record_reader->def_levels(),
+        record_reader->def_levels() + record_reader->levels_position());
+    std::vector<int16_t> read_reps(
+        record_reader->rep_levels(),
+        record_reader->rep_levels() + record_reader->levels_position());
+
+    ASSERT_THAT(read_vals, ElementsAre(20, 20, 20));
+    ASSERT_THAT(read_defs, ElementsAre(1, 1, 1));
+    ASSERT_THAT(read_reps, ElementsAre(0, 1, 1));
+  }
+
+  {
+    // Read [40]
+    int64_t records_read = record_reader->ReadRecords(/*num_records=*/1);
+
+    ASSERT_EQ(records_read, 1);
+    ASSERT_EQ(record_reader->values_written(), 4);
+    ASSERT_EQ(record_reader->null_count(), 0);
+    ASSERT_EQ(record_reader->levels_written(), 4);
+    ASSERT_EQ(record_reader->levels_position(), 4);
+
+    const auto read_values = reinterpret_cast<const 
int32_t*>(record_reader->values());
+    std::vector<int32_t> read_vals(read_values,
+                                   read_values + 
record_reader->values_written());
+    std::vector<int16_t> read_defs(
+        record_reader->def_levels(),
+        record_reader->def_levels() + record_reader->levels_position());
+    std::vector<int16_t> read_reps(
+        record_reader->rep_levels(),
+        record_reader->rep_levels() + record_reader->levels_position());
+
+    ASSERT_THAT(read_vals, ElementsAre(20, 20, 20, 40));
+    ASSERT_THAT(read_defs, ElementsAre(1, 1, 1, 1));
+    ASSERT_THAT(read_reps, ElementsAre(0, 1, 1, 0));
+  }
+}
+
+// Test reading when one record spans multiple pages for a repeated field.
+TEST(RecordReaderTest, ReadPartialRecord) {
+  internal::LevelInfo level_info;
+  level_info.def_level = 1;
+  level_info.rep_level = 1;
+
+  NodePtr type = schema::Int32("b", Repetition::REPEATED);
+  const ColumnDescriptor descr(type, level_info.def_level, 
level_info.rep_level);
+
+  std::vector<std::shared_ptr<Page>> pages;
+  std::unique_ptr<PageReader> pager;
+
+  // Page 1: {[10], [20, 20, 20 ... } continues to next page.
+  {
+    std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+        &descr, /*values=*/{10, 20, 20, 20}, /*num_values=*/4, Encoding::PLAIN,
+        /*indices=*/{},
+        /*indices_size=*/0, /*def_levels=*/{1, 1, 1, 1}, level_info.def_level,
+        /*rep_levels=*/{0, 0, 1, 1}, level_info.rep_level);
+    pages.push_back(std::move(page));
+  }
+
+  // Page 2: {... 20, 20, ...} continues from previous page and to next page.
+  {
+    std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+        &descr, /*values=*/{20, 20}, /*num_values=*/2, Encoding::PLAIN,
+        /*indices=*/{},
+        /*indices_size=*/0, /*def_levels=*/{1, 1}, level_info.def_level,
+        /*rep_levels=*/{1, 1}, level_info.rep_level);
+    pages.push_back(std::move(page));
+  }
+
+  // Page 3: { ... 20, [30]} continues from previous page.

Review Comment:
   Done.



##########
cpp/src/parquet/column_reader_test.cc:
##########
@@ -572,5 +576,615 @@ TEST_F(TestPrimitiveReader, 
TestNonDictionaryEncodedPagesWithExposeEncoding) {
   pages_.clear();
 }
 
+// Tests reading a repeated field using the RecordReader.
+TEST(RecordReaderTest, BasicReadRepeatedField) {
+  internal::LevelInfo level_info;
+  level_info.def_level = 1;
+  level_info.rep_level = 1;
+
+  NodePtr type = schema::Int32("b", Repetition::OPTIONAL);
+  const ColumnDescriptor descr(type, level_info.def_level, 
level_info.rep_level);
+
+  // Records look like: {[10], [20, 20], [30, 30, 30]}
+  std::vector<std::shared_ptr<Page>> pages;
+  std::vector<int32_t> values = {10, 20, 20, 30, 30, 30};
+  std::vector<int16_t> def_levels = {1, 1, 1, 1, 1, 1};
+  std::vector<int16_t> rep_levels = {0, 0, 1, 0, 1, 1};
+
+  std::unique_ptr<PageReader> pager;
+  std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+      &descr, values, /*num_values=*/static_cast<int>(def_levels.size()), 
Encoding::PLAIN,
+      /*indices=*/{},
+      /*indices_size=*/0, def_levels, level_info.def_level, rep_levels,
+      level_info.rep_level);
+  pages.push_back(std::move(page));
+  pager.reset(new test::MockPageReader(pages));
+
+  std::shared_ptr<internal::RecordReader> record_reader =
+      internal::RecordReader::Make(&descr, level_info);
+  record_reader->SetPageReader(std::move(pager));
+
+  int64_t records_read = record_reader->ReadRecords(/*num_records=*/2);
+
+  ASSERT_EQ(records_read, 2);
+  ASSERT_EQ(record_reader->values_written(), 3);
+  ASSERT_EQ(record_reader->null_count(), 0);
+  ASSERT_EQ(record_reader->levels_written(), 6);
+  ASSERT_EQ(record_reader->levels_position(), 3);
+
+  const auto read_values = reinterpret_cast<const 
int32_t*>(record_reader->values());
+  std::vector<int32_t> read_vals(read_values,
+                                 read_values + 
record_reader->values_written());
+  std::vector<int16_t> read_defs(
+      record_reader->def_levels(),
+      record_reader->def_levels() + record_reader->levels_position());
+  std::vector<int16_t> read_reps(
+      record_reader->rep_levels(),
+      record_reader->rep_levels() + record_reader->levels_position());
+
+  ASSERT_THAT(read_vals, ElementsAre(10, 20, 20));
+  ASSERT_THAT(read_defs, ElementsAre(1, 1, 1));
+  ASSERT_THAT(read_reps, ElementsAre(0, 0, 1));
+}
+
+// Test that we can skip required top level field.
+TEST(RecordReaderTest, SkipRequiredTopLevel) {
+  internal::LevelInfo level_info;
+  level_info.def_level = 0;
+  level_info.rep_level = 0;
+
+  NodePtr type = schema::Int32("b", Repetition::REQUIRED);
+  const ColumnDescriptor descr(type, level_info.def_level, 
level_info.rep_level);
+
+  std::vector<std::shared_ptr<Page>> pages;
+  std::vector<int32_t> values = {10, 20, 20, 30, 30, 30};
+
+  std::unique_ptr<PageReader> pager;
+  std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+      &descr, values, /*num_values=*/static_cast<int>(values.size()), 
Encoding::PLAIN,
+      /*indices=*/{},
+      /*indices_size=*/0, /*def_levels=*/{}, level_info.def_level,
+      /*rep_levels=*/{}, level_info.rep_level);
+  pages.push_back(std::move(page));
+  pager.reset(new test::MockPageReader(pages));
+
+  std::shared_ptr<internal::RecordReader> record_reader =
+      internal::RecordReader::Make(&descr, level_info);
+  record_reader->SetPageReader(std::move(pager));
+
+  int64_t records_skipped = record_reader->SkipRecords(/*num_records=*/3);
+
+  ASSERT_EQ(records_skipped, 3);
+  ASSERT_EQ(record_reader->values_written(), 0);
+  ASSERT_EQ(record_reader->levels_written(), 0);
+  ASSERT_EQ(record_reader->levels_position(), 0);
+
+  int64_t records_read = record_reader->ReadRecords(/*num_records=*/2);
+
+  ASSERT_EQ(records_read, 2);
+  ASSERT_EQ(record_reader->values_written(), 2);
+  ASSERT_EQ(record_reader->null_count(), 0);
+  ASSERT_EQ(record_reader->levels_written(), 0);
+  ASSERT_EQ(record_reader->levels_position(), 0);
+
+  const auto read_values = reinterpret_cast<const 
int32_t*>(record_reader->values());
+  std::vector<int32_t> read_vals(read_values,
+                                 read_values + 
record_reader->values_written());
+
+  // There are no null values to account for here.
+  ASSERT_THAT(read_vals, ElementsAre(30, 30));
+}
+
+// Skip an optional field. Intentionally included some null values.
+TEST(RecordReaderTest, SkipOptional) {
+  internal::LevelInfo level_info;
+  level_info.def_level = 1;
+  level_info.rep_level = 0;
+
+  NodePtr type = schema::Int32("b", Repetition::OPTIONAL);
+  const ColumnDescriptor descr(type, level_info.def_level, 
level_info.rep_level);
+
+  // Records look like {null, 10, 20, 30, null, 40, 50, 60}
+  std::vector<std::shared_ptr<Page>> pages;
+  std::vector<int32_t> values = {10, 20, 30, 40, 50, 60};
+  std::vector<int16_t> def_levels = {0, 1, 1, 0, 1, 1, 1, 1};
+
+  std::unique_ptr<PageReader> pager;
+  std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+      &descr, values, /*num_values=*/static_cast<int>(values.size()), 
Encoding::PLAIN,
+      /*indices=*/{},
+      /*indices_size=*/0, def_levels, level_info.def_level,
+      /*rep_levels=*/{}, level_info.rep_level);
+  pages.push_back(std::move(page));
+  pager.reset(new test::MockPageReader(pages));
+
+  std::shared_ptr<internal::RecordReader> record_reader =
+      internal::RecordReader::Make(&descr, level_info);
+  record_reader->SetPageReader(std::move(pager));
+
+  {
+    // Skip {null, 10}
+    // This also tests when we start with a Skip.
+    int64_t records_skipped = record_reader->SkipRecords(/*num_records=*/2);
+
+    ASSERT_EQ(records_skipped, 2);
+    ASSERT_EQ(record_reader->values_written(), 0);
+    // Since we started with skipping, there was nothing in the buffer to 
consume
+    // for skipping.
+    ASSERT_EQ(record_reader->levels_written(), 0);
+    ASSERT_EQ(record_reader->levels_position(), 0);
+  }
+
+  {
+    // Read 3 records: {20, null, 30}
+    int64_t records_read = record_reader->ReadRecords(/*num_records=*/3);
+
+    ASSERT_EQ(records_read, 3);
+    // One of these values is null. values_written() includes null values.
+    ASSERT_EQ(record_reader->values_written(), 3);
+    ASSERT_EQ(record_reader->null_count(), 1);
+    // When we read, we buffer some levels. Since there are only a few levels
+    // in our whole column, all of them are read.
+    // We had skipped 2 of the levels above. So there is only 6 left in total 
to
+    // read, and we read 3 of them here.
+    ASSERT_EQ(record_reader->levels_written(), 6);
+    ASSERT_EQ(record_reader->levels_position(), 3);
+
+    std::vector<int16_t> read_defs(
+        record_reader->def_levels(),
+        record_reader->def_levels() + record_reader->levels_position());
+
+    const auto read_values = reinterpret_cast<const 
int32_t*>(record_reader->values());
+    std::vector<int32_t> read_vals(read_values,
+                                   read_values + 
record_reader->values_written());
+
+    // ReadRecords for optional fields uses ReadValuesSpaced, so there is a
+    // placeholder for null.
+    ASSERT_EQ(read_vals[0], 20);
+    // read_vals[1] is a space for null.
+    ASSERT_EQ(read_vals[2], 30);
+    ASSERT_THAT(read_defs, ElementsAre(1, 0, 1));
+  }
+
+  {
+    // Skip {40, 50}.
+    int64_t records_skipped = record_reader->SkipRecords(/*num_records=*/2);
+
+    ASSERT_EQ(records_skipped, 2);
+
+    ASSERT_EQ(record_reader->values_written(), 3);
+    ASSERT_EQ(record_reader->levels_written(), 4);
+    ASSERT_EQ(record_reader->levels_position(), 3);
+
+    std::vector<int16_t> read_defs(
+        record_reader->def_levels(),
+        record_reader->def_levels() + record_reader->levels_position());
+
+    const auto read_values = reinterpret_cast<const 
int32_t*>(record_reader->values());
+    std::vector<int32_t> read_vals(read_values,
+                                   read_values + 
record_reader->values_written());
+  }
+
+  {
+    // Read to the end of the column. Read {60}
+    // This test checks that ReadAndThrowAwayValues works, since if it
+    // does not we would read the wrong values.
+    int64_t records_read = record_reader->ReadRecords(/*num_records=*/1);
+
+    ASSERT_EQ(records_read, 1);
+    ASSERT_EQ(record_reader->values_written(), 4);
+    ASSERT_EQ(record_reader->null_count(), 1);
+    ASSERT_EQ(record_reader->levels_written(), 4);
+    ASSERT_EQ(record_reader->levels_position(), 4);
+
+    std::vector<int16_t> read_defs(
+        record_reader->def_levels(),
+        record_reader->def_levels() + record_reader->levels_position());
+
+    const auto read_values = reinterpret_cast<const 
int32_t*>(record_reader->values());
+    std::vector<int32_t> read_vals(read_values,
+                                   read_values + 
record_reader->values_written());
+
+    ASSERT_EQ(read_vals[0], 20);
+    // read_vals[1] is a space for null.
+    ASSERT_EQ(read_vals[2], 30);
+    ASSERT_EQ(read_vals[3], 60);
+    ASSERT_THAT(read_defs, ElementsAre(1, 0, 1, 1));
+  }
+
+  // We have exhausted all the records.
+  ASSERT_EQ(record_reader->ReadRecords(/*num_records=*/3), 0);
+  ASSERT_EQ(record_reader->SkipRecords(/*num_records=*/3), 0);
+}
+
+// Test skipping for repeated fields.
+TEST(RecordReaderTest, SkipRepeated) {
+  internal::LevelInfo level_info;
+  level_info.def_level = 1;
+  level_info.rep_level = 1;
+
+  NodePtr type = schema::Int32("b", Repetition::REPEATED);
+  const ColumnDescriptor descr(type, level_info.def_level, 
level_info.rep_level);
+
+  // Records look like {null, [20, 20, 20], null, [30, 30], [40]}
+  std::vector<std::shared_ptr<Page>> pages;
+  std::vector<int32_t> values = {20, 20, 20, 30, 30, 40};
+  std::vector<int16_t> def_levels = {0, 1, 1, 1, 0, 1, 1, 1};
+  std::vector<int16_t> rep_levels = {0, 0, 1, 1, 0, 0, 1, 0};
+
+  std::unique_ptr<PageReader> pager;
+  std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+      &descr, values, /*num_values=*/static_cast<int>(values.size()), 
Encoding::PLAIN,
+      /*indices=*/{},
+      /*indices_size=*/0, def_levels, level_info.def_level, rep_levels,
+      level_info.rep_level);
+  pages.push_back(std::move(page));
+  pager.reset(new test::MockPageReader(pages));
+
+  std::shared_ptr<internal::RecordReader> record_reader =
+      internal::RecordReader::Make(&descr, level_info);
+  record_reader->SetPageReader(std::move(pager));
+
+  {
+    // This should skip the first null record.
+    int64_t records_skipped = record_reader->SkipRecords(/*num_records=*/1);
+
+    ASSERT_EQ(records_skipped, 1);
+    ASSERT_EQ(record_reader->values_written(), 0);
+    ASSERT_EQ(record_reader->null_count(), 0);
+    // For repeated fields, we need to read the levels to find the record
+    // boundaries and skip. So some levels are read, however, the skipped
+    // level should not be there after the skip. That's why levels_position()
+    // is 0.
+    ASSERT_EQ(record_reader->levels_written(), 7);
+    ASSERT_EQ(record_reader->levels_position(), 0);
+  }
+
+  {
+    // Read [20, 20, 20]
+    int64_t records_read = record_reader->ReadRecords(/*num_records=*/1);
+
+    ASSERT_EQ(records_read, 1);
+    ASSERT_EQ(record_reader->values_written(), 3);
+    ASSERT_EQ(record_reader->null_count(), 0);
+    ASSERT_EQ(record_reader->levels_written(), 7);
+    ASSERT_EQ(record_reader->levels_position(), 3);
+
+    const auto read_values = reinterpret_cast<const 
int32_t*>(record_reader->values());
+    std::vector<int32_t> read_vals(read_values,
+                                   read_values + 
record_reader->values_written());
+    std::vector<int16_t> read_defs(
+        record_reader->def_levels(),
+        record_reader->def_levels() + record_reader->levels_position());
+    std::vector<int16_t> read_reps(
+        record_reader->rep_levels(),
+        record_reader->rep_levels() + record_reader->levels_position());
+
+    ASSERT_THAT(read_vals, ElementsAre(20, 20, 20));
+    ASSERT_THAT(read_defs, ElementsAre(1, 1, 1));
+    ASSERT_THAT(read_reps, ElementsAre(0, 1, 1));
+  }
+
+  {
+    // Skip the null record and also skip [30, 30]
+    int64_t records_skipped = record_reader->SkipRecords(/*num_records=*/2);
+
+    ASSERT_EQ(records_skipped, 2);
+    ASSERT_EQ(record_reader->values_written(), 3);
+    ASSERT_EQ(record_reader->null_count(), 0);
+    // We remove the skipped levels from the buffer.
+    ASSERT_EQ(record_reader->levels_written(), 4);
+    ASSERT_EQ(record_reader->levels_position(), 3);
+
+    const auto read_values = reinterpret_cast<const 
int32_t*>(record_reader->values());
+    std::vector<int32_t> read_vals(read_values,
+                                   read_values + 
record_reader->values_written());
+    std::vector<int16_t> read_defs(
+        record_reader->def_levels(),
+        record_reader->def_levels() + record_reader->levels_position());
+    std::vector<int16_t> read_reps(
+        record_reader->rep_levels(),
+        record_reader->rep_levels() + record_reader->levels_position());
+
+    ASSERT_THAT(read_vals, ElementsAre(20, 20, 20));
+    ASSERT_THAT(read_defs, ElementsAre(1, 1, 1));
+    ASSERT_THAT(read_reps, ElementsAre(0, 1, 1));
+  }
+
+  {
+    // Read [40]
+    int64_t records_read = record_reader->ReadRecords(/*num_records=*/1);
+
+    ASSERT_EQ(records_read, 1);
+    ASSERT_EQ(record_reader->values_written(), 4);
+    ASSERT_EQ(record_reader->null_count(), 0);
+    ASSERT_EQ(record_reader->levels_written(), 4);
+    ASSERT_EQ(record_reader->levels_position(), 4);
+
+    const auto read_values = reinterpret_cast<const 
int32_t*>(record_reader->values());
+    std::vector<int32_t> read_vals(read_values,
+                                   read_values + 
record_reader->values_written());
+    std::vector<int16_t> read_defs(
+        record_reader->def_levels(),
+        record_reader->def_levels() + record_reader->levels_position());
+    std::vector<int16_t> read_reps(
+        record_reader->rep_levels(),
+        record_reader->rep_levels() + record_reader->levels_position());
+
+    ASSERT_THAT(read_vals, ElementsAre(20, 20, 20, 40));
+    ASSERT_THAT(read_defs, ElementsAre(1, 1, 1, 1));
+    ASSERT_THAT(read_reps, ElementsAre(0, 1, 1, 0));
+  }
+}
+
+// Test reading when one record spans multiple pages for a repeated field.
+TEST(RecordReaderTest, ReadPartialRecord) {
+  internal::LevelInfo level_info;
+  level_info.def_level = 1;
+  level_info.rep_level = 1;
+
+  NodePtr type = schema::Int32("b", Repetition::REPEATED);
+  const ColumnDescriptor descr(type, level_info.def_level, 
level_info.rep_level);
+
+  std::vector<std::shared_ptr<Page>> pages;
+  std::unique_ptr<PageReader> pager;
+
+  // Page 1: {[10], [20, 20, 20 ... } continues to next page.
+  {
+    std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+        &descr, /*values=*/{10, 20, 20, 20}, /*num_values=*/4, Encoding::PLAIN,
+        /*indices=*/{},
+        /*indices_size=*/0, /*def_levels=*/{1, 1, 1, 1}, level_info.def_level,
+        /*rep_levels=*/{0, 0, 1, 1}, level_info.rep_level);
+    pages.push_back(std::move(page));
+  }
+
+  // Page 2: {... 20, 20, ...} continues from previous page and to next page.
+  {
+    std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+        &descr, /*values=*/{20, 20}, /*num_values=*/2, Encoding::PLAIN,
+        /*indices=*/{},
+        /*indices_size=*/0, /*def_levels=*/{1, 1}, level_info.def_level,
+        /*rep_levels=*/{1, 1}, level_info.rep_level);
+    pages.push_back(std::move(page));
+  }
+
+  // Page 3: { ... 20, [30]} continues from previous page.
+  {
+    std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+        &descr, /*values=*/{20, 30}, /*num_values=*/2, Encoding::PLAIN,
+        /*indices=*/{},
+        /*indices_size=*/0, /*def_levels=*/{1, 1}, level_info.def_level,
+        /*rep_levels=*/{1, 0}, level_info.rep_level);
+    pages.push_back(std::move(page));
+  }
+
+  pager.reset(new test::MockPageReader(pages));
+
+  std::shared_ptr<internal::RecordReader> record_reader =
+      internal::RecordReader::Make(&descr, level_info);
+  record_reader->SetPageReader(std::move(pager));
+
+  {
+    // Read [10]
+    int64_t records_read = record_reader->ReadRecords(/*num_records=*/1);
+
+    ASSERT_EQ(records_read, 1);
+
+    const auto read_values = reinterpret_cast<const 
int32_t*>(record_reader->values());
+    std::vector<int32_t> read_vals(read_values,
+                                   read_values + 
record_reader->values_written());
+

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to