This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 16eb02649f PARQUET-2423: [C++][Parquet] Avoid allocating buffer object
in RecordReader's SkipRecords (#39818)
16eb02649f is described below
commit 16eb02649fde5c7ea9ba3607697a832f13d36cb3
Author: Jinpeng <[email protected]>
AuthorDate: Thu Feb 29 08:06:58 2024 -0800
PARQUET-2423: [C++][Parquet] Avoid allocating buffer object in
RecordReader's SkipRecords (#39818)
### Rationale for this change
Currently each invocation of `SkipRecords()` for non-repeated fields would
[create a new
buffer](https://github.com/apache/arrow/blob/main/cpp/src/parquet/column_reader.cc#L1482)
object to hold a decoded validity bitmpa. It is not useful as we are merely
counting how many defined values are in the internal buffer, not reusing the
validity bitmap.
### What changes are included in this PR?
* Remove temporary validity bitmap, just counting the definition levels at
the max value instead. This improves performance when skipping non-repeated
records.
* Add a new microbenchmark for reading and skipping alternatively from a
RecordReader.
### Are these changes tested?
Yes.
### Are there any user-facing changes?
No.
Lead-authored-by: jp0317 <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Co-authored-by: Jinpeng <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
cpp/src/parquet/column_reader.cc | 14 ++------
cpp/src/parquet/column_reader_benchmark.cc | 51 ++++++++++++++++++++++++++++++
cpp/src/parquet/column_reader_test.cc | 27 ++++++++++++++++
3 files changed, 81 insertions(+), 11 deletions(-)
diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index ac4627d69c..3fb224154c 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -1478,17 +1478,9 @@ class TypedRecordReader : public
TypedColumnReaderImpl<DType>,
// We skipped the levels by incrementing 'levels_position_'. For values
// we do not have a buffer, so we need to read them and throw them away.
// First we need to figure out how many present/not-null values there are.
- std::shared_ptr<::arrow::ResizableBuffer> valid_bits;
- valid_bits = AllocateBuffer(this->pool_);
-
PARQUET_THROW_NOT_OK(valid_bits->Resize(bit_util::BytesForBits(skipped_records),
- /*shrink_to_fit=*/true));
- ValidityBitmapInputOutput validity_io;
- validity_io.values_read_upper_bound = skipped_records;
- validity_io.valid_bits = valid_bits->mutable_data();
- validity_io.valid_bits_offset = 0;
- DefLevelsToBitmap(def_levels() + start_levels_position, skipped_records,
- this->leaf_info_, &validity_io);
- int64_t values_to_read = validity_io.values_read - validity_io.null_count;
+ int64_t values_to_read =
+ std::count(def_levels() + start_levels_position, def_levels() +
levels_position_,
+ this->max_def_level_);
// Now that we have figured out number of values to read, we do not need
// these levels anymore. We will remove these values from the buffer.
diff --git a/cpp/src/parquet/column_reader_benchmark.cc
b/cpp/src/parquet/column_reader_benchmark.cc
index 61fe397cf1..93ab2dfa8c 100644
--- a/cpp/src/parquet/column_reader_benchmark.cc
+++ b/cpp/src/parquet/column_reader_benchmark.cc
@@ -56,6 +56,7 @@ class BenchmarkHelper {
for (const auto& page : pages_) {
total_size_ += page->size();
}
+ total_levels_ = static_cast<int64_t>(num_pages) * levels_per_page;
}
Int32Reader* ResetColumnReader() {
@@ -80,6 +81,8 @@ class BenchmarkHelper {
int64_t total_size() const { return total_size_; }
+ int64_t total_levels() const { return total_levels_; }
+
private:
std::vector<std::shared_ptr<Page>> pages_;
std::unique_ptr<ColumnDescriptor> descr_;
@@ -88,6 +91,7 @@ class BenchmarkHelper {
// Reader for record reader benchmarks.
std::shared_ptr<RecordReader> record_reader_;
int64_t total_size_ = 0;
+ int64_t total_levels_ = 0;
};
// Benchmarks Skip for ColumnReader with the following parameters in order:
@@ -165,6 +169,7 @@ static void RecordReaderReadRecords(::benchmark::State&
state) {
}
state.SetBytesProcessed(state.iterations() * helper.total_size());
+ state.SetItemsProcessed(state.iterations() * helper.total_levels());
}
// Benchmarks SkipRecords for RecordReader with the following parameters in
order:
@@ -190,6 +195,40 @@ static void RecordReaderSkipRecords(::benchmark::State&
state) {
}
state.SetBytesProcessed(state.iterations() * helper.total_size());
+ state.SetItemsProcessed(state.iterations() * helper.total_levels());
+}
+
+// Benchmarks ReadRecords and SkipRecords for RecordReader with the following
parameters
+// in order:
+// - repetition: 0 for REQUIRED, 1 for OPTIONAL, 2 for REPEATED.
+// - batch_size: sets how many values to read/skip at each call.
+// - levels_per_page: sets how many levels to read/skip in total.
+static void RecordReaderReadAndSkipRecords(::benchmark::State& state) {
+ const auto repetition = static_cast<Repetition::type>(state.range(0));
+ const auto batch_size = static_cast<int64_t>(state.range(1));
+ const auto levels_per_page = static_cast<int>(state.range(2));
+
+ BenchmarkHelper helper(repetition, /*num_pages=*/16, levels_per_page);
+
+ // Vectors to read the values into.
+ for (auto _ : state) {
+ state.PauseTiming();
+ // read_dense_for_nullable should not matter for skip.
+ RecordReader* reader =
helper.ResetRecordReader(/*read_dense_for_nullable=*/false);
+ int64_t records_read = -1;
+ int64_t records_skipped = -1;
+ state.ResumeTiming();
+ while (records_read != 0 && records_skipped != 0) {
+ // ReadRecords may buffer some levels which will be skipped by the
following
+ // SkipRecords.
+ DoNotOptimize(records_read = reader->ReadRecords(batch_size));
+ DoNotOptimize(records_skipped = reader->SkipRecords(batch_size));
+ reader->Reset();
+ }
+ }
+
+ state.SetBytesProcessed(state.iterations() * helper.total_size());
+ state.SetItemsProcessed(state.iterations() * helper.total_levels());
}
BENCHMARK(ColumnReaderSkipInt32)
@@ -219,6 +258,18 @@ BENCHMARK(RecordReaderReadRecords)
->Args({2, 1000, true})
->Args({2, 1000, false});
+BENCHMARK(RecordReaderReadAndSkipRecords)
+ ->ArgNames({"Repetition", "BatchSize", "LevelsPerPage"})
+ ->Args({0, 10, 80000})
+ ->Args({0, 1000, 80000})
+ ->Args({0, 10000, 1000000})
+ ->Args({1, 10, 80000})
+ ->Args({1, 1000, 80000})
+ ->Args({1, 10000, 1000000})
+ ->Args({2, 10, 80000})
+ ->Args({2, 100, 80000})
+ ->Args({2, 10000, 1000000});
+
void GenerateLevels(int level_repeats, int max_level, int num_levels,
std::vector<int16_t>* levels) {
// Generate random levels
diff --git a/cpp/src/parquet/column_reader_test.cc
b/cpp/src/parquet/column_reader_test.cc
index e2cc24502a..a48573966a 100644
--- a/cpp/src/parquet/column_reader_test.cc
+++ b/cpp/src/parquet/column_reader_test.cc
@@ -1607,6 +1607,33 @@ TEST_P(ByteArrayRecordReaderTest, ReadAndSkipOptional) {
record_reader_->Reset();
}
+// Test skipping buffered records when reading/skipping more than
kMinLevelBatchSize
+// levels at a time.
+TEST_P(ByteArrayRecordReaderTest, ReadAndBatchSkipOptional) {
+ MakeRecordReader(/*levels_per_page=*/9000, /*num_pages=*/1);
+
+ // Read 100 records and buffer some records.
+ ASSERT_EQ(record_reader_->ReadRecords(/*num_records=*/100), 100);
+ CheckReadValues(0, 100);
+ record_reader_->Reset();
+
+ // Skip 3000 records. The buffered records will be skipped.
+ ASSERT_EQ(record_reader_->SkipRecords(/*num_records=*/3000), 3000);
+
+ // Read 900 records and buffer some records again.
+ ASSERT_EQ(record_reader_->ReadRecords(/*num_records=*/900), 900);
+ CheckReadValues(3100, 4000);
+ record_reader_->Reset();
+
+ // Skip 3000 records. The buffered records will be skipped.
+ ASSERT_EQ(record_reader_->SkipRecords(/*num_records=*/3000), 3000);
+
+ // Read 3000 records. Only 2000 records are left to be read.
+ ASSERT_EQ(record_reader_->ReadRecords(/*num_records=*/3000), 2000);
+ CheckReadValues(7000, 9000);
+ record_reader_->Reset();
+}
+
// Tests reading and skipping an optional FLBA field.
// The binary readers only differ in DecodeDense and DecodeSpaced functions, so
// testing optional is sufficient in exercising those code paths.