pitrou commented on a change in pull request #10978:
URL: https://github.com/apache/arrow/pull/10978#discussion_r704444525
##########
File path: cpp/src/parquet/encoding.cc
##########
@@ -2281,46 +2346,130 @@ class DeltaByteArrayDecoder : public DecoderImpl,
: DecoderImpl(descr, Encoding::DELTA_BYTE_ARRAY),
prefix_len_decoder_(nullptr, pool),
suffix_decoder_(nullptr, pool),
- last_value_(0, nullptr) {}
+ last_value_in_previous_page_(""),
+ buffered_prefix_length_(AllocateBuffer(pool, 0)),
+ buffered_data_(AllocateBuffer(pool, 0)) {}
- virtual void SetData(int num_values, const uint8_t* data, int len) {
+ void SetData(int num_values, const uint8_t* data, int len) override {
num_values_ = num_values;
- if (len == 0) return;
- int prefix_len_length = ::arrow::util::SafeLoadAs<int32_t>(data);
- data += 4;
- len -= 4;
- prefix_len_decoder_.SetData(num_values, data, prefix_len_length);
- data += prefix_len_length;
- len -= prefix_len_length;
- suffix_decoder_.SetData(num_values, data, len);
- }
-
- // TODO: this doesn't work and requires memory management. We need to
allocate
- // new strings to store the results.
- virtual int Decode(ByteArray* buffer, int max_values) {
- max_values = std::min(max_values, this->num_values_);
- for (int i = 0; i < max_values; ++i) {
- int prefix_len = 0;
- prefix_len_decoder_.Decode(&prefix_len, 1);
- ByteArray suffix = {0, nullptr};
- suffix_decoder_.Decode(&suffix, 1);
- buffer[i].len = prefix_len + suffix.len;
+ decoder_ = std::make_shared<::arrow::BitUtil::BitReader>(data, len);
+ prefix_len_decoder_.SetDecoder(num_values, decoder_);
+
+ int num_prefix = prefix_len_decoder_.ValidValuesCount();
+ PARQUET_THROW_NOT_OK(buffered_prefix_length_->Resize(num_prefix *
sizeof(int32_t)));
+ int ret = prefix_len_decoder_.Decode(
+ reinterpret_cast<int32_t*>(buffered_prefix_length_->mutable_data()),
num_prefix);
+ DCHECK_EQ(ret, num_prefix);
+ prefix_len_offset_ = 0;
+ num_valid_values_ = num_prefix;
+
+ suffix_decoder_.SetData(num_values, decoder_);
Review comment:
I don't know how it should be done exactly, but as I said `Decode(T*
buffer, int max_values)` can be called incrementally (and you're taking care to
support that already in the `DeltaBitPackDecoder`). You can compute the total
bytes upfront in `Init`, or you can do it lazily...
Ideally, we would have unit tests for incremental decoding...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]