This is an automated email from the ASF dual-hosted git repository.

raulcd pushed a commit to branch maint-13.0.0
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 7ea78a9a59e86ee00ba7e574bb7ed0c5186b57c9
Author: Jinpeng <[email protected]>
AuthorDate: Wed Jul 19 04:29:27 2023 -0400

    PARQUET-2323: [C++] Use bitmap to store pre-buffered column chunks (#36649)
    
    ### Rationale for this change
    
    In https://issues.apache.org/jira/browse/PARQUET-2316 we allow partial 
buffer in parquet File Reader by storing prebuffered column chunk index in a 
hash set, and make a copy of this hash set for each rowgroup reader.
    
    In extreme conditions where numerous columns are prebuffered and multiple 
rowgroup readers are created for the same row group , the hash set would incur 
significant overhead. 
    
    Using a bitmap instead (with one bit per column chunk indicating whether 
it's prebuffered or not) would be a reasonsable mitigation, taking 4KB for 32K 
columns.
    
    ### What changes are included in this PR?
    
    Switch from a hash set to a bitmap buffer.
    
    ### Are these changes tested?
    
    Yes, passed unit tests on partial prebuffer.
    
    ### Are there any user-facing changes?
    
    No.
    
    Lead-authored-by: jp0317 <[email protected]>
    Co-authored-by: Jinpeng <[email protected]>
    Co-authored-by: Gang Wu <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/parquet/arrow/arrow_reader_writer_test.cc |  4 +--
 cpp/src/parquet/file_reader.cc                    | 33 +++++++++++++----------
 2 files changed, 21 insertions(+), 16 deletions(-)

diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc 
b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
index 69827d5c46..8585b1ccf1 100644
--- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
+++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
@@ -2413,9 +2413,9 @@ TEST(TestArrowReadWrite, 
CoalescedReadsAndNonCoalescedReads) {
 
   ASSERT_EQ(2, reader->num_row_groups());
 
-  // Pre-buffer 3 columns in the 2nd row group.
+  // Pre-buffer column 0 and column 3 in the 2nd row group.
   const std::vector<int> row_groups = {1};
-  const std::vector<int> column_indices = {0, 1, 4};
+  const std::vector<int> column_indices = {0, 3};
   reader->parquet_reader()->PreBuffer(row_groups, column_indices,
                                       ::arrow::io::IOContext(),
                                       ::arrow::io::CacheOptions::Defaults());
diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc
index fc30ddb43f..adda9a027b 100644
--- a/cpp/src/parquet/file_reader.cc
+++ b/cpp/src/parquet/file_reader.cc
@@ -29,6 +29,7 @@
 #include "arrow/io/caching.h"
 #include "arrow/io/file.h"
 #include "arrow/io/memory.h"
+#include "arrow/util/bit_util.h"
 #include "arrow/util/checked_cast.h"
 #include "arrow/util/future.h"
 #include "arrow/util/int_util_overflow.h"
@@ -179,7 +180,7 @@ class SerializedRowGroup : public RowGroupReader::Contents {
                      std::shared_ptr<::arrow::io::internal::ReadRangeCache> 
cached_source,
                      int64_t source_size, FileMetaData* file_metadata,
                      int row_group_number, const ReaderProperties& props,
-                     std::unordered_set<int> prebuffered_column_chunks,
+                     std::shared_ptr<Buffer> prebuffered_column_chunks_bitmap,
                      std::shared_ptr<InternalFileDecryptor> file_decryptor = 
nullptr)
       : source_(std::move(source)),
         cached_source_(std::move(cached_source)),
@@ -187,7 +188,7 @@ class SerializedRowGroup : public RowGroupReader::Contents {
         file_metadata_(file_metadata),
         properties_(props),
         row_group_ordinal_(row_group_number),
-        prebuffered_column_chunks_(std::move(prebuffered_column_chunks)),
+        
prebuffered_column_chunks_bitmap_(std::move(prebuffered_column_chunks_bitmap)),
         file_decryptor_(file_decryptor) {
     row_group_metadata_ = file_metadata->RowGroup(row_group_number);
   }
@@ -203,8 +204,8 @@ class SerializedRowGroup : public RowGroupReader::Contents {
     ::arrow::io::ReadRange col_range =
         ComputeColumnChunkRange(file_metadata_, source_size_, 
row_group_ordinal_, i);
     std::shared_ptr<ArrowInputStream> stream;
-    if (cached_source_ &&
-        prebuffered_column_chunks_.find(i) != 
prebuffered_column_chunks_.end()) {
+    if (cached_source_ && prebuffered_column_chunks_bitmap_ != nullptr &&
+        ::arrow::bit_util::GetBit(prebuffered_column_chunks_bitmap_->data(), 
i)) {
       // PARQUET-1698: if read coalescing is enabled, read from pre-buffered
       // segments.
       PARQUET_ASSIGN_OR_THROW(auto buffer, cached_source_->Read(col_range));
@@ -272,7 +273,7 @@ class SerializedRowGroup : public RowGroupReader::Contents {
   std::unique_ptr<RowGroupMetaData> row_group_metadata_;
   ReaderProperties properties_;
   int row_group_ordinal_;
-  const std::unordered_set<int> prebuffered_column_chunks_;
+  const std::shared_ptr<Buffer> prebuffered_column_chunks_bitmap_;
   std::shared_ptr<InternalFileDecryptor> file_decryptor_;
 };
 
@@ -302,17 +303,17 @@ class SerializedFile : public ParquetFileReader::Contents 
{
   }
 
   std::shared_ptr<RowGroupReader> GetRowGroup(int i) override {
-    std::unordered_set<int> prebuffered_column_chunks;
-    // Avoid updating the map as this function can be called concurrently. The 
map can
-    // only be updated within Prebuffer().
+    std::shared_ptr<Buffer> prebuffered_column_chunks_bitmap;
+    // Avoid updating the bitmap as this function can be called concurrently. 
The bitmap
+    // can only be updated within Prebuffer().
     auto prebuffered_column_chunks_iter = prebuffered_column_chunks_.find(i);
     if (prebuffered_column_chunks_iter != prebuffered_column_chunks_.end()) {
-      prebuffered_column_chunks = prebuffered_column_chunks_iter->second;
+      prebuffered_column_chunks_bitmap = 
prebuffered_column_chunks_iter->second;
     }
 
     std::unique_ptr<SerializedRowGroup> contents = 
std::make_unique<SerializedRowGroup>(
         source_, cached_source_, source_size_, file_metadata_.get(), i, 
properties_,
-        std::move(prebuffered_column_chunks), file_decryptor_);
+        std::move(prebuffered_column_chunks_bitmap), file_decryptor_);
     return std::make_shared<RowGroupReader>(std::move(contents));
   }
 
@@ -366,9 +367,12 @@ class SerializedFile : public ParquetFileReader::Contents {
     std::vector<::arrow::io::ReadRange> ranges;
     prebuffered_column_chunks_.clear();
     for (int row : row_groups) {
-      std::unordered_set<int>& prebuffered = prebuffered_column_chunks_[row];
+      std::shared_ptr<Buffer>& col_bitmap = prebuffered_column_chunks_[row];
+      int num_cols = file_metadata_->num_columns();
+      PARQUET_THROW_NOT_OK(
+          AllocateEmptyBitmap(num_cols, 
properties_.memory_pool()).Value(&col_bitmap));
       for (int col : column_indices) {
-        prebuffered.insert(col);
+        ::arrow::bit_util::SetBit(col_bitmap->mutable_data(), col);
         ranges.push_back(
             ComputeColumnChunkRange(file_metadata_.get(), source_size_, row, 
col));
       }
@@ -578,8 +582,9 @@ class SerializedFile : public ParquetFileReader::Contents {
   ReaderProperties properties_;
   std::shared_ptr<PageIndexReader> page_index_reader_;
   std::unique_ptr<BloomFilterReader> bloom_filter_reader_;
-  // Maps a row group to its column chunks that are cached via Prebuffer().
-  std::unordered_map<int, std::unordered_set<int>> prebuffered_column_chunks_;
+  // Maps row group ordinal and prebuffer status of its column chunks in the 
form of a
+  // bitmap buffer.
+  std::unordered_map<int, std::shared_ptr<Buffer>> prebuffered_column_chunks_;
   std::shared_ptr<InternalFileDecryptor> file_decryptor_;
 
   // \return The true length of the metadata in bytes

Reply via email to