This is an automated email from the ASF dual-hosted git repository. raulcd pushed a commit to branch maint-13.0.0 in repository https://gitbox.apache.org/repos/asf/arrow.git
commit 7ea78a9a59e86ee00ba7e574bb7ed0c5186b57c9 Author: Jinpeng <[email protected]> AuthorDate: Wed Jul 19 04:29:27 2023 -0400 PARQUET-2323: [C++] Use bitmap to store pre-buffered column chunks (#36649) ### Rationale for this change In https://issues.apache.org/jira/browse/PARQUET-2316 we allow partial buffer in parquet File Reader by storing prebuffered column chunk index in a hash set, and make a copy of this hash set for each rowgroup reader. In extreme conditions where numerous columns are prebuffered and multiple rowgroup readers are created for the same row group , the hash set would incur significant overhead. Using a bitmap instead (with one bit per column chunk indicating whether it's prebuffered or not) would be a reasonsable mitigation, taking 4KB for 32K columns. ### What changes are included in this PR? Switch from a hash set to a bitmap buffer. ### Are these changes tested? Yes, passed unit tests on partial prebuffer. ### Are there any user-facing changes? No. Lead-authored-by: jp0317 <[email protected]> Co-authored-by: Jinpeng <[email protected]> Co-authored-by: Gang Wu <[email protected]> Signed-off-by: Antoine Pitrou <[email protected]> --- cpp/src/parquet/arrow/arrow_reader_writer_test.cc | 4 +-- cpp/src/parquet/file_reader.cc | 33 +++++++++++++---------- 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 69827d5c46..8585b1ccf1 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -2413,9 +2413,9 @@ TEST(TestArrowReadWrite, CoalescedReadsAndNonCoalescedReads) { ASSERT_EQ(2, reader->num_row_groups()); - // Pre-buffer 3 columns in the 2nd row group. + // Pre-buffer column 0 and column 3 in the 2nd row group. const std::vector<int> row_groups = {1}; - const std::vector<int> column_indices = {0, 1, 4}; + const std::vector<int> column_indices = {0, 3}; reader->parquet_reader()->PreBuffer(row_groups, column_indices, ::arrow::io::IOContext(), ::arrow::io::CacheOptions::Defaults()); diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc index fc30ddb43f..adda9a027b 100644 --- a/cpp/src/parquet/file_reader.cc +++ b/cpp/src/parquet/file_reader.cc @@ -29,6 +29,7 @@ #include "arrow/io/caching.h" #include "arrow/io/file.h" #include "arrow/io/memory.h" +#include "arrow/util/bit_util.h" #include "arrow/util/checked_cast.h" #include "arrow/util/future.h" #include "arrow/util/int_util_overflow.h" @@ -179,7 +180,7 @@ class SerializedRowGroup : public RowGroupReader::Contents { std::shared_ptr<::arrow::io::internal::ReadRangeCache> cached_source, int64_t source_size, FileMetaData* file_metadata, int row_group_number, const ReaderProperties& props, - std::unordered_set<int> prebuffered_column_chunks, + std::shared_ptr<Buffer> prebuffered_column_chunks_bitmap, std::shared_ptr<InternalFileDecryptor> file_decryptor = nullptr) : source_(std::move(source)), cached_source_(std::move(cached_source)), @@ -187,7 +188,7 @@ class SerializedRowGroup : public RowGroupReader::Contents { file_metadata_(file_metadata), properties_(props), row_group_ordinal_(row_group_number), - prebuffered_column_chunks_(std::move(prebuffered_column_chunks)), + prebuffered_column_chunks_bitmap_(std::move(prebuffered_column_chunks_bitmap)), file_decryptor_(file_decryptor) { row_group_metadata_ = file_metadata->RowGroup(row_group_number); } @@ -203,8 +204,8 @@ class SerializedRowGroup : public RowGroupReader::Contents { ::arrow::io::ReadRange col_range = ComputeColumnChunkRange(file_metadata_, source_size_, row_group_ordinal_, i); std::shared_ptr<ArrowInputStream> stream; - if (cached_source_ && - prebuffered_column_chunks_.find(i) != prebuffered_column_chunks_.end()) { + if (cached_source_ && prebuffered_column_chunks_bitmap_ != nullptr && + ::arrow::bit_util::GetBit(prebuffered_column_chunks_bitmap_->data(), i)) { // PARQUET-1698: if read coalescing is enabled, read from pre-buffered // segments. PARQUET_ASSIGN_OR_THROW(auto buffer, cached_source_->Read(col_range)); @@ -272,7 +273,7 @@ class SerializedRowGroup : public RowGroupReader::Contents { std::unique_ptr<RowGroupMetaData> row_group_metadata_; ReaderProperties properties_; int row_group_ordinal_; - const std::unordered_set<int> prebuffered_column_chunks_; + const std::shared_ptr<Buffer> prebuffered_column_chunks_bitmap_; std::shared_ptr<InternalFileDecryptor> file_decryptor_; }; @@ -302,17 +303,17 @@ class SerializedFile : public ParquetFileReader::Contents { } std::shared_ptr<RowGroupReader> GetRowGroup(int i) override { - std::unordered_set<int> prebuffered_column_chunks; - // Avoid updating the map as this function can be called concurrently. The map can - // only be updated within Prebuffer(). + std::shared_ptr<Buffer> prebuffered_column_chunks_bitmap; + // Avoid updating the bitmap as this function can be called concurrently. The bitmap + // can only be updated within Prebuffer(). auto prebuffered_column_chunks_iter = prebuffered_column_chunks_.find(i); if (prebuffered_column_chunks_iter != prebuffered_column_chunks_.end()) { - prebuffered_column_chunks = prebuffered_column_chunks_iter->second; + prebuffered_column_chunks_bitmap = prebuffered_column_chunks_iter->second; } std::unique_ptr<SerializedRowGroup> contents = std::make_unique<SerializedRowGroup>( source_, cached_source_, source_size_, file_metadata_.get(), i, properties_, - std::move(prebuffered_column_chunks), file_decryptor_); + std::move(prebuffered_column_chunks_bitmap), file_decryptor_); return std::make_shared<RowGroupReader>(std::move(contents)); } @@ -366,9 +367,12 @@ class SerializedFile : public ParquetFileReader::Contents { std::vector<::arrow::io::ReadRange> ranges; prebuffered_column_chunks_.clear(); for (int row : row_groups) { - std::unordered_set<int>& prebuffered = prebuffered_column_chunks_[row]; + std::shared_ptr<Buffer>& col_bitmap = prebuffered_column_chunks_[row]; + int num_cols = file_metadata_->num_columns(); + PARQUET_THROW_NOT_OK( + AllocateEmptyBitmap(num_cols, properties_.memory_pool()).Value(&col_bitmap)); for (int col : column_indices) { - prebuffered.insert(col); + ::arrow::bit_util::SetBit(col_bitmap->mutable_data(), col); ranges.push_back( ComputeColumnChunkRange(file_metadata_.get(), source_size_, row, col)); } @@ -578,8 +582,9 @@ class SerializedFile : public ParquetFileReader::Contents { ReaderProperties properties_; std::shared_ptr<PageIndexReader> page_index_reader_; std::unique_ptr<BloomFilterReader> bloom_filter_reader_; - // Maps a row group to its column chunks that are cached via Prebuffer(). - std::unordered_map<int, std::unordered_set<int>> prebuffered_column_chunks_; + // Maps row group ordinal and prebuffer status of its column chunks in the form of a + // bitmap buffer. + std::unordered_map<int, std::shared_ptr<Buffer>> prebuffered_column_chunks_; std::shared_ptr<InternalFileDecryptor> file_decryptor_; // \return The true length of the metadata in bytes
