emkornfield commented on code in PR #48345:
URL: https://github.com/apache/arrow/pull/48345#discussion_r2722394761
##########
cpp/src/parquet/decoder.cc:
##########
@@ -2323,6 +2327,121 @@ class ByteStreamSplitDecoder<FLBAType> : public
ByteStreamSplitDecoderBase<FLBAT
}
};
+// ----------------------------------------------------------------------
+// ALP decoder (Adaptive Lossless floating-Point)
+
+template <typename DType>
+class AlpDecoder : public TypedDecoderImpl<DType> {
+ public:
+ using Base = TypedDecoderImpl<DType>;
+ using T = typename DType::c_type;
+
+ explicit AlpDecoder(const ColumnDescriptor* descr)
+ : Base(descr, Encoding::ALP), current_offset_{0}, needs_decode_{false} {
+ static_assert(std::is_same<T, float>::value || std::is_same<T,
double>::value,
+ "ALP only supports float and double types");
+ }
+
+ void SetData(int num_values, const uint8_t* data, int len) final {
+ Base::SetData(num_values, data, len);
+ current_offset_ = 0;
+ needs_decode_ = (len > 0 && num_values > 0);
+ decoded_buffer_.clear();
+ }
+
+ int Decode(T* buffer, int max_values) override {
+ // Fast path: decode directly into output buffer if requesting all values
+ if (needs_decode_ && max_values >= this->num_values_) {
+ ::arrow::util::alp::AlpWrapper<T>::Decode(
+ buffer, static_cast<uint64_t>(this->num_values_),
+ reinterpret_cast<const char*>(this->data_), this->len_);
+
+ const int decoded = this->num_values_;
+ this->num_values_ = 0;
+ needs_decode_ = false;
+ return decoded;
+ }
+
+ // Slow path: partial read - decode to intermediate buffer
+ // ALP Bit unpacker needs batches of 64
+ if (needs_decode_) {
+ decoded_buffer_.resize(this->num_values_);
+ ::arrow::util::alp::AlpWrapper<T>::Decode(
+ decoded_buffer_.data(), static_cast<uint64_t>(this->num_values_),
+ reinterpret_cast<const char*>(this->data_), this->len_);
+ needs_decode_ = false;
+ }
+
+ // Copy from intermediate buffer
+ const int values_to_decode = std::min(
+ max_values,
+ static_cast<int>(decoded_buffer_.size() - current_offset_));
+
+ if (values_to_decode > 0) {
+ std::memcpy(buffer, decoded_buffer_.data() + current_offset_,
+ values_to_decode * sizeof(T));
+ current_offset_ += values_to_decode;
+ this->num_values_ -= values_to_decode;
+ }
+
+ return values_to_decode;
+ }
+
+ int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
+ int64_t valid_bits_offset,
+ typename EncodingTraits<DType>::Accumulator* builder)
override {
+ const int values_to_decode = num_values - null_count;
+ if (ARROW_PREDICT_FALSE(this->num_values_ < values_to_decode)) {
+ ParquetException::EofException("ALP DecodeArrow: Not enough values
available. "
+ "Available: " +
std::to_string(this->num_values_) +
+ ", Requested: " +
std::to_string(values_to_decode));
+ }
+
+ // Decode if needed (DecodeArrow always needs intermediate buffer for
nulls)
+ if (needs_decode_) {
+ decoded_buffer_.resize(this->num_values_);
+ ::arrow::util::alp::AlpWrapper<T>::Decode(
+ decoded_buffer_.data(), static_cast<uint64_t>(this->num_values_),
Review Comment:
I think we should be doing incremental decoding here. It will reduce peak
memory, it also can avoid a memory copy as long as requested sizes are a power
of two which is typical and > Vector Size
The way to avoid a memory copies. is to call Reserve on the accumulator and
then decode directly to the underlying buffer and finally call
[UnsafeAdvance](https://github.com/apache/arrow/blob/07c1c44967c564bb399ff1e7ef5341e8bedb279e/cpp/src/arrow/array/builder_primitive.h#L333)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]