fatemehp commented on code in PR #14142:
URL: https://github.com/apache/arrow/pull/14142#discussion_r978130991


##########
cpp/src/parquet/column_reader_test.cc:
##########
@@ -573,5 +573,464 @@ TEST_F(TestPrimitiveReader, 
TestNonDictionaryEncodedPagesWithExposeEncoding) {
   pages_.clear();
 }
 
+// Tests reading a repeated field using the RecordReader.
+TEST(RecordReaderTest, BasicReadRepeatedField) {
+  internal::LevelInfo level_info;
+  level_info.def_level = 1;
+  level_info.rep_level = 1;
+
+  NodePtr type = schema::Int32("b", Repetition::OPTIONAL);
+  const ColumnDescriptor descr(type, level_info.def_level,
+                               level_info.rep_level);
+
+
+  // Records look like: {[10], [20, 20], [30, 30, 30]}
+  std::vector<std::shared_ptr<Page>> pages;
+  std::vector<int32_t> values = {10, 20, 20, 30, 30, 30};
+  std::vector<int16_t> def_levels = {1, 1, 1, 1, 1, 1};
+  std::vector<int16_t> rep_levels = {0, 0, 1, 0, 1, 1};
+
+  std::unique_ptr<PageReader> pager;
+  std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+      &descr, values, /*num_values=*/def_levels.size(), Encoding::PLAIN,
+      /*indices=*/{},
+      /*indices_size=*/0, def_levels, level_info.def_level, rep_levels,
+      level_info.rep_level);
+  pages.push_back(std::move(page));
+  pager.reset(new test::MockPageReader(pages));
+
+  std::shared_ptr<internal::RecordReader> record_reader =
+      internal::RecordReader::Make(&descr, level_info);
+  record_reader->SetPageReader(std::move(pager));
+
+  int64_t records_read = record_reader->ReadRecords(/*num_records=*/2);
+
+  ASSERT_EQ(records_read, 2);
+  ASSERT_EQ(record_reader->values_written(), 3);
+  ASSERT_EQ(record_reader->null_count(), 0);
+  ASSERT_EQ(record_reader->levels_written(), 6);
+  ASSERT_EQ(record_reader->levels_position(), 3);
+
+  const auto read_values =
+      reinterpret_cast<const int32_t*>(record_reader->values());
+  std::vector<int32_t> read_vals(read_values,
+                                 read_values + 
record_reader->values_written());
+  std::vector<int16_t> read_defs(
+      record_reader->def_levels(),
+      record_reader->def_levels() + record_reader->levels_position());
+  std::vector<int16_t> read_reps(
+      record_reader->rep_levels(),
+      record_reader->rep_levels() + record_reader->levels_position());
+
+  ASSERT_TRUE(vector_equal(read_vals, {10, 20, 20}));
+  ASSERT_TRUE(vector_equal(read_defs, {1, 1, 1}));
+  ASSERT_TRUE(vector_equal(read_reps, {0, 0, 1}));
+}
+
+// Test that we can skip required top level field.
+TEST(RecordReaderTest, SkipRequiredTopLevel) {
+  internal::LevelInfo level_info;
+  level_info.def_level = 0;
+  level_info.rep_level = 0;
+
+  NodePtr type = schema::Int32("b", Repetition::REQUIRED);
+  const ColumnDescriptor descr(type, level_info.def_level,
+                               level_info.rep_level);
+
+  std::vector<std::shared_ptr<Page>> pages;
+  std::vector<int32_t> values = {10, 20, 20, 30, 30, 30};
+
+  std::unique_ptr<PageReader> pager;
+  std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+      &descr, values, /*num_values=*/values.size(), Encoding::PLAIN,
+      /*indices=*/{},
+      /*indices_size=*/0, /*def_levels=*/{}, level_info.def_level,
+      /*rep_levels=*/{}, level_info.rep_level);
+  pages.push_back(std::move(page));
+  pager.reset(new test::MockPageReader(pages));
+
+  std::shared_ptr<internal::RecordReader> record_reader =
+      internal::RecordReader::Make(&descr, level_info);
+  record_reader->SetPageReader(std::move(pager));
+
+  int64_t records_skipped = record_reader->SkipRecords(/*num_records=*/3);
+
+  ASSERT_EQ(records_skipped, 3);
+  ASSERT_EQ(record_reader->values_written(), 0);
+  ASSERT_EQ(record_reader->levels_written(), 0);
+  ASSERT_EQ(record_reader->levels_position(), 0);
+
+  int64_t records_read = record_reader->ReadRecords(/*num_records=*/2);
+
+  ASSERT_EQ(records_read, 2);
+  ASSERT_EQ(record_reader->values_written(), 2);
+  ASSERT_EQ(record_reader->levels_written(), 0);
+  ASSERT_EQ(record_reader->levels_position(), 0);
+
+  const auto read_values =
+      reinterpret_cast<const int32_t*>(record_reader->values());
+  std::vector<int32_t> read_vals(read_values,
+                                 read_values + 
record_reader->values_written());
+
+  ASSERT_TRUE(vector_equal(read_vals, {30, 30}));
+}
+
+// Skip an optional field. Intentionally included some null values.
+TEST(RecordReaderTest, SkipOptional) {
+  internal::LevelInfo level_info;
+  level_info.def_level = 1;
+  level_info.rep_level = 0;
+
+  NodePtr type = schema::Int32("b", Repetition::OPTIONAL);
+  const ColumnDescriptor descr(type, level_info.def_level,
+                               level_info.rep_level);
+
+  // Records look like {null, 20, 20, 20, null, 30, 30}
+  std::vector<std::shared_ptr<Page>> pages;
+  std::vector<int32_t> values = {20, 20, 20, 30, 30};
+  std::vector<int16_t> def_levels = {0, 1, 1, 1, 0, 1, 1};
+
+  std::unique_ptr<PageReader> pager;
+  std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+      &descr, values, /*num_values=*/values.size(), Encoding::PLAIN,
+      /*indices=*/{},
+      /*indices_size=*/0, def_levels, level_info.def_level,
+      /*rep_levels=*/{}, level_info.rep_level);
+  pages.push_back(std::move(page));
+  pager.reset(new test::MockPageReader(pages));
+
+  std::shared_ptr<internal::RecordReader> record_reader =
+      internal::RecordReader::Make(&descr, level_info);
+  record_reader->SetPageReader(std::move(pager));
+
+  // This also tests when we start with a Skip.
+  int64_t records_skipped = record_reader->SkipRecords(/*num_records=*/2);
+
+  ASSERT_EQ(records_skipped, 2);
+  ASSERT_EQ(record_reader->values_written(), 0);
+  // Since we started with skipping, there was nothing in the buffer to consume
+  // for skipping.
+  ASSERT_EQ(record_reader->levels_written(), 0);
+  ASSERT_EQ(record_reader->levels_position(), 0);
+
+  int64_t records_read = record_reader->ReadRecords(/*num_records=*/3);
+
+  ASSERT_EQ(records_read, 3);
+  // One of these values is null. values_written() includes null values.
+  ASSERT_EQ(record_reader->values_written(), 3);
+  // When we read, we buffer some levels. Since there are only a few levels
+  // in our whole column, all of them are read.
+  // We had skipped 2 of the levels above. So there is only 5 left in total to
+  // read, and we read 3 of them here.
+  ASSERT_EQ(record_reader->levels_written(), 5);
+  ASSERT_EQ(record_reader->levels_position(), 3);
+
+  const auto read_values =
+      reinterpret_cast<const int32_t*>(record_reader->values());
+  std::vector<int32_t> read_vals(read_values,
+                                 read_values + record_reader->values_written() 
-
+                                     record_reader->null_count());
+
+  ASSERT_TRUE(vector_equal(read_vals, {20, 20}));

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to