This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 7ad300390e PARQUET-2323: [C++] Use bitmap to store pre-buffered column
chunks (#36649)
7ad300390e is described below
commit 7ad300390e8a193f242a7776b941a1d5fc160c06
Author: Jinpeng <[email protected]>
AuthorDate: Wed Jul 19 04:29:27 2023 -0400
PARQUET-2323: [C++] Use bitmap to store pre-buffered column chunks (#36649)
### Rationale for this change
In https://issues.apache.org/jira/browse/PARQUET-2316 we allow partial
buffer in parquet File Reader by storing prebuffered column chunk index in a
hash set, and make a copy of this hash set for each rowgroup reader.
In extreme conditions where numerous columns are prebuffered and multiple
rowgroup readers are created for the same row group , the hash set would incur
significant overhead.
Using a bitmap instead (with one bit per column chunk indicating whether
it's prebuffered or not) would be a reasonsable mitigation, taking 4KB for 32K
columns.
### What changes are included in this PR?
Switch from a hash set to a bitmap buffer.
### Are these changes tested?
Yes, passed unit tests on partial prebuffer.
### Are there any user-facing changes?
No.
Lead-authored-by: jp0317 <[email protected]>
Co-authored-by: Jinpeng <[email protected]>
Co-authored-by: Gang Wu <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
cpp/src/parquet/arrow/arrow_reader_writer_test.cc | 4 +--
cpp/src/parquet/file_reader.cc | 33 +++++++++++++----------
2 files changed, 21 insertions(+), 16 deletions(-)
diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
index 69827d5c46..8585b1ccf1 100644
--- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
+++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
@@ -2413,9 +2413,9 @@ TEST(TestArrowReadWrite,
CoalescedReadsAndNonCoalescedReads) {
ASSERT_EQ(2, reader->num_row_groups());
- // Pre-buffer 3 columns in the 2nd row group.
+ // Pre-buffer column 0 and column 3 in the 2nd row group.
const std::vector<int> row_groups = {1};
- const std::vector<int> column_indices = {0, 1, 4};
+ const std::vector<int> column_indices = {0, 3};
reader->parquet_reader()->PreBuffer(row_groups, column_indices,
::arrow::io::IOContext(),
::arrow::io::CacheOptions::Defaults());
diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc
index fc30ddb43f..adda9a027b 100644
--- a/cpp/src/parquet/file_reader.cc
+++ b/cpp/src/parquet/file_reader.cc
@@ -29,6 +29,7 @@
#include "arrow/io/caching.h"
#include "arrow/io/file.h"
#include "arrow/io/memory.h"
+#include "arrow/util/bit_util.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/future.h"
#include "arrow/util/int_util_overflow.h"
@@ -179,7 +180,7 @@ class SerializedRowGroup : public RowGroupReader::Contents {
std::shared_ptr<::arrow::io::internal::ReadRangeCache>
cached_source,
int64_t source_size, FileMetaData* file_metadata,
int row_group_number, const ReaderProperties& props,
- std::unordered_set<int> prebuffered_column_chunks,
+ std::shared_ptr<Buffer> prebuffered_column_chunks_bitmap,
std::shared_ptr<InternalFileDecryptor> file_decryptor =
nullptr)
: source_(std::move(source)),
cached_source_(std::move(cached_source)),
@@ -187,7 +188,7 @@ class SerializedRowGroup : public RowGroupReader::Contents {
file_metadata_(file_metadata),
properties_(props),
row_group_ordinal_(row_group_number),
- prebuffered_column_chunks_(std::move(prebuffered_column_chunks)),
+
prebuffered_column_chunks_bitmap_(std::move(prebuffered_column_chunks_bitmap)),
file_decryptor_(file_decryptor) {
row_group_metadata_ = file_metadata->RowGroup(row_group_number);
}
@@ -203,8 +204,8 @@ class SerializedRowGroup : public RowGroupReader::Contents {
::arrow::io::ReadRange col_range =
ComputeColumnChunkRange(file_metadata_, source_size_,
row_group_ordinal_, i);
std::shared_ptr<ArrowInputStream> stream;
- if (cached_source_ &&
- prebuffered_column_chunks_.find(i) !=
prebuffered_column_chunks_.end()) {
+ if (cached_source_ && prebuffered_column_chunks_bitmap_ != nullptr &&
+ ::arrow::bit_util::GetBit(prebuffered_column_chunks_bitmap_->data(),
i)) {
// PARQUET-1698: if read coalescing is enabled, read from pre-buffered
// segments.
PARQUET_ASSIGN_OR_THROW(auto buffer, cached_source_->Read(col_range));
@@ -272,7 +273,7 @@ class SerializedRowGroup : public RowGroupReader::Contents {
std::unique_ptr<RowGroupMetaData> row_group_metadata_;
ReaderProperties properties_;
int row_group_ordinal_;
- const std::unordered_set<int> prebuffered_column_chunks_;
+ const std::shared_ptr<Buffer> prebuffered_column_chunks_bitmap_;
std::shared_ptr<InternalFileDecryptor> file_decryptor_;
};
@@ -302,17 +303,17 @@ class SerializedFile : public ParquetFileReader::Contents
{
}
std::shared_ptr<RowGroupReader> GetRowGroup(int i) override {
- std::unordered_set<int> prebuffered_column_chunks;
- // Avoid updating the map as this function can be called concurrently. The
map can
- // only be updated within Prebuffer().
+ std::shared_ptr<Buffer> prebuffered_column_chunks_bitmap;
+ // Avoid updating the bitmap as this function can be called concurrently.
The bitmap
+ // can only be updated within Prebuffer().
auto prebuffered_column_chunks_iter = prebuffered_column_chunks_.find(i);
if (prebuffered_column_chunks_iter != prebuffered_column_chunks_.end()) {
- prebuffered_column_chunks = prebuffered_column_chunks_iter->second;
+ prebuffered_column_chunks_bitmap =
prebuffered_column_chunks_iter->second;
}
std::unique_ptr<SerializedRowGroup> contents =
std::make_unique<SerializedRowGroup>(
source_, cached_source_, source_size_, file_metadata_.get(), i,
properties_,
- std::move(prebuffered_column_chunks), file_decryptor_);
+ std::move(prebuffered_column_chunks_bitmap), file_decryptor_);
return std::make_shared<RowGroupReader>(std::move(contents));
}
@@ -366,9 +367,12 @@ class SerializedFile : public ParquetFileReader::Contents {
std::vector<::arrow::io::ReadRange> ranges;
prebuffered_column_chunks_.clear();
for (int row : row_groups) {
- std::unordered_set<int>& prebuffered = prebuffered_column_chunks_[row];
+ std::shared_ptr<Buffer>& col_bitmap = prebuffered_column_chunks_[row];
+ int num_cols = file_metadata_->num_columns();
+ PARQUET_THROW_NOT_OK(
+ AllocateEmptyBitmap(num_cols,
properties_.memory_pool()).Value(&col_bitmap));
for (int col : column_indices) {
- prebuffered.insert(col);
+ ::arrow::bit_util::SetBit(col_bitmap->mutable_data(), col);
ranges.push_back(
ComputeColumnChunkRange(file_metadata_.get(), source_size_, row,
col));
}
@@ -578,8 +582,9 @@ class SerializedFile : public ParquetFileReader::Contents {
ReaderProperties properties_;
std::shared_ptr<PageIndexReader> page_index_reader_;
std::unique_ptr<BloomFilterReader> bloom_filter_reader_;
- // Maps a row group to its column chunks that are cached via Prebuffer().
- std::unordered_map<int, std::unordered_set<int>> prebuffered_column_chunks_;
+ // Maps row group ordinal and prebuffer status of its column chunks in the
form of a
+ // bitmap buffer.
+ std::unordered_map<int, std::shared_ptr<Buffer>> prebuffered_column_chunks_;
std::shared_ptr<InternalFileDecryptor> file_decryptor_;
// \return The true length of the metadata in bytes