wjones127 commented on code in PR #14353:
URL: https://github.com/apache/arrow/pull/14353#discussion_r1083003326


##########
cpp/src/parquet/column_reader.h:
##########
@@ -370,6 +376,9 @@ class PARQUET_EXPORT RecordReader {
   int64_t values_capacity_;
   int64_t null_count_;
 
+  bool hasCal_average_len_ = false;
+  int64_t binary_per_row_length_ = kDefaultBinaryPerRowSize;

Review Comment:
   First, I think this would be clearer as a `std::optional`, rather than a 
boolean on the side.
   Second, please document the purpose of these fields in the header file.
   ```suggestion
     /// \brief Typical size of single binary value, used for pre-allocating 
value buffer.
     ///
     /// Before this is set, kDefaultBinaryPerRowSize is used. After the first
     /// batch of values, this is set to the size of the values buffer divided 
by
     /// the number of values.
     std::optional<int64_t> binary_per_row_length_ = std::nullopt;
   ```



##########
cpp/src/parquet/column_reader.cc:
##########
@@ -1959,6 +1972,138 @@ class ByteArrayChunkedRecordReader : public 
TypedRecordReader<ByteArrayType>,
   typename EncodingTraits<ByteArrayType>::Accumulator accumulator_;
 };
 
+class ByteArrayChunkedOptRecordReader : public 
TypedRecordReader<ByteArrayType>,
+                                        virtual public BinaryRecordReader {
+ public:
+  ByteArrayChunkedOptRecordReader(const ColumnDescriptor* descr, LevelInfo 
leaf_info,
+                                  ::arrow::MemoryPool* pool)
+      : TypedRecordReader<ByteArrayType>(descr, leaf_info, pool) {
+    DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY);
+    accumulator_.builder.reset(new ::arrow::BinaryBuilder(pool));
+    values_ = AllocateBuffer(pool);
+    offset_ = AllocateBuffer(pool);
+  }
+
+  ::arrow::ArrayVector GetBuilderChunks() override {
+    if (uses_opt_) {
+      std::vector<std::shared_ptr<Buffer>> buffers = {ReleaseIsValid(), 
ReleaseOffsets(),
+                                                      ReleaseValues()};
+      auto data = std::make_shared<::arrow::ArrayData>(
+          ::arrow::binary(), values_written(), buffers, null_count());
+
+      auto chunks = ::arrow::ArrayVector({::arrow::MakeArray(data)});
+      return chunks;
+    } else {
+      ::arrow::ArrayVector result = accumulator_.chunks;
+      if (result.size() == 0 || accumulator_.builder->length() > 0) {
+        std::shared_ptr<::arrow::Array> last_chunk;
+        PARQUET_THROW_NOT_OK(accumulator_.builder->Finish(&last_chunk));
+        result.push_back(std::move(last_chunk));
+      }
+      accumulator_.chunks = {};
+      return result;
+    }
+  }
+
+  void ReadValuesDense(int64_t values_to_read) override {
+    if (uses_opt_) {
+      int64_t num_decoded = this->current_decoder_->DecodeArrowZeroCopy(
+          static_cast<int>(values_to_read), 0, NULLPTR,
+          (reinterpret_cast<int32_t*>(offset_->mutable_data()) + 
values_written_),
+          values_, 0, &binary_length_);
+      DCHECK_EQ(num_decoded, values_to_read);
+    } else {
+      int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull(
+          static_cast<int>(values_to_read), &accumulator_);
+      CheckNumberDecoded(num_decoded, values_to_read);
+      ResetValues();
+    }
+  }
+
+  void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override {
+    if (uses_opt_) {
+      int64_t num_decoded = this->current_decoder_->DecodeArrowZeroCopy(
+          static_cast<int>(values_to_read), static_cast<int>(null_count),
+          valid_bits_->mutable_data(),
+          (reinterpret_cast<int32_t*>(offset_->mutable_data()) + 
values_written_),
+          values_, values_written_, &binary_length_);
+      DCHECK_EQ(num_decoded, values_to_read - null_count);
+    } else {
+      int64_t num_decoded = this->current_decoder_->DecodeArrow(
+          static_cast<int>(values_to_read), static_cast<int>(null_count),
+          valid_bits_->mutable_data(), values_written_, &accumulator_);
+      CheckNumberDecoded(num_decoded, values_to_read - null_count);
+      ResetValues();
+    }
+  }
+
+  void ReserveValues(int64_t extra_values) override {
+    const int64_t new_values_capacity =
+        UpdateCapacity(values_capacity_, values_written_, extra_values);
+    if (new_values_capacity > values_capacity_) {
+      PARQUET_THROW_NOT_OK(
+          values_->Resize(new_values_capacity * binary_per_row_length_, 
false));

Review Comment:
   if we make this an option:
   ```suggestion
         int64_t per_row_length = 
binary_per_row_length_.value_or(kDefaultBinaryPerRowSize);
         PARQUET_THROW_NOT_OK(
             values_->Resize(new_values_capacity * per_row_length, false));
   ```



##########
cpp/src/parquet/column_reader.h:
##########
@@ -55,6 +55,8 @@ static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 
1024 * 1024;
 // 16 KB is the default expected page header size
 static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024;
 
+static constexpr int32_t kDefaultBinaryPerRowSize = 20;

Review Comment:
   Since this corresponds to `binary_per_row_length_`, could we make the names 
match? I'm thinking "bytes per row" is the best description here:
   
   ```suggestion
   static constexpr int32_t kDefaultBinaryBytesPerRow = 20;
   ```
   (Also change `binary_per_row_length_` to `binary_bytes_per_row_`)



##########
cpp/src/parquet/column_reader.cc:
##########
@@ -1959,6 +1972,138 @@ class ByteArrayChunkedRecordReader : public 
TypedRecordReader<ByteArrayType>,
   typename EncodingTraits<ByteArrayType>::Accumulator accumulator_;
 };
 
+class ByteArrayChunkedOptRecordReader : public 
TypedRecordReader<ByteArrayType>,
+                                        virtual public BinaryRecordReader {
+ public:
+  ByteArrayChunkedOptRecordReader(const ColumnDescriptor* descr, LevelInfo 
leaf_info,
+                                  ::arrow::MemoryPool* pool)
+      : TypedRecordReader<ByteArrayType>(descr, leaf_info, pool) {
+    DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY);
+    accumulator_.builder.reset(new ::arrow::BinaryBuilder(pool));
+    values_ = AllocateBuffer(pool);
+    offset_ = AllocateBuffer(pool);
+  }
+
+  ::arrow::ArrayVector GetBuilderChunks() override {
+    if (uses_opt_) {
+      std::vector<std::shared_ptr<Buffer>> buffers = {ReleaseIsValid(), 
ReleaseOffsets(),
+                                                      ReleaseValues()};
+      auto data = std::make_shared<::arrow::ArrayData>(
+          ::arrow::binary(), values_written(), buffers, null_count());
+
+      auto chunks = ::arrow::ArrayVector({::arrow::MakeArray(data)});
+      return chunks;
+    } else {
+      ::arrow::ArrayVector result = accumulator_.chunks;
+      if (result.size() == 0 || accumulator_.builder->length() > 0) {
+        std::shared_ptr<::arrow::Array> last_chunk;
+        PARQUET_THROW_NOT_OK(accumulator_.builder->Finish(&last_chunk));
+        result.push_back(std::move(last_chunk));
+      }
+      accumulator_.chunks = {};
+      return result;
+    }
+  }
+
+  void ReadValuesDense(int64_t values_to_read) override {
+    if (uses_opt_) {
+      int64_t num_decoded = this->current_decoder_->DecodeArrowZeroCopy(
+          static_cast<int>(values_to_read), 0, NULLPTR,
+          (reinterpret_cast<int32_t*>(offset_->mutable_data()) + 
values_written_),
+          values_, 0, &binary_length_);
+      DCHECK_EQ(num_decoded, values_to_read);
+    } else {
+      int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull(
+          static_cast<int>(values_to_read), &accumulator_);
+      CheckNumberDecoded(num_decoded, values_to_read);
+      ResetValues();
+    }
+  }
+
+  void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override {
+    if (uses_opt_) {
+      int64_t num_decoded = this->current_decoder_->DecodeArrowZeroCopy(
+          static_cast<int>(values_to_read), static_cast<int>(null_count),
+          valid_bits_->mutable_data(),
+          (reinterpret_cast<int32_t*>(offset_->mutable_data()) + 
values_written_),
+          values_, values_written_, &binary_length_);
+      DCHECK_EQ(num_decoded, values_to_read - null_count);
+    } else {
+      int64_t num_decoded = this->current_decoder_->DecodeArrow(
+          static_cast<int>(values_to_read), static_cast<int>(null_count),
+          valid_bits_->mutable_data(), values_written_, &accumulator_);
+      CheckNumberDecoded(num_decoded, values_to_read - null_count);
+      ResetValues();
+    }
+  }
+
+  void ReserveValues(int64_t extra_values) override {
+    const int64_t new_values_capacity =
+        UpdateCapacity(values_capacity_, values_written_, extra_values);
+    if (new_values_capacity > values_capacity_) {
+      PARQUET_THROW_NOT_OK(
+          values_->Resize(new_values_capacity * binary_per_row_length_, 
false));
+      PARQUET_THROW_NOT_OK(offset_->Resize((new_values_capacity + 1) * 4, 
false));
+
+      auto offset = reinterpret_cast<int32_t*>(offset_->mutable_data());
+      offset[0] = 0;
+
+      values_capacity_ = new_values_capacity;
+    }
+    if (leaf_info_.HasNullableValues()) {
+      int64_t valid_bytes_new = bit_util::BytesForBits(values_capacity_);
+      if (valid_bits_->size() < valid_bytes_new) {
+        int64_t valid_bytes_old = bit_util::BytesForBits(values_written_);
+        PARQUET_THROW_NOT_OK(valid_bits_->Resize(valid_bytes_new, false));
+        // Avoid valgrind warnings
+        memset(valid_bits_->mutable_data() + valid_bytes_old, 0,
+               valid_bytes_new - valid_bytes_old);
+      }
+    }
+  }
+  std::shared_ptr<ResizableBuffer> ReleaseValues() override {
+    auto result = values_;
+    values_ = AllocateBuffer(this->pool_);
+    values_capacity_ = 0;
+    return result;
+  }
+  std::shared_ptr<ResizableBuffer> ReleaseOffsets() override {
+    auto result = offset_;
+    if (ARROW_PREDICT_FALSE(!hasCal_average_len_)) {

Review Comment:
   if we make it an optional:
   ```suggestion
       if (ARROW_PREDICT_FALSE(!binary_per_row_length_.has_value())) {
   ```



##########
cpp/src/parquet/column_reader.cc:
##########
@@ -1594,6 +1605,8 @@ class TypedRecordReader : public 
TypedColumnReaderImpl<DType>,
     }
   }
 
+  std::shared_ptr<ResizableBuffer> ReleaseOffsets() override { return nullptr; 
}

Review Comment:
   ```suggestion
     virtual std::shared_ptr<ResizableBuffer> ReleaseOffsets() { return 
nullptr; }
   ```



##########
cpp/src/parquet/column_reader.h:
##########
@@ -299,6 +303,8 @@ class PARQUET_EXPORT RecordReader {
   /// allocated in subsequent ReadRecords calls
   virtual std::shared_ptr<ResizableBuffer> ReleaseValues() = 0;
 
+  virtual std::shared_ptr<ResizableBuffer> ReleaseOffsets() = 0;
+

Review Comment:
   ```suggestion
   ```



##########
cpp/src/parquet/column_reader.cc:
##########
@@ -1697,7 +1710,7 @@ class TypedRecordReader : public 
TypedColumnReaderImpl<DType>,
     }
   }
 
