cyb70289 commented on code in PR #14353:
URL: https://github.com/apache/arrow/pull/14353#discussion_r1051561605
##########
cpp/src/parquet/column_reader.h:
##########
@@ -291,6 +293,8 @@ class PARQUET_EXPORT RecordReader {
/// \brief Pre-allocate space for data. Results in better flat read
performance
virtual void Reserve(int64_t num_values) = 0;
+ virtual void ReserveValues(int64_t capacity) {}
Review Comment:
A new interface? Are these added data members and functions necessary for
this base class? I suppose they are only for the new reader implementation.
##########
cpp/src/parquet/column_reader.cc:
##########
@@ -850,12 +850,23 @@ class ColumnReaderImplBase {
current_encoding_ = encoding;
current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer,
static_cast<int>(data_size));
+ if (!hasSet_uses_opt_) {
+ if (current_encoding_ == Encoding::PLAIN_DICTIONARY ||
+ current_encoding_ == Encoding::PLAIN ||
+ current_encoding_ == Encoding::RLE_DICTIONARY) {
Review Comment:
Are all these cases covered by UT?
##########
cpp/src/parquet/column_reader.cc:
##########
@@ -850,12 +850,23 @@ class ColumnReaderImplBase {
current_encoding_ = encoding;
current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer,
static_cast<int>(data_size));
+ if (!hasSet_uses_opt_) {
Review Comment:
Stick to snake_case for variables.
##########
cpp/src/parquet/column_reader.cc:
##########
@@ -1957,6 +1970,139 @@ class ByteArrayChunkedRecordReader : public
TypedRecordReader<ByteArrayType>,
typename EncodingTraits<ByteArrayType>::Accumulator accumulator_;
};
+class ByteArrayChunkedOptRecordReader : public
TypedRecordReader<ByteArrayType>,
+ virtual public BinaryRecordReader {
+ public:
+ ByteArrayChunkedOptRecordReader(const ColumnDescriptor* descr, LevelInfo
leaf_info,
+ ::arrow::MemoryPool* pool)
+ : TypedRecordReader<ByteArrayType>(descr, leaf_info, pool) {
+ DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY);
+ accumulator_.builder.reset(new ::arrow::BinaryBuilder(pool));
+ values_ = AllocateBuffer(pool);
+ offset_ = AllocateBuffer(pool);
+ }
+
+ ::arrow::ArrayVector GetBuilderChunks() override {
+ if (uses_opt_) {
+ std::vector<std::shared_ptr<Buffer>> buffers = {ReleaseIsValid(),
ReleaseOffsets(),
+ ReleaseValues()};
+ auto data = std::make_shared<::arrow::ArrayData>(
+ ::arrow::binary(), values_written(), buffers, null_count());
+
+ auto chunks = ::arrow::ArrayVector({::arrow::MakeArray(data)});
+ return chunks;
+ } else {
+ ::arrow::ArrayVector result = accumulator_.chunks;
+ if (result.size() == 0 || accumulator_.builder->length() > 0) {
+ std::shared_ptr<::arrow::Array> last_chunk;
+ PARQUET_THROW_NOT_OK(accumulator_.builder->Finish(&last_chunk));
+ result.push_back(std::move(last_chunk));
+ }
+ accumulator_.chunks = {};
+ return result;
Review Comment:
Duplicates `ByteArrayChunkedRecordReader`? This doesn't look right.
##########
cpp/src/parquet/column_reader.cc:
##########
@@ -850,12 +850,23 @@ class ColumnReaderImplBase {
current_encoding_ = encoding;
current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer,
static_cast<int>(data_size));
+ if (!hasSet_uses_opt_) {
+ if (current_encoding_ == Encoding::PLAIN_DICTIONARY ||
+ current_encoding_ == Encoding::PLAIN ||
+ current_encoding_ == Encoding::RLE_DICTIONARY) {
+ uses_opt_ = true;
+ }
+ hasSet_uses_opt_ = true;
+ }
}
int64_t available_values_current_page() const {
return num_buffered_values_ - num_decoded_values_;
}
+ bool hasSet_uses_opt_ = false;
+ bool uses_opt_ = false;
Review Comment:
Are these two flags really necessary?
Looks to me a trivial helper function is better.
##########
cpp/src/parquet/encoding.h:
##########
@@ -317,6 +317,13 @@ class TypedDecoder : virtual public Decoder {
int64_t valid_bits_offset,
typename EncodingTraits<DType>::Accumulator* out) =
0;
+ virtual int DecodeArrow_opt(int num_values, int null_count, const uint8_t*
valid_bits,
+ int32_t* offset,
+ std::shared_ptr<::arrow::ResizableBuffer>&
values,
+ int64_t valid_bits_offset, int32_t*
bianry_length) {
+ return 0;
Review Comment:
I saw many dummy implementations like this in the PR. Probably they can be
eliminated.
##########
cpp/src/parquet/encoding.cc:
##########
@@ -1948,6 +2026,79 @@ class DictByteArrayDecoderImpl : public
DictDecoderImpl<ByteArrayType>,
return Status::OK();
}
+ Status DecodeArrowDense_opt(int num_values, int null_count, const uint8_t*
valid_bits,
+ int32_t* offset,
+ std::shared_ptr<::arrow::ResizableBuffer>&
values,
+ int64_t valid_bits_offset, int* out_num_values,
+ int32_t* bianry_length) {
+ constexpr int32_t kBufferSize = 1024;
+ int32_t indices[kBufferSize];
+ auto dst_value = values->mutable_data() + (*bianry_length);
+
+ ::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset,
num_values);
Review Comment:
Do we have benchmark result of this bitmap reader based implementation?
Is it better than original code (bitblock counter)?
##########
cpp/src/parquet/encoding.cc:
##########
@@ -1360,6 +1360,17 @@ class PlainByteArrayDecoder : public
PlainDecoder<ByteArrayType>,
return result;
}
+ int DecodeArrow_opt(int num_values, int null_count, const uint8_t*
valid_bits,
Review Comment:
Please stick to CamelCase for function names, snake_case for variables.
##########
cpp/src/parquet/column_reader.cc:
##########
@@ -1957,6 +1970,139 @@ class ByteArrayChunkedRecordReader : public
TypedRecordReader<ByteArrayType>,
typename EncodingTraits<ByteArrayType>::Accumulator accumulator_;
};
+class ByteArrayChunkedOptRecordReader : public
TypedRecordReader<ByteArrayType>,
+ virtual public BinaryRecordReader {
+ public:
+ ByteArrayChunkedOptRecordReader(const ColumnDescriptor* descr, LevelInfo
leaf_info,
+ ::arrow::MemoryPool* pool)
+ : TypedRecordReader<ByteArrayType>(descr, leaf_info, pool) {
+ DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY);
+ accumulator_.builder.reset(new ::arrow::BinaryBuilder(pool));
+ values_ = AllocateBuffer(pool);
+ offset_ = AllocateBuffer(pool);
+ }
+
+ ::arrow::ArrayVector GetBuilderChunks() override {
+ if (uses_opt_) {
+ std::vector<std::shared_ptr<Buffer>> buffers = {ReleaseIsValid(),
ReleaseOffsets(),
+ ReleaseValues()};
+ auto data = std::make_shared<::arrow::ArrayData>(
+ ::arrow::binary(), values_written(), buffers, null_count());
+
+ auto chunks = ::arrow::ArrayVector({::arrow::MakeArray(data)});
+ return chunks;
+ } else {
+ ::arrow::ArrayVector result = accumulator_.chunks;
+ if (result.size() == 0 || accumulator_.builder->length() > 0) {
+ std::shared_ptr<::arrow::Array> last_chunk;
+ PARQUET_THROW_NOT_OK(accumulator_.builder->Finish(&last_chunk));
+ result.push_back(std::move(last_chunk));
+ }
+ accumulator_.chunks = {};
+ return result;
+ }
+ }
+
+ void ReadValuesDense(int64_t values_to_read) override {
+ if (uses_opt_) {
+ int64_t num_decoded = this->current_decoder_->DecodeArrow_opt(
+ static_cast<int>(values_to_read), 0, NULLPTR,
+ (reinterpret_cast<int32_t*>(offset_->mutable_data()) +
values_written_),
+ values_, 0, &bianry_length_);
+ DCHECK_EQ(num_decoded, values_to_read);
+ } else {
+ int64_t num_decoded = this->current_decoder_->DecodeArrowNonNull(
+ static_cast<int>(values_to_read), &accumulator_);
+ CheckNumberDecoded(num_decoded, values_to_read);
+ ResetValues();
+ }
+ }
+
+ void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override {
+ if (uses_opt_) {
+ int64_t num_decoded = this->current_decoder_->DecodeArrow_opt(
+ static_cast<int>(values_to_read), static_cast<int>(null_count),
+ valid_bits_->mutable_data(),
+ (reinterpret_cast<int32_t*>(offset_->mutable_data()) +
values_written_),
+ values_, values_written_, &bianry_length_);
+ DCHECK_EQ(num_decoded, values_to_read - null_count);
+ } else {
+ int64_t num_decoded = this->current_decoder_->DecodeArrow(
+ static_cast<int>(values_to_read), static_cast<int>(null_count),
+ valid_bits_->mutable_data(), values_written_, &accumulator_);
+ CheckNumberDecoded(num_decoded, values_to_read - null_count);
+ ResetValues();
+ }
+ }
+
+ void ReserveValues(int64_t extra_values) override {
+ const int64_t new_values_capacity =
+ UpdateCapacity(values_capacity_, values_written_, extra_values);
+ if (new_values_capacity > values_capacity_) {
+ PARQUET_THROW_NOT_OK(
+ values_->Resize(new_values_capacity * binary_per_row_length_,
false));
+ PARQUET_THROW_NOT_OK(offset_->Resize((new_values_capacity + 1) * 4,
false));
+
+ auto offset = reinterpret_cast<int32_t*>(offset_->mutable_data());
+ offset[0] = 0;
+
+ values_capacity_ = new_values_capacity;
+ }
+ if (leaf_info_.HasNullableValues()) {
+ int64_t valid_bytes_new = bit_util::BytesForBits(values_capacity_);
+ if (valid_bits_->size() < valid_bytes_new) {
+ int64_t valid_bytes_old = bit_util::BytesForBits(values_written_);
+ PARQUET_THROW_NOT_OK(valid_bits_->Resize(valid_bytes_new, false));
+ // Avoid valgrind warnings
+ memset(valid_bits_->mutable_data() + valid_bytes_old, 0,
+ valid_bytes_new - valid_bytes_old);
+ }
+ }
+ }
+ std::shared_ptr<ResizableBuffer> ReleaseValues() override {
+ auto result = values_;
+ values_ = AllocateBuffer(this->pool_);
+ values_capacity_ = 0;
+ return result;
+ }
+ std::shared_ptr<ResizableBuffer> ReleaseOffsets() override {
+ auto result = offset_;
+ if (ARROW_PREDICT_FALSE(!hasCal_average_len_)) {
+ auto offsetArr = reinterpret_cast<int32_t*>(offset_->mutable_data());
+ const auto first_offset = offsetArr[0];
+ const auto last_offset = offsetArr[values_written_];
+ int64_t binary_length = last_offset - first_offset;
+ binary_per_row_length_ = binary_length / values_written_ + 1;
+ // std::cout << "binary_per_row_length_:" << binary_per_row_length_ <<
std::endl;
Review Comment:
Please remove debug code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]