wgtmac commented on code in PR #17877:
URL: https://github.com/apache/arrow/pull/17877#discussion_r1118212149


##########
cpp/src/parquet/column_reader.cc:
##########
@@ -340,13 +340,13 @@ class SerializedPageReader : public PageReader {
 void SerializedPageReader::InitDecryption() {
   // Prepare the AAD for quick update later.
   if (crypto_ctx_.data_decryptor != nullptr) {
-    DCHECK(!crypto_ctx_.data_decryptor->file_aad().empty());
+    ARROW_DCHECK(!crypto_ctx_.data_decryptor->file_aad().empty());

Review Comment:
   Just curious why moving from `DCHECK` to `ARROW_DCHECK`? Actually they are 
the same thing.



##########
cpp/src/parquet/column_reader_benchmark.cc:
##########
@@ -56,19 +58,36 @@ class BenchmarkHelper {
     }
   }
 
-  Int32Reader* ResetReader() {
+  Int32Reader* ResetColumnReader() {
     std::unique_ptr<PageReader> pager;
     pager.reset(new test::MockPageReader(pages_));
     column_reader_ = ColumnReader::Make(descr_.get(), std::move(pager));
     return static_cast<Int32Reader*>(column_reader_.get());
   }
 
+  RecordReader* ResetRecordReader(bool read_dense_for_nullable) {
+    std::unique_ptr<PageReader> pager;
+    pager.reset(new test::MockPageReader(pages_));
+    internal::LevelInfo level_info;
+    level_info.def_level = descr_->max_definition_level();
+    level_info.rep_level = descr_->max_repetition_level();
+    record_reader_ = internal::RecordReader::Make(
+        descr_.get(), level_info, ::arrow::default_memory_pool(),
+        /*read_dictionary=*/false,
+        /*read_dense_for_nullable=*/read_dense_for_nullable);

Review Comment:
   nit: the comment is redundant as the argument is self-descriptive



##########
cpp/src/parquet/column_reader_test.cc:
##########
@@ -703,9 +710,187 @@ class RecordReaderTest : public ::testing::Test {
   Repetition::type repetition_type_;
 };
 
-// Tests reading a repeated field using the RecordReader.
-TEST_F(RecordReaderTest, BasicReadRepeatedField) {
-  Init(/*max_def_level=*/1, /*max_rep_level=*/1, Repetition::REPEATED);
+// Tests reading a required field. The expected results are the same for
+// reading dense and spaced.
+TEST_P(RecordReaderPrimitiveTypeTest, ReadRequired) {
+  Init(/*max_def_level=*/0, /*max_rep_level=*/0, Repetition::REQUIRED,
+       /*repeated_ancestor_def_level=*/0);
+
+  // Records look like: {10, 20, 20, 30, 30, 30}
+  std::vector<std::shared_ptr<Page>> pages;
+  std::vector<int32_t> values = {10, 20, 20, 30, 30, 30};
+  std::vector<int16_t> def_levels = {};
+  std::vector<int16_t> rep_levels = {};
+
+  std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+      descr_.get(), values, /*num_values=*/static_cast<int>(def_levels.size()),
+      Encoding::PLAIN,
+      /*indices=*/{},
+      /*indices_size=*/0, def_levels, level_info_.def_level, rep_levels,
+      level_info_.rep_level);
+  pages.push_back(std::move(page));
+  auto pager = std::make_unique<MockPageReader>(pages);
+  record_reader_->SetPageReader(std::move(pager));
+
+  // Read [10]
+  int64_t records_read = record_reader_->ReadRecords(/*num_records=*/1);
+  ASSERT_EQ(records_read, 1);
+  CheckState(/*values_written=*/1, /*null_count=*/0, /*levels_written=*/0,
+             /*levels_position=*/0);
+  CheckReadValues(/*expected_values=*/{10}, /*expected_defs=*/{},
+                  /*expected_reps=*/{});
+  record_reader_->Reset();
+  CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/0,
+             /*levels_position=*/0);
+
+  // Read 20, 20, 30, 30, 30
+  records_read = record_reader_->ReadRecords(/*num_records=*/10);
+  ASSERT_EQ(records_read, 5);
+  CheckState(/*values_written=*/5, /*null_count=*/0, /*levels_written=*/0,
+             /*levels_position=*/0);
+  CheckReadValues(/*expected_values=*/{20, 20, 30, 30, 30},
+                  /*expected_defs=*/{},
+                  /*expected_reps=*/{});
+  record_reader_->Reset();
+  CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/0,
+             /*levels_position=*/0);
+}
+
+// Tests reading an optional field.
+// Use a max definition field > 1 to test both cases where parent is present or
+// parent is missing.
+TEST_P(RecordReaderPrimitiveTypeTest, ReadOptional) {
+  Init(/*max_def_level=*/2, /*max_rep_level=*/0, Repetition::OPTIONAL,
+       /*repeated_ancestor_def_level=*/0);
+
+  // Records look like: {10, null, 20, 20, null, 30, 30, 30, null}
+  std::vector<std::shared_ptr<Page>> pages;
+  std::vector<int32_t> values = {10, 20, 20, 30, 30, 30};
+  std::vector<int16_t> def_levels = {2, 0, 2, 2, 1, 2, 2, 2, 0};
+
+  std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+      descr_.get(), values, /*num_values=*/static_cast<int>(def_levels.size()),
+      Encoding::PLAIN,
+      /*indices=*/{},
+      /*indices_size=*/0, def_levels, level_info_.def_level, /*rep_levels=*/{},
+      level_info_.rep_level);
+  pages.push_back(std::move(page));
+  auto pager = std::make_unique<MockPageReader>(pages);
+  record_reader_->SetPageReader(std::move(pager));
+
+  // Read 10, null
+  int64_t records_read = record_reader_->ReadRecords(/*num_records=*/2);
+  ASSERT_EQ(records_read, 2);
+  if (GetParam() == /*read_dense_for_nullable=*/true) {
+    CheckState(/*values_written=*/1, /*null_count=*/0, /*levels_written=*/9,
+               /*levels_position=*/2);
+    CheckReadValues(/*expected_values=*/{10}, /*expected_defs=*/{2, 0},
+                    /*expected_reps=*/{});
+  } else {
+    CheckState(/*values_written=*/2, /*null_count=*/1, /*levels_written=*/9,
+               /*levels_position=*/2);
+    CheckReadValues(/*expected_values=*/{10, kNullValue}, 
/*expected_defs=*/{2, 0},
+                    /*expected_reps=*/{});
+  }
+  record_reader_->Reset();
+  CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/7,
+             /*levels_position=*/0);
+
+  // Read 20, 20, null (parent present), 30, 30, 30
+  records_read = record_reader_->ReadRecords(/*num_records=*/6);
+  ASSERT_EQ(records_read, 6);
+  if (GetParam() == /*read_dense_for_nullable=*/true) {
+    CheckState(/*values_written=*/5, /*null_count=*/0, /*levels_written=*/7,
+               /*levels_position=*/6);
+    CheckReadValues(/*expected_values=*/{20, 20, 30, 30, 30},
+                    /*expected_defs=*/{2, 2, 1, 2, 2, 2},
+                    /*expected_reps=*/{});
+  } else {
+    CheckState(/*values_written=*/6, /*null_count=*/1, /*levels_written=*/7,
+               /*levels_position=*/6);
+    CheckReadValues(/*expected_values=*/{20, 20, kNullValue, 30, 30, 30},
+                    /*expected_defs=*/{2, 2, 1, 2, 2, 2},
+                    /*expected_reps=*/{});
+  }
+  record_reader_->Reset();
+  CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/1,
+             /*levels_position=*/0);
+
+  // Read the last null value and read past the end.
+  records_read = record_reader_->ReadRecords(/*num_records=*/3);
+  ASSERT_EQ(records_read, 1);
+  if (GetParam() == /*read_dense_for_nullable=*/true) {
+    CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/1,
+               /*levels_position=*/1);
+    CheckReadValues(/*expected_values=*/{},
+                    /*expected_defs=*/{0},
+                    /*expected_reps=*/{});
+  } else {
+    CheckState(/*values_written=*/1, /*null_count=*/1, /*levels_written=*/1,
+               /*levels_position=*/1);
+    CheckReadValues(/*expected_values=*/{kNullValue},
+                    /*expected_defs=*/{0},
+                    /*expected_reps=*/{});
+  }
+  record_reader_->Reset();
+  CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/0,
+             /*levels_position=*/0);
+}
+
+// Tests reading a required repeated field. The results are the same for 
reading
+// dense or spaced.
+TEST_P(RecordReaderPrimitiveTypeTest, ReadRequiredRepeated) {
+  // Set repeated_ancestor_def_level = max_def_level so that the repeated field
+  // is not nullable.
+  Init(/*max_def_level=*/1, /*max_rep_level=*/1, Repetition::REPEATED,
+       /*repeated_ancestor_def_level=*/1);
+
+  // Records look like: {[10], [20, 20], [30, 30, 30]}
+  std::vector<std::shared_ptr<Page>> pages;
+  std::vector<int32_t> values = {10, 20, 20, 30, 30, 30};
+  std::vector<int16_t> def_levels = {1, 1, 1, 1, 1, 1};
+  std::vector<int16_t> rep_levels = {0, 0, 1, 0, 1, 1};
+
+  std::shared_ptr<DataPageV1> page = MakeDataPage<Int32Type>(
+      descr_.get(), values, /*num_values=*/static_cast<int>(def_levels.size()),
+      Encoding::PLAIN,
+      /*indices=*/{},
+      /*indices_size=*/0, def_levels, level_info_.def_level, rep_levels,
+      level_info_.rep_level);
+  pages.push_back(std::move(page));
+  auto pager = std::make_unique<MockPageReader>(pages);
+  record_reader_->SetPageReader(std::move(pager));
+
+  // Read [10]
+  int64_t records_read = record_reader_->ReadRecords(/*num_records=*/1);
+  ASSERT_EQ(records_read, 1);
+  CheckState(/*values_written=*/1, /*null_count=*/0, /*levels_written=*/6,
+             /*levels_position=*/1);
+  CheckReadValues(/*expected_values=*/{10}, /*expected_defs=*/{1},
+                  /*expected_reps=*/{0});
+  record_reader_->Reset();
+  CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/5,
+             /*levels_position=*/0);
+
+  // Read [20, 20], [30, 30, 30]
+  records_read = record_reader_->ReadRecords(/*num_records=*/3);
+  ASSERT_EQ(records_read, 2);
+  CheckState(/*values_written=*/5, /*null_count=*/0, /*levels_written=*/5,
+             /*levels_position=*/5);
+  CheckReadValues(/*expected_values=*/{20, 20, 30, 30, 30},
+                  /*expected_defs=*/{1, 1, 1, 1, 1},
+                  /*expected_reps=*/{0, 1, 0, 1, 1});
+  record_reader_->Reset();
+  CheckState(/*values_written=*/0, /*null_count=*/0, /*levels_written=*/0,
+             /*levels_position=*/0);
+}
+
+// Tests reading a nullable repeated field.
+TEST_P(RecordReaderPrimitiveTypeTest, ReadNullableRepeated) {

Review Comment:
   Could you please add a test case for `empty list` and `list of null`?
   For example:
   - {[10], [] , [20, 20], [], [30, 30, 30]}
   - {[10], [null], [20, 20], [30, null, 30], [null, null]}
   
   So we can make sure cases for `max_def_level > 1` are well-covered.



##########
cpp/src/parquet/column_reader.cc:
##########
@@ -1803,7 +1805,104 @@ class TypedRecordReader : public 
TypedColumnReaderImpl<DType>,
     CheckNumberDecoded(num_decoded, values_to_read);
   }
 
