pitrou commented on code in PR #40094:
URL: https://github.com/apache/arrow/pull/40094#discussion_r1494375511


##########
cpp/src/parquet/encoding.cc:
##########
@@ -3559,136 +3618,162 @@ class DeltaByteArrayFLBADecoder : public 
DeltaByteArrayDecoderImpl<FLBAType>,
 };
 
 // ----------------------------------------------------------------------
-// BYTE_STREAM_SPLIT
+// BYTE_STREAM_SPLIT decoders
 
 template <typename DType>
-class ByteStreamSplitDecoder : public DecoderImpl, virtual public 
TypedDecoder<DType> {
+class ByteStreamSplitDecoderBase : public DecoderImpl,
+                                   virtual public TypedDecoder<DType> {
  public:
   using T = typename DType::c_type;
-  explicit ByteStreamSplitDecoder(const ColumnDescriptor* descr);
 
-  int Decode(T* buffer, int max_values) override;
+  ByteStreamSplitDecoderBase(const ColumnDescriptor* descr, int byte_width)
+      : DecoderImpl(descr, Encoding::BYTE_STREAM_SPLIT), 
byte_width_(byte_width) {}
 
-  int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
-                  int64_t valid_bits_offset,
-                  typename EncodingTraits<DType>::Accumulator* builder) 
override;
+  void SetData(int num_values, const uint8_t* data, int len) override {
+    if (static_cast<int64_t>(num_values) * byte_width_ != len) {
+      throw ParquetException(
+          "Data size does not match number of values in BYTE_STREAM_SPLIT");
+    }
+    DecoderImpl::SetData(num_values, data, len);
+    stride_ = num_values_;
+  }
 
   int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
                   int64_t valid_bits_offset,
-                  typename EncodingTraits<DType>::DictAccumulator* builder) 
override;
-
-  void SetData(int num_values, const uint8_t* data, int len) override;
+                  typename EncodingTraits<DType>::DictAccumulator* builder) 
override {
+    ParquetException::NYI("DecodeArrow to DictAccumulator for 
BYTE_STREAM_SPLIT");
+  }
 
-  T* EnsureDecodeBuffer(int64_t min_values) {
-    const int64_t size = sizeof(T) * min_values;
+ protected:
+  int DecodeRaw(uint8_t* out_buffer, int max_values) {
+    const int values_to_decode = std::min(num_values_, max_values);
+    ::arrow::util::internal::ByteStreamSplitDecode(data_, byte_width_, 
values_to_decode,
+                                                   stride_, out_buffer);
+    data_ += values_to_decode;
+    num_values_ -= values_to_decode;
+    len_ -= byte_width_ * values_to_decode;
+    return values_to_decode;
+  }
+
+  uint8_t* EnsureDecodeBuffer(int64_t min_values) {
+    const int64_t size = byte_width_ * min_values;
     if (!decode_buffer_ || decode_buffer_->size() < size) {
-      PARQUET_ASSIGN_OR_THROW(decode_buffer_, ::arrow::AllocateBuffer(size));
+      const auto alloc_size = ::arrow::bit_util::NextPower2(size);
+      PARQUET_ASSIGN_OR_THROW(decode_buffer_, 
::arrow::AllocateBuffer(alloc_size));
     }
-    return decode_buffer_->mutable_data_as<T>();
+    return decode_buffer_->mutable_data();
   }
 
- private:
-  int num_values_in_buffer_{0};
+  const int byte_width_;
+  int stride_{0};
   std::shared_ptr<Buffer> decode_buffer_;
-
-  static constexpr int kNumStreams = sizeof(T);
+  const uint8_t* decode_data_;

Review Comment:
   Indeed, thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to