pitrou commented on code in PR #47294:
URL: https://github.com/apache/arrow/pull/47294#discussion_r2367144653


##########
cpp/src/arrow/util/rle_encoding_internal.h:
##########
@@ -299,385 +612,663 @@ class RleEncoder {
   uint8_t* literal_indicator_byte_;
 };
 
+/************************
+ *  RleBitPackedParser  *
+ ************************/
+
+template <typename Handler>
+void RleBitPackedParser::Parse(Handler&& handler) {
+  while (!exhausted()) {
+    auto [read, control] = PeekImpl(handler);
+    data_ += read;
+    data_size_ -= read;
+    if (ARROW_PREDICT_FALSE(control == ControlFlow::Break)) {
+      break;
+    }
+  }
+}
+
+namespace internal {
+/// The maximal unsigned size that a variable can fit.
+template <typename T>
+constexpr auto max_size_for_v =
+    static_cast<std::make_unsigned_t<T>>(std::numeric_limits<T>::max());
+
+}  // namespace internal
+
+template <typename Handler>
+auto RleBitPackedParser::PeekImpl(Handler&& handler) const
+    -> std::pair<rle_size_t, ControlFlow> {
+  ARROW_DCHECK(!exhausted());
+
+  constexpr auto kMaxSize = bit_util::kMaxLEB128ByteLenFor<uint32_t>;
+  uint32_t run_len_type = 0;
+  const auto header_bytes = bit_util::ParseLeadingLEB128(data_, kMaxSize, 
&run_len_type);
+
+  if (ARROW_PREDICT_FALSE(header_bytes == 0)) {
+    // Malfomrmed LEB128 data
+    return {};
+  }
+
+  const bool is_bit_packed = run_len_type & 1;
+  const uint32_t count = run_len_type >> 1;
+  if (is_bit_packed) {
+    constexpr auto kMaxCount = 
bit_util::CeilDiv(internal::max_size_for_v<rle_size_t>, 8);
+    if (ARROW_PREDICT_FALSE(count == 0 || count > kMaxCount)) {
+      // Illegal number of encoded values
+      return {0, ControlFlow::Break};
+    }
+
+    const auto values_count = static_cast<rle_size_t>(count * 8);
+    ARROW_DCHECK_LT(count, internal::max_size_for_v<rle_size_t>);
+    // Count Already divided by 8
+    const auto bytes_read =
+        header_bytes + static_cast<rle_size_t>(count) * value_bit_width_;
+
+    auto control = handler.OnBitPackedRun(
+        BitPackedRun(data_ + header_bytes, values_count, value_bit_width_));
+
+    return {bytes_read, control};
+  }
+
+  if (ARROW_PREDICT_FALSE(
+          count == 0 ||
+          count > 
static_cast<uint32_t>(std::numeric_limits<rle_size_t>::max()))) {
+    // Illegal number of encoded values
+    return {0, ControlFlow::Break};
+  }
+
+  const auto values_count = static_cast<rle_size_t>(count);
+  const auto value_bytes = bit_util::BytesForBits(value_bit_width_);
+  ARROW_DCHECK_LT(value_bytes, internal::max_size_for_v<rle_size_t>);
+  const auto bytes_read = header_bytes + static_cast<rle_size_t>(value_bytes);
+
+  auto control =
+      handler.OnRleRun(RleRun(data_ + header_bytes, values_count, 
value_bit_width_));
+
+  return {bytes_read, control};
+}
+
+/*************************
+ *  RleBitPackedDecoder  *
+ *************************/
+
 template <typename T>
-inline bool RleDecoder::Get(T* val) {
+template <typename Callable>
+void RleBitPackedDecoder<T>::ParseWithCallable(Callable&& func) {
+  struct {
+    Callable func;
+    auto OnBitPackedRun(BitPackedRun run) { return func(std::move(run)); }
+    auto OnRleRun(RleRun run) { return func(std::move(run)); }
+  } handler{std::move(func)};
+
+  parser_.Parse(std::move(handler));
+}
+
+template <typename T>
+bool RleBitPackedDecoder<T>::Get(value_type* val) {
   return GetBatch(val, 1) == 1;
 }
 
 template <typename T>
-inline int RleDecoder::GetBatch(T* values, int batch_size) {
-  ARROW_DCHECK_GE(bit_width_, 0);
-  int values_read = 0;
-
-  auto* out = values;
-
-  while (values_read < batch_size) {
-    int remaining = batch_size - values_read;
-
-    if (repeat_count_ > 0) {  // Repeated value case.
-      int repeat_batch = std::min(remaining, repeat_count_);
-      std::fill(out, out + repeat_batch, static_cast<T>(current_value_));
-
-      repeat_count_ -= repeat_batch;
-      values_read += repeat_batch;
-      out += repeat_batch;
-    } else if (literal_count_ > 0) {
-      int literal_batch = std::min(remaining, literal_count_);
-      int actual_read = bit_reader_.GetBatch(bit_width_, out, literal_batch);
-      if (actual_read != literal_batch) {
-        return values_read;
-      }
+auto RleBitPackedDecoder<T>::GetBatch(value_type* out, rle_size_t batch_size)
+    -> rle_size_t {
+  using ControlFlow = RleBitPackedParser::ControlFlow;
 
-      literal_count_ -= literal_batch;
-      values_read += literal_batch;
-      out += literal_batch;
-    } else {
-      if (!NextCounts<T>()) return values_read;
+  rle_size_t values_read = 0;
+
+  // Remaining from a previous call that would have left some unread data from 
a run.
+  if (ARROW_PREDICT_FALSE(run_remaining() > 0)) {
+    const auto read = RunGetBatch(out, batch_size);
+    values_read += read;
+    out += read;
+
+    // Either we fulfilled all the batch to be read or we finished remaining 
run.
+    if (ARROW_PREDICT_FALSE(values_read == batch_size)) {
+      return values_read;
     }
+    ARROW_DCHECK(run_remaining() == 0);
   }
 
+  ParseWithCallable([&](auto run) {
+    using RunDecoder = typename decltype(run)::template 
DecoderType<value_type>;
+
+    ARROW_DCHECK_LT(values_read, batch_size);
+    RunDecoder decoder(run);
+    const auto read = decoder.GetBatch(out, batch_size - values_read);
+    ARROW_DCHECK_LE(read, batch_size - values_read);
+    values_read += read;
+    out += read;
+
+    // Stop reading and store remaining decoder
+    if (ARROW_PREDICT_FALSE(values_read == batch_size || read == 0)) {

Review Comment:
   Ok, but then, when is `read == 0` supposed to occur? The only possibility is 
that the `decoder`'s length is 0, but `PeekImpl` is careful to avoid that.
   
   I think we can change that to a debug assertion:
   ```c++
       // Stop reading and store remaining decoder
       if (ARROW_PREDICT_FALSE(values_read == batch_size)) {
         decoder_ = std::move(decoder);
         return ControlFlow::Break;
       }
       // We ensure that `values_read != batch_size` before calling 
`RunDecoder::GetBatch`
       // and `ParseWithCallable` ensures the run isn't empty.
       ARROW_DCHECK(read != 0);
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to