This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 79f97640d0 GH-47112: [Parquet][C++] Rle BitPacked parser (#47294)
79f97640d0 is described below

commit 79f97640d0d79af928a300cb53c5ce6fad3dabe4
Author: Antoine Prouvost <[email protected]>
AuthorDate: Wed Sep 24 12:28:48 2025 +0200

    GH-47112: [Parquet][C++] Rle BitPacked parser (#47294)
    
    ### Rationale for this change
    
    ### What changes are included in this PR?
    New independent abstractions:
    - A `BitPackedRun` to describe the encoded bytes in a bit packed run.
    - A minimal `BitPackedDecoder` that can decode this type of run (no 
dict/spaced methods).
    - A `RleRun` to describe the encoded value in a RLE run.
    - A minimal `RleDecoder` that can decode this type of run (no dict/spaced 
methods).
    - A `RleBitPackedParser` that read the encoded headers and emits different 
runs.
    
    These new abstractions are then plugged into `RleBitPackedDecoder` 
(formerly `RleDecode`) to keep the compatibility with the rest of Arrow 
(improvements to using the parser independently can come in follow-up PR).
    
    Misc changes:
    - Separation of LEB128 reading/writing from `BitReader` into a free 
functions, and add check for a special case for handling undefined behavior 
overflow.
    
    ### Are these changes tested?
    Yes, on top of the existing tests, many more unit tests have been added.
    
    ### Are there any user-facing changes?
    API changes to internal classes.
    
    * GitHub Issue: #47112
    
    Authored-by: AntoinePrv <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 CPPLINT.cfg                                    |    2 +
 cpp/src/arrow/util/bit_run_reader.h            |    4 +
 cpp/src/arrow/util/bit_stream_utils_internal.h |  178 ++--
 cpp/src/arrow/util/bit_util.h                  |  121 +++
 cpp/src/arrow/util/bit_util_test.cc            |  188 +++-
 cpp/src/arrow/util/bitmap_reader.h             |    2 +
 cpp/src/arrow/util/rle_encoding_internal.h     | 1361 +++++++++++++++++-------
 cpp/src/arrow/util/rle_encoding_test.cc        |  674 ++++++++++--
 cpp/src/parquet/column_reader.cc               |    8 +-
 cpp/src/parquet/column_reader.h                |    5 +-
 cpp/src/parquet/column_writer.cc               |    8 +-
 cpp/src/parquet/column_writer.h                |    4 +-
 cpp/src/parquet/decoder.cc                     |   16 +-
 cpp/src/parquet/encoder.cc                     |   11 +-
 14 files changed, 2004 insertions(+), 578 deletions(-)

diff --git a/CPPLINT.cfg b/CPPLINT.cfg
index 2f47b4dbf5..dd1139ac7f 100644
--- a/CPPLINT.cfg
+++ b/CPPLINT.cfg
@@ -26,5 +26,7 @@ filter = -readability/alt_tokens
 filter = -readability/casting
 filter = -readability/todo
 filter = -runtime/references
+# Let the formatter do the job for whitespaces
 filter = -whitespace/comments
+filter = -whitespace/braces
 linelength = 90
diff --git a/cpp/src/arrow/util/bit_run_reader.h 
b/cpp/src/arrow/util/bit_run_reader.h
index a2cbad5b29..ed7be940a5 100644
--- a/cpp/src/arrow/util/bit_run_reader.h
+++ b/cpp/src/arrow/util/bit_run_reader.h
@@ -52,6 +52,8 @@ inline bool operator!=(const BitRun& lhs, const BitRun& rhs) {
 
 class BitRunReaderLinear {
  public:
+  BitRunReaderLinear() = default;
+
   BitRunReaderLinear(const uint8_t* bitmap, int64_t start_offset, int64_t 
length)
       : reader_(bitmap, start_offset, length) {}
 
@@ -74,6 +76,8 @@ class BitRunReaderLinear {
 /// in a bitmap.
 class ARROW_EXPORT BitRunReader {
  public:
+  BitRunReader() = default;
+
   /// \brief Constructs new BitRunReader.
   ///
   /// \param[in] bitmap source data
diff --git a/cpp/src/arrow/util/bit_stream_utils_internal.h 
b/cpp/src/arrow/util/bit_stream_utils_internal.h
index 2b5ec3830e..1f3b699e1a 100644
--- a/cpp/src/arrow/util/bit_stream_utils_internal.h
+++ b/cpp/src/arrow/util/bit_stream_utils_internal.h
@@ -22,6 +22,7 @@
 #include <algorithm>
 #include <cstdint>
 #include <cstring>
+#include <type_traits>
 
 #include "arrow/util/bit_util.h"
 #include "arrow/util/bpacking_internal.h"
@@ -30,8 +31,7 @@
 #include "arrow/util/macros.h"
 #include "arrow/util/ubsan.h"
 
-namespace arrow {
-namespace bit_util {
+namespace arrow::bit_util {
 
 /// Utility class to write bit/byte streams.  This class can write data to 
either be
 /// bit packed or byte aligned (and a single stream that has a mix of both).
@@ -73,19 +73,14 @@ class BitWriter {
   /// room.  The value is written byte aligned.
   /// For more details on vlq:
   /// en.wikipedia.org/wiki/Variable-length_quantity
-  bool PutVlqInt(uint32_t v);
+  template <typename Int>
+  bool PutVlqInt(Int v);
 
-  // Writes an int zigzag encoded.
-  bool PutZigZagVlqInt(int32_t v);
-
-  /// Write a Vlq encoded int64 to the buffer.  Returns false if there was not 
enough
-  /// room.  The value is written byte aligned.
-  /// For more details on vlq:
-  /// en.wikipedia.org/wiki/Variable-length_quantity
-  bool PutVlqInt(uint64_t v);
-
-  // Writes an int64 zigzag encoded.
-  bool PutZigZagVlqInt(int64_t v);
+  /// Writes a zigzag encoded signed integer.
+  /// Zigzag encoding is used to encode possibly negative numbers by 
alternating positive
+  /// and negative ones.
+  template <typename Int>
+  bool PutZigZagVlqInt(Int v);
 
   /// Get a pointer to the next aligned byte and advance the underlying buffer
   /// by num_bytes.
@@ -128,14 +123,14 @@ inline uint64_t ReadLittleEndianWord(const uint8_t* 
buffer, int bytes_remaining)
 /// bytes in one read (e.g. encoded int).
 class BitReader {
  public:
-  BitReader() = default;
+  BitReader() noexcept = default;
 
   /// 'buffer' is the buffer to read from.  The buffer's length is 
'buffer_len'.
   BitReader(const uint8_t* buffer, int buffer_len) : BitReader() {
     Reset(buffer, buffer_len);
   }
 
-  void Reset(const uint8_t* buffer, int buffer_len) {
+  void Reset(const uint8_t* buffer, int buffer_len) noexcept {
     buffer_ = buffer;
     max_bytes_ = buffer_len;
     byte_offset_ = 0;
@@ -169,18 +164,14 @@ class BitReader {
   /// Reads a vlq encoded int from the stream.  The encoded int must start at
   /// the beginning of a byte. Return false if there were not enough bytes in
   /// the buffer.
-  bool GetVlqInt(uint32_t* v);
-
-  // Reads a zigzag encoded int `into` v.
-  bool GetZigZagVlqInt(int32_t* v);
-
-  /// Reads a vlq encoded int64 from the stream.  The encoded int must start at
-  /// the beginning of a byte. Return false if there were not enough bytes in
-  /// the buffer.
-  bool GetVlqInt(uint64_t* v);
+  template <typename Int>
+  bool GetVlqInt(Int* v);
 
-  // Reads a zigzag encoded int64 `into` v.
-  bool GetZigZagVlqInt(int64_t* v);
+  /// Reads a zigzag encoded integer into a signed integer output v.
+  /// Zigzag encoding is used to decode possibly negative numbers by 
alternating positive
+  /// and negative ones.
+  template <typename Int>
+  bool GetZigZagVlqInt(Int* v);
 
   /// Returns the number of bytes left in the stream, not including the current
   /// byte (i.e., there may be an additional fraction of a byte).
@@ -189,12 +180,6 @@ class BitReader {
            (byte_offset_ + 
static_cast<int>(bit_util::BytesForBits(bit_offset_)));
   }
 
-  /// Maximum byte length of a vlq encoded int
-  static constexpr int kMaxVlqByteLength = 5;
-
-  /// Maximum byte length of a vlq encoded int64
-  static constexpr int kMaxVlqByteLengthForInt64 = 10;
-
  private:
   const uint8_t* buffer_;
   int max_bytes_;
@@ -439,91 +424,92 @@ inline bool BitReader::Advance(int64_t num_bits) {
   return true;
 }
 
-inline bool BitWriter::PutVlqInt(uint32_t v) {
-  bool result = true;
-  while ((v & 0xFFFFFF80UL) != 0UL) {
-    result &= PutAligned<uint8_t>(static_cast<uint8_t>((v & 0x7F) | 0x80), 1);
-    v >>= 7;
-  }
-  result &= PutAligned<uint8_t>(static_cast<uint8_t>(v & 0x7F), 1);
-  return result;
-}
+template <typename Int>
+inline bool BitWriter::PutVlqInt(Int v) {
+  static_assert(std::is_integral_v<Int>);
 
-inline bool BitReader::GetVlqInt(uint32_t* v) {
-  uint32_t tmp = 0;
+  constexpr auto kBufferSize = kMaxLEB128ByteLenFor<Int>;
 
-  for (int i = 0; i < kMaxVlqByteLength; i++) {
-    uint8_t byte = 0;
-    if (ARROW_PREDICT_FALSE(!GetAligned<uint8_t>(1, &byte))) {
+  uint8_t buffer[kBufferSize] = {};
+  const auto bytes_written = WriteLEB128(v, buffer, kBufferSize);
+  ARROW_DCHECK_LE(bytes_written, kBufferSize);
+  if constexpr (std::is_signed_v<Int>) {
+    // Can fail if negative
+    if (ARROW_PREDICT_FALSE(!bytes_written == 0)) {
       return false;
     }
-    tmp |= static_cast<uint32_t>(byte & 0x7F) << (7 * i);
+  } else {
+    // Cannot fail since we gave max space
+    ARROW_DCHECK_GT(bytes_written, 0);
+  }
 
-    if ((byte & 0x80) == 0) {
-      *v = tmp;
-      return true;
+  for (int i = 0; i < bytes_written; ++i) {
+    const bool success = PutAligned(buffer[i], 1);
+    if (ARROW_PREDICT_FALSE(!success)) {
+      return false;
     }
   }
 
-  return false;
-}
-
-inline bool BitWriter::PutZigZagVlqInt(int32_t v) {
-  uint32_t u_v = ::arrow::util::SafeCopy<uint32_t>(v);
-  u_v = (u_v << 1) ^ static_cast<uint32_t>(v >> 31);
-  return PutVlqInt(u_v);
-}
-
-inline bool BitReader::GetZigZagVlqInt(int32_t* v) {
-  uint32_t u;
-  if (!GetVlqInt(&u)) return false;
-  u = (u >> 1) ^ (~(u & 1) + 1);
-  *v = ::arrow::util::SafeCopy<int32_t>(u);
   return true;
 }
 
-inline bool BitWriter::PutVlqInt(uint64_t v) {
-  bool result = true;
-  while ((v & 0xFFFFFFFFFFFFFF80ULL) != 0ULL) {
-    result &= PutAligned<uint8_t>(static_cast<uint8_t>((v & 0x7F) | 0x80), 1);
-    v >>= 7;
+template <typename Int>
+inline bool BitReader::GetVlqInt(Int* v) {
+  static_assert(std::is_integral_v<Int>);
+
+  // The data that we will pass to the LEB128 parser
+  // In all case, we read a byte-aligned value, skipping remaining bits
+  const uint8_t* data = NULLPTR;
+  int max_size = 0;
+
+  // Number of bytes left in the buffered values, not including the current
+  // byte (i.e., there may be an additional fraction of a byte).
+  const int bytes_left_in_cache =
+      sizeof(buffered_values_) - 
static_cast<int>(bit_util::BytesForBits(bit_offset_));
+
+  // If there are clearly enough bytes left we can try to parse from the cache
+  if (bytes_left_in_cache >= kMaxLEB128ByteLenFor<Int>) {
+    max_size = bytes_left_in_cache;
+    data = reinterpret_cast<const uint8_t*>(&buffered_values_) +
+           bit_util::BytesForBits(bit_offset_);
+    // Otherwise, we try straight from buffer (ignoring few bytes that may be 
cached)
+  } else {
+    max_size = bytes_left();
+    data = buffer_ + (max_bytes_ - max_size);
   }
-  result &= PutAligned<uint8_t>(static_cast<uint8_t>(v & 0x7F), 1);
-  return result;
-}
 
-inline bool BitReader::GetVlqInt(uint64_t* v) {
-  uint64_t tmp = 0;
-
-  for (int i = 0; i < kMaxVlqByteLengthForInt64; i++) {
-    uint8_t byte = 0;
-    if (ARROW_PREDICT_FALSE(!GetAligned<uint8_t>(1, &byte))) {
-      return false;
-    }
-    tmp |= static_cast<uint64_t>(byte & 0x7F) << (7 * i);
-
-    if ((byte & 0x80) == 0) {
-      *v = tmp;
-      return true;
-    }
+  const auto bytes_read = bit_util::ParseLeadingLEB128(data, max_size, v);
+  if (ARROW_PREDICT_FALSE(bytes_read == 0)) {
+    // Corrupt LEB128
+    return false;
   }
 
-  return false;
+  // Advance for the bytes we have read + the bits we skipped
+  return Advance((8 * bytes_read) + (bit_offset_ % 8));
 }
 
-inline bool BitWriter::PutZigZagVlqInt(int64_t v) {
-  uint64_t u_v = ::arrow::util::SafeCopy<uint64_t>(v);
-  u_v = (u_v << 1) ^ static_cast<uint64_t>(v >> 63);
+template <typename Int>
+inline bool BitWriter::PutZigZagVlqInt(Int v) {
+  static_assert(std::is_integral_v<Int>);
+  static_assert(std::is_signed_v<Int>);
+  using UInt = std::make_unsigned_t<Int>;
+  constexpr auto kBitSize = 8 * sizeof(Int);
+
+  UInt u_v = ::arrow::util::SafeCopy<UInt>(v);
+  u_v = (u_v << 1) ^ static_cast<UInt>(v >> (kBitSize - 1));
   return PutVlqInt(u_v);
 }
 
-inline bool BitReader::GetZigZagVlqInt(int64_t* v) {
-  uint64_t u;
+template <typename Int>
+inline bool BitReader::GetZigZagVlqInt(Int* v) {
+  static_assert(std::is_integral_v<Int>);
+  static_assert(std::is_signed_v<Int>);
+
+  std::make_unsigned_t<Int> u;
   if (!GetVlqInt(&u)) return false;
   u = (u >> 1) ^ (~(u & 1) + 1);
-  *v = ::arrow::util::SafeCopy<int64_t>(u);
+  *v = ::arrow::util::SafeCopy<Int>(u);
   return true;
 }
 
-}  // namespace bit_util
-}  // namespace arrow
+}  // namespace arrow::bit_util
diff --git a/cpp/src/arrow/util/bit_util.h b/cpp/src/arrow/util/bit_util.h
index 13d265f0be..8d4811ede7 100644
--- a/cpp/src/arrow/util/bit_util.h
+++ b/cpp/src/arrow/util/bit_util.h
@@ -365,5 +365,126 @@ void PackBits(const uint32_t* values, uint8_t* out) {
   }
 }
 
+constexpr int64_t MaxLEB128ByteLen(int64_t n_bits) { return CeilDiv(n_bits, 
7); }
+
+template <typename Int>
+constexpr int64_t kMaxLEB128ByteLenFor = MaxLEB128ByteLen(sizeof(Int) * 8);
+
+/// Write a integer as LEB128
+///
+/// Write the input value as LEB128 into the outptut buffer and return the 
number of bytes
+/// written.
+/// If the output buffer size is insufficient, return 0 but the output may 
have been
+/// written to.
+/// The input value can be a signed integer, but must be non negative.
+///
+/// \see https://en.wikipedia.org/wiki/LEB128
+/// \see MaxLEB128ByteLenFor
+template <typename Int>
+constexpr int32_t WriteLEB128(Int value, uint8_t* out, int32_t max_out_size) {
+  constexpr Int kLow7Mask = Int(0x7F);
+  constexpr Int kHigh7Mask = ~kLow7Mask;
+  constexpr uint8_t kContinuationBit = 0x80;
+
+  // This encoding does not work for negative values
+  if constexpr (std::is_signed_v<Int>) {
+    if (ARROW_PREDICT_FALSE(value < 0)) {
+      return 0;
+    }
+  }
+
+  const auto out_first = out;
+
+  // Write as many bytes as we could be for the given input
+  while ((value & kHigh7Mask) != Int(0)) {
+    // We do not have enough room to write the LEB128
+    if (ARROW_PREDICT_FALSE(out - out_first >= max_out_size)) {
+      return 0;
+    }
+
+    // Write the encoded byte with continuation bit
+    *out = static_cast<uint8_t>(value & kLow7Mask) | kContinuationBit;
+    ++out;
+    // Shift remaining data
+    value >>= 7;
+  }
+
+  // We do not have enough room to write the LEB128
+  if (ARROW_PREDICT_FALSE(out - out_first >= max_out_size)) {
+    return 0;
+  }
+
+  // Write last non-continuing byte
+  *out = static_cast<uint8_t>(value & kLow7Mask);
+  ++out;
+
+  return static_cast<int32_t>(out - out_first);
+}
+
+/// Parse a leading LEB128
+///
+/// Take as input a data pointer and the maximum number of bytes that can be 
read from it
+/// (typically the array size).
+/// When a valid LEB128 is found at the start of the data, the function writes 
it to the
+/// out pointer and return the number of bytes read.
+/// Otherwise, the out pointer is unmodified and zero is returned.
+///
+/// \see https://en.wikipedia.org/wiki/LEB128
+/// \see MaxLEB128ByteLenFor
+template <typename Int>
+constexpr int32_t ParseLeadingLEB128(const uint8_t* data, int32_t 
max_data_size,
+                                     Int* out) {
+  constexpr auto kMaxBytes = static_cast<int32_t>(kMaxLEB128ByteLenFor<Int>);
+  static_assert(kMaxBytes >= 1);
+  constexpr uint8_t kLow7Mask = 0x7F;
+  constexpr uint8_t kContinuationBit = 0x80;
+  constexpr int32_t kSignBitCount = std::is_signed_v<Int> ? 1 : 0;
+  // Number of bits allowed for encoding data on the last byte to avoid 
overflow
+  constexpr uint8_t kHighBitCount = (8 * sizeof(Int) - kSignBitCount) % 7;
+  // kHighBitCount least significant `0` bits and the rest with `1`
+  constexpr uint8_t kHighForbiddenMask = ~((1 << kHighBitCount) - 1);
+
+  // Iteratively building the value
+  std::make_unsigned_t<Int> value = 0;
+
+  // Read as many bytes as we could be for the given output.
+  for (int32_t i = 0; i < kMaxBytes - 1; i++) {
+    // We have not finished reading a valid LEB128, yet we run out of data
+    if (ARROW_PREDICT_FALSE(i >= max_data_size)) {
+      return 0;
+    }
+
+    // Read the byte and set its 7 LSB to in the final value
+    const uint8_t byte = data[i];
+    value |= static_cast<Int>(byte & kLow7Mask) << (7 * i);
+
+    // Check for lack of continuation flag in MSB
+    if ((byte & kContinuationBit) == 0) {
+      *out = value;
+      return i + 1;
+    }
+  }
+
+  // Process the last index avoiding overflowing
+  constexpr int32_t last = kMaxBytes - 1;
+
+  // We have not finished reading a valid LEB128, yet we run out of data
+  if (ARROW_PREDICT_FALSE(last >= max_data_size)) {
+    return 0;
+  }
+
+  const uint8_t byte = data[last];
+
+  // Need to check if there are bits that would overflow the output.
+  // Also checks that there is no continuation.
+  if (ARROW_PREDICT_FALSE((byte & kHighForbiddenMask) != 0)) {
+    return 0;
+  }
+
+  // No longer need to mask since we ensured
+  value |= static_cast<Int>(byte) << (7 * last);
+  *out = value;
+  return last + 1;
+}
 }  // namespace bit_util
 }  // namespace arrow
diff --git a/cpp/src/arrow/util/bit_util_test.cc 
b/cpp/src/arrow/util/bit_util_test.cc
index fcaeb49261..e8cee340de 100644
--- a/cpp/src/arrow/util/bit_util_test.cc
+++ b/cpp/src/arrow/util/bit_util_test.cc
@@ -1997,11 +1997,189 @@ TEST(BitUtil, RoundUpToPowerOf2) {
 #undef U64
 #undef S64
 
+/// Test the maximum number of bytes needed to write a LEB128 of a give size.
+TEST(LEB128, MaxLEB128ByteLenFor) {
+  EXPECT_EQ(bit_util::kMaxLEB128ByteLenFor<int8_t>, 2);
+  EXPECT_EQ(bit_util::kMaxLEB128ByteLenFor<uint8_t>, 2);
+  EXPECT_EQ(bit_util::kMaxLEB128ByteLenFor<int16_t>, 3);
+  EXPECT_EQ(bit_util::kMaxLEB128ByteLenFor<uint16_t>, 3);
+  EXPECT_EQ(bit_util::kMaxLEB128ByteLenFor<int32_t>, 5);
+  EXPECT_EQ(bit_util::kMaxLEB128ByteLenFor<uint32_t>, 5);
+  EXPECT_EQ(bit_util::kMaxLEB128ByteLenFor<int64_t>, 10);
+  EXPECT_EQ(bit_util::kMaxLEB128ByteLenFor<uint64_t>, 10);
+}
+
+/// Utility function to test LEB128 encoding with known input value and 
expected byte
+/// array
+template <typename Int>
+void TestLEB128Encode(Int input_value, const std::vector<uint8_t>& 
expected_data,
+                      std::size_t buffer_size) {
+  std::vector<uint8_t> buffer(buffer_size);
+  auto bytes_written = bit_util::WriteLEB128(input_value, buffer.data(),
+                                             
static_cast<int32_t>(buffer.size()));
+
+  EXPECT_EQ(bytes_written, expected_data.size());
+  // Encoded data
+  for (std::size_t i = 0; i < expected_data.size(); ++i) {
+    EXPECT_EQ(buffer.at(i), expected_data.at(i));
+  }
+
+  // When the value is successfully encoded, the remaining of the buffer is 
untouched
+  if (bytes_written > 0) {
+    for (std::size_t i = bytes_written; i < buffer.size(); ++i) {
+      EXPECT_EQ(buffer.at(i), 0);
+    }
+  }
+}
+
+/// Test encoding to known LEB128 byte sequences with edge cases parameters.
+/// \see LEB128.KnownSuccessfulValues for other known values tested.
+TEST(LEB128, WriteEdgeCases) {
+  // Single byte value 0
+  TestLEB128Encode(0U, {0x00}, 1);
+  // Single byte value 127
+  TestLEB128Encode(127U, {0x7F}, 1);
+  // Three byte value 16384, encoded in larger buffer
+  TestLEB128Encode(16384U, {0x80, 0x80, 0x01}, 10);
+  // Two byte boundary values
+  TestLEB128Encode(128U, {0x80, 0x01}, 2);
+  TestLEB128Encode(129U, {0x81, 0x01}, 2);
+  TestLEB128Encode(16383U, {0xFF, 0x7F}, 2);
+  // Error case: Buffer too small for value 128 (needs 2 bytes but only 1 
provided)
+  TestLEB128Encode(128U, {}, 1);
+  // Error case: Buffer too small for uint32_t max (needs 5 bytes but only 4 
provided)
+  TestLEB128Encode(4294967295U, {}, 4);
+  // Error case: Zero buffer size
+  TestLEB128Encode(52U, {}, 0);
+  // Error case: Negative value
+  TestLEB128Encode(-3, {}, 1);
+}
+
+/// Utility function to test LEB128 decoding with known byte array and 
expected result
+template <typename Int>
+void TestLEB128Decode(const std::vector<uint8_t>& data, Int expected_value,
+                      int32_t expected_bytes_read) {
+  Int result = 0;
+  auto bytes_read = bit_util::ParseLeadingLEB128(
+      data.data(), static_cast<int32_t>(data.size()), &result);
+  EXPECT_EQ(bytes_read, expected_bytes_read);
+  if (expected_bytes_read > 0) {
+    EXPECT_EQ(result, expected_value);
+  }
+}
+
+/// Test decoding from known LEB128 byte sequences with edge case parameters.
+/// \see LEB128.KnownSuccessfulValues for other known values tested.
+TEST(LEB128, ReadEdgeCases) {
+  // Single byte value 0
+  TestLEB128Decode({0x00}, 0U, 1);
+  // Single byte value 127
+  TestLEB128Decode({0x7F}, 127U, 1);
+  // Three byte value 16384, with remaining data
+  TestLEB128Decode({0x80, 0x80, 0x01, 0x80, 0x00}, 16384U, 3);
+  // Four byte value 268435455
+  TestLEB128Decode({0xFF, 0xFF, 0xFF, 0x7F}, 268435455U, 4);
+  // Error case: Truncated sequence (continuation bit set but no more data)
+  TestLEB128Decode({0x80}, 0U, 0);
+  // Error case: Input has exactly the maximum number of bytes for a int32_t 
(5),
+  // but the decoded value overflows nonetheless (7 * 5 = 35 bits of data).
+  TestLEB128Decode({0xFF, 0xFF, 0xFF, 0xFF, 0x7F}, int32_t{}, 0);
+  // Error case: Oversized sequence for uint32_t (too many bytes)
+  TestLEB128Decode({0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x01}, 0U, 0);
+}
+
+struct KnownLEB128Encoding {
+  uint64_t value;
+  std::vector<uint8_t> bytes;
+};
+
+static const std::vector<KnownLEB128Encoding> KnownLEB128EncodingValues{
+    {0, {0x00}},
+    {1, {0x01}},
+    {63, {0x3F}},
+    {64, {0x40}},
+    {127U, {0x7F}},
+    {128, {0x80, 0x01}},
+    {300, {0xAC, 0x02}},
+    {16384, {0x80, 0x80, 0x01}},
+    {268435455, {0xFF, 0xFF, 0xFF, 0x7F}},
+    {static_cast<uint64_t>(std::numeric_limits<uint8_t>::max()), {0xFF, 0x01}},
+    {static_cast<uint64_t>(std::numeric_limits<int8_t>::max()), {0x7F}},
+    {static_cast<uint64_t>(std::numeric_limits<uint16_t>::max()), {0xFF, 0xFF, 
0x03}},
+    {static_cast<uint64_t>(std::numeric_limits<int16_t>::max()), {0xFF, 0xFF, 
0x01}},
+    {static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()),
+     {0xFF, 0xFF, 0xFF, 0xFF, 0x0F}},
+    {static_cast<uint64_t>(std::numeric_limits<int32_t>::max()),
+     {0xFF, 0xFF, 0xFF, 0xFF, 0x7}},
+    {static_cast<uint64_t>(std::numeric_limits<uint64_t>::max()),
+     {0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x01}},
+    {static_cast<uint64_t>(std::numeric_limits<int64_t>::max()),
+     {0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x7F}},
+};
+
+/// Test encoding and decoding to known LEB128 byte sequences with all possible
+/// integer sizes and signess.
+TEST(LEB128, KnownSuccessfulValues) {
+  for (const auto& data : KnownLEB128EncodingValues) {
+    SCOPED_TRACE("Testing value " + std::to_string(data.value));
+
+    // 8 bits
+    if (data.value <= 
static_cast<uint64_t>(std::numeric_limits<int8_t>::max())) {
+      const auto val = static_cast<int8_t>(data.value);
+      TestLEB128Encode(val, data.bytes, data.bytes.size());
+      TestLEB128Decode(data.bytes, val, 
static_cast<int32_t>(data.bytes.size()));
+    }
+    if (data.value <= 
static_cast<uint64_t>(std::numeric_limits<uint8_t>::max())) {
+      const auto val = static_cast<uint8_t>(data.value);
+      TestLEB128Encode(val, data.bytes, data.bytes.size());
+      TestLEB128Decode(data.bytes, val, 
static_cast<int32_t>(data.bytes.size()));
+    }
+
+    // 16 bits
+    if (data.value <= 
static_cast<uint64_t>(std::numeric_limits<int16_t>::max())) {
+      const auto val = static_cast<int16_t>(data.value);
+      TestLEB128Encode(val, data.bytes, data.bytes.size());
+      TestLEB128Decode(data.bytes, val, 
static_cast<int32_t>(data.bytes.size()));
+    }
+    if (data.value <= 
static_cast<uint64_t>(std::numeric_limits<uint16_t>::max())) {
+      const auto val = static_cast<uint16_t>(data.value);
+      TestLEB128Encode(val, data.bytes, data.bytes.size());
+      TestLEB128Decode(data.bytes, val, 
static_cast<int32_t>(data.bytes.size()));
+    }
+
+    // 32 bits
+    if (data.value <= 
static_cast<uint64_t>(std::numeric_limits<int32_t>::max())) {
+      const auto val = static_cast<int32_t>(data.value);
+      TestLEB128Encode(val, data.bytes, data.bytes.size());
+      TestLEB128Decode(data.bytes, val, 
static_cast<int32_t>(data.bytes.size()));
+    }
+    if (data.value <= 
static_cast<uint64_t>(std::numeric_limits<uint32_t>::max())) {
+      const auto val = static_cast<uint32_t>(data.value);
+      TestLEB128Encode(val, data.bytes, data.bytes.size());
+      TestLEB128Decode(data.bytes, val, 
static_cast<int32_t>(data.bytes.size()));
+    }
+
+    // 64 bits
+    if (data.value <= 
static_cast<uint64_t>(std::numeric_limits<int64_t>::max())) {
+      const auto val = static_cast<int64_t>(data.value);
+      TestLEB128Encode(val, data.bytes, data.bytes.size());
+      TestLEB128Decode(data.bytes, val, 
static_cast<int32_t>(data.bytes.size()));
+    }
+    if (data.value <= 
static_cast<uint64_t>(std::numeric_limits<uint64_t>::max())) {
+      const auto val = static_cast<uint64_t>(data.value);
+      TestLEB128Encode(val, data.bytes, data.bytes.size());
+      TestLEB128Decode(data.bytes, val, 
static_cast<int32_t>(data.bytes.size()));
+    }
+  }
+}
+
 static void TestZigZag(int32_t v, std::array<uint8_t, 5> buffer_expect) {
-  uint8_t buffer[bit_util::BitReader::kMaxVlqByteLength] = {};
+  uint8_t buffer[bit_util::kMaxLEB128ByteLenFor<int32_t>] = {};
   bit_util::BitWriter writer(buffer, sizeof(buffer));
-  bit_util::BitReader reader(buffer, sizeof(buffer));
   writer.PutZigZagVlqInt(v);
+  // WARNING: The reader reads and caches the input when created, so it must 
be created
+  // after the data is written in the buffer.
+  bit_util::BitReader reader(buffer, sizeof(buffer));
   EXPECT_THAT(buffer, testing::ElementsAreArray(buffer_expect));
   int32_t result;
   EXPECT_TRUE(reader.GetZigZagVlqInt(&result));
@@ -2020,10 +2198,12 @@ TEST(BitStreamUtil, ZigZag) {
 }
 
 static void TestZigZag64(int64_t v, std::array<uint8_t, 10> buffer_expect) {
-  uint8_t buffer[bit_util::BitReader::kMaxVlqByteLengthForInt64] = {};
+  uint8_t buffer[bit_util::kMaxLEB128ByteLenFor<int64_t>] = {};
   bit_util::BitWriter writer(buffer, sizeof(buffer));
-  bit_util::BitReader reader(buffer, sizeof(buffer));
   writer.PutZigZagVlqInt(v);
+  // WARNING: The reader reads and caches the input when created, so it must 
be created
+  // after the data is written in the buffer.
+  bit_util::BitReader reader(buffer, sizeof(buffer));
   EXPECT_THAT(buffer, testing::ElementsAreArray(buffer_expect));
   int64_t result = 0;
   EXPECT_TRUE(reader.GetZigZagVlqInt(&result));
diff --git a/cpp/src/arrow/util/bitmap_reader.h 
b/cpp/src/arrow/util/bitmap_reader.h
index 5526c87dbc..d95fd921f4 100644
--- a/cpp/src/arrow/util/bitmap_reader.h
+++ b/cpp/src/arrow/util/bitmap_reader.h
@@ -31,6 +31,8 @@ namespace internal {
 
 class BitmapReader {
  public:
+  BitmapReader() = default;
+
   BitmapReader(const uint8_t* bitmap, int64_t start_offset, int64_t length)
       : bitmap_(bitmap), position_(0), length_(length) {
     current_byte_ = 0;
diff --git a/cpp/src/arrow/util/rle_encoding_internal.h 
b/cpp/src/arrow/util/rle_encoding_internal.h
index fab8c50512..c231c9a63e 100644
--- a/cpp/src/arrow/util/rle_encoding_internal.h
+++ b/cpp/src/arrow/util/rle_encoding_internal.h
@@ -21,18 +21,17 @@
 #pragma once
 
 #include <algorithm>
-#include <cmath>
+#include <array>
 #include <limits>
-#include <vector>
+#include <type_traits>
+#include <variant>
 
-#include "arrow/util/bit_block_counter.h"
 #include "arrow/util/bit_run_reader.h"
 #include "arrow/util/bit_stream_utils_internal.h"
 #include "arrow/util/bit_util.h"
 #include "arrow/util/macros.h"
 
-namespace arrow {
-namespace util {
+namespace arrow::util {
 
 /// Utility classes to do run length encoding (RLE) for fixed bit width 
values.  If runs
 /// are sufficiently long, RLE is used, otherwise, the values are just 
bit-packed
@@ -82,86 +81,411 @@ namespace util {
 /// 200 ints = 25 groups of 8
 /// <varint((25 << 1) | 1)> <25 bytes of values, bitpacked>
 /// (total 26 bytes, 1 byte overhead)
-//
 
-/// Decoder class for RLE encoded data.
-class RleDecoder {
+/// The type for an encoded Rle of BitPacked run size, between 1 and 2^31-1 as 
per Parquet
+/// spec.
+/// This is also pragmatically used for other integer used in the Rle and 
BitPacked runs
+/// and decoder to avoid conversions.
+/// It can therefore be referred to as a "typical" size for Rle and BitPacked 
logic.
+using rle_size_t = int32_t;
+
+template <typename T>
+class RleRunDecoder;
+
+/// A Single Run Length Encoded run.
+///
+/// Consist of a single value repeated multiple times.
+/// A previous version of this class also stored the value bit width to be 
self contain,
+/// removing it and passing it explicitly when needed proved to speed up 
decoding up to
+/// 10 % on some benchmarks.
+class RleRun {
  public:
-  /// Create a decoder object. buffer/buffer_len is the decoded data.
-  /// bit_width is the width of each value (before encoding).
-  RleDecoder(const uint8_t* buffer, int buffer_len, int bit_width)
-      : bit_reader_(buffer, buffer_len),
-        bit_width_(bit_width),
-        current_value_(0),
-        repeat_count_(0),
-        literal_count_(0) {
-    ARROW_DCHECK_GE(bit_width_, 0);
-    ARROW_DCHECK_LE(bit_width_, 64);
+  /// The decoder class used to decode a single run in the given type.
+  template <typename T>
+  using DecoderType = RleRunDecoder<T>;
+
+  constexpr RleRun() noexcept = default;
+
+  explicit RleRun(const uint8_t* data, rle_size_t values_count,
+                  rle_size_t value_bit_width) noexcept
+      : values_count_(values_count) {
+    ARROW_DCHECK_GE(value_bit_width, 0);
+    ARROW_DCHECK_GE(values_count, 0);
+    std::copy(data, data + raw_data_size(value_bit_width), data_.begin());
   }
 
-  RleDecoder() : bit_width_(-1) {}
+  /// The number of repeated values in this run.
+  constexpr rle_size_t values_count() const noexcept { return values_count_; }
 
-  void Reset(const uint8_t* buffer, int buffer_len, int bit_width) {
-    ARROW_DCHECK_GE(bit_width, 0);
-    ARROW_DCHECK_LE(bit_width, 64);
-    bit_reader_.Reset(buffer, buffer_len);
-    bit_width_ = bit_width;
-    current_value_ = 0;
-    repeat_count_ = 0;
-    literal_count_ = 0;
+  /// A pointer to the repeated value raw bytes.
+  constexpr const uint8_t* raw_data_ptr() const noexcept { return 
data_.data(); }
+
+  /// The number of bytes used for the raw repeated value.
+  constexpr rle_size_t raw_data_size(rle_size_t value_bit_width) const 
noexcept {
+    auto out = bit_util::BytesForBits(value_bit_width);
+    ARROW_DCHECK_LE(out, std::numeric_limits<rle_size_t>::max());
+    return static_cast<rle_size_t>(out);
   }
 
-  /// Gets the next value.  Returns false if there are no more.
+ private:
+  /// The repeated value raw bytes stored inside the class with enough space 
to store
+  /// up to a 64 bit value.
+  std::array<uint8_t, 8> data_ = {};
+  /// The number of time the value is repeated.
+  rle_size_t values_count_ = 0;
+};
+
+template <typename T>
+class BitPackedRunDecoder;
+
+/// A single bit packed run.
+///
+/// Consist of a view on a buffer of bytes that encode integers on 
``value_bit_width``
+/// bits (that is the numbers are small enough that high order bits are all 
zeros and can
+/// be omitted).
+/// A previous version of this class also stored the value bit width to be 
self contain,
+/// removing it and passing it explicitly when needed proved to speed up 
decoding up to
+/// 10 % on some benchmarks.
+class BitPackedRun {
+ public:
+  /// The decoder class used to decode a single run in the given type.
+  template <typename T>
+  using DecoderType = BitPackedRunDecoder<T>;
+
+  constexpr BitPackedRun() noexcept = default;
+
+  constexpr BitPackedRun(const uint8_t* data, rle_size_t values_count,
+                         rle_size_t value_bit_width) noexcept
+      : data_(data), values_count_(values_count) {
+    ARROW_CHECK_GE(value_bit_width, 0);
+    ARROW_CHECK_GE(values_count_, 0);
+  }
+
+  constexpr rle_size_t values_count() const noexcept { return values_count_; }
+
+  constexpr const uint8_t* raw_data_ptr() const noexcept { return data_; }
+
+  constexpr rle_size_t raw_data_size(rle_size_t value_bit_width) const 
noexcept {
+    auto out = bit_util::BytesForBits(static_cast<int64_t>(value_bit_width) *
+                                      static_cast<int64_t>(values_count_));
+    ARROW_CHECK_LE(out, std::numeric_limits<rle_size_t>::max());
+    return static_cast<rle_size_t>(out);
+  }
+
+ private:
+  /// The pointer to the beginning of the run
+  const uint8_t* data_ = nullptr;
+  /// Number of values in this run.
+  rle_size_t values_count_ = 0;
+};
+
+/// A parser that emits either a ``BitPackedRun`` or a ``RleRun``.
+class RleBitPackedParser {
+ public:
+  /// The different types of runs emitted by the parser
+  using dynamic_run_type = std::variant<RleRun, BitPackedRun>;
+
+  constexpr RleBitPackedParser() noexcept = default;
+
+  constexpr RleBitPackedParser(const uint8_t* data, rle_size_t data_size,
+                               rle_size_t value_bit_width) noexcept
+      : data_(data), data_size_(data_size), value_bit_width_(value_bit_width) 
{}
+
+  constexpr void Reset(const uint8_t* data, rle_size_t data_size,
+                       rle_size_t value_bit_width) noexcept {
+    *this = {data, data_size, value_bit_width};
+  }
+
+  /// Whether there is still runs to iterate over.
+  ///
+  /// WARN: Due to simplistic error handling, iteration with Next and Peek 
could
+  /// fail to return data while the parser is not exhausted.
+  /// This is how one can check for errors.
+  bool exhausted() const { return data_size_ == 0; }
+
+  /// Enum to return from an ``Parse`` handler.
+  ///
+  /// Since a callback has no way to know when to stop, the handler must return
+  /// a value indicating to the ``Parse`` function whether to stop or continue.
+  enum class ControlFlow {
+    Continue,
+    Break,
+  };
+
+  /// A callback approach to parsing.
+  ///
+  /// This approach is used to reduce the number of dynamic lookups involved 
with using a
+  /// variant.
+  ///
+  /// The handler must be of the form
+  /// ```cpp
+  /// struct Handler {
+  ///   ControlFlow OnBitPackedRun(BitPackedRun run);
+  ///
+  ///   ControlFlow OnRleRun(RleRun run);
+  /// };
+  /// ```
+  template <typename Handler>
+  void Parse(Handler&& handler);
+
+ private:
+  /// The pointer to the beginning of the run
+  const uint8_t* data_ = nullptr;
+  /// Size in bytes of the run.
+  rle_size_t data_size_ = 0;
+  /// The size in bit of a packed value in the run
+  rle_size_t value_bit_width_ = 0;
+
+  /// Run the handler on the run read and return the number of values read.
+  /// Does not advance the parser.
+  template <typename Handler>
+  std::pair<rle_size_t, ControlFlow> PeekImpl(Handler&&) const;
+};
+
+/// Decoder class for a single run of RLE encoded data.
+template <typename T>
+class RleRunDecoder {
+ public:
+  /// The type in which the data should be decoded.
+  using value_type = T;
+  /// The type of run that can be decoded.
+  using RunType = RleRun;
+
+  constexpr RleRunDecoder() noexcept = default;
+
+  explicit RleRunDecoder(const RunType& run, rle_size_t value_bit_width) 
noexcept {
+    Reset(run, value_bit_width);
+  }
+
+  void Reset(const RunType& run, rle_size_t value_bit_width) noexcept {
+    remaining_count_ = run.values_count();
+    if constexpr (std::is_same_v<value_type, bool>) {
+      // ARROW-18031:  just check the LSB of the next byte and move on.
+      // If we memcpy + FromLittleEndian, we have potential undefined behavior
+      // if the bool value isn't 0 or 1.
+      value_ = *run.raw_data_ptr() & 1;
+    } else {
+      // Memcopy is required to avoid undefined behavior.
+      value_ = {};
+      std::memcpy(&value_, run.raw_data_ptr(), 
run.raw_data_size(value_bit_width));
+      value_ = ::arrow::bit_util::FromLittleEndian(value_);
+    }
+  }
+
+  /// Return the number of values that can be advanced.
+  rle_size_t remaining() const { return remaining_count_; }
+
+  /// Return the repeated value of this decoder.
+  constexpr value_type value() const { return value_; }
+
+  /// Try to advance by as many values as provided.
+  /// Return the number of values skipped.
+  /// May advance by less than asked for if there are not enough values left.
+  [[nodiscard]] rle_size_t Advance(rle_size_t batch_size, rle_size_t 
value_bit_width) {
+    const auto steps = std::min(batch_size, remaining_count_);
+    remaining_count_ -= steps;
+    return steps;
+  }
+
+  /// Get the next value and return false if there are no more.
+  [[nodiscard]] constexpr bool Get(value_type* out_value, rle_size_t 
value_bit_width) {
+    return GetBatch(out_value, 1, value_bit_width) == 1;
+  }
+
+  /// Get a batch of values return the number of decoded elements.
+  /// May write fewer elements to the output than requested if there are not 
enough values
+  /// left.
+  [[nodiscard]] rle_size_t GetBatch(value_type* out, rle_size_t batch_size,
+                                    rle_size_t value_bit_width) {
+    if (ARROW_PREDICT_FALSE(remaining_count_ == 0)) {
+      return 0;
+    }
+
+    const auto to_read = std::min(remaining_count_, batch_size);
+    std::fill(out, out + to_read, value_);
+    remaining_count_ -= to_read;
+    return to_read;
+  }
+
+ private:
+  value_type value_ = {};
+  rle_size_t remaining_count_ = 0;
+
+  static_assert(std::is_integral_v<value_type>,
+                "This class is meant to decode positive integers");
+};
+
+/// Decoder class for single run of bit-packed encoded data.
+template <typename T>
+class BitPackedRunDecoder {
+ public:
+  /// The type in which the data should be decoded.
+  using value_type = T;
+  /// The type of run that can be decoded.
+  using RunType = BitPackedRun;
+
+  BitPackedRunDecoder() noexcept = default;
+
+  explicit BitPackedRunDecoder(const RunType& run, rle_size_t value_bit_width) 
noexcept {
+    Reset(run, value_bit_width);
+  }
+
+  void Reset(const RunType& run, rle_size_t value_bit_width) noexcept {
+    remaining_count_ = run.values_count();
+    ARROW_DCHECK_GE(value_bit_width, 0);
+    ARROW_DCHECK_LE(value_bit_width, 64);
+    bit_reader_.Reset(run.raw_data_ptr(), run.raw_data_size(value_bit_width));
+  }
+
+  /// Return the number of values that can be advanced.
+  constexpr rle_size_t remaining() const { return remaining_count_; }
+
+  /// Try to advance by as many values as provided.
+  /// Return the number of values skipped or 0 if it fail to advance.
+  /// May advance by less than asked for if there are not enough values left.
+  [[nodiscard]] rle_size_t Advance(rle_size_t batch_size, rle_size_t 
value_bit_width) {
+    const auto steps = std::min(batch_size, remaining_count_);
+    if (bit_reader_.Advance(steps * value_bit_width)) {
+      remaining_count_ -= steps;
+      return steps;
+    }
+    return 0;
+  }
+
+  /// Get the next value and return false if there are no more or an error 
occurred.
+  [[nodiscard]] constexpr bool Get(value_type* out_value, rle_size_t 
value_bit_width) {
+    return GetBatch(out_value, 1, value_bit_width) == 1;
+  }
+
+  /// Get a batch of values return the number of decoded elements.
+  /// May write fewer elements to the output than requested if there are not 
enough values
+  /// left or if an error occurred.
+  [[nodiscard]] rle_size_t GetBatch(value_type* out, rle_size_t batch_size,
+                                    rle_size_t value_bit_width) {
+    if (ARROW_PREDICT_FALSE(remaining_count_ == 0)) {
+      return 0;
+    }
+
+    const auto to_read = std::min(remaining_count_, batch_size);
+    const auto actual_read = bit_reader_.GetBatch(value_bit_width, out, 
to_read);
+    // There should not be any reason why the actual read would be different
+    // but this is error resistant.
+    remaining_count_ -= actual_read;
+    return actual_read;
+  }
+
+ private:
+  ::arrow::bit_util::BitReader bit_reader_ = {};
+  rle_size_t remaining_count_ = 0;
+
+  static_assert(std::is_integral_v<value_type>,
+                "This class is meant to decode positive integers");
+};
+
+/// Decoder class for Parquet RLE bit-packed data.
+template <typename T>
+class RleBitPackedDecoder {
+ public:
+  /// The type in which the data should be decoded.
+  using value_type = T;
+  using DynamicRun = RleBitPackedParser::dynamic_run_type;
+
+  RleBitPackedDecoder() noexcept = default;
+
+  /// Create a decoder object.
+  ///
+  /// data and data_size are the raw bytes to decode.
+  /// value_bit_width is the size in bits of each encoded value.
+  RleBitPackedDecoder(const uint8_t* data, rle_size_t data_size,
+                      rle_size_t value_bit_width) noexcept {
+    Reset(data, data_size, value_bit_width);
+  }
+
+  void Reset(const uint8_t* data, rle_size_t data_size,
+             rle_size_t value_bit_width) noexcept {
+    ARROW_DCHECK_GE(value_bit_width, 0);
+    ARROW_DCHECK_LE(value_bit_width, 64);
+    parser_.Reset(data, data_size, value_bit_width);
+    decoder_ = {};
+    value_bit_width_ = value_bit_width;
+  }
+
+  /// Whether there is still runs to iterate over.
+  ///
+  /// WARN: Due to lack of proper error handling, iteration with Get methods 
could return
+  /// no data while the parser is not exhausted.
+  /// This is how one can check for errors.
+  bool exhausted() const { return (run_remaining() == 0) && 
parser_.exhausted(); }
+
+  /// Gets the next value or returns false if there are no more or an error 
occurred.
   ///
   /// NB: Because the encoding only supports literal runs with lengths
   /// that are multiples of 8, RleEncoder sometimes pads the end of its
   /// input with zeros. Since the encoding does not differentiate between
   /// input values and padding, Get() returns true even for these padding
   /// values.
-  template <typename T>
-  bool Get(T* val);
+  [[nodiscard]] bool Get(value_type* val);
 
-  /// Gets a batch of values.  Returns the number of decoded elements.
-  template <typename T>
-  int GetBatch(T* values, int batch_size);
+  /// Get a batch of values return the number of decoded elements.
+  /// May write fewer elements to the output than requested if there are not 
enough values
+  /// left or if an error occurred.
+  [[nodiscard]] rle_size_t GetBatch(value_type* out, rle_size_t batch_size);
 
-  /// Like GetBatch but add spacing for null entries
-  template <typename T>
-  int GetBatchSpaced(int batch_size, int null_count, const uint8_t* valid_bits,
-                     int64_t valid_bits_offset, T* out);
+  /// Like GetBatch but add spacing for null entries.
+  ///
+  /// Null entries will be set to an arbistrary value to avoid leaking private 
data.
+  /// May write fewer elements to the output than requested if there are not 
enough values
+  /// left or if an error occurred.
+  [[nodiscard]] rle_size_t GetBatchSpaced(rle_size_t batch_size, rle_size_t 
null_count,
+                                          const uint8_t* valid_bits,
+                                          int64_t valid_bits_offset, 
value_type* out);
 
   /// Like GetBatch but the values are then decoded using the provided 
dictionary
-  template <typename T>
-  int GetBatchWithDict(const T* dictionary, int32_t dictionary_length, T* 
values,
-                       int batch_size);
+  ///
+  /// May write fewer elements to the output than requested if there are not 
enough values
+  /// left or if an error occurred.
+  template <typename V>
+  [[nodiscard]] rle_size_t GetBatchWithDict(const V* dictionary,
+                                            int32_t dictionary_length, V* out,
+                                            rle_size_t batch_size);
 
   /// Like GetBatchWithDict but add spacing for null entries
   ///
-  /// Null entries will be zero-initialized in `values` to avoid leaking
-  /// private data.
-  template <typename T>
-  int GetBatchWithDictSpaced(const T* dictionary, int32_t dictionary_length, 
T* values,
-                             int batch_size, int null_count, const uint8_t* 
valid_bits,
-                             int64_t valid_bits_offset);
-
- protected:
-  ::arrow::bit_util::BitReader bit_reader_;
-  /// Number of bits needed to encode the value. Must be between 0 and 64.
-  int bit_width_;
-  uint64_t current_value_;
-  int32_t repeat_count_;
-  int32_t literal_count_;
+  /// Null entries will be set to an arbistrary value to avoid leaking private 
data.
+  /// May write fewer elements to the output than requested if there are not 
enough values
+  /// left or if an error occurred.
+  template <typename V>
+  [[nodiscard]] rle_size_t GetBatchWithDictSpaced(
+      const V* dictionary, int32_t dictionary_length, V* out, rle_size_t 
batch_size,
+      rle_size_t null_count, const uint8_t* valid_bits, int64_t 
valid_bits_offset);
 
  private:
-  /// Fills literal_count_ and repeat_count_ with next values. Returns false 
if there
-  /// are no more.
-  template <typename T>
-  bool NextCounts();
+  RleBitPackedParser parser_ = {};
+  std::variant<RleRunDecoder<value_type>, BitPackedRunDecoder<value_type>> 
decoder_ = {};
+  rle_size_t value_bit_width_;
+
+  /// Return the number of values that are remaining in the current run.
+  rle_size_t run_remaining() const {
+    return std::visit([](const auto& dec) { return dec.remaining(); }, 
decoder_);
+  }
+
+  /// Get a batch of values from the current run and return the number 
elements read.
+  [[nodiscard]] rle_size_t RunGetBatch(value_type* out, rle_size_t batch_size) 
{
+    return std::visit(
+        [&](auto& dec) { return dec.GetBatch(out, batch_size, 
value_bit_width_); },
+        decoder_);
+  }
+
+  /// Call the parser with a single callable for all event types.
+  template <typename Callable>
+  void ParseWithCallable(Callable&& func);
 
   /// Utility methods for retrieving spaced values.
-  template <typename T, typename RunType, typename Converter>
-  int GetSpaced(Converter converter, int batch_size, int null_count,
-                const uint8_t* valid_bits, int64_t valid_bits_offset, T* out);
+  template <typename Converter>
+  [[nodiscard]] rle_size_t GetSpaced(Converter converter,
+                                     typename Converter::out_type* out,
+                                     rle_size_t batch_size, const uint8_t* 
valid_bits,
+                                     int64_t valid_bits_offset, rle_size_t 
null_count);
 };
 
 /// Class to incrementally build the rle data.   This class does not allocate 
any memory.
@@ -170,7 +494,7 @@ class RleDecoder {
 /// This class does so by buffering 8 values at a time.  If they are not all 
the same
 /// they are added to the literal run.  If they are the same, they are added 
to the
 /// repeated run.  When we switch modes, the previous run is flushed out.
-class RleEncoder {
+class RleBitPackedEncoder {
  public:
   /// buffer/buffer_len: preallocated output buffer.
   /// bit_width: max number of bits for value.
@@ -178,7 +502,7 @@ class RleEncoder {
   /// when values should be encoded as repeated runs.  Currently this is 
derived
   /// based on the bit_width, which can determine a storage optimal choice.
   /// TODO: allow 0 bit_width (and have dict encoder use it)
-  RleEncoder(uint8_t* buffer, int buffer_len, int bit_width)
+  RleBitPackedEncoder(uint8_t* buffer, int buffer_len, int bit_width)
       : bit_width_(bit_width), bit_writer_(buffer, buffer_len) {
     ARROW_DCHECK_GE(bit_width_, 0);
     ARROW_DCHECK_LE(bit_width_, 64);
@@ -191,12 +515,12 @@ class RleEncoder {
   /// This is the maximum length of a single run for 'bit_width'.
   /// It is not valid to pass a buffer less than this length.
   static int MinBufferSize(int bit_width) {
-    /// 1 indicator byte and MAX_VALUES_PER_LITERAL_RUN 'bit_width' values.
+    // 1 indicator byte and MAX_VALUES_PER_LITERAL_RUN 'bit_width' values.
     int max_literal_run_size = 1 + 
static_cast<int>(::arrow::bit_util::BytesForBits(
                                        MAX_VALUES_PER_LITERAL_RUN * 
bit_width));
-    /// Up to kMaxVlqByteLength indicator and a single 'bit_width' value.
+    // Up to kMaxVlqByteLength indicator and a single 'bit_width' value.
     int max_repeated_run_size =
-        ::arrow::bit_util::BitReader::kMaxVlqByteLength +
+        bit_util::kMaxLEB128ByteLenFor<int32_t> +
         static_cast<int>(::arrow::bit_util::BytesForBits(bit_width));
     return std::max(max_literal_run_size, max_repeated_run_size);
   }
@@ -299,385 +623,665 @@ class RleEncoder {
   uint8_t* literal_indicator_byte_;
 };
 
+/************************
+ *  RleBitPackedParser  *
+ ************************/
+
+template <typename Handler>
+void RleBitPackedParser::Parse(Handler&& handler) {
+  while (!exhausted()) {
+    auto [read, control] = PeekImpl(handler);
+    data_ += read;
+    data_size_ -= read;
+    if (ARROW_PREDICT_FALSE(control == ControlFlow::Break)) {
+      break;
+    }
+  }
+}
+
+namespace internal {
+/// The maximal unsigned size that a variable can fit.
+template <typename T>
+constexpr auto max_size_for_v =
+    static_cast<std::make_unsigned_t<T>>(std::numeric_limits<T>::max());
+
+}  // namespace internal
+
+template <typename Handler>
+auto RleBitPackedParser::PeekImpl(Handler&& handler) const
+    -> std::pair<rle_size_t, ControlFlow> {
+  ARROW_DCHECK(!exhausted());
+
+  constexpr auto kMaxSize = bit_util::kMaxLEB128ByteLenFor<uint32_t>;
+  uint32_t run_len_type = 0;
+  const auto header_bytes = bit_util::ParseLeadingLEB128(data_, kMaxSize, 
&run_len_type);
+
+  if (ARROW_PREDICT_FALSE(header_bytes == 0)) {
+    // Malfomrmed LEB128 data
+    return {0, ControlFlow::Break};
+  }
+
+  const bool is_bit_packed = run_len_type & 1;
+  const uint32_t count = run_len_type >> 1;
+  if (is_bit_packed) {
+    constexpr auto kMaxCount = 
bit_util::CeilDiv(internal::max_size_for_v<rle_size_t>, 8);
+    if (ARROW_PREDICT_FALSE(count == 0 || count > kMaxCount)) {
+      // Illegal number of encoded values
+      return {0, ControlFlow::Break};
+    }
+
+    ARROW_DCHECK_LT(static_cast<uint64_t>(count) * 8,
+                    internal::max_size_for_v<rle_size_t>);
+    const auto values_count = static_cast<rle_size_t>(count * 8);
+    // Count Already divided by 8
+    const auto bytes_read =
+        header_bytes + static_cast<rle_size_t>(count) * value_bit_width_;
+
+    auto control = handler.OnBitPackedRun(
+        BitPackedRun(data_ + header_bytes, values_count, value_bit_width_));
+
+    return {bytes_read, control};
+  }
+
+  if (ARROW_PREDICT_FALSE(count == 0)) {
+    // Illegal number of encoded values
+    return {0, ControlFlow::Break};
+  }
+
+  // Safe because created from right shift
+  const auto values_count = static_cast<rle_size_t>(count);
+  const auto value_bytes = bit_util::BytesForBits(value_bit_width_);
+  ARROW_DCHECK_LT(value_bytes, internal::max_size_for_v<rle_size_t>);
+  const auto bytes_read = header_bytes + static_cast<rle_size_t>(value_bytes);
+
+  auto control =
+      handler.OnRleRun(RleRun(data_ + header_bytes, values_count, 
value_bit_width_));
+
+  return {bytes_read, control};
+}
+
+/*************************
+ *  RleBitPackedDecoder  *
+ *************************/
+
 template <typename T>
-inline bool RleDecoder::Get(T* val) {
+template <typename Callable>
+void RleBitPackedDecoder<T>::ParseWithCallable(Callable&& func) {
+  struct {
+    Callable func;
+    auto OnBitPackedRun(BitPackedRun run) { return func(std::move(run)); }
+    auto OnRleRun(RleRun run) { return func(std::move(run)); }
+  } handler{std::move(func)};
+
+  parser_.Parse(std::move(handler));
+}
+
+template <typename T>
+bool RleBitPackedDecoder<T>::Get(value_type* val) {
   return GetBatch(val, 1) == 1;
 }
 
 template <typename T>
-inline int RleDecoder::GetBatch(T* values, int batch_size) {
-  ARROW_DCHECK_GE(bit_width_, 0);
-  int values_read = 0;
-
-  auto* out = values;
-
-  while (values_read < batch_size) {
-    int remaining = batch_size - values_read;
-
-    if (repeat_count_ > 0) {  // Repeated value case.
-      int repeat_batch = std::min(remaining, repeat_count_);
-      std::fill(out, out + repeat_batch, static_cast<T>(current_value_));
-
-      repeat_count_ -= repeat_batch;
-      values_read += repeat_batch;
-      out += repeat_batch;
-    } else if (literal_count_ > 0) {
-      int literal_batch = std::min(remaining, literal_count_);
-      int actual_read = bit_reader_.GetBatch(bit_width_, out, literal_batch);
-      if (actual_read != literal_batch) {
-        return values_read;
-      }
+auto RleBitPackedDecoder<T>::GetBatch(value_type* out, rle_size_t batch_size)
+    -> rle_size_t {
+  using ControlFlow = RleBitPackedParser::ControlFlow;
 
-      literal_count_ -= literal_batch;
-      values_read += literal_batch;
-      out += literal_batch;
-    } else {
-      if (!NextCounts<T>()) return values_read;
+  rle_size_t values_read = 0;
+
+  // Remaining from a previous call that would have left some unread data from 
a run.
+  if (ARROW_PREDICT_FALSE(run_remaining() > 0)) {
+    const auto read = RunGetBatch(out, batch_size);
+    values_read += read;
+    out += read;
+
+    // Either we fulfilled all the batch to be read or we finished remaining 
run.
+    if (ARROW_PREDICT_FALSE(values_read == batch_size)) {
+      return values_read;
     }
+    ARROW_DCHECK(run_remaining() == 0);
   }
 
+  ParseWithCallable([&](auto run) {
+    using RunDecoder = typename decltype(run)::template 
DecoderType<value_type>;
+
+    ARROW_DCHECK_LT(values_read, batch_size);
+    RunDecoder decoder(run, value_bit_width_);
+    const auto read = decoder.GetBatch(out, batch_size - values_read, 
value_bit_width_);
+    ARROW_DCHECK_LE(read, batch_size - values_read);
+    values_read += read;
+    out += read;
+
+    // Stop reading and store remaining decoder
+    if (ARROW_PREDICT_FALSE(values_read == batch_size || read == 0)) {
+      decoder_ = std::move(decoder);
+      return ControlFlow::Break;
+    }
+
+    return ControlFlow::Continue;
+  });
+
   return values_read;
 }
 
-template <typename T, typename RunType, typename Converter>
-inline int RleDecoder::GetSpaced(Converter converter, int batch_size, int 
null_count,
-                                 const uint8_t* valid_bits, int64_t 
valid_bits_offset,
-                                 T* out) {
-  if (ARROW_PREDICT_FALSE(null_count == batch_size)) {
-    converter.FillZero(out, out + batch_size);
-    return batch_size;
-  }
-
-  ARROW_DCHECK_GE(bit_width_, 0);
-  int values_read = 0;
-  int values_remaining = batch_size - null_count;
-
-  // Assume no bits to start.
-  arrow::internal::BitRunReader bit_reader(valid_bits, valid_bits_offset,
-                                           /*length=*/batch_size);
-  arrow::internal::BitRun valid_run = bit_reader.NextRun();
-  while (values_read < batch_size) {
-    if (ARROW_PREDICT_FALSE(valid_run.length == 0)) {
-      valid_run = bit_reader.NextRun();
+namespace internal {
+
+/// Utility class to safely handle values and null count without too 
error-prone
+/// verbosity.
+class BatchCounter {
+ public:
+  using size_type = rle_size_t;
+
+  static constexpr BatchCounter FromBatchSizeAndNulls(size_type batch_size,
+                                                      size_type null_count) {
+    ARROW_DCHECK_LE(null_count, batch_size);
+    return {batch_size - null_count, null_count};
+  }
+
+  constexpr BatchCounter(size_type values_count, size_type null_count) noexcept
+      : values_count_(values_count), null_count_(null_count) {}
+
+  constexpr size_type values_count() const noexcept { return values_count_; }
+
+  constexpr size_type values_read() const noexcept { return values_read_; }
+
+  constexpr size_type values_remaining() const noexcept {
+    ARROW_DCHECK_LE(values_read_, values_count_);
+    return values_count_ - values_read_;
+  }
+
+  constexpr void AccrueReadValues(size_type to_read) noexcept {
+    ARROW_DCHECK_LE(to_read, values_remaining());
+    values_read_ += to_read;
+  }
+
+  constexpr size_type null_count() const noexcept { return null_count_; }
+
+  constexpr size_type null_read() const noexcept { return null_read_; }
+
+  constexpr size_type null_remaining() const noexcept {
+    ARROW_DCHECK_LE(null_read_, null_count_);
+    return null_count_ - null_read_;
+  }
+
+  constexpr void AccrueReadNulls(size_type to_read) noexcept {
+    ARROW_DCHECK_LE(to_read, null_remaining());
+    null_read_ += to_read;
+  }
+
+  constexpr size_type total_remaining() const noexcept {
+    return values_remaining() + null_remaining();
+  }
+
+  constexpr size_type total_read() const noexcept { return values_read_ + 
null_read_; }
+
+  constexpr bool is_fully_null() const noexcept { return values_remaining() == 
0; }
+
+  constexpr bool is_done() const noexcept { return total_remaining() == 0; }
+
+ private:
+  size_type values_count_ = 0;
+  size_type values_read_ = 0;
+  size_type null_count_ = 0;
+  size_type null_read_ = 0;
+};
+
+template <typename Int>
+struct GetSpacedResult {
+  Int values_read;
+  Int null_read;
+};
+
+/// Overload for GetSpaced for a single run in a RleDecoder
+template <typename Converter, typename BitRunReader, typename BitRun, typename 
value_type>
+auto RunGetSpaced(Converter* converter, typename Converter::out_type* out,
+                  rle_size_t batch_size, rle_size_t null_count,
+                  rle_size_t value_bit_width, BitRunReader* validity_reader,
+                  BitRun* validity_run, RleRunDecoder<value_type>* decoder)
+    -> GetSpacedResult<rle_size_t> {
+  ARROW_DCHECK_GT(batch_size, 0);
+  // The equality case is handled in the main loop in GetSpaced
+  ARROW_DCHECK_LT(null_count, batch_size);
+
+  auto batch = BatchCounter::FromBatchSizeAndNulls(batch_size, null_count);
+
+  const rle_size_t values_available = decoder->remaining();
+  ARROW_DCHECK_GT(values_available, 0);
+  auto values_remaining_run = [&]() {
+    auto out = values_available - batch.values_read();
+    ARROW_DCHECK_GE(out, 0);
+    return out;
+  };
+
+  // Consume as much as possible from the repeated run.
+  // We only need to count the number of nulls and non-nulls because we can 
fill in the
+  // same value for nulls and non-nulls.
+  // This proves to be a big efficiency win.
+  while (values_remaining_run() > 0 && !batch.is_done()) {
+    ARROW_DCHECK_GE(validity_run->length, 0);
+    ARROW_DCHECK_LT(validity_run->length, max_size_for_v<rle_size_t>);
+    ARROW_DCHECK_LE(validity_run->length, batch.total_remaining());
+    const auto& validity_run_size = 
static_cast<rle_size_t>(validity_run->length);
+
+    if (validity_run->set) {
+      // We may end the current RLE run in the middle of the validity run
+      auto update_size = std::min(validity_run_size, values_remaining_run());
+      batch.AccrueReadValues(update_size);
+      validity_run->length -= update_size;
+    } else {
+      // We can consume all nulls here because it does not matter if we 
consume on this
+      // RLE run, or an a next encoded run. The value filled does not matter.
+      auto update_size = std::min(validity_run_size, batch.null_remaining());
+      batch.AccrueReadNulls(update_size);
+      validity_run->length -= update_size;
     }
 
-    ARROW_DCHECK_GT(batch_size, 0);
-    ARROW_DCHECK_GT(valid_run.length, 0);
+    if (ARROW_PREDICT_TRUE(validity_run->length == 0)) {
+      *validity_run = validity_reader->NextRun();
+    }
+  }
+
+  const value_type value = decoder->value();
+  if (ARROW_PREDICT_FALSE(!converter->InputIsValid(value))) {
+    return {0, 0};
+  }
+  converter->WriteRepeated(out, out + batch.total_read(), value);
+  const auto actual_values_read = decoder->Advance(batch.values_read(), 
value_bit_width);
+  // We always cropped the number of values_read by the remaining values in 
the run.
+  // What's more the RLE decoder should not encounter any errors.
+  ARROW_DCHECK_EQ(actual_values_read, batch.values_read());
+
+  return {/* .values_read= */ batch.values_read(), /* .null_read= */ 
batch.null_read()};
+}
+
+template <typename Converter, typename BitRunReader, typename BitRun, typename 
value_type>
+auto RunGetSpaced(Converter* converter, typename Converter::out_type* out,
+                  rle_size_t batch_size, rle_size_t null_count,
+                  rle_size_t value_bit_width, BitRunReader* validity_reader,
+                  BitRun* validity_run, BitPackedRunDecoder<value_type>* 
decoder)
+    -> GetSpacedResult<rle_size_t> {
+  ARROW_DCHECK_GT(batch_size, 0);
+  // The equality case is handled in the main loop in GetSpaced
+  ARROW_DCHECK_LT(null_count, batch_size);
+
+  auto batch = BatchCounter::FromBatchSizeAndNulls(batch_size, null_count);
+
+  const rle_size_t values_available = decoder->remaining();
+  ARROW_DCHECK_GT(values_available, 0);
+  auto run_values_remaining = [&]() {
+    auto out = values_available - batch.values_read();
+    ARROW_DCHECK_GE(out, 0);
+    return out;
+  };
+
+  while (run_values_remaining() > 0 && batch.values_remaining() > 0) {
+    // Pull a batch of values from the bit packed encoded data and store it in 
a local
+    // buffer to benefit from unpacking intrinsics and data locality.
+    // Quick benchmarking on a linux x86-64 cloud instance show that this 
previously
+    // hard-coded value is appropriate.
+    static constexpr rle_size_t kBufferCapacity = 1024;
+    std::array<value_type, kBufferCapacity> buffer = {};
+
+    rle_size_t buffer_start = 0;
+    rle_size_t buffer_end = 0;
+    auto buffer_size = [&]() {
+      auto out = buffer_end - buffer_start;
+      ARROW_DCHECK_GE(out, 0);
+      return out;
+    };
+
+    // buffer_start is 0 at this point so size is end
+    buffer_end = std::min(std::min(run_values_remaining(), 
batch.values_remaining()),
+                          kBufferCapacity);
+    buffer_end = decoder->GetBatch(buffer.data(), buffer_size(), 
value_bit_width);
+    ARROW_DCHECK_LE(buffer_size(), kBufferCapacity);
+
+    if (ARROW_PREDICT_FALSE(!converter->InputIsValid(buffer.data(), 
buffer_size()))) {
+      return {batch.values_read(), batch.null_read()};
+    }
 
-    if (valid_run.set) {
-      if ((repeat_count_ == 0) && (literal_count_ == 0)) {
-        if (!NextCounts<RunType>()) return values_read;
-        ARROW_DCHECK((repeat_count_ > 0) ^ (literal_count_ > 0));
+    // Copy chunks of valid values into the output, while adjusting spacing 
for null
+    // values.
+    while (buffer_size() > 0) {
+      ARROW_DCHECK_GE(validity_run->length, 0);
+      ARROW_DCHECK_LT(validity_run->length, max_size_for_v<rle_size_t>);
+      ARROW_DCHECK_LE(validity_run->length, batch.total_remaining());
+      const auto validity_run_length = 
static_cast<rle_size_t>(validity_run->length);
+
+      // Copy as much as possible from the buffer into the output while not 
exceeding
+      // validity run
+      if (validity_run->set) {
+        const auto update_size = std::min(validity_run_length, buffer_size());
+        converter->WriteRange(out, buffer.data() + buffer_start, update_size);
+        buffer_start += update_size;
+        batch.AccrueReadValues(update_size);
+        out += update_size;
+        validity_run->length -= update_size;
+        // Simply write zeros in the output
+      } else {
+        const auto update_size = std::min(validity_run_length, 
batch.null_remaining());
+        converter->WriteZero(out, out + update_size);
+        batch.AccrueReadNulls(update_size);
+        out += update_size;
+        validity_run->length -= update_size;
       }
 
-      if (repeat_count_ > 0) {
-        int repeat_batch = 0;
-        // Consume the entire repeat counts incrementing repeat_batch to
-        // be the total of nulls + values consumed, we only need to
-        // get the total count because we can fill in the same value for
-        // nulls and non-nulls. This proves to be a big efficiency win.
-        while (repeat_count_ > 0 && (values_read + repeat_batch) < batch_size) 
{
-          ARROW_DCHECK_GT(valid_run.length, 0);
-          if (valid_run.set) {
-            int update_size = std::min(static_cast<int>(valid_run.length), 
repeat_count_);
-            repeat_count_ -= update_size;
-            repeat_batch += update_size;
-            valid_run.length -= update_size;
-            values_remaining -= update_size;
-          } else {
-            // We can consume all nulls here because we would do so on
-            //  the next loop anyways.
-            repeat_batch += static_cast<int>(valid_run.length);
-            valid_run.length = 0;
-          }
-          if (valid_run.length == 0) {
-            valid_run = bit_reader.NextRun();
-          }
-        }
-        RunType current_value = static_cast<RunType>(current_value_);
-        if (ARROW_PREDICT_FALSE(!converter.IsValid(current_value))) {
-          return values_read;
-        }
-        converter.Fill(out, out + repeat_batch, current_value);
-        out += repeat_batch;
-        values_read += repeat_batch;
-      } else if (literal_count_ > 0) {
-        int literal_batch = std::min(values_remaining, literal_count_);
-        ARROW_DCHECK_GT(literal_batch, 0);
-
-        // Decode the literals
-        constexpr int kBufferSize = 1024;
-        RunType indices[kBufferSize];
-        literal_batch = std::min(literal_batch, kBufferSize);
-        int actual_read = bit_reader_.GetBatch(bit_width_, indices, 
literal_batch);
-        if (ARROW_PREDICT_FALSE(actual_read != literal_batch)) {
-          return values_read;
-        }
-        if (!converter.IsValid(indices, /*length=*/actual_read)) {
-          return values_read;
-        }
-        int skipped = 0;
-        int literals_read = 0;
-        while (literals_read < literal_batch) {
-          if (valid_run.set) {
-            int update_size = std::min(literal_batch - literals_read,
-                                       static_cast<int>(valid_run.length));
-            converter.Copy(out, indices + literals_read, update_size);
-            literals_read += update_size;
-            out += update_size;
-            valid_run.length -= update_size;
-          } else {
-            converter.FillZero(out, out + valid_run.length);
-            out += valid_run.length;
-            skipped += static_cast<int>(valid_run.length);
-            valid_run.length = 0;
-          }
-          if (valid_run.length == 0) {
-            valid_run = bit_reader.NextRun();
-          }
-        }
-        literal_count_ -= literal_batch;
-        values_remaining -= literal_batch;
-        values_read += literal_batch + skipped;
+      if (validity_run->length == 0) {
+        *validity_run = validity_reader->NextRun();
       }
-    } else {
-      converter.FillZero(out, out + valid_run.length);
-      out += valid_run.length;
-      values_read += static_cast<int>(valid_run.length);
-      valid_run.length = 0;
     }
+
+    ARROW_DCHECK_EQ(buffer_size(), 0);
   }
-  ARROW_DCHECK_EQ(valid_run.length, 0);
-  ARROW_DCHECK_EQ(values_remaining, 0);
-  return values_read;
+
+  ARROW_DCHECK_EQ(values_available - decoder->remaining(), 
batch.values_read());
+  ARROW_DCHECK_LE(batch.total_read(), batch_size);
+  ARROW_DCHECK_LE(batch.null_read(), batch.null_count());
+
+  return {/* .values_read= */ batch.values_read(), /* .null_read= */ 
batch.null_read()};
 }
 
+/// Overload for GetSpaced for a single run in a decoder variant
+template <typename Converter, typename BitRunReader, typename BitRun, typename 
value_type>
+auto RunGetSpaced(
+    Converter* converter, typename Converter::out_type* out, rle_size_t 
batch_size,
+    rle_size_t null_count, rle_size_t value_bit_width, BitRunReader* 
validity_reader,
+    BitRun* validity_run,
+    std::variant<RleRunDecoder<value_type>, BitPackedRunDecoder<value_type>>* 
decoder)
+    -> GetSpacedResult<rle_size_t> {
+  return std::visit(
+      [&](auto& dec) {
+        ARROW_DCHECK_GT(dec.remaining(), 0);
+        return RunGetSpaced(converter, out, batch_size, null_count, 
value_bit_width,
+                            validity_reader, validity_run, &dec);
+      },
+      *decoder);
+}
+
+}  // namespace internal
+
+template <typename T>
+template <typename Converter>
+auto RleBitPackedDecoder<T>::GetSpaced(Converter converter,
+                                       typename Converter::out_type* out,
+                                       rle_size_t batch_size,
+                                       const uint8_t* validity_bits,
+                                       int64_t validity_bits_offset,
+                                       rle_size_t null_count) -> rle_size_t {
+  using ControlFlow = RleBitPackedParser::ControlFlow;
+
+  ARROW_DCHECK_GT(batch_size, 0);
+
+  auto batch = internal::BatchCounter::FromBatchSizeAndNulls(batch_size, 
null_count);
+
+  if (ARROW_PREDICT_FALSE(batch.is_fully_null())) {
+    converter.WriteZero(out, out + batch.null_remaining());
+    return batch.null_remaining();
+  }
+
+  arrow::internal::BitRunReader validity_reader(validity_bits, 
validity_bits_offset,
+                                                
/*length=*/batch.total_remaining());
+  arrow::internal::BitRun validity_run = validity_reader.NextRun();
+
+  const auto check_and_handle_fully_null_remaining = [&]() {
+    if (batch.is_fully_null()) {
+      ARROW_DCHECK(validity_run.length == 0 || !validity_run.set);
+      ARROW_DCHECK_GE(validity_run.length, batch.null_remaining());
+
+      converter.WriteZero(out, out + batch.null_remaining());
+      out += batch.null_remaining();
+      batch.AccrueReadNulls(batch.null_remaining());
+    }
+  };
+
+  // Remaining from a previous call that would have left some unread data from 
a run.
+  if (ARROW_PREDICT_FALSE(run_remaining() > 0)) {
+    const auto read = internal::RunGetSpaced(&converter, out, 
batch.total_remaining(),
+                                             batch.null_remaining(), 
value_bit_width_,
+                                             &validity_reader, &validity_run, 
&decoder_);
+
+    batch.AccrueReadNulls(read.null_read);
+    batch.AccrueReadValues(read.values_read);
+    out += read.values_read + read.null_read;
+
+    // Either we fulfilled all the batch values to be read
+    if (ARROW_PREDICT_FALSE(batch.values_remaining() == 0)) {
+      // There may be remaining null if they are not greedily filled
+      check_and_handle_fully_null_remaining();
+      return batch.total_read();
+    }
+
+    // We finished the remaining run
+    ARROW_DCHECK(run_remaining() == 0);
+  }
+
+  ParseWithCallable([&](auto run) {
+    using RunDecoder = typename decltype(run)::template 
DecoderType<value_type>;
+
+    RunDecoder decoder(run, value_bit_width_);
+
+    const auto read = internal::RunGetSpaced(&converter, out, 
batch.total_remaining(),
+                                             batch.null_remaining(), 
value_bit_width_,
+                                             &validity_reader, &validity_run, 
&decoder);
+
+    batch.AccrueReadNulls(read.null_read);
+    batch.AccrueReadValues(read.values_read);
+    out += read.values_read + read.null_read;
+
+    // Stop reading and store remaining decoder
+    if (ARROW_PREDICT_FALSE(read.values_read == 0 || batch.values_remaining() 
== 0)) {
+      decoder_ = std::move(decoder);
+      return ControlFlow::Break;
+    }
+
+    return ControlFlow::Continue;
+  });
+
+  // There may be remaining null if they are not greedily filled by either 
decoder calls
+  check_and_handle_fully_null_remaining();
+
+  ARROW_DCHECK(batch.is_done() || exhausted());
+  return batch.total_read();
+}
+
+namespace internal {
+
 // Converter for GetSpaced that handles runs that get returned
 // directly as output.
 template <typename T>
-struct PlainRleConverter {
-  T kZero = {};
-  inline bool IsValid(const T& values) const { return true; }
-  inline bool IsValid(const T* values, int32_t length) const { return true; }
-  inline void Fill(T* begin, T* end, const T& run_value) const {
+struct NoOpConverter {
+  using in_type = T;
+  using out_type = T;
+  using size_type = rle_size_t;
+
+  static constexpr bool InputIsValid(const in_type& values) { return true; }
+
+  static constexpr bool InputIsValid(const in_type* values, size_type length) {
+    return true;
+  }
+
+  static void WriteRepeated(out_type* begin, out_type* end, in_type run_value) 
{
     std::fill(begin, end, run_value);
   }
-  inline void FillZero(T* begin, T* end) { std::fill(begin, end, kZero); }
-  inline void Copy(T* out, const T* values, int length) const {
-    std::memcpy(out, values, length * sizeof(T));
+
+  static void WriteZero(out_type* begin, out_type* end) {
+    std::fill(begin, end, out_type{});
+  }
+
+  static void WriteRange(out_type* out, const in_type* values, size_type 
length) {
+    std::memcpy(out, values, length * sizeof(out_type));
   }
 };
 
+}  // namespace internal
+
 template <typename T>
-inline int RleDecoder::GetBatchSpaced(int batch_size, int null_count,
-                                      const uint8_t* valid_bits,
-                                      int64_t valid_bits_offset, T* out) {
+auto RleBitPackedDecoder<T>::GetBatchSpaced(rle_size_t batch_size, rle_size_t 
null_count,
+                                            const uint8_t* valid_bits,
+                                            int64_t valid_bits_offset, 
value_type* out)
+    -> rle_size_t {
   if (null_count == 0) {
-    return GetBatch<T>(out, batch_size);
+    return GetBatch(out, batch_size);
   }
 
-  PlainRleConverter<T> converter;
-  arrow::internal::BitBlockCounter block_counter(valid_bits, valid_bits_offset,
-                                                 batch_size);
+  internal::NoOpConverter<value_type> converter;
 
-  int total_processed = 0;
-  int processed = 0;
-  arrow::internal::BitBlockCount block;
-
-  do {
-    block = block_counter.NextFourWords();
-    if (block.length == 0) {
-      break;
-    }
-    if (block.AllSet()) {
-      processed = GetBatch<T>(out, block.length);
-    } else if (block.NoneSet()) {
-      converter.FillZero(out, out + block.length);
-      processed = block.length;
-    } else {
-      processed = GetSpaced<T, /*RunType=*/T, PlainRleConverter<T>>(
-          converter, block.length, block.length - block.popcount, valid_bits,
-          valid_bits_offset, out);
-    }
-    total_processed += processed;
-    out += block.length;
-    valid_bits_offset += block.length;
-  } while (processed == block.length);
-  return total_processed;
+  return GetSpaced(converter, out, batch_size, valid_bits, valid_bits_offset, 
null_count);
 }
 
-static inline bool IndexInRange(int32_t idx, int32_t dictionary_length) {
-  return idx >= 0 && idx < dictionary_length;
+namespace internal {
+
+template <typename I>
+bool IndexInRange(I idx, int32_t dictionary_length) {
+  ARROW_DCHECK_GT(dictionary_length, 0);
+  using T = std::common_type_t<decltype(idx), decltype(dictionary_length)>;
+  return idx >= 0 && static_cast<T>(idx) < static_cast<T>(dictionary_length);
 }
 
 // Converter for GetSpaced that handles runs of returned dictionary
 // indices.
-template <typename T>
+template <typename V, typename I>
 struct DictionaryConverter {
-  T kZero = {};
-  const T* dictionary;
-  int32_t dictionary_length;
-
-  inline bool IsValid(int32_t value) { return IndexInRange(value, 
dictionary_length); }
-
-  inline bool IsValid(const int32_t* values, int32_t length) const {
-    using IndexType = int32_t;
-    IndexType min_index = std::numeric_limits<IndexType>::max();
-    IndexType max_index = std::numeric_limits<IndexType>::min();
-    for (int x = 0; x < length; x++) {
-      min_index = std::min(values[x], min_index);
-      max_index = std::max(values[x], max_index);
+  using out_type = V;
+  using in_type = I;
+  using size_type = rle_size_t;
+
+  static constexpr bool kIsIdentity = false;
+
+  const out_type* dictionary;
+  size_type dictionary_length;
+
+  bool InputIsValid(in_type idx) const { return IndexInRange(idx, 
dictionary_length); }
+
+  bool InputIsValid(const in_type* indices, size_type length) const {
+    ARROW_DCHECK(length > 0);
+
+    in_type min_index = std::numeric_limits<in_type>::max();
+    in_type max_index = std::numeric_limits<in_type>::min();
+    for (size_type x = 0; x < length; x++) {
+      min_index = std::min(indices[x], min_index);
+      max_index = std::max(indices[x], max_index);
     }
 
     return IndexInRange(min_index, dictionary_length) &&
            IndexInRange(max_index, dictionary_length);
   }
-  inline void Fill(T* begin, T* end, const int32_t& run_value) const {
+
+  void WriteRepeated(out_type* begin, out_type* end, in_type run_value) const {
     std::fill(begin, end, dictionary[run_value]);
   }
-  inline void FillZero(T* begin, T* end) { std::fill(begin, end, kZero); }
 
-  inline void Copy(T* out, const int32_t* values, int length) const {
-    for (int x = 0; x < length; x++) {
+  static void WriteZero(out_type* begin, out_type* end) {
+    std::fill(begin, end, out_type{});
+  }
+
+  void WriteRange(out_type* out, const in_type* values, size_type length) 
const {
+    for (size_type x = 0; x < length; x++) {
       out[x] = dictionary[values[x]];
     }
   }
 };
 
+/// Dummy imitation of BitRun that is all set.
+struct AllSetBitRun {
+  static constexpr bool set = true;
+  int64_t length = 0;
+};
+
+/// Dummy imitation of BitRunReader that should never be called.
+struct UnreachableBitRunReader {
+  constexpr static AllSetBitRun NextRun() { return {}; }
+};
+
+}  // namespace internal
+
 template <typename T>
-inline int RleDecoder::GetBatchWithDict(const T* dictionary, int32_t 
dictionary_length,
-                                        T* values, int batch_size) {
-  // Per https://github.com/apache/parquet-format/blob/master/Encodings.md,
-  // the maximum dictionary index width in Parquet is 32 bits.
-  using IndexType = int32_t;
-  DictionaryConverter<T> converter;
-  converter.dictionary = dictionary;
-  converter.dictionary_length = dictionary_length;
-
-  ARROW_DCHECK_GE(bit_width_, 0);
-  int values_read = 0;
-
-  auto* out = values;
-
-  while (values_read < batch_size) {
-    int remaining = batch_size - values_read;
-
-    if (repeat_count_ > 0) {
-      auto idx = static_cast<IndexType>(current_value_);
-      if (ARROW_PREDICT_FALSE(!IndexInRange(idx, dictionary_length))) {
-        return values_read;
-      }
-      T val = dictionary[idx];
+template <typename V>
+auto RleBitPackedDecoder<T>::GetBatchWithDict(const V* dictionary,
+                                              int32_t dictionary_length, V* 
out,
+                                              rle_size_t batch_size) -> 
rle_size_t {
+  using ControlFlow = RleBitPackedParser::ControlFlow;
+
+  if (ARROW_PREDICT_FALSE(batch_size <= 0)) {
+    return 0;
+  }
 
-      int repeat_batch = std::min(remaining, repeat_count_);
-      std::fill(out, out + repeat_batch, val);
+  internal::DictionaryConverter<V, value_type> converter{dictionary, 
dictionary_length};
 
-      /* Upkeep counters */
-      repeat_count_ -= repeat_batch;
-      values_read += repeat_batch;
-      out += repeat_batch;
-    } else if (literal_count_ > 0) {
-      constexpr int kBufferSize = 1024;
-      IndexType indices[kBufferSize];
+  // Make lightweight BitRun class to reuse previous methods.
+  constexpr internal::UnreachableBitRunReader validity_reader{};
+  internal::AllSetBitRun validity_run = {batch_size};
 
-      int literal_batch = std::min(remaining, literal_count_);
-      literal_batch = std::min(literal_batch, kBufferSize);
+  rle_size_t values_read = 0;
+  auto batch_values_remaining = [&]() {
+    ARROW_DCHECK_LE(values_read, batch_size);
+    return batch_size - values_read;
+  };
 
-      int actual_read = bit_reader_.GetBatch(bit_width_, indices, 
literal_batch);
-      if (ARROW_PREDICT_FALSE(actual_read != literal_batch)) {
-        return values_read;
-      }
-      if (ARROW_PREDICT_FALSE(!converter.IsValid(indices, 
/*length=*/literal_batch))) {
-        return values_read;
-      }
-      converter.Copy(out, indices, literal_batch);
+  if (ARROW_PREDICT_FALSE(run_remaining() > 0)) {
+    const auto read = internal::RunGetSpaced(&converter, out, batch_size,
+                                             /* null_count= */ 0, 
value_bit_width_,
+                                             &validity_reader, &validity_run, 
&decoder_);
 
-      /* Upkeep counters */
-      literal_count_ -= literal_batch;
-      values_read += literal_batch;
-      out += literal_batch;
-    } else {
-      if (!NextCounts<IndexType>()) return values_read;
+    ARROW_DCHECK_EQ(read.null_read, 0);
+    values_read += read.values_read;
+    out += read.values_read;
+
+    // Either we fulfilled all the batch values to be read
+    if (ARROW_PREDICT_FALSE(values_read >= batch_size)) {
+      // There may be remaining null if they are not greedily filled
+      return values_read;
     }
+
+    // We finished the remaining run
+    ARROW_DCHECK(run_remaining() == 0);
   }
 
-  return values_read;
-}
+  ParseWithCallable([&](auto run) {
+    using RunDecoder = typename decltype(run)::template 
DecoderType<value_type>;
 
-template <typename T>
-inline int RleDecoder::GetBatchWithDictSpaced(const T* dictionary,
-                                              int32_t dictionary_length, T* 
out,
-                                              int batch_size, int null_count,
-                                              const uint8_t* valid_bits,
-                                              int64_t valid_bits_offset) {
-  if (null_count == 0) {
-    return GetBatchWithDict<T>(dictionary, dictionary_length, out, batch_size);
-  }
-  arrow::internal::BitBlockCounter block_counter(valid_bits, valid_bits_offset,
-                                                 batch_size);
-  using IndexType = int32_t;
-  DictionaryConverter<T> converter;
-  converter.dictionary = dictionary;
-  converter.dictionary_length = dictionary_length;
-
-  int total_processed = 0;
-  int processed = 0;
-  arrow::internal::BitBlockCount block;
-  do {
-    block = block_counter.NextFourWords();
-    if (block.length == 0) {
-      break;
-    }
-    if (block.AllSet()) {
-      processed = GetBatchWithDict<T>(dictionary, dictionary_length, out, 
block.length);
-    } else if (block.NoneSet()) {
-      converter.FillZero(out, out + block.length);
-      processed = block.length;
-    } else {
-      processed = GetSpaced<T, /*RunType=*/IndexType, DictionaryConverter<T>>(
-          converter, block.length, block.length - block.popcount, valid_bits,
-          valid_bits_offset, out);
+    RunDecoder decoder(run, value_bit_width_);
+
+    const auto read = internal::RunGetSpaced(&converter, out, 
batch_values_remaining(),
+                                             /* null_count= */ 0, 
value_bit_width_,
+                                             &validity_reader, &validity_run, 
&decoder);
+
+    ARROW_DCHECK_EQ(read.null_read, 0);
+    values_read += read.values_read;
+    out += read.values_read;
+
+    // Stop reading and store remaining decoder
+    if (ARROW_PREDICT_FALSE(read.values_read == 0 || values_read == 
batch_size)) {
+      decoder_ = std::move(decoder);
+      return ControlFlow::Break;
     }
-    total_processed += processed;
-    out += block.length;
-    valid_bits_offset += block.length;
-  } while (processed == block.length);
-  return total_processed;
+
+    return ControlFlow::Continue;
+  });
+
+  return values_read;
 }
 
 template <typename T>
-bool RleDecoder::NextCounts() {
-  // Read the next run's indicator int, it could be a literal or repeated run.
-  // The int is encoded as a vlq-encoded value.
-  uint32_t indicator_value = 0;
-  if (!bit_reader_.GetVlqInt(&indicator_value)) return false;
-
-  // lsb indicates if it is a literal run or repeated run
-  bool is_literal = indicator_value & 1;
-  uint32_t count = indicator_value >> 1;
-  if (is_literal) {
-    if (ARROW_PREDICT_FALSE(count == 0 || count > 
static_cast<uint32_t>(INT32_MAX) / 8)) {
-      return false;
-    }
-    literal_count_ = count * 8;
-  } else {
-    if (ARROW_PREDICT_FALSE(count == 0 || count > 
static_cast<uint32_t>(INT32_MAX))) {
-      return false;
-    }
-    repeat_count_ = count;
-    T value = {};
-    if (!bit_reader_.GetAligned<T>(
-            static_cast<int>(::arrow::bit_util::CeilDiv(bit_width_, 8)), 
&value)) {
-      return false;
-    }
-    current_value_ = static_cast<uint64_t>(value);
+template <typename V>
+auto RleBitPackedDecoder<T>::GetBatchWithDictSpaced(
+    const V* dictionary, int32_t dictionary_length, V* out, rle_size_t 
batch_size,
+    rle_size_t null_count, const uint8_t* valid_bits, int64_t 
valid_bits_offset)
+    -> rle_size_t {
+  if (null_count == 0) {
+    return GetBatchWithDict<V>(dictionary, dictionary_length, out, batch_size);
   }
-  return true;
+  internal::DictionaryConverter<V, value_type> converter{dictionary, 
dictionary_length};
+
+  return GetSpaced(converter, out, batch_size, valid_bits, valid_bits_offset, 
null_count);
 }
 
+/*************************
+ *  RleBitPackedEncoder  *
+ *************************/
+
 /// This function buffers input values 8 at a time.  After seeing all 8 values,
 /// it decides whether they should be encoded as a literal or repeated run.
-inline bool RleEncoder::Put(uint64_t value) {
+inline bool RleBitPackedEncoder::Put(uint64_t value) {
   ARROW_DCHECK(bit_width_ == 64 || value < (1ULL << bit_width_));
   if (ARROW_PREDICT_FALSE(buffer_full_)) return false;
 
@@ -708,7 +1312,7 @@ inline bool RleEncoder::Put(uint64_t value) {
   return true;
 }
 
-inline void RleEncoder::FlushLiteralRun(bool update_indicator_byte) {
+inline void RleBitPackedEncoder::FlushLiteralRun(bool update_indicator_byte) {
   if (literal_indicator_byte_ == NULL) {
     // The literal indicator byte has not been reserved yet, get one now.
     literal_indicator_byte_ = bit_writer_.GetNextBytePtr();
@@ -738,7 +1342,7 @@ inline void RleEncoder::FlushLiteralRun(bool 
update_indicator_byte) {
   }
 }
 
-inline void RleEncoder::FlushRepeatedRun() {
+inline void RleBitPackedEncoder::FlushRepeatedRun() {
   ARROW_DCHECK_GT(repeat_count_, 0);
   bool result = true;
   // The lsb of 0 indicates this is a repeated run
@@ -754,7 +1358,7 @@ inline void RleEncoder::FlushRepeatedRun() {
 
 /// Flush the values that have been buffered.  At this point we decide whether
 /// we need to switch between the run types or continue the current one.
-inline void RleEncoder::FlushBufferedValues(bool done) {
+inline void RleBitPackedEncoder::FlushBufferedValues(bool done) {
   if (repeat_count_ >= 8) {
     // Clear the buffered values.  They are part of the repeated run now and we
     // don't want to flush them out as literals.
@@ -784,7 +1388,7 @@ inline void RleEncoder::FlushBufferedValues(bool done) {
   repeat_count_ = 0;
 }
 
-inline int RleEncoder::Flush() {
+inline int RleBitPackedEncoder::Flush() {
   if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) {
     bool all_repeat = literal_count_ == 0 && (repeat_count_ == 
num_buffered_values_ ||
                                               num_buffered_values_ == 0);
@@ -811,14 +1415,14 @@ inline int RleEncoder::Flush() {
   return bit_writer_.bytes_written();
 }
 
-inline void RleEncoder::CheckBufferFull() {
+inline void RleBitPackedEncoder::CheckBufferFull() {
   int bytes_written = bit_writer_.bytes_written();
   if (bytes_written + max_run_byte_size_ > bit_writer_.buffer_len()) {
     buffer_full_ = true;
   }
 }
 
-inline void RleEncoder::Clear() {
+inline void RleBitPackedEncoder::Clear() {
   buffer_full_ = false;
   current_value_ = 0;
   repeat_count_ = 0;
@@ -828,5 +1432,4 @@ inline void RleEncoder::Clear() {
   bit_writer_.Clear();
 }
 
-}  // namespace util
-}  // namespace arrow
+}  // namespace arrow::util
diff --git a/cpp/src/arrow/util/rle_encoding_test.cc 
b/cpp/src/arrow/util/rle_encoding_test.cc
index 0cc0a276a2..c7f4878b74 100644
--- a/cpp/src/arrow/util/rle_encoding_test.cc
+++ b/cpp/src/arrow/util/rle_encoding_test.cc
@@ -25,7 +25,10 @@
 #include <gtest/gtest.h>
 
 #include "arrow/array.h"
-#include "arrow/buffer.h"
+#include "arrow/array/concatenate.h"
+#include "arrow/array/util.h"
+#include "arrow/scalar.h"
+#include "arrow/testing/gtest_util.h"
 #include "arrow/testing/random.h"
 #include "arrow/type.h"
 #include "arrow/util/bit_stream_utils_internal.h"
@@ -33,8 +36,7 @@
 #include "arrow/util/io_util.h"
 #include "arrow/util/rle_encoding_internal.h"
 
-namespace arrow {
-namespace util {
+namespace arrow::util {
 
 const int MAX_WIDTH = 32;
 
@@ -207,12 +209,303 @@ TEST(BitUtil, RoundTripIntValues) {
   }
 }
 
+/// A Rle run is a simple class owning some data and a repetition count.
+/// It does not know how to read such data.
+TEST(Rle, RleRun) {
+  const std::array<uint8_t, 4> value = {21, 2, 0, 0};
+
+  const rle_size_t value_count = 12;
+
+  // 12 times the value 21 fitting over 5 bits
+  const rle_size_t value_bit_width_5 = 5;
+  const auto run_5 = RleRun(value.data(), value_count, value_bit_width_5);
+  EXPECT_EQ(run_5.values_count(), value_count);
+  EXPECT_EQ(run_5.raw_data_size(value_bit_width_5), 1);  // 5 bits fit in one 
byte
+  EXPECT_EQ(*run_5.raw_data_ptr(), 21);
+
+  // 12 times the value 21 fitting over 8 bits
+  const rle_size_t value_bit_width_8 = 8;
+  const auto run_8 = RleRun(value.data(), value_count, value_bit_width_8);
+  EXPECT_EQ(run_8.values_count(), value_count);
+  EXPECT_EQ(run_8.raw_data_size(value_bit_width_8), 1);  // 8 bits fit in 1 
byte
+  EXPECT_EQ(*run_8.raw_data_ptr(), 21);
+
+  // 12 times the value 533 (21 + 2 * 2^8) fitting over 10 bits
+  const rle_size_t value_bit_width_10 = 10;
+  const auto run_10 = RleRun(value.data(), value_count, value_bit_width_10);
+  EXPECT_EQ(run_10.values_count(), value_count);
+  EXPECT_EQ(run_10.raw_data_size(value_bit_width_10), 2);  // 10 bits fit in 2 
bytes
+  EXPECT_EQ(*(run_10.raw_data_ptr() + 0), 21);
+  EXPECT_EQ(*(run_10.raw_data_ptr() + 1), 2);
+
+  // 12 times the value 533 (21 + 2 * 2^8) fitting over 32 bits
+  const rle_size_t value_bit_width_32 = 32;
+  const auto run_32 = RleRun(value.data(), value_count, value_bit_width_32);
+  EXPECT_EQ(run_32.values_count(), value_count);
+  EXPECT_EQ(run_32.raw_data_size(value_bit_width_32), 4);  // 32 bits fit in 4 
bytes
+  EXPECT_EQ(*(run_32.raw_data_ptr() + 0), 21);
+  EXPECT_EQ(*(run_32.raw_data_ptr() + 1), 2);
+  EXPECT_EQ(*(run_32.raw_data_ptr() + 2), 0);
+  EXPECT_EQ(*(run_32.raw_data_ptr() + 3), 0);
+}
+
+/// A BitPacked run is a simple class owning some data and its size.
+/// It does not know how to read such data.
+TEST(BitPacked, BitPackedRun) {
+  const std::array<uint8_t, 4> value = {0b10101010, 0, 0, 0b1111111};
+
+  // 16 values of 1 bit for a total of 16 bits
+  const rle_size_t value_count_1 = 16;
+  const rle_size_t value_bit_width_1 = 1;
+  const auto run_1 = BitPackedRun(value.data(), value_count_1, 
value_bit_width_1);
+  EXPECT_EQ(run_1.values_count(), value_count_1);
+  EXPECT_EQ(run_1.raw_data_size(value_bit_width_1), 2);  // 16 bits fit in 2 
bytes
+  EXPECT_EQ(run_1.raw_data_ptr(), value.data());
+
+  // 8 values of 3 bits for a total of 24 bits
+  const rle_size_t value_count_3 = 8;
+  const rle_size_t value_bit_width_3 = 3;
+  const auto run_3 = BitPackedRun(value.data(), value_count_3, 
value_bit_width_3);
+  EXPECT_EQ(run_3.values_count(), value_count_3);
+  EXPECT_EQ(run_3.raw_data_size(value_bit_width_3), 3);  // 24 bits fit in 3 
bytes
+  EXPECT_EQ(run_3.raw_data_ptr(), value.data());
+}
+
+template <typename T>
+void TestRleDecoder(std::vector<uint8_t> bytes, rle_size_t value_count,
+                    rle_size_t bit_width, T expected_value) {
+  // Pre-requisite for this test
+  EXPECT_GT(value_count, 6);
+
+  const auto run = RleRun(bytes.data(), value_count, bit_width);
+
+  auto decoder = RleRunDecoder<T>(run, bit_width);
+  std::vector<T> vals = {0, 0};
+
+  EXPECT_EQ(decoder.remaining(), value_count);
+
+  rle_size_t read = 0;
+  EXPECT_EQ(decoder.Get(vals.data(), bit_width), 1);
+  read += 1;
+  EXPECT_EQ(vals.at(0), expected_value);
+  EXPECT_EQ(decoder.remaining(), value_count - read);
+
+  EXPECT_EQ(decoder.Advance(3, bit_width), 3);
+  read += 3;
+  EXPECT_EQ(decoder.remaining(), value_count - read);
+
+  vals = {0, 0};
+  EXPECT_EQ(decoder.GetBatch(vals.data(), 2, bit_width), vals.size());
+  EXPECT_EQ(vals.at(0), expected_value);
+  EXPECT_EQ(vals.at(1), expected_value);
+  read += static_cast<decltype(read)>(vals.size());
+  EXPECT_EQ(decoder.remaining(), value_count - read);
+
+  // Exhaust iteration
+  EXPECT_EQ(decoder.Advance(value_count - read, bit_width), value_count - 
read);
+  EXPECT_EQ(decoder.remaining(), 0);
+  EXPECT_EQ(decoder.Advance(1, bit_width), 0);
+  vals = {0, 0};
+  EXPECT_EQ(decoder.Get(vals.data(), bit_width), 0);
+  EXPECT_EQ(vals.at(0), 0);
+
+  // Reset the decoder
+  decoder.Reset(run, bit_width);
+  EXPECT_EQ(decoder.remaining(), value_count);
+  vals = {0, 0};
+  EXPECT_EQ(decoder.GetBatch(vals.data(), 2, bit_width), vals.size());
+  EXPECT_EQ(vals.at(0), expected_value);
+  EXPECT_EQ(vals.at(1), expected_value);
+}
+
+TEST(Rle, RleDecoder) {
+  TestRleDecoder<uint8_t>({21, 0, 0}, /* value_count= */ 23, /* bit_width= */ 
5,
+                          /* expected_value= */ 21);
+  TestRleDecoder<uint16_t>({1, 0}, /* value_count= */ 13, /* bit_width= */ 1,
+                           /* expected_value= */ 1);
+  TestRleDecoder<uint32_t>({21, 0, 0}, /* value_count= */ 23, /* bit_width= */ 
5,
+                           /* expected_value= */ 21);
+  TestRleDecoder<int32_t>({21, 0, 0}, /* value_count= */ 23, /* bit_width= */ 
5,
+                          /* expected_value= */ 21);
+  TestRleDecoder<uint64_t>({21, 2, 0, 1}, /* value_count= */ 20, /* bit_width= 
*/ 30,
+                           /* expected_value= */ 16777749);
+}
+
+template <typename T>
+void TestBitPackedDecoder(std::vector<uint8_t> bytes, rle_size_t value_count,
+                          rle_size_t bit_width, std::vector<T> expected) {
+  // Pre-requisite for this test
+  EXPECT_GT(value_count, 6);
+
+  const auto run = BitPackedRun(bytes.data(), value_count, bit_width);
+
+  auto decoder = BitPackedRunDecoder<T>(run, bit_width);
+  std::vector<T> vals = {0, 0};
+
+  EXPECT_EQ(decoder.remaining(), value_count);
+
+  rle_size_t read = 0;
+  EXPECT_EQ(decoder.Get(vals.data(), bit_width), 1);
+  EXPECT_EQ(vals.at(0), expected.at(0 + read));
+  read += 1;
+  EXPECT_EQ(decoder.remaining(), value_count - read);
+
+  EXPECT_EQ(decoder.Advance(3, bit_width), 3);
+  read += 3;
+  EXPECT_EQ(decoder.remaining(), value_count - read);
+
+  vals = {0, 0};
+  EXPECT_EQ(decoder.GetBatch(vals.data(), 2, bit_width), vals.size());
+  EXPECT_EQ(vals.at(0), expected.at(0 + read));
+  EXPECT_EQ(vals.at(1), expected.at(1 + read));
+  read += static_cast<decltype(read)>(vals.size());
+  EXPECT_EQ(decoder.remaining(), value_count - read);
+
+  // Exhaust iteration
+  EXPECT_EQ(decoder.Advance(value_count - read, bit_width), value_count - 
read);
+  EXPECT_EQ(decoder.remaining(), 0);
+  EXPECT_EQ(decoder.Advance(1, bit_width), 0);
+  vals = {0, 0};
+  EXPECT_EQ(decoder.Get(vals.data(), bit_width), 0);
+  EXPECT_EQ(vals.at(0), 0);
+
+  // Reset the decoder
+  decoder.Reset(run, bit_width);
+  read = 0;
+  EXPECT_EQ(decoder.remaining(), value_count);
+  vals = {0, 0};
+  EXPECT_EQ(decoder.GetBatch(vals.data(), 2, bit_width), vals.size());
+  EXPECT_EQ(vals.at(0), expected.at(0 + read));
+  EXPECT_EQ(vals.at(1), expected.at(1 + read));
+}
+
+TEST(BitPacked, BitPackedDecoder) {
+  // See parquet encoding for bytes layout
+  TestBitPackedDecoder<uint16_t>(
+      /* bytes= */ {0x88, 0xc6, 0xfa},
+      /* values_count= */ 8,
+      /* bit_width= */ 3,
+      /* expected= */ {0, 1, 2, 3, 4, 5, 6, 7});
+  TestBitPackedDecoder<uint8_t>(
+      /* bytes= */ {0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7},
+      /* values_count= */ 8,
+      /* bit_width= */ 8,
+      /* expected= */ {0, 1, 2, 3, 4, 5, 6, 7});
+  TestBitPackedDecoder<uint32_t>(
+      /* bytes= */ {0x47, 0xc, 0x10, 0x35},
+      /* values_count= */ 8,
+      /* bit_width= */ 4,
+      /* expected= */ {7, 4, 12, 0, 0, 1, 5, 3});
+  TestBitPackedDecoder<uint64_t>(
+      /* bytes= */ {0xe8, 0x7, 0x20, 0xc0, 0x0, 0x4, 0x14, 0x60, 0xc0, 0x1},
+      /* values_count= */ 8,
+      /* bit_width= */ 10,
+      /* expected= */ {1000, 1, 2, 3, 4, 5, 6, 7});
+}
+
+template <typename T>
+void TestRleBitPackedParser(std::vector<uint8_t> bytes, rle_size_t bit_width,
+                            std::vector<T> expected) {
+  auto parser =
+      RleBitPackedParser(bytes.data(), static_cast<rle_size_t>(bytes.size()), 
bit_width);
+  EXPECT_FALSE(parser.exhausted());
+
+  // Try to decode all data of all runs in the decoded vector
+  decltype(expected) decoded = {};
+  auto rle_decoder = RleRunDecoder<T>();
+  auto bit_packed_decoder = BitPackedRunDecoder<T>();
+
+  struct {
+    decltype(rle_decoder)* rle_decoder_ptr_;
+    decltype(bit_packed_decoder)* bit_packed_decoder_ptr_;
+    decltype(decoded)* decoded_ptr_;
+    decltype(bit_width) bit_width_;
+
+    auto OnRleRun(RleRun run) {
+      rle_decoder_ptr_->Reset(run, bit_width_);
+
+      const auto n_decoded = decoded_ptr_->size();
+      const auto n_to_decode = rle_decoder_ptr_->remaining();
+      decoded_ptr_->resize(n_decoded + n_to_decode);
+      EXPECT_EQ(rle_decoder_ptr_->GetBatch(decoded_ptr_->data() + n_decoded, 
n_to_decode,
+                                           bit_width_),
+                n_to_decode);
+      EXPECT_EQ(rle_decoder_ptr_->remaining(), 0);
+
+      return RleBitPackedParser::ControlFlow::Continue;
+    }
+
+    auto OnBitPackedRun(BitPackedRun run) {
+      bit_packed_decoder_ptr_->Reset(run, bit_width_);
+
+      const auto n_decoded = decoded_ptr_->size();
+      const auto n_to_decode = bit_packed_decoder_ptr_->remaining();
+      decoded_ptr_->resize(n_decoded + n_to_decode);
+      EXPECT_EQ(bit_packed_decoder_ptr_->GetBatch(decoded_ptr_->data() + 
n_decoded,
+                                                  n_to_decode, bit_width_),
+                n_to_decode);
+      EXPECT_EQ(bit_packed_decoder_ptr_->remaining(), 0);
+
+      return RleBitPackedParser::ControlFlow::Continue;
+    }
+  } handler{&rle_decoder, &bit_packed_decoder, &decoded, bit_width};
+
+  // Iterate over all runs
+  parser.Parse(handler);
+
+  EXPECT_TRUE(parser.exhausted());
+  EXPECT_EQ(decoded.size(), expected.size());
+  EXPECT_EQ(decoded, expected);
+}
+
+TEST(RleBitPacked, RleBitPackedParser) {
+  TestRleBitPackedParser<uint16_t>(
+      /* bytes= */
+      {/* LEB128 for 8 values bit packed marker */ 0x3,
+       /* Bitpacked run */ 0x88, 0xc6, 0xfa},
+      /* bit_width= */ 3,
+      /* expected= */ {0, 1, 2, 3, 4, 5, 6, 7});
+
+  {
+    std::vector<uint32_t> expected = {0, 1, 2, 3, 4, 5, 6, 7};
+    expected.resize(expected.size() + 200, 5);
+    TestRleBitPackedParser<uint32_t>(
+        /* bytes= */
+        {/* LEB128 for 8 values bit packed marker */ 0x3,
+         /* Bitpacked run */ 0x88, 0xc6, 0xfa,
+         /* LEB128 for 200 RLE marker */ 0x90, 0x3,
+         /* Value 5 over paded to a byte*/ 0x5},
+        /* bit_width= */ 3,
+        /* expected= */ expected);
+  }
+
+  {
+    std::vector<uint16_t> expected = {0, 0, 0, 0, 1, 1, 1, 1};
+    expected.resize(expected.size() + 200, 1);
+    expected.resize(expected.size() + 10, 3);
+    std::array<uint16_t, 16> run2 = {1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 
1, 2};
+    expected.insert(expected.end(), run2.begin(), run2.end());
+    TestRleBitPackedParser<uint16_t>(
+        /* bytes= */
+        {/* LEB128 for 8 values bit packed marker */ 0x3,
+         /* Bitpacked run */ 0x0, 0x55,
+         /* LEB128 for 200 RLE marker */ 0x90, 0x3,
+         /* Value 1 over paded to a byte*/ 0x1,
+         /* LEB128 for 10 RLE marker */ 0x14,
+         /* Value 3 over paded to a byte*/ 0x3,
+         /* LEB128 for 16 values bit packed marker */ 0x5,
+         /* Bitpacked run */ 0x99, 0x99, 0x99, 0x99},
+        /* bit_width= */ 2,
+        /* expected= */ expected);
+  }
+}
+
 // Validates encoding of values by encoding and decoding them.  If
 // expected_encoding != NULL, also validates that the encoded buffer is
 // exactly 'expected_encoding'.
 // if expected_len is not -1, it will validate the encoded size is correct.
-void ValidateRle(const std::vector<int>& values, int bit_width,
-                 uint8_t* expected_encoding, int expected_len) {
+void ValidateRleBitPacked(const std::vector<int>& values, int bit_width,
+                          uint8_t* expected_encoding, int expected_len) {
   const int len = 64 * 1024;
 #ifdef __EMSCRIPTEN__
   // don't make this on the stack as it is
@@ -224,7 +517,7 @@ void ValidateRle(const std::vector<int>& values, int 
bit_width,
 #endif
   EXPECT_LE(expected_len, len);
 
-  RleEncoder encoder(buffer, len, bit_width);
+  RleBitPackedEncoder encoder(buffer, len, bit_width);
   for (size_t i = 0; i < values.size(); ++i) {
     bool result = encoder.Put(values[i]);
     EXPECT_TRUE(result);
@@ -240,7 +533,7 @@ void ValidateRle(const std::vector<int>& values, int 
bit_width,
 
   // Verify read
   {
-    RleDecoder decoder(buffer, len, bit_width);
+    RleBitPackedDecoder<uint64_t> decoder(buffer, len, bit_width);
     for (size_t i = 0; i < values.size(); ++i) {
       uint64_t val;
       bool result = decoder.Get(&val);
@@ -251,7 +544,7 @@ void ValidateRle(const std::vector<int>& values, int 
bit_width,
 
   // Verify batch read
   {
-    RleDecoder decoder(buffer, len, bit_width);
+    RleBitPackedDecoder<int> decoder(buffer, len, bit_width);
     std::vector<int> values_read(values.size());
     ASSERT_EQ(values.size(),
               decoder.GetBatch(values_read.data(), 
static_cast<int>(values.size())));
@@ -271,7 +564,7 @@ bool CheckRoundTrip(const std::vector<int>& values, int 
bit_width) {
 #else
   uint8_t buffer[len];
 #endif
-  RleEncoder encoder(buffer, len, bit_width);
+  RleBitPackedEncoder encoder(buffer, len, bit_width);
   for (size_t i = 0; i < values.size(); ++i) {
     bool result = encoder.Put(values[i]);
     if (!result) {
@@ -282,7 +575,7 @@ bool CheckRoundTrip(const std::vector<int>& values, int 
bit_width) {
   int out = 0;
 
   {
-    RleDecoder decoder(buffer, encoded_len, bit_width);
+    RleBitPackedDecoder<int> decoder(buffer, encoded_len, bit_width);
     for (size_t i = 0; i < values.size(); ++i) {
       EXPECT_TRUE(decoder.Get(&out));
       if (values[i] != out) {
@@ -293,7 +586,7 @@ bool CheckRoundTrip(const std::vector<int>& values, int 
bit_width) {
 
   // Verify batch read
   {
-    RleDecoder decoder(buffer, encoded_len, bit_width);
+    RleBitPackedDecoder<int> decoder(buffer, encoded_len, bit_width);
     std::vector<int> values_read(values.size());
     if (static_cast<int>(values.size()) !=
         decoder.GetBatch(values_read.data(), static_cast<int>(values.size()))) 
{
@@ -308,7 +601,7 @@ bool CheckRoundTrip(const std::vector<int>& values, int 
bit_width) {
   return true;
 }
 
-TEST(Rle, SpecificSequences) {
+TEST(RleBitPacked, SpecificSequences) {
   const int len = 1024;
   uint8_t expected_buffer[len];
   std::vector<int> values;
@@ -328,12 +621,12 @@ TEST(Rle, SpecificSequences) {
   expected_buffer[2] = (50 << 1);
   expected_buffer[3] = 1;
   for (int width = 1; width <= 8; ++width) {
-    ValidateRle(values, width, expected_buffer, 4);
+    ValidateRleBitPacked(values, width, expected_buffer, 4);
   }
 
   for (int width = 9; width <= MAX_WIDTH; ++width) {
-    ValidateRle(values, width, nullptr,
-                2 * (1 + static_cast<int>(bit_util::CeilDiv(width, 8))));
+    ValidateRleBitPacked(values, width, nullptr,
+                         2 * (1 + static_cast<int>(bit_util::CeilDiv(width, 
8))));
   }
 
   // Test 100 0's and 1's alternating
@@ -349,11 +642,11 @@ TEST(Rle, SpecificSequences) {
   expected_buffer[100 / 8 + 1] = 0x0A /* 0b00001010 */;
 
   // num_groups and expected_buffer only valid for bit width = 1
-  ValidateRle(values, 1, expected_buffer, 1 + num_groups);
+  ValidateRleBitPacked(values, 1, expected_buffer, 1 + num_groups);
   for (int width = 2; width <= MAX_WIDTH; ++width) {
     int num_values = static_cast<int>(bit_util::CeilDiv(100, 8)) * 8;
-    ValidateRle(values, width, nullptr,
-                1 + static_cast<int>(bit_util::CeilDiv(width * num_values, 
8)));
+    ValidateRleBitPacked(values, width, nullptr,
+                         1 + static_cast<int>(bit_util::CeilDiv(width * 
num_values, 8)));
   }
 
   // Test 16-bit values to confirm encoded values are stored in little endian
@@ -371,7 +664,7 @@ TEST(Rle, SpecificSequences) {
   expected_buffer[4] = 0x55;
   expected_buffer[5] = 0xaa;
 
-  ValidateRle(values, 16, expected_buffer, 6);
+  ValidateRleBitPacked(values, 16, expected_buffer, 6);
 
   // Test 32-bit values to confirm encoded values are stored in little endian
   values.resize(28);
@@ -392,7 +685,7 @@ TEST(Rle, SpecificSequences) {
   expected_buffer[8] = 0xaa;
   expected_buffer[9] = 0x5a;
 
-  ValidateRle(values, 32, expected_buffer, 10);
+  ValidateRleBitPacked(values, 32, expected_buffer, 10);
 }
 
 // ValidateRle on 'num_vals' values with width 'bit_width'. If 'value' != -1, 
that value
@@ -403,10 +696,10 @@ void TestRleValues(int bit_width, int num_vals, int value 
= -1) {
   for (int v = 0; v < num_vals; ++v) {
     values.push_back((value != -1) ? value : static_cast<int>(v % mod));
   }
-  ValidateRle(values, bit_width, NULL, -1);
+  ValidateRleBitPacked(values, bit_width, NULL, -1);
 }
 
-TEST(Rle, TestValues) {
+TEST(RleBitPacked, TestValues) {
   for (int width = 1; width <= MAX_WIDTH; ++width) {
     TestRleValues(width, 1);
     TestRleValues(width, 1024);
@@ -415,11 +708,11 @@ TEST(Rle, TestValues) {
   }
 }
 
-TEST(Rle, BitWidthZeroRepeated) {
+TEST(RleBitPacked, BitWidthZeroRepeated) {
   uint8_t buffer[1];
   const int num_values = 15;
   buffer[0] = num_values << 1;  // repeated indicator byte
-  RleDecoder decoder(buffer, sizeof(buffer), 0);
+  RleBitPackedDecoder<uint8_t> decoder(buffer, sizeof(buffer), 0);
   uint8_t val;
   for (int i = 0; i < num_values; ++i) {
     bool result = decoder.Get(&val);
@@ -429,11 +722,11 @@ TEST(Rle, BitWidthZeroRepeated) {
   EXPECT_FALSE(decoder.Get(&val));
 }
 
-TEST(Rle, BitWidthZeroLiteral) {
+TEST(RleBitPacked, BitWidthZeroLiteral) {
   uint8_t buffer[1];
   const int num_groups = 4;
   buffer[0] = num_groups << 1 | 1;  // literal indicator byte
-  RleDecoder decoder = RleDecoder(buffer, sizeof(buffer), 0);
+  RleBitPackedDecoder<uint8_t> decoder = {buffer, sizeof(buffer), 0};
   const int num_values = num_groups * 8;
   uint8_t val;
   for (int i = 0; i < num_values; ++i) {
@@ -450,13 +743,13 @@ TEST(BitRle, Flush) {
   std::vector<int> values;
   for (int i = 0; i < 16; ++i) values.push_back(1);
   values.push_back(0);
-  ValidateRle(values, 1, NULL, -1);
+  ValidateRleBitPacked(values, 1, NULL, -1);
   values.push_back(1);
-  ValidateRle(values, 1, NULL, -1);
+  ValidateRleBitPacked(values, 1, NULL, -1);
   values.push_back(1);
-  ValidateRle(values, 1, NULL, -1);
+  ValidateRleBitPacked(values, 1, NULL, -1);
   values.push_back(1);
-  ValidateRle(values, 1, NULL, -1);
+  ValidateRleBitPacked(values, 1, NULL, -1);
 }
 
 // Test some random sequences.
@@ -515,17 +808,17 @@ TEST(BitRle, RepeatedPattern) {
     }
   }
 
-  ValidateRle(values, 1, NULL, -1);
+  ValidateRleBitPacked(values, 1, NULL, -1);
 }
 
 TEST(BitRle, Overflow) {
   for (int bit_width = 1; bit_width < 32; bit_width += 3) {
-    int len = RleEncoder::MinBufferSize(bit_width);
+    int len = RleBitPackedEncoder::MinBufferSize(bit_width);
     std::vector<uint8_t> buffer(len);
     int num_added = 0;
     bool parity = true;
 
-    RleEncoder encoder(buffer.data(), len, bit_width);
+    RleBitPackedEncoder encoder(buffer.data(), len, bit_width);
     // Insert alternating true/false until there is no space left
     while (true) {
       bool result = encoder.Put(parity);
@@ -538,7 +831,7 @@ TEST(BitRle, Overflow) {
     EXPECT_LE(bytes_written, len);
     EXPECT_GT(num_added, 0);
 
-    RleDecoder decoder(buffer.data(), bytes_written, bit_width);
+    RleBitPackedDecoder<uint32_t> decoder(buffer.data(), bytes_written, 
bit_width);
     parity = true;
     uint32_t v;
     for (int i = 0; i < num_added; ++i) {
@@ -553,69 +846,300 @@ TEST(BitRle, Overflow) {
   }
 }
 
+/// Check RleBitPacked encoding/decoding round trip.
+///
+/// \param spaced If set to false, treat Nulls in the input array as regular 
data.
+/// \param parts The number of parts in which the data will be decoded.
+///         For number greater than one, this ensure that the decoder 
intermediate state
+///         is valid.
 template <typename Type>
-void CheckRoundTripSpaced(const Array& data, int bit_width) {
+void CheckRoundTrip(const Array& data, int bit_width, bool spaced, int32_t 
parts,
+                    std::shared_ptr<FloatArray> dict = {}) {
   using ArrayType = typename TypeTraits<Type>::ArrayType;
-  using T = typename Type::c_type;
+  using value_type = typename Type::c_type;
 
-  int num_values = static_cast<int>(data.length());
-  int buffer_size = RleEncoder::MaxBufferSize(bit_width, num_values);
+  const int data_size = static_cast<int>(data.length());
+  const int data_values_count =
+      static_cast<int>(data.length() - spaced * data.null_count());
+  const int buffer_size = RleBitPackedEncoder::MaxBufferSize(bit_width, 
data_size);
+  ASSERT_GE(parts, 1);
+  ASSERT_LE(parts, data_size);
 
-  const T* values = static_cast<const ArrayType&>(data).raw_values();
+  const value_type* data_values = static_cast<const 
ArrayType&>(data).raw_values();
 
+  // Encode the data into `buffer` using the encoder.
   std::vector<uint8_t> buffer(buffer_size);
-  RleEncoder encoder(buffer.data(), buffer_size, bit_width);
-  for (int i = 0; i < num_values; ++i) {
-    if (data.IsValid(i)) {
-      if (!encoder.Put(static_cast<uint64_t>(values[i]))) {
-        FAIL() << "Encoding failed";
-      }
+  RleBitPackedEncoder encoder(buffer.data(), buffer_size, bit_width);
+  int32_t encoded_values_size = 0;
+  for (int i = 0; i < data_size; ++i) {
+    // Depending on `spaced` we treat nulls as regular values.
+    if (data.IsValid(i) || !spaced) {
+      bool success = encoder.Put(static_cast<uint64_t>(data_values[i]));
+      ASSERT_TRUE(success) << "Encoding failed in pos " << i;
+      ++encoded_values_size;
     }
   }
-  int encoded_size = encoder.Flush();
+  int encoded_byte_size = encoder.Flush();
+  ASSERT_EQ(encoded_values_size, data_values_count)
+      << "All values input were not encoded successfully by the encoder";
+
+  // Now we verify batch read
+  RleBitPackedDecoder<value_type> decoder(buffer.data(), encoded_byte_size, 
bit_width);
+  // We will only use one of them depending on whether this is a dictionary 
tests
+  std::vector<float> dict_read;
+  std::vector<value_type> values_read;
+  if (dict) {
+    dict_read.resize(data_size);
+  } else {
+    values_read.resize(data_size);
+  }
 
-  // Verify batch read
-  RleDecoder decoder(buffer.data(), encoded_size, bit_width);
-  std::vector<T> values_read(num_values);
+  // We will read the data in `parts` calls to make sure intermediate states 
are valid
+  rle_size_t total_read_count = 0;
+  while (total_read_count < data_size) {
+    const auto remaining = data_size - total_read_count;
+    auto to_read = data_size / parts;
+    if (remaining / to_read == 1) {
+      to_read = remaining;
+    }
 
-  if (num_values != decoder.GetBatchSpaced(
-                        num_values, static_cast<int>(data.null_count()),
-                        data.null_bitmap_data(), data.offset(), 
values_read.data())) {
-    FAIL();
-  }
+    rle_size_t read = 0;
+    if (spaced) {
+      // We need to slice the input array get the proper null count and bitmap
+      auto data_remaining = data.Slice(total_read_count, to_read);
+
+      if (dict) {
+        auto* out = dict_read.data() + total_read_count;
+        read = decoder.GetBatchWithDictSpaced(
+            dict->raw_values(), static_cast<int32_t>(dict->length()), out, 
to_read,
+            static_cast<int32_t>(data_remaining->null_count()),
+            data_remaining->null_bitmap_data(), data_remaining->offset());
+      } else {
+        auto* out = values_read.data() + total_read_count;
+        read = decoder.GetBatchSpaced(
+            to_read, static_cast<int32_t>(data_remaining->null_count()),
+            data_remaining->null_bitmap_data(), data_remaining->offset(), out);
+      }
+    } else {
+      if (dict) {
+        auto* out = dict_read.data() + total_read_count;
+        read = decoder.GetBatchWithDict(
+            dict->raw_values(), static_cast<int32_t>(dict->length()), out, 
to_read);
+      } else {
+        auto* out = values_read.data() + total_read_count;
+        read = decoder.GetBatch(out, to_read);
+      }
+    }
+    ASSERT_EQ(read, to_read) << "Decoder did not read as many values as 
requested";
 
-  for (int64_t i = 0; i < num_values; ++i) {
-    if (data.IsValid(i)) {
-      if (values_read[i] != values[i]) {
-        FAIL() << "Index " << i << " read " << values_read[i] << " but should 
be "
-               << values[i];
+    total_read_count += read;
+  }
+  EXPECT_EQ(total_read_count, data_size) << "Total number of values read is 
off";
+
+  // Verify the round trip: encoded-decoded values must equal the original one
+  for (int64_t i = 0; i < data_size; ++i) {
+    if (data.IsValid(i) || !spaced) {
+      if (dict) {
+        EXPECT_EQ(dict_read.at(i), dict->Value(data_values[i]))
+            << "Encoded then decoded and mapped value at position " << i << " 
("
+            << values_read[i] << ") differs from original value (" << 
data_values[i]
+            << " mapped to " << dict->Value(data_values[i]) << ")";
+      } else {
+        EXPECT_EQ(values_read.at(i), data_values[i])
+            << "Encoded then decoded value at position " << i << " (" << 
values_read.at(i)
+            << ") differs from original value (" << data_values[i] << ")";
       }
     }
   }
 }
 
 template <typename T>
-struct GetBatchSpacedTestCase {
-  T max_value;
-  int64_t size;
+struct DataTestRleBitPackedRandomPart {
+  using value_type = T;
+
+  value_type max;
+  int32_t size;
+  double null_probability;
+};
+
+template <typename T>
+struct DataTestRleBitPackedRepeatPart {
+  using value_type = T;
+
+  value_type value;
+  int32_t size;
   double null_probability;
-  int bit_width;
 };
 
-TEST(RleDecoder, GetBatchSpaced) {
-  uint32_t kSeed = 1337;
-  ::arrow::random::RandomArrayGenerator rand(kSeed);
+template <typename T>
+struct DataTestRleBitPackedNullPart {
+  using value_type = T;
 
-  std::vector<GetBatchSpacedTestCase<int32_t>> int32_cases{
-      {1, 100000, 0.01, 1}, {1, 100000, 0.1, 1},    {1, 100000, 0.5, 1},
-      {4, 100000, 0.05, 3}, {100, 100000, 0.05, 7},
+  int32_t size;
+};
+
+template <typename T>
+struct DataTestRleBitPacked {
+  using value_type = T;
+  using ArrowType = typename arrow::CTypeTraits<value_type>::ArrowType;
+  using RandomPart = DataTestRleBitPackedRandomPart<value_type>;
+  using RepeatPart = DataTestRleBitPackedRepeatPart<value_type>;
+  using NullPart = DataTestRleBitPackedNullPart<value_type>;
+  using AnyPart = std::variant<RandomPart, RepeatPart, NullPart>;
+
+  std::vector<AnyPart> parts;
+  int32_t bit_width;
+
+  std::shared_ptr<::arrow::Array> MakeArray(
+      ::arrow::random::RandomArrayGenerator& rand) const {
+    using Traits = arrow::TypeTraits<ArrowType>;
+
+    std::vector<std::shared_ptr<::arrow::Array>> arrays = {};
+
+    for (const auto& dyn_part : parts) {
+      if (auto* part = std::get_if<RandomPart>(&dyn_part)) {
+        auto arr = rand.Numeric<ArrowType>(part->size, /* min= */ 
value_type(0),
+                                           part->max, part->null_probability);
+        arrays.push_back(std::move(arr));
+
+      } else if (auto* part = std::get_if<RepeatPart>(&dyn_part)) {
+        auto arr =
+            rand.Numeric<ArrowType>(part->size, /* min= */ part->value,
+                                    /* max= */ part->value, 
part->null_probability);
+        arrays.push_back(std::move(arr));
+
+      } else if (auto* part = std::get_if<NullPart>(&dyn_part)) {
+        EXPECT_OK_AND_ASSIGN(
+            auto arr, ::arrow::MakeArrayOfNull(Traits::type_singleton(), 
part->size));
+        arrays.push_back(std::move(arr));
+      }
+    }
+    ARROW_DCHECK_EQ(parts.size(), arrays.size());
+
+    return ::arrow::Concatenate(arrays).ValueOrDie();
+  }
+};
+
+template <typename T>
+void DoTestGetBatchSpacedRoundtrip() {
+  using Data = DataTestRleBitPacked<T>;
+  using ArrowType = typename Data::ArrowType;
+  using RandomPart = typename Data::RandomPart;
+  using NullPart = typename Data::NullPart;
+  using RepeatPart = typename Data::RepeatPart;
+
+  std::vector<Data> test_cases = {
+      {
+          {RandomPart{/* max=*/1, /* size=*/400, /* null_proba= */ 0.1}},
+          /* bit_width= */ 1,
+      },
+      {
+          {
+              RandomPart{/* max=*/7, /* size=*/1037, /* null_proba= */ 0.0},
+              NullPart{/* size= */ 1153},
+              RandomPart{/* max=*/7, /* size=*/800, /* null_proba= */ 0.5},
+          },
+          /* bit_width= */ 3,
+      },
+      {
+          {
+              NullPart{/* size= */ 80},
+              RandomPart{/* max=*/static_cast<T>(1023), /* size=*/800,
+                         /* null_proba= */ 0.01},
+              NullPart{/* size= */ 1023},
+          },
+          /* bit_width= */ 11,
+      },
+      {
+          {RepeatPart{/* value=*/13, /* size=*/1024, /* null_proba= */ 0.01}},
+          /* bit_width= */ 10,
+      },
+      {
+          {
+              NullPart{/* size= */ 1024},
+              RepeatPart{/* value=*/static_cast<T>(10000), /* size=*/1025,
+                         /* null_proba= */ 0.1},
+              NullPart{/* size= */ 77},
+          },
+          /* bit_width= */ 23,
+      },
+      {
+          {
+              RepeatPart{/* value=*/13, /* size=*/1023, /* null_proba= */ 0.0},
+              NullPart{/* size= */ 1153},
+              RepeatPart{/* value=*/72, /* size=*/1799, /* null_proba= */ 0.5},
+          },
+          /* bit_width= */ 10,
+      },
+      {
+          {
+              RandomPart{/* max=*/1, /* size=*/1013, /* null_proba= */ 0.01},
+              NullPart{/* size=*/8},
+              RepeatPart{1, /* size= */ 256, /* null_proba= */ 0.1},
+              NullPart{/* size=*/128},
+              RepeatPart{0, /* size= */ 256, /* null_proba= */ 0.0},
+              NullPart{/* size=*/15},
+              RandomPart{/* max=*/1, /* size=*/1024, /* null_proba= */ 0.01},
+          },
+          /* bit_width= */ 1,
+      },
   };
-  for (auto case_ : int32_cases) {
-    auto arr = rand.Int32(case_.size, /*min=*/0, case_.max_value, 
case_.null_probability);
-    CheckRoundTripSpaced<Int32Type>(*arr, case_.bit_width);
-    CheckRoundTripSpaced<Int32Type>(*arr->Slice(1), case_.bit_width);
+
+  ::arrow::random::RandomArrayGenerator rand(/* seed= */ 12);
+  // FRAGILE: we create a dictionary large enough so that any encoded value 
from the
+  // previous test cases can be used as an index in the dictionary.
+  // Its size must be increased accordingly if larger values are encoded in 
the test
+  // cases.
+  auto dict = std::static_pointer_cast<arrow::FloatArray>(rand.Float32(20000, 
-1.0, 1.0));
+
+  // Number of bits available in T to write a positive integer.
+  constexpr int kBitsAvailable = 8 * sizeof(T) - (std::is_signed_v<T> ? 1 : 0);
+
+  for (auto case_ : test_cases) {
+    if (case_.bit_width > kBitsAvailable) {
+      continue;
+    }
+
+    auto array = case_.MakeArray(rand);
+
+    // Tests for GetBatch
+    CheckRoundTrip<ArrowType>(*array, case_.bit_width, /* spaced= */ false,
+                              /* parts= */ 1);
+    CheckRoundTrip<ArrowType>(*array, case_.bit_width, /* spaced= */ false,
+                              /* parts= */ 3);
+
+    // Tests for GetBatchSpaced
+    CheckRoundTrip<ArrowType>(*array, case_.bit_width, /* spaced= */ true,
+                              /* parts= */ 1);
+    CheckRoundTrip<ArrowType>(*array, case_.bit_width, /* spaced= */ true,
+                              /* parts= */ 7);
+    CheckRoundTrip<ArrowType>(*array->Slice(1), case_.bit_width, /* spaced= */ 
true,
+                              /* parts= */ 1);
+
+    // Cannot test GetBatchWithDict with this method since unknown null values
+
+    // Tests for GetBatchWithDictSpaced
+    CheckRoundTrip<ArrowType>(*array, case_.bit_width, /* spaced= */ true, /* 
parts= */ 1,
+                              dict);
+    CheckRoundTrip<ArrowType>(*array, case_.bit_width, /* spaced= */ true, /* 
parts= */ 5,
+                              dict);
   }
 }
 
-}  // namespace util
-}  // namespace arrow
+TEST(RleBitPacked, GetBatchSpacedRoundtripUint8) {
+  DoTestGetBatchSpacedRoundtrip<uint8_t>();
+}
+TEST(RleBitPacked, GetBatchSpacedRoundtripUint16) {
+  DoTestGetBatchSpacedRoundtrip<uint16_t>();
+}
+TEST(RleBitPacked, GetBatchSpacedRoundtripInt32) {
+  DoTestGetBatchSpacedRoundtrip<int32_t>();
+}
+TEST(RleBitPacked, GetBatchSpacedRoundtripUInt32) {
+  DoTestGetBatchSpacedRoundtrip<uint32_t>();
+}
+TEST(RleBitPacked, GetBatchSpacedRoundtripUint64) {
+  DoTestGetBatchSpacedRoundtrip<uint64_t>();
+}
+
+}  // namespace arrow::util
diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index f40e3cae54..9c314cf818 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -113,8 +113,8 @@ int LevelDecoder::SetData(Encoding::type encoding, int16_t 
max_level,
       }
       const uint8_t* decoder_data = data + 4;
       if (!rle_decoder_) {
-        rle_decoder_ = 
std::make_unique<::arrow::util::RleDecoder>(decoder_data,
-                                                                   num_bytes, 
bit_width_);
+        rle_decoder_ = 
std::make_unique<::arrow::util::RleBitPackedDecoder<int16_t>>(
+            decoder_data, num_bytes, bit_width_);
       } else {
         rle_decoder_->Reset(decoder_data, num_bytes, bit_width_);
       }
@@ -157,8 +157,8 @@ void LevelDecoder::SetDataV2(int32_t num_bytes, int16_t 
max_level,
   bit_width_ = bit_util::Log2(max_level + 1);
 
   if (!rle_decoder_) {
-    rle_decoder_ =
-        std::make_unique<::arrow::util::RleDecoder>(data, num_bytes, 
bit_width_);
+    rle_decoder_ = 
std::make_unique<::arrow::util::RleBitPackedDecoder<int16_t>>(
+        data, num_bytes, bit_width_);
   } else {
     rle_decoder_->Reset(data, num_bytes, bit_width_);
   }
diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h
index 0bff52f792..ac4469b190 100644
--- a/cpp/src/parquet/column_reader.h
+++ b/cpp/src/parquet/column_reader.h
@@ -39,7 +39,8 @@ class BitReader;
 }  // namespace bit_util
 
 namespace util {
-class RleDecoder;
+template <typename T>
+class RleBitPackedDecoder;
 }  // namespace util
 
 }  // namespace arrow
@@ -95,7 +96,7 @@ class PARQUET_EXPORT LevelDecoder {
   int bit_width_;
   int num_values_remaining_;
   Encoding::type encoding_;
-  std::unique_ptr<::arrow::util::RleDecoder> rle_decoder_;
+  std::unique_ptr<::arrow::util::RleBitPackedDecoder<int16_t>> rle_decoder_;
   std::unique_ptr<::arrow::bit_util::BitReader> bit_packed_decoder_;
   int16_t max_level_;
 };
diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index f35f84f002..1f3d64f622 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -70,7 +70,7 @@ using arrow::bit_util::BitWriter;
 using arrow::internal::checked_cast;
 using arrow::internal::checked_pointer_cast;
 using arrow::util::Float16;
-using arrow::util::RleEncoder;
+using arrow::util::RleBitPackedEncoder;
 
 namespace bit_util = arrow::bit_util;
 
@@ -168,7 +168,7 @@ void LevelEncoder::Init(Encoding::type encoding, int16_t 
max_level,
   encoding_ = encoding;
   switch (encoding) {
     case Encoding::RLE: {
-      rle_encoder_ = std::make_unique<RleEncoder>(data, data_size, bit_width_);
+      rle_encoder_ = std::make_unique<RleBitPackedEncoder>(data, data_size, 
bit_width_);
       break;
     }
     case Encoding::BIT_PACKED: {
@@ -190,8 +190,8 @@ int LevelEncoder::MaxBufferSize(Encoding::type encoding, 
int16_t max_level,
     case Encoding::RLE: {
       // TODO: Due to the way we currently check if the buffer is full enough,
       // we need to have MinBufferSize as head room.
-      num_bytes = RleEncoder::MaxBufferSize(bit_width, num_buffered_values) +
-                  RleEncoder::MinBufferSize(bit_width);
+      num_bytes = RleBitPackedEncoder::MaxBufferSize(bit_width, 
num_buffered_values) +
+                  RleBitPackedEncoder::MinBufferSize(bit_width);
       break;
     }
     case Encoding::BIT_PACKED: {
diff --git a/cpp/src/parquet/column_writer.h b/cpp/src/parquet/column_writer.h
index bd329d6105..2a046a0ca5 100644
--- a/cpp/src/parquet/column_writer.h
+++ b/cpp/src/parquet/column_writer.h
@@ -36,7 +36,7 @@ class BitWriter;
 }  // namespace bit_util
 
 namespace util {
-class RleEncoder;
+class RleBitPackedEncoder;
 class CodecOptions;
 }  // namespace util
 
@@ -80,7 +80,7 @@ class PARQUET_EXPORT LevelEncoder {
   int bit_width_;
   int rle_length_;
   Encoding::type encoding_;
-  std::unique_ptr<::arrow::util::RleEncoder> rle_encoder_;
+  std::unique_ptr<::arrow::util::RleBitPackedEncoder> rle_encoder_;
   std::unique_ptr<::arrow::bit_util::BitWriter> bit_packed_encoder_;
 };
 
diff --git a/cpp/src/parquet/decoder.cc b/cpp/src/parquet/decoder.cc
index fc191e8ded..46d1c201e9 100644
--- a/cpp/src/parquet/decoder.cc
+++ b/cpp/src/parquet/decoder.cc
@@ -861,7 +861,8 @@ class DictDecoderImpl : public TypedDecoderImpl<Type>, 
public DictDecoder<Type>
     this->num_values_ = num_values;
     if (len == 0) {
       // Initialize dummy decoder to avoid crashes later on
-      idx_decoder_ = ::arrow::util::RleDecoder(data, len, /*bit_width=*/1);
+      idx_decoder_ =
+          ::arrow::util::RleBitPackedDecoder<int32_t>(data, len, 
/*bit_width=*/1);
       return;
     }
     uint8_t bit_width = *data;
@@ -869,7 +870,7 @@ class DictDecoderImpl : public TypedDecoderImpl<Type>, 
public DictDecoder<Type>
       throw ParquetException("Invalid or corrupted bit_width " +
                              std::to_string(bit_width) + ". Maximum allowed is 
32.");
     }
-    idx_decoder_ = ::arrow::util::RleDecoder(++data, --len, bit_width);
+    idx_decoder_ = ::arrow::util::RleBitPackedDecoder<int32_t>(++data, --len, 
bit_width);
   }
 
   int Decode(T* buffer, int num_values) override {
@@ -1003,7 +1004,7 @@ class DictDecoderImpl : public TypedDecoderImpl<Type>, 
public DictDecoder<Type>
   // BinaryDictionary32Builder
   std::shared_ptr<ResizableBuffer> indices_scratch_space_;
 
-  ::arrow::util::RleDecoder idx_decoder_;
+  ::arrow::util::RleBitPackedDecoder<int32_t> idx_decoder_;
 };
 
 template <typename Type>
@@ -1810,8 +1811,9 @@ class RleBooleanDecoder : public 
TypedDecoderImpl<BooleanType>, public BooleanDe
 
     auto decoder_data = data + 4;
     if (decoder_ == nullptr) {
-      decoder_ = std::make_shared<::arrow::util::RleDecoder>(decoder_data, 
num_bytes,
-                                                             /*bit_width=*/1);
+      decoder_ = std::make_shared<::arrow::util::RleBitPackedDecoder<bool>>(
+          decoder_data, num_bytes,
+          /*bit_width=*/1);
     } else {
       decoder_->Reset(decoder_data, num_bytes, /*bit_width=*/1);
     }
@@ -1898,7 +1900,7 @@ class RleBooleanDecoder : public 
TypedDecoderImpl<BooleanType>, public BooleanDe
   }
 
  private:
-  std::shared_ptr<::arrow::util::RleDecoder> decoder_;
+  std::shared_ptr<::arrow::util::RleBitPackedDecoder<bool>> decoder_;
 };
 
 // ----------------------------------------------------------------------
@@ -2123,7 +2125,7 @@ class DeltaByteArrayDecoderImpl : public 
TypedDecoderImpl<DType> {
   int num_valid_values_{0};
   uint32_t prefix_len_offset_{0};
   std::shared_ptr<ResizableBuffer> buffered_prefix_length_;
-  // buffer for decoded strings, which gurantees the lifetime of the decoded 
strings
+  // buffer for decoded strings, which guarantees the lifetime of the decoded 
strings
   // until the next call of Decode.
   std::shared_ptr<ResizableBuffer> buffered_data_;
 };
diff --git a/cpp/src/parquet/encoder.cc b/cpp/src/parquet/encoder.cc
index 112b810a8f..831ddbddab 100644
--- a/cpp/src/parquet/encoder.cc
+++ b/cpp/src/parquet/encoder.cc
@@ -438,8 +438,8 @@ int RlePreserveBufferSize(int num_values, int bit_width) {
   // is called, we have to reserve an extra "RleEncoder::MinBufferSize"
   // bytes. These extra bytes won't be used but not reserving them
   // would cause the encoder to fail.
-  return ::arrow::util::RleEncoder::MaxBufferSize(bit_width, num_values) +
-         ::arrow::util::RleEncoder::MinBufferSize(bit_width);
+  return ::arrow::util::RleBitPackedEncoder::MaxBufferSize(bit_width, 
num_values) +
+         ::arrow::util::RleBitPackedEncoder::MinBufferSize(bit_width);
 }
 
 /// See the dictionary encoding section of
@@ -476,7 +476,7 @@ class DictEncoderImpl : public EncoderImpl, virtual public 
DictEncoder<DType> {
     ++buffer;
     --buffer_len;
 
-    ::arrow::util::RleEncoder encoder(buffer, buffer_len, bit_width());
+    ::arrow::util::RleBitPackedEncoder encoder(buffer, buffer_len, 
bit_width());
 
     for (int32_t index : buffered_indices_) {
       if (ARROW_PREDICT_FALSE(!encoder.Put(index))) return -1;
@@ -1717,8 +1717,9 @@ std::shared_ptr<Buffer> RleBooleanEncoder::FlushValues() {
   int rle_buffer_size_max = MaxRleBufferSize();
   std::shared_ptr<ResizableBuffer> buffer =
       AllocateBuffer(this->pool_, rle_buffer_size_max + kRleLengthInBytes);
-  ::arrow::util::RleEncoder encoder(buffer->mutable_data() + kRleLengthInBytes,
-                                    rle_buffer_size_max, /*bit_width*/ 
kBitWidth);
+  ::arrow::util::RleBitPackedEncoder encoder(buffer->mutable_data() + 
kRleLengthInBytes,
+                                             rle_buffer_size_max,
+                                             /*bit_width*/ kBitWidth);
 
   for (bool value : buffered_append_values_) {
     encoder.Put(value ? 1 : 0);

Reply via email to