This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 16eb02649f PARQUET-2423: [C++][Parquet] Avoid allocating buffer object 
in RecordReader's SkipRecords (#39818)
16eb02649f is described below

commit 16eb02649fde5c7ea9ba3607697a832f13d36cb3
Author: Jinpeng <[email protected]>
AuthorDate: Thu Feb 29 08:06:58 2024 -0800

    PARQUET-2423: [C++][Parquet] Avoid allocating buffer object in 
RecordReader's SkipRecords (#39818)
    
    ### Rationale for this change
    
    Currently each invocation of `SkipRecords()` for non-repeated fields would 
[create a new 
buffer](https://github.com/apache/arrow/blob/main/cpp/src/parquet/column_reader.cc#L1482)
 object to hold a decoded validity bitmpa. It is not useful as we are merely 
counting how many defined values are in the internal buffer, not reusing the 
validity bitmap.
    
    ### What changes are included in this PR?
    
    * Remove temporary validity bitmap, just counting the definition levels at 
the max value instead. This improves performance when skipping non-repeated 
records.
    * Add a new microbenchmark for reading and skipping alternatively from a 
RecordReader.
    
    ### Are these changes tested?
    
    Yes.
    
    ### Are there any user-facing changes?
    
    No.
    
    Lead-authored-by: jp0317 <[email protected]>
    Co-authored-by: Antoine Pitrou <[email protected]>
    Co-authored-by: Jinpeng <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/parquet/column_reader.cc           | 14 ++------
 cpp/src/parquet/column_reader_benchmark.cc | 51 ++++++++++++++++++++++++++++++
 cpp/src/parquet/column_reader_test.cc      | 27 ++++++++++++++++
 3 files changed, 81 insertions(+), 11 deletions(-)

diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index ac4627d69c..3fb224154c 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -1478,17 +1478,9 @@ class TypedRecordReader : public 
TypedColumnReaderImpl<DType>,
     // We skipped the levels by incrementing 'levels_position_'. For values
     // we do not have a buffer, so we need to read them and throw them away.
     // First we need to figure out how many present/not-null values there are.
-    std::shared_ptr<::arrow::ResizableBuffer> valid_bits;
-    valid_bits = AllocateBuffer(this->pool_);
-    
PARQUET_THROW_NOT_OK(valid_bits->Resize(bit_util::BytesForBits(skipped_records),
-                                            /*shrink_to_fit=*/true));
-    ValidityBitmapInputOutput validity_io;
-    validity_io.values_read_upper_bound = skipped_records;
-    validity_io.valid_bits = valid_bits->mutable_data();
-    validity_io.valid_bits_offset = 0;
-    DefLevelsToBitmap(def_levels() + start_levels_position, skipped_records,
-                      this->leaf_info_, &validity_io);
-    int64_t values_to_read = validity_io.values_read - validity_io.null_count;
+    int64_t values_to_read =
+        std::count(def_levels() + start_levels_position, def_levels() + 
levels_position_,
+                   this->max_def_level_);
 
     // Now that we have figured out number of values to read, we do not need
     // these levels anymore. We will remove these values from the buffer.
diff --git a/cpp/src/parquet/column_reader_benchmark.cc 
b/cpp/src/parquet/column_reader_benchmark.cc
index 61fe397cf1..93ab2dfa8c 100644
--- a/cpp/src/parquet/column_reader_benchmark.cc
+++ b/cpp/src/parquet/column_reader_benchmark.cc
@@ -56,6 +56,7 @@ class BenchmarkHelper {
     for (const auto& page : pages_) {
       total_size_ += page->size();
     }
+    total_levels_ = static_cast<int64_t>(num_pages) * levels_per_page;
   }
 
   Int32Reader* ResetColumnReader() {
@@ -80,6 +81,8 @@ class BenchmarkHelper {
 
   int64_t total_size() const { return total_size_; }
 
+  int64_t total_levels() const { return total_levels_; }
+
  private:
   std::vector<std::shared_ptr<Page>> pages_;
   std::unique_ptr<ColumnDescriptor> descr_;
@@ -88,6 +91,7 @@ class BenchmarkHelper {
   // Reader for record reader benchmarks.
   std::shared_ptr<RecordReader> record_reader_;
   int64_t total_size_ = 0;
+  int64_t total_levels_ = 0;
 };
 
 // Benchmarks Skip for ColumnReader with the following parameters in order:
@@ -165,6 +169,7 @@ static void RecordReaderReadRecords(::benchmark::State& 
state) {
   }
 
   state.SetBytesProcessed(state.iterations() * helper.total_size());
+  state.SetItemsProcessed(state.iterations() * helper.total_levels());
 }
 
 // Benchmarks SkipRecords for RecordReader with the following parameters in 
order:
@@ -190,6 +195,40 @@ static void RecordReaderSkipRecords(::benchmark::State& 
state) {
   }
 
   state.SetBytesProcessed(state.iterations() * helper.total_size());
+  state.SetItemsProcessed(state.iterations() * helper.total_levels());
+}
+
+// Benchmarks ReadRecords and SkipRecords for RecordReader with the following 
parameters
+// in order:
+// - repetition: 0 for REQUIRED, 1 for OPTIONAL, 2 for REPEATED.
+// - batch_size: sets how many values to read/skip at each call.
+// - levels_per_page: sets how many levels to read/skip in total.
+static void RecordReaderReadAndSkipRecords(::benchmark::State& state) {
+  const auto repetition = static_cast<Repetition::type>(state.range(0));
+  const auto batch_size = static_cast<int64_t>(state.range(1));
+  const auto levels_per_page = static_cast<int>(state.range(2));
+
+  BenchmarkHelper helper(repetition, /*num_pages=*/16, levels_per_page);
+
+  // Vectors to read the values into.
+  for (auto _ : state) {
+    state.PauseTiming();
+    // read_dense_for_nullable should not matter for skip.
+    RecordReader* reader = 
helper.ResetRecordReader(/*read_dense_for_nullable=*/false);
+    int64_t records_read = -1;
+    int64_t records_skipped = -1;
+    state.ResumeTiming();
+    while (records_read != 0 && records_skipped != 0) {
+      // ReadRecords may buffer some levels which will be skipped by the 
following
+      // SkipRecords.
+      DoNotOptimize(records_read = reader->ReadRecords(batch_size));
+      DoNotOptimize(records_skipped = reader->SkipRecords(batch_size));
+      reader->Reset();
+    }
+  }
+
+  state.SetBytesProcessed(state.iterations() * helper.total_size());
+  state.SetItemsProcessed(state.iterations() * helper.total_levels());
 }
 
 BENCHMARK(ColumnReaderSkipInt32)
@@ -219,6 +258,18 @@ BENCHMARK(RecordReaderReadRecords)
     ->Args({2, 1000, true})
     ->Args({2, 1000, false});
 
+BENCHMARK(RecordReaderReadAndSkipRecords)
+    ->ArgNames({"Repetition", "BatchSize", "LevelsPerPage"})
+    ->Args({0, 10, 80000})
+    ->Args({0, 1000, 80000})
+    ->Args({0, 10000, 1000000})
+    ->Args({1, 10, 80000})
+    ->Args({1, 1000, 80000})
+    ->Args({1, 10000, 1000000})
+    ->Args({2, 10, 80000})
+    ->Args({2, 100, 80000})
+    ->Args({2, 10000, 1000000});
+
 void GenerateLevels(int level_repeats, int max_level, int num_levels,
                     std::vector<int16_t>* levels) {
   // Generate random levels
diff --git a/cpp/src/parquet/column_reader_test.cc 
b/cpp/src/parquet/column_reader_test.cc
index e2cc24502a..a48573966a 100644
--- a/cpp/src/parquet/column_reader_test.cc
+++ b/cpp/src/parquet/column_reader_test.cc
@@ -1607,6 +1607,33 @@ TEST_P(ByteArrayRecordReaderTest, ReadAndSkipOptional) {
   record_reader_->Reset();
 }
 
+// Test skipping buffered records when reading/skipping more than 
kMinLevelBatchSize
+// levels at a time.
+TEST_P(ByteArrayRecordReaderTest, ReadAndBatchSkipOptional) {
+  MakeRecordReader(/*levels_per_page=*/9000, /*num_pages=*/1);
+
+  // Read 100 records and buffer some records.
+  ASSERT_EQ(record_reader_->ReadRecords(/*num_records=*/100), 100);
+  CheckReadValues(0, 100);
+  record_reader_->Reset();
+
+  // Skip 3000 records. The buffered records will be skipped.
+  ASSERT_EQ(record_reader_->SkipRecords(/*num_records=*/3000), 3000);
+
+  // Read 900 records and buffer some records again.
+  ASSERT_EQ(record_reader_->ReadRecords(/*num_records=*/900), 900);
+  CheckReadValues(3100, 4000);
+  record_reader_->Reset();
+
+  // Skip 3000 records. The buffered records will be skipped.
+  ASSERT_EQ(record_reader_->SkipRecords(/*num_records=*/3000), 3000);
+
+  // Read 3000 records. Only 2000 records are left to be read.
+  ASSERT_EQ(record_reader_->ReadRecords(/*num_records=*/3000), 2000);
+  CheckReadValues(7000, 9000);
+  record_reader_->Reset();
+}
+
 // Tests reading and skipping an optional FLBA field.
 // The binary readers only differ in DecodeDense and DecodeSpaced functions, so
 // testing optional is sufficient in exercising those code paths.

Reply via email to