pitrou commented on code in PR #47294:
URL: https://github.com/apache/arrow/pull/47294#discussion_r2319450787


##########
cpp/src/arrow/util/rle_encoding_internal.h:
##########
@@ -299,385 +552,988 @@ class RleEncoder {
   uint8_t* literal_indicator_byte_;
 };
 
+/*************************
+ *  RleBitPackedDecoder  *
+ *************************/
+
+template <typename T>
+RleBitPackedDecoder<T>::RleBitPackedDecoder(raw_data_const_pointer data,
+                                            raw_data_size_type data_size,
+                                            bit_size_type value_bit_width) 
noexcept {
+  Reset(data, data_size, value_bit_width);
+}
+
+template <typename T>
+void RleBitPackedDecoder<T>::Reset(raw_data_const_pointer data,
+                                   raw_data_size_type data_size,
+                                   bit_size_type value_bit_width) noexcept {
+  ARROW_DCHECK_GE(value_bit_width, 0);
+  ARROW_DCHECK_LE(value_bit_width, 64);
+  parser_.Reset(data, data_size, value_bit_width);
+  decoder_ = {};
+}
+
+template <typename T>
+auto RleBitPackedDecoder<T>::RunRemaining() const -> values_count_type {
+  return std::visit([](auto const& dec) { return dec.Remaining(); }, decoder_);
+}
+
+template <typename T>
+bool RleBitPackedDecoder<T>::Exhausted() const {
+  return (RunRemaining() == 0) && parser_.Exhausted();
+}
+
 template <typename T>
-inline bool RleDecoder::Get(T* val) {
+bool RleBitPackedDecoder<T>::ParseAndResetDecoder() {
+  auto dyn_run = parser_.Next();
+  if (!dyn_run.has_value()) {
+    return false;
+  }
+
+  if (auto* rle_run = std::get_if<BitPackedRun>(dyn_run.operator->())) {
+    decoder_ = {BitPackedDecoder<value_type>(*rle_run)};
+    return true;
+  }
+
+  auto* bit_packed_run = std::get_if<RleRun>(dyn_run.operator->());
+  ARROW_DCHECK(bit_packed_run);  // Only two possibilities in the variant
+  decoder_ = {RleDecoder<value_type>(*bit_packed_run)};
+  return true;
+}
+
+template <typename T>
+auto RleBitPackedDecoder<T>::RunGetBatch(value_type* out, values_count_type 
batch_size)
+    -> values_count_type {
+  return std::visit([&](auto& dec) { return dec.GetBatch(out, batch_size); }, 
decoder_);
+}
+
+template <typename T>
+bool RleBitPackedDecoder<T>::Get(value_type* val) {
   return GetBatch(val, 1) == 1;
 }
 
+namespace internal {
+
+/// A ``Parse`` handler that calls a single lambda.
+///
+/// This lambda would typically take the input run as ``auto run`` (i.e. the 
lambda is
+/// templated) and deduce other types from it.
+template <typename Lambda>
+struct LambdaHandler {
+  Lambda handlder_;
+
+  auto OnBitPackedRun(BitPackedRun run) { return handlder_(std::move(run)); }
+
+  auto OnRleRun(RleRun run) { return handlder_(std::move(run)); }
+};
+
+template <typename Lambda>
+LambdaHandler(Lambda) -> LambdaHandler<Lambda>;
+
+template <typename value_type, typename Run>
+struct decoder_for;
+
+template <typename value_type>
+struct decoder_for<value_type, BitPackedRun> {
+  using type = BitPackedDecoder<value_type>;
+};
+
+template <typename value_type>
+struct decoder_for<value_type, RleRun> {
+  using type = RleDecoder<value_type>;
+};
+
+template <typename value_type, typename Run>
+using decoder_for_t = typename decoder_for<value_type, Run>::type;
+
+}  // namespace internal
+
 template <typename T>
-inline int RleDecoder::GetBatch(T* values, int batch_size) {
-  ARROW_DCHECK_GE(bit_width_, 0);
-  int values_read = 0;
-
-  auto* out = values;
-
-  while (values_read < batch_size) {
-    int remaining = batch_size - values_read;
-
-    if (repeat_count_ > 0) {  // Repeated value case.
-      int repeat_batch = std::min(remaining, repeat_count_);
-      std::fill(out, out + repeat_batch, static_cast<T>(current_value_));
-
-      repeat_count_ -= repeat_batch;
-      values_read += repeat_batch;
-      out += repeat_batch;
-    } else if (literal_count_ > 0) {
-      int literal_batch = std::min(remaining, literal_count_);
-      int actual_read = bit_reader_.GetBatch(bit_width_, out, literal_batch);
-      if (actual_read != literal_batch) {
-        return values_read;
-      }
+auto RleBitPackedDecoder<T>::GetBatch(value_type* out, values_count_type 
batch_size)
+    -> values_count_type {
+  using ControlFlow = RleBitPackedParser::ControlFlow;
 
-      literal_count_ -= literal_batch;
-      values_read += literal_batch;
-      out += literal_batch;
-    } else {
-      if (!NextCounts<T>()) return values_read;
+  values_count_type values_read = 0;
+
+  // Remaining from a previous call that would have left some unread data from 
a run.
+  if (ARROW_PREDICT_FALSE(RunRemaining() > 0)) {
+    auto const read = RunGetBatch(out, batch_size);
+    values_read += read;
+    out += read;
+
+    // Either we fulfilled all the batch to be read or we finished remaining 
run.
+    if (ARROW_PREDICT_FALSE(values_read == batch_size)) {
+      return values_read;
     }
+    ARROW_DCHECK(RunRemaining() == 0);
   }
 
+  auto handler = internal::LambdaHandler{
+      [&](auto run) {
+        ARROW_DCHECK_LT(values_read, batch_size);
+        internal::decoder_for_t<value_type, decltype(run)> decoder(run);
+        auto const read = decoder.GetBatch(out, batch_size - values_read);
+        ARROW_DCHECK_LE(read, batch_size - values_read);
+        values_read += read;
+        out += read;
+
+        // Stop reading and store remaining decoder
+        if (ARROW_PREDICT_FALSE(values_read == batch_size || read == 0)) {

Review Comment:
   Not sure about `ARROW_PREDICT_FALSE` either here. Its probability might 
depend on RLE stream and batching patterns in the upper layers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to