-  // Return number of logical records read
+  // Reads repeated records and returns number of records read. Fills in
+  // values_to_read and null_count.
+  int64_t ReadRepeatedRecords(int64_t num_records, int64_t* values_to_read,
+                              int64_t* null_count) {
+    const int64_t start_levels_position = levels_position_;
+    // Note that repeated records may be required or nullable. If they have
+    // an optional parent in the path, they will be nullable, otherwise,
+    // they are required. We use leaf_info_->HasNullableValues() that looks
+    // at repeated_ancestor_def_level to determine if it is required or
+    // nullable. Even if they are required, we may have to read ahead and
+    // delimit the records to get the right number of values and they will
+    // have associated levels.
+    int64_t records_read = DelimitRecords(num_records, values_to_read);
+    if (!nullable_values()) {

Review Comment:
   Lines from 1850 to 1857 are doing the same thing, should be merge them into 
single branch?



##########
cpp/src/parquet/column_reader_test.cc:
##########
@@ -644,20 +644,27 @@ TEST_F(TestPrimitiveReader, 
TestNonDictionaryEncodedPagesWithExposeEncoding) {
   pages_.clear();
 }
 
-class RecordReaderTest : public ::testing::Test {
+class RecordReaderPrimitiveTypeTest : public ::testing::TestWithParam<bool> {

Review Comment:
   This may provide better readability. Otherwise one has to go all the way to 
the end to understand what does it mean.



##########
cpp/src/parquet/column_reader_benchmark.cc:
##########
@@ -123,35 +142,82 @@ static void 
ColumnReaderReadBatchInt32(::benchmark::State& state) {
   state.SetBytesProcessed(state.iterations() * helper.total_size());
 }
 
+// Benchmarks ReadRecords for RecordReader with the following parameters in 
order:
+// - repetition: 0 for REQUIRED, 1 for OPTIONAL, 2 for REPEATED.
+// - batch_size: sets how many values to read at each call.
+// - read_dense_for_nullable: sets reading dense or spaced.
+static void RecordReaderReadRecords(::benchmark::State& state) {
+  const auto repetition = static_cast<Repetition::type>(state.range(0));
+  const auto batch_size = static_cast<int64_t>(state.range(1));
+  const bool read_dense_for_nullable = state.range(2);
+
+  BenchmarkHelper helper(repetition, /*num_pages=*/16, 
/*levels_per_page=*/80000);
+
+  // Vectors to read the values into.
+  for (auto _ : state) {
+    state.PauseTiming();
+    RecordReader* reader = helper.ResetRecordReader(read_dense_for_nullable);
+    int64_t records_read = -1;
+    state.ResumeTiming();
+    while (records_read != 0) {
+      DoNotOptimize(records_read = reader->ReadRecords(batch_size));
+      reader->Reset();
+    }
+  }
+
+  state.SetBytesProcessed(state.iterations() * helper.total_size());
+}
+
+// Benchmarks SkipRecords for RecordReader with the following parameters in 
order:
+// - repetition: 0 for REQUIRED, 1 for OPTIONAL, 2 for REPEATED.
+// - batch_size: sets how many values to read at each call.
+static void RecordReaderSkipRecords(::benchmark::State& state) {
+  const auto repetition = static_cast<Repetition::type>(state.range(0));
+  const auto batch_size = static_cast<int64_t>(state.range(1));
+
+  BenchmarkHelper helper(repetition, /*num_pages=*/16, 
/*levels_per_page=*/80000);
+
+  // Vectors to read the values into.
+  for (auto _ : state) {
+    state.PauseTiming();
+    RecordReader* reader = 
helper.ResetRecordReader(/*read_dense_for_nullable=*/true);

Review Comment:
   Does the value of `read_dense_for_nullable` really matters here? Or should 
we set it to false?



##########
cpp/src/parquet/column_reader_test.cc:
##########
@@ -644,20 +644,27 @@ TEST_F(TestPrimitiveReader, 
TestNonDictionaryEncodedPagesWithExposeEncoding) {
   pages_.clear();
 }
 
-class RecordReaderTest : public ::testing::Test {
+class RecordReaderPrimitiveTypeTest : public ::testing::TestWithParam<bool> {

Review Comment:
   ```suggestion
   using ReadDenseForNullable = bool;
   class RecordReaderPrimitiveTypeTest : public 
::testing::TestWithParam<ReadDenseForNullable> {
   ```



##########
cpp/src/parquet/column_reader_benchmark.cc:
##########
@@ -123,35 +142,82 @@ static void 
ColumnReaderReadBatchInt32(::benchmark::State& state) {
   state.SetBytesProcessed(state.iterations() * helper.total_size());
 }
 
+// Benchmarks ReadRecords for RecordReader with the following parameters in 
order:
+// - repetition: 0 for REQUIRED, 1 for OPTIONAL, 2 for REPEATED.
+// - batch_size: sets how many values to read at each call.
+// - read_dense_for_nullable: sets reading dense or spaced.
+static void RecordReaderReadRecords(::benchmark::State& state) {
+  const auto repetition = static_cast<Repetition::type>(state.range(0));
+  const auto batch_size = static_cast<int64_t>(state.range(1));
+  const bool read_dense_for_nullable = state.range(2);
+
+  BenchmarkHelper helper(repetition, /*num_pages=*/16, 
/*levels_per_page=*/80000);
+
+  // Vectors to read the values into.
+  for (auto _ : state) {
+    state.PauseTiming();
+    RecordReader* reader = helper.ResetRecordReader(read_dense_for_nullable);
+    int64_t records_read = -1;
+    state.ResumeTiming();
+    while (records_read != 0) {
+      DoNotOptimize(records_read = reader->ReadRecords(batch_size));
+      reader->Reset();
+    }
+  }
+
+  state.SetBytesProcessed(state.iterations() * helper.total_size());
+}
+
+// Benchmarks SkipRecords for RecordReader with the following parameters in 
order:
+// - repetition: 0 for REQUIRED, 1 for OPTIONAL, 2 for REPEATED.
+// - batch_size: sets how many values to read at each call.
+static void RecordReaderSkipRecords(::benchmark::State& state) {
+  const auto repetition = static_cast<Repetition::type>(state.range(0));
+  const auto batch_size = static_cast<int64_t>(state.range(1));
+
+  BenchmarkHelper helper(repetition, /*num_pages=*/16, 
/*levels_per_page=*/80000);
+
+  // Vectors to read the values into.
+  for (auto _ : state) {
+    state.PauseTiming();
+    RecordReader* reader = 
helper.ResetRecordReader(/*read_dense_for_nullable=*/true);
+    int64_t records_skipped = -1;
+    state.ResumeTiming();
+    while (records_skipped != 0) {
+      DoNotOptimize(records_skipped = reader->SkipRecords(batch_size));
+      reader->Reset();
+    }
+  }
+
+  state.SetBytesProcessed(state.iterations() * helper.total_size());
+}
+
 BENCHMARK(ColumnReaderSkipInt32)
     ->ArgNames({"Repetition", "BatchSize"})
-    ->Args({0, 100})
     ->Args({0, 1000})
-    ->Args({0, 10000})
-    ->Args({0, 100000})
-    ->Args({1, 100})
     ->Args({1, 1000})
-    ->Args({1, 10000})
-    ->Args({1, 100000})
-    ->Args({2, 100})
-    ->Args({2, 1000})
-    ->Args({2, 10000})
-    ->Args({2, 100000});
+    ->Args({2, 1000});
 
 BENCHMARK(ColumnReaderReadBatchInt32)
     ->ArgNames({"Repetition", "BatchSize"})
-    ->Args({0, 100})
     ->Args({0, 1000})
-    ->Args({0, 10000})
-    ->Args({0, 100000})
-    ->Args({1, 100})
     ->Args({1, 1000})
-    ->Args({1, 10000})
-    ->Args({1, 100000})
-    ->Args({2, 100})
-    ->Args({2, 1000})
-    ->Args({2, 10000})
-    ->Args({2, 100000});
+    ->Args({2, 1000});
+
+BENCHMARK(RecordReaderSkipRecords)

Review Comment:
   Why not keep different batch sizes as before?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to