-  void ReserveValues(int64_t extra_values) {
+  void ReserveValues(int64_t extra_values) override {

Review Comment:
   ```suggestion
     virtual void ReserveValues(int64_t extra_values) {
   ```



##########
cpp/src/parquet/column_reader.h:
##########
@@ -291,6 +293,8 @@ class PARQUET_EXPORT RecordReader {
   /// \brief Pre-allocate space for data. Results in better flat read 
performance
   virtual void Reserve(int64_t num_values) = 0;
 
+  virtual void ReserveValues(int64_t capacity) {}

Review Comment:
   Since these are coming from `TypeRecordReader`, which is private, could you 
mark it's methods as virtual instead?
   
   ```suggestion
   ```
   



##########
cpp/src/parquet/column_reader.cc:
##########
@@ -850,12 +850,23 @@ class ColumnReaderImplBase {
     current_encoding_ = encoding;
     current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer,
                               static_cast<int>(data_size));
+    if (!hasSet_uses_opt_) {
+      if (current_encoding_ == Encoding::PLAIN_DICTIONARY ||
+          current_encoding_ == Encoding::PLAIN ||
+          current_encoding_ == Encoding::RLE_DICTIONARY) {
+        uses_opt_ = true;
+      }
+      hasSet_uses_opt_ = true;
+    }
   }
 
   int64_t available_values_current_page() const {
     return num_buffered_values_ - num_decoded_values_;
   }
 
+  bool hasSet_uses_opt_ = false;
+  bool uses_opt_ = false;

Review Comment:
   Could you create a separate function for that, as Yibo suggested? If you do 
measure a meaningful performance difference, could you share your results then?
   
   In addition, could you add a comment explaining why the optimization is only 
applicable to those those three encodings?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to