This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 79f97640d0 GH-47112: [Parquet][C++] Rle BitPacked parser (#47294)
79f97640d0 is described below
commit 79f97640d0d79af928a300cb53c5ce6fad3dabe4
Author: Antoine Prouvost <[email protected]>
AuthorDate: Wed Sep 24 12:28:48 2025 +0200
GH-47112: [Parquet][C++] Rle BitPacked parser (#47294)
### Rationale for this change
### What changes are included in this PR?
New independent abstractions:
- A `BitPackedRun` to describe the encoded bytes in a bit packed run.
- A minimal `BitPackedDecoder` that can decode this type of run (no
dict/spaced methods).
- A `RleRun` to describe the encoded value in a RLE run.
- A minimal `RleDecoder` that can decode this type of run (no dict/spaced
methods).
- A `RleBitPackedParser` that read the encoded headers and emits different
runs.
These new abstractions are then plugged into `RleBitPackedDecoder`
(formerly `RleDecode`) to keep the compatibility with the rest of Arrow
(improvements to using the parser independently can come in follow-up PR).
Misc changes:
- Separation of LEB128 reading/writing from `BitReader` into a free
functions, and add check for a special case for handling undefined behavior
overflow.
### Are these changes tested?
Yes, on top of the existing tests, many more unit tests have been added.
### Are there any user-facing changes?
API changes to internal classes.
* GitHub Issue: #47112
Authored-by: AntoinePrv <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
CPPLINT.cfg | 2 +
cpp/src/arrow/util/bit_run_reader.h | 4 +
cpp/src/arrow/util/bit_stream_utils_internal.h | 178 ++--
cpp/src/arrow/util/bit_util.h | 121 +++
cpp/src/arrow/util/bit_util_test.cc | 188 +++-
cpp/src/arrow/util/bitmap_reader.h | 2 +
cpp/src/arrow/util/rle_encoding_internal.h | 1361 +++++++++++++++++-------
cpp/src/arrow/util/rle_encoding_test.cc | 674 ++++++++++--
cpp/src/parquet/column_reader.cc | 8 +-
cpp/src/parquet/column_reader.h | 5 +-
cpp/src/parquet/column_writer.cc | 8 +-
cpp/src/parquet/column_writer.h | 4 +-
cpp/src/parquet/decoder.cc | 16 +-
cpp/src/parquet/encoder.cc | 11 +-
14 files changed, 2004 insertions(+), 578 deletions(-)
diff --git a/CPPLINT.cfg b/CPPLINT.cfg
index 2f47b4dbf5..dd1139ac7f 100644
--- a/CPPLINT.cfg
+++ b/CPPLINT.cfg
@@ -26,5 +26,7 @@ filter = -readability/alt_tokens
filter = -readability/casting
filter = -readability/todo
filter = -runtime/references
+# Let the formatter do the job for whitespaces
filter = -whitespace/comments
+filter = -whitespace/braces
linelength = 90
diff --git a/cpp/src/arrow/util/bit_run_reader.h
b/cpp/src/arrow/util/bit_run_reader.h
index a2cbad5b29..ed7be940a5 100644
--- a/cpp/src/arrow/util/bit_run_reader.h
+++ b/cpp/src/arrow/util/bit_run_reader.h
@@ -52,6 +52,8 @@ inline bool operator!=(const BitRun& lhs, const BitRun& rhs) {
class BitRunReaderLinear {
public:
+ BitRunReaderLinear() = default;
+
BitRunReaderLinear(const uint8_t* bitmap, int64_t start_offset, int64_t
length)
: reader_(bitmap, start_offset, length) {}
@@ -74,6 +76,8 @@ class BitRunReaderLinear {
/// in a bitmap.
class ARROW_EXPORT BitRunReader {
public:
+ BitRunReader() = default;
+
/// \brief Constructs new BitRunReader.
///
/// \param[in] bitmap source data
diff --git a/cpp/src/arrow/util/bit_stream_utils_internal.h
b/cpp/src/arrow/util/bit_stream_utils_internal.h
index 2b5ec3830e..1f3b699e1a 100644
--- a/cpp/src/arrow/util/bit_stream_utils_internal.h
+++ b/cpp/src/arrow/util/bit_stream_utils_internal.h
@@ -22,6 +22,7 @@
#include <algorithm>
#include <cstdint>
#include <cstring>
+#include <type_traits>
#include "arrow/util/bit_util.h"
#include "arrow/util/bpacking_internal.h"
@@ -30,8 +31,7 @@
#include "arrow/util/macros.h"
#include "arrow/util/ubsan.h"
-namespace arrow {
-namespace bit_util {
+namespace arrow::bit_util {
/// Utility class to write bit/byte streams. This class can write data to
either be
/// bit packed or byte aligned (and a single stream that has a mix of both).
@@ -73,19 +73,14 @@ class BitWriter {
/// room. The value is written byte aligned.
/// For more details on vlq:
/// en.wikipedia.org/wiki/Variable-length_quantity
- bool PutVlqInt(uint32_t v);
+ template <typename Int>
+ bool PutVlqInt(Int v);
- // Writes an int zigzag encoded.
- bool PutZigZagVlqInt(int32_t v);
-
- /// Write a Vlq encoded int64 to the buffer. Returns false if there was not
enough
- /// room. The value is written byte aligned.
- /// For more details on vlq:
- /// en.wikipedia.org/wiki/Variable-length_quantity
- bool PutVlqInt(uint64_t v);
-
- // Writes an int64 zigzag encoded.
- bool PutZigZagVlqInt(int64_t v);
+ /// Writes a zigzag encoded signed integer.
+ /// Zigzag encoding is used to encode possibly negative numbers by
alternating positive
+ /// and negative ones.
+ template <typename Int>
+ bool PutZigZagVlqInt(Int v);
/// Get a pointer to the next aligned byte and advance the underlying buffer
/// by num_bytes.
@@ -128,14 +123,14 @@ inline uint64_t ReadLittleEndianWord(const uint8_t*
buffer, int bytes_remaining)
/// bytes in one read (e.g. encoded int).
class BitReader {
public:
- BitReader() = default;
+ BitReader() noexcept = default;
/// 'buffer' is the buffer to read from. The buffer's length is
'buffer_len'.
BitReader(const uint8_t* buffer, int buffer_len) : BitReader() {
Reset(buffer, buffer_len);
}
- void Reset(const uint8_t* buffer, int buffer_len) {
+ void Reset(const uint8_t* buffer, int buffer_len) noexcept {
buffer_ = buffer;
max_bytes_ = buffer_len;
byte_offset_ = 0;
@@ -169,18 +164,14 @@ class BitReader {
/// Reads a vlq encoded int from the stream. The encoded int must start at
/// the beginning of a byte. Return false if there were not enough bytes in
/// the buffer.
- bool GetVlqInt(uint32_t* v);
-
- // Reads a zigzag encoded int `into` v.
- bool GetZigZagVlqInt(int32_t* v);
-
- /// Reads a vlq encoded int64 from the stream. The encoded int must start at
- /// the beginning of a byte. Return false if there were not enough bytes in
- /// the buffer.
- bool GetVlqInt(uint64_t* v);
+ template <typename Int>
+ bool GetVlqInt(Int* v);
- // Reads a zigzag encoded int64 `into` v.
- bool GetZigZagVlqInt(int64_t* v);
+ /// Reads a zigzag encoded integer into a signed integer output v.
+ /// Zigzag encoding is used to decode possibly negative numbers by
alternating positive
+ /// and negative ones.
+ template <typename Int>
+ bool GetZigZagVlqInt(Int* v);
/// Returns the number of bytes left in the stream, not including the current
/// byte (i.e., there may be an additional fraction of a byte).
@@ -189,12 +180,6 @@ class BitReader {
(byte_offset_ +
static_cast<int>(bit_util::BytesForBits(bit_offset_)));
}
- /// Maximum byte length of a vlq encoded int
- static constexpr int kMaxVlqByteLength = 5;
-
- /// Maximum byte length of a vlq encoded int64
- static constexpr int kMaxVlqByteLengthForInt64 = 10;
-
private:
const uint8_t* buffer_;
int max_bytes_;
@@ -439,91 +424,92 @@ inline bool BitReader::Advance(int64_t num_bits) {
return true;
}
-inline bool BitWriter::PutVlqInt(uint32_t v) {
- bool result = true;
- while ((v & 0xFFFFFF80UL) != 0UL) {
- result &= PutAligned<uint8_t>(static_cast<uint8_t>((v & 0x7F) | 0x80), 1);
- v >>= 7;
- }
- result &= PutAligned<uint8_t>(static_cast<uint8_t>(v & 0x7F), 1);
- return result;
-}
+template <typename Int>
+inline bool BitWriter::PutVlqInt(Int v) {
+ static_assert(std::is_integral_v<Int>);
-inline bool BitReader::GetVlqInt(uint32_t* v) {
- uint32_t tmp = 0;
+ constexpr auto kBufferSize = kMaxLEB128ByteLenFor<Int>;
- for (int i = 0; i < kMaxVlqByteLength; i++) {
- uint8_t byte = 0;
- if (ARROW_PREDICT_FALSE(!GetAligned<uint8_t>(1, &byte))) {
+ uint8_t buffer[kBufferSize] = {};
+ const auto bytes_written = WriteLEB128(v, buffer, kBufferSize);
+ ARROW_DCHECK_LE(bytes_written, kBufferSize);
+ if constexpr (std::is_signed_v<Int>) {
+ // Can fail if negative
+ if (ARROW_PREDICT_FALSE(!bytes_written == 0)) {
return false;
}
- tmp |= static_cast<uint32_t>(byte & 0x7F) << (7 * i);
+ } else {
+ // Cannot fail since we gave max space
+ ARROW_DCHECK_GT(bytes_written, 0);
+ }
- if ((byte & 0x80) == 0) {
- *v = tmp;
- return true;
+ for (int i = 0; i < bytes_written; ++i) {
+ const bool success = PutAligned(buffer[i], 1);
+ if (ARROW_PREDICT_FALSE(!success)) {
+ return false;
}
}
- return false;
-}
-
-inline bool BitWriter::PutZigZagVlqInt(int32_t v) {
- uint32_t u_v = ::arrow::util::SafeCopy<uint32_t>(v);
- u_v = (u_v << 1) ^ static_cast<uint32_t>(v >> 31);
- return PutVlqInt(u_v);
-}
-
-inline bool BitReader::GetZigZagVlqInt(int32_t* v) {
- uint32_t u;
- if (!GetVlqInt(&u)) return false;
- u = (u >> 1) ^ (~(u & 1) + 1);
- *v = ::arrow::util::SafeCopy<int32_t>(u);
return true;
}
-inline bool BitWriter::PutVlqInt(uint64_t v) {
- bool result = true;
- while ((v & 0xFFFFFFFFFFFFFF80ULL) != 0ULL) {
- result &= PutAligned<uint8_t>(static_cast<uint8_t>((v & 0x7F) | 0x80), 1);
- v >>= 7;
+template <typename Int>
+inline bool BitReader::GetVlqInt(Int* v) {
+ static_assert(std::is_integral_v<Int>);
+
+ // The data that we will pass to the LEB128 parser
+ // In all case, we read a byte-aligned value, skipping remaining bits
+ const uint8_t* data = NULLPTR;
+ int max_size = 0;
+
+ // Number of bytes left in the buffered values, not including the current
+ // byte (i.e., there may be an additional fraction of a byte).
+ const int bytes_left_in_cache =
+ sizeof(buffered_values_) -
static_cast<int>(bit_util::BytesForBits(bit_offset_));
+
+ // If there are clearly enough bytes left we can try to parse from the cache
+ if (bytes_left_in_cache >= kMaxLEB128ByteLenFor<Int>) {
+ max_size = bytes_left_in_cache;
+ data = reinterpret_cast<const uint8_t*>(&buffered_values_) +
+ bit_util::BytesForBits(bit_offset_);
+ // Otherwise, we try straight from buffer (ignoring few bytes that may be
cached)
+ } else {
+ max_size = bytes_left();
+ data = buffer_ + (max_bytes_ - max_size);
}
- result &= PutAligned<uint8_t>(static_cast<uint8_t>(v & 0x7F), 1);
- return result;
-}
-inline bool BitReader::GetVlqInt(uint64_t* v) {
- uint64_t tmp = 0;
-
- for (int i = 0; i < kMaxVlqByteLengthForInt64; i++) {
- uint8_t byte = 0;
- if (ARROW_PREDICT_FALSE(!GetAligned<uint8_t>(1, &byte))) {
- return false;
- }
- tmp |= static_cast<uint64_t>(byte & 0x7F) << (7 * i);
-
- if ((byte & 0x80) == 0) {
- *v = tmp;
- return true;
- }
+ const auto bytes_read = bit_util::ParseLeadingLEB128(data, max_size, v);
+ if (ARROW_PREDICT_FALSE(bytes_read == 0)) {
+ // Corrupt LEB128
+ return false;
}
- return false;
+ // Advance for the bytes we have read + the bits we skipped
+ return Advance((8 * bytes_read) + (bit_offset_ % 8));
}
-inline bool BitWriter::PutZigZagVlqInt(int64_t v) {
- uint64_t u_v = ::arrow::util::SafeCopy<uint64_t>(v);
- u_v = (u_v << 1) ^ static_cast<uint64_t>(v >> 63);
+template <typename Int>
+inline bool BitWriter::PutZigZagVlqInt(Int v) {
+ static_assert(std::is_integral_v<Int>);
+ static_assert(std::is_signed_v<Int>);
+ using UInt = std::make_unsigned_t<Int>;
+ constexpr auto kBitSize = 8 * sizeof(Int);
+
+ UInt u_v = ::arrow::util::SafeCopy<UInt>(v);
+ u_v = (u_v << 1) ^ static_cast<UInt>(v >> (kBitSize - 1));
return PutVlqInt(u_v);
}
-inline bool BitReader::GetZigZagVlqInt(int64_t* v) {
- uint64_t u;
+template <typename Int>
+inline bool BitReader::GetZigZagVlqInt(Int* v) {
+ static_assert(std::is_integral_v<Int>);
+ static_assert(std::is_signed_v<Int>);
+
+ std::make_unsigned_t<Int> u;
if (!GetVlqInt(&u)) return false;
u = (u >> 1) ^ (~(u & 1) + 1);
- *v = ::arrow::util::SafeCopy<int64_t>(u);
+ *v = ::arrow::util::SafeCopy<Int>(u);
return true;
}
-} // namespace bit_util
-} // namespace arrow
+} // namespace arrow::bit_util
diff --git a/cpp/src/arrow/util/bit_util.h b/cpp/src/arrow/util/bit_util.h
index 13d265f0be..8d4811ede7 100644
--- a/cpp/src/arrow/util/bit_util.h
+++ b/cpp/src/arrow/util/bit_util.h
@@ -365,5 +365,126 @@ void PackBits(const uint32_t* values, uint8_t* out) {
}
}
+constexpr int64_t MaxLEB128ByteLen(int64_t n_bits) { return CeilDiv(n_bits,
7); }
+
+template <typename Int>
+constexpr int64_t kMaxLEB128ByteLenFor = MaxLEB128ByteLen(sizeof(Int) * 8);
+
+/// Write a integer as LEB128
+///
+/// Write the input value as LEB128 into the outptut buffer and return the
number of bytes
+/// written.
+/// If the output buffer size is insufficient, return 0 but the output may
have been
+/// written to.
+/// The input value can be a signed integer, but must be non negative.
+///
+/// \see https://en.wikipedia.org/wiki/LEB128
+/// \see MaxLEB128ByteLenFor
+template <typename Int>
+constexpr int32_t WriteLEB128(Int value, uint8_t* out, int32_t max_out_size) {
+ constexpr Int kLow7Mask = Int(0x7F);
+ constexpr Int kHigh7Mask = ~kLow7Mask;
+ constexpr uint8_t kContinuationBit = 0x80;
+
+ // This encoding does not work for negative values
+ if constexpr (std::is_signed_v<Int>) {
+ if (ARROW_PREDICT_FALSE(value < 0)) {
+ return 0;
+ }
+ }
+
+ const auto out_first = out;
+
+ // Write as many bytes as we could be for the given input
+ while ((value & kHigh7Mask) != Int(0)) {
+ // We do not have enough room to write the LEB128
+ if (ARROW_PREDICT_FALSE(out - out_first >= max_out_size)) {
+ return 0;
+ }
+
+ // Write the encoded byte with continuation bit
+ *out = static_cast<uint8_t>(value & kLow7Mask) | kContinuationBit;
+ ++out;
+ // Shift remaining data
+ value >>= 7;
+ }
+
+ // We do not have enough room to write the LEB128
+ if (ARROW_PREDICT_FALSE(out - out_first >= max_out_size)) {
+ return 0;
+ }
+
+ // Write last non-continuing byte
+ *out = static_cast<uint8_t>(value & kLow7Mask);
+ ++out;
+
+ return static_cast<int32_t>(out - out_first);
+}
+
+/// Parse a leading LEB128
+///
+/// Take as input a data pointer and the maximum number of bytes that can be
read from it
+/// (typically the array size).
+/// When a valid LEB128 is found at the start of the data, the function writes
it to the
+/// out pointer and return the number of bytes read.
+/// Otherwise, the out pointer is unmodified and zero is returned.
+///
+/// \see https://en.wikipedia.org/wiki/LEB128
+/// \see MaxLEB128ByteLenFor
+template <typename Int>
+constexpr int32_t ParseLeadingLEB128(const uint8_t* data, int32_t
max_data_size,
+ Int* out) {
+ constexpr auto kMaxBytes = static_cast<int32_t>(kMaxLEB128ByteLenFor<Int>);
+ static_assert(kMaxBytes >= 1);
+ constexpr uint8_t kLow7Mask = 0x7F;
+ constexpr uint8_t kContinuationBit = 0x80;
+ constexpr int32_t kSignBitCount = std::is_signed_v<Int> ? 1 : 0;
+ // Number of bits allowed for encoding data on the last byte to avoid
overflow
+ constexpr uint8_t kHighBitCount = (8 * sizeof(Int) - kSignBitCount) % 7;
+ // kHighBitCount least significant `0` bits and the rest with `1`
+ constexpr uint8_t kHighForbiddenMask = ~((1 << kHighBitCount) - 1);
+
+ // Iteratively building the value
+ std::make_unsigned_t<Int> value = 0;
+
+ // Read as many bytes as we could be for the given output.
+ for (int32_t i = 0; i < kMaxBytes - 1; i++) {
+ // We have not finished reading a valid LEB128, yet we run out of data
+ if (ARROW_PREDICT_FALSE(i >= max_data_size)) {
+ return 0;
+ }
+
+ // Read the byte and set its 7 LSB to in the final value
+ const uint8_t byte = data[i];
+ value |= static_cast<Int>(byte & kLow7Mask) << (7 * i);
+
+ // Check for lack of continuation flag in MSB
+ if ((byte & kContinuationBit) == 0) {
+ *out = value;
+ return i + 1;
+ }
+ }
+
+ // Process the last index avoiding overflowing
+ constexpr int32_t last = kMaxBytes - 1;
+
+ // We have not finished reading a valid LEB128, yet we run out of data
+ if (ARROW_PREDICT_FALSE(last >= max_data_size)) {
+ return 0;
+ }
+
+ const uint8_t byte = data[last];
+
+ // Need to check if there are bits that would overflow the output.
+ // Also checks that there is no continuation.
+ if (ARROW_PREDICT_FALSE((byte & kHighForbiddenMask) != 0)) {
+ return 0;
+ }
+
+ // No longer need to mask since we ensured
+ value |= static_cast<Int>(byte) << (7 * last);
+ *out = value;
+ return last + 1;
+}
} // namespace bit_util
} // namespace arrow
diff --git a/cpp/src/arrow/util/bit_util_test.cc
b/cpp/src/arrow/util/bit_util_test.cc
index fcaeb49261..e8cee340de 100644
--- a/cpp/src/arrow/util/bit_util_test.cc
+++ b/cpp/src/arrow/util/bit_util_test.cc
@@ -1997,11 +1997,189 @@ TEST(BitUtil, RoundUpToPowerOf2) {
#undef U64
#undef S64
+/// Test the maximum number of bytes needed to write a LEB128 of a give size.
+TEST(LEB128, MaxLEB128ByteLenFor) {
+ EXPECT_EQ(bit_util::kMaxLEB128ByteLenFor<int8_t>, 2);
+ EXPECT_EQ(bit_util::kMaxLEB128ByteLenFor<uint8_t>, 2);
+ EXPECT_EQ(bit_util::kMaxLEB128ByteLenFor<int16_t>, 3);
+ EXPECT_EQ(bit_util::kMaxLEB128ByteLenFor<uint16_t>, 3);
+ EXPECT_EQ(bit_util::kMaxLEB128ByteLenFor<int32_t>, 5);
+ EXPECT_EQ(bit_util::kMaxLEB128ByteLenFor<uint32_t>, 5);
+ EXPECT_EQ(bit_util::kMaxLEB128ByteLenFor<int64_t>, 10);
+ EXPECT_EQ(bit_util::kMaxLEB128ByteLenFor<uint64_t>, 10);
+}
+
+/// Utility function to test LEB128 encoding with known input value and
expected byte
+/// array
+template <typename Int>
+void TestLEB128Encode(Int input_value, const std::vector<uint8_t>&
expected_data,
+ std::size_t buffer_size) {
+ std::vector<uint8_t> buffer(buffer_size);
+ auto bytes_written = bit_util::WriteLEB128(input_value, buffer.data(),
+
static_cast<int32_t>(buffer.size()));
+
+ EXPECT_EQ(bytes_written, expected_data.size());
+ // Encoded data
+ for (std::size_t i = 0; i < expected_data.size(); ++i) {
+ EXPECT_EQ(buffer.at(i), expected_data.at(i));
+ }
+
+ // When the value is successfully encoded, the remaining of the buffer is
untouched
+ if (bytes_written > 0) {
+ for (std::size_t i = bytes_written; i < buffer.size(); ++i) {
+ EXPECT_EQ(buffer.at(i), 0);
+ }
+ }
+}
+
+/// Test encoding to known LEB128 byte sequences with edge cases parameters.
+/// \see LEB128.KnownSuccessfulValues for other known values tested.
+TEST(LEB128, WriteEdgeCases) {
+ // Single byte value 0
+ TestLEB128Encode(0U, {0x00}, 1);
+ // Single byte value 127
+ TestLEB128Encode(127U, {0x7F}, 1);
+ // Three byte value 16384, encoded in larger buffer
+ TestLEB128Encode(16384U, {0x80, 0x80, 0x01}, 10);
+ // Two byte boundary values
+ TestLEB128Encode(128U, {0x80, 0x01}, 2);
+ TestLEB128Encode(129U, {0x81, 0x01}, 2);
+ TestLEB128Encode(16383U, {0xFF, 0x7F}, 2);
+ // Error case: Buffer too small for value 128 (needs 2 bytes but only 1
provided)
+ TestLEB128Encode(128U, {}, 1);
+ // Error case: Buffer too small for uint32_t max (needs 5 bytes but only 4
provided)
+ TestLEB128Encode(4294967295U, {}, 4);
+ // Error case: Zero buffer size
+ TestLEB128Encode(52U, {}, 0);
+ // Error case: Negative value
+ TestLEB128Encode(-3, {}, 1);
+}
+
+/// Utility function to test LEB128 decoding with known byte array and
expected result
+template <typename Int>
+void TestLEB128Decode(const std::vector<uint8_t>& data, Int expected_value,
+ int32_t expected_bytes_read) {
+ Int result = 0;
+ auto bytes_read = bit_util::ParseLeadingLEB128(
+ data.data(), static_cast<int32_t>(data.size()), &result);
+ EXPECT_EQ(bytes_read, expected_bytes_read);
+ if (expected_bytes_read > 0) {
+ EXPECT_EQ(result, expected_value);
+ }
+}
+
+/// Test decoding from known LEB128 byte sequences with edge case parameters.
+/// \see LEB128.KnownSuccessfulValues for other known values tested.
+TEST(LEB128, ReadEdgeCases) {
+ // Single byte value 0
+ TestLEB128Decode({0x00}, 0U, 1);
+ // Single byte value 127
+ TestLEB128Decode({0x7F}, 127U, 1);
+ // Three byte value 16384, with remaining data
+ TestLEB128Decode({0x80, 0x80, 0x01, 0x80, 0x00}, 16384U, 3);
+ // Four byte value 268435455
+ TestLEB128Decode({0xFF, 0xFF, 0xFF, 0x7F}, 268435455U, 4);
+ // Error case: Truncated sequence (continuation bit set but no more data)
+ TestLEB128Decode({0x80}, 0U, 0);
+ // Error case: Input has exactly the maximum number of bytes for a int32_t
(5),
+ // but the decoded value overflows nonetheless (7 * 5 = 35 bits of data).
+ TestLEB128Decode({0xFF, 0xFF, 0xFF, 0xFF, 0x7F}, int32_t{}, 0);
+ // Error case: Oversized sequence for uint32_t (too many bytes)
+ TestLEB128Decode({0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x01}, 0U, 0);
+}
+
+struct KnownLEB128Encoding {
+ uint64_t value;
+ std::vector<uint8_t> bytes;
+};
+
+static const std::vector<KnownLEB128Encoding> KnownLEB128EncodingValues{
+ {0, {0x00}},
+ {1, {0x01}},
+ {63, {0x3F}},
+ {64, {0x40}},
+ {127U, {0x7F}},
+ {128, {0x80, 0x01}},
+ {300, {0xAC, 0x02}},
+ {16384, {0x80, 0x80, 0x01}},
+ {268435455, {0xFF, 0xFF, 0xFF, 0x7F}},
+ {static_cast<uint64_t>(std::numeric_limits<uint8_t>::max()), {0xFF, 0x01}},
+ {static_cast<uint64_t>(std::numeric_limits<int8_t>::max()), {0x7F}},
+ {static_cast<uint64_t>(std::numeric_limits<uint16_t>::max()), {0xFF, 0xFF,
0x03}},
+ {static_cast<uint64_t>(std::numeric_limits<int16_t>::max()), {0xFF, 0xFF,
0x01}},
+ {static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()),
+ {0xFF, 0xFF, 0xFF, 0xFF, 0x0F}},
+ {static_cast<uint64_t>(std::numeric_limits<int32_t>::max()),
+ {0xFF, 0xFF, 0xFF, 0xFF, 0x7}},
+ {static_cast<uint64_t>(std::numeric_limits<uint64_t>::max()),
+ {0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x01}},
+ {static_cast<uint64_t>(std::numeric_limits<int64_t>::max()),
+ {0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x7F}},
+};
+
+/// Test encoding and decoding to known LEB128 byte sequences with all possible
+/// integer sizes and signess.
+TEST(LEB128, KnownSuccessfulValues) {
+ for (const auto& data : KnownLEB128EncodingValues) {
+ SCOPED_TRACE("Testing value " + std::to_string(data.value));
+
+ // 8 bits
+ if (data.value <=
static_cast<uint64_t>(std::numeric_limits<int8_t>::max())) {
+ const auto val = static_cast<int8_t>(data.value);
+ TestLEB128Encode(val, data.bytes, data.bytes.size());
+ TestLEB128Decode(data.bytes, val,
static_cast<int32_t>(data.bytes.size()));
+ }
+ if (data.value <=
static_cast<uint64_t>(std::numeric_limits<uint8_t>::max())) {
+ const auto val = static_cast<uint8_t>(data.value);
+ TestLEB128Encode(val, data.bytes, data.bytes.size());
+ TestLEB128Decode(data.bytes, val,
static_cast<int32_t>(data.bytes.size()));
+ }
+
+ // 16 bits
+ if (data.value <=
static_cast<uint64_t>(std::numeric_limits<int16_t>::max())) {
+ const auto val = static_cast<int16_t>(data.value);
+ TestLEB128Encode(val, data.bytes, data.bytes.size());
+ TestLEB128Decode(data.bytes, val,
static_cast<int32_t>(data.bytes.size()));
+ }
+ if (data.value <=
static_cast<uint64_t>(std::numeric_limits<uint16_t>::max())) {
+ const auto val = static_cast<uint16_t>(data.value);
+ TestLEB128Encode(val, data.bytes, data.bytes.size());
+ TestLEB128Decode(data.bytes, val,
static_cast<int32_t>(data.bytes.size()));
+ }
+
+ // 32 bits
+ if (data.value <=
static_cast<uint64_t>(std::numeric_limits<int32_t>::max())) {
+ const auto val = static_cast<int32_t>(data.value);
+ TestLEB128Encode(val, data.bytes, data.bytes.size());
+ TestLEB128Decode(data.bytes, val,
static_cast<int32_t>(data.bytes.size()));
+ }
+ if (data.value <=
static_cast<uint64_t>(std::numeric_limits<uint32_t>::max())) {
+ const auto val = static_cast<uint32_t>(data.value);
+ TestLEB128Encode(val, data.bytes, data.bytes.size());
+ TestLEB128Decode(data.bytes, val,
static_cast<int32_t>(data.bytes.size()));
+ }
+
+ // 64 bits
+ if (data.value <=
static_cast<uint64_t>(std::numeric_limits<int64_t>::max())) {
+ const auto val = static_cast<int64_t>(data.value);
+ TestLEB128Encode(val, data.bytes, data.bytes.size());
+ TestLEB128Decode(data.bytes, val,
static_cast<int32_t>(data.bytes.size()));
+ }
+ if (data.value <=
static_cast<uint64_t>(std::numeric_limits<uint64_t>::max())) {
+ const auto val = static_cast<uint64_t>(data.value);
+ TestLEB128Encode(val, data.bytes, data.bytes.size());
+ TestLEB128Decode(data.bytes, val,
static_cast<int32_t>(data.bytes.size()));
+ }
+ }
+}
+
static void TestZigZag(int32_t v, std::array<uint8_t, 5> buffer_expect) {
- uint8_t buffer[bit_util::BitReader::kMaxVlqByteLength] = {};
+ uint8_t buffer[bit_util::kMaxLEB128ByteLenFor<int32_t>] = {};
bit_util::BitWriter writer(buffer, sizeof(buffer));
- bit_util::BitReader reader(buffer, sizeof(buffer));
writer.PutZigZagVlqInt(v);
+ // WARNING: The reader reads and caches the input when created, so it must
be created
+ // after the data is written in the buffer.
+ bit_util::BitReader reader(buffer, sizeof(buffer));
EXPECT_THAT(buffer, testing::ElementsAreArray(buffer_expect));
int32_t result;
EXPECT_TRUE(reader.GetZigZagVlqInt(&result));
@@ -2020,10 +2198,12 @@ TEST(BitStreamUtil, ZigZag) {
}
static void TestZigZag64(int64_t v, std::array<uint8_t, 10> buffer_expect) {
- uint8_t buffer[bit_util::BitReader::kMaxVlqByteLengthForInt64] = {};
+ uint8_t buffer[bit_util::kMaxLEB128ByteLenFor<int64_t>] = {};
bit_util::BitWriter writer(buffer, sizeof(buffer));
- bit_util::BitReader reader(buffer, sizeof(buffer));
writer.PutZigZagVlqInt(v);
+ // WARNING: The reader reads and caches the input when created, so it must
be created
+ // after the data is written in the buffer.
+ bit_util::BitReader reader(buffer, sizeof(buffer));
EXPECT_THAT(buffer, testing::ElementsAreArray(buffer_expect));
int64_t result = 0;
EXPECT_TRUE(reader.GetZigZagVlqInt(&result));
diff --git a/cpp/src/arrow/util/bitmap_reader.h
b/cpp/src/arrow/util/bitmap_reader.h
index 5526c87dbc..d95fd921f4 100644
--- a/cpp/src/arrow/util/bitmap_reader.h
+++ b/cpp/src/arrow/util/bitmap_reader.h
@@ -31,6 +31,8 @@ namespace internal {
class BitmapReader {
public:
+ BitmapReader() = default;
+
BitmapReader(const uint8_t* bitmap, int64_t start_offset, int64_t length)
: bitmap_(bitmap), position_(0), length_(length) {
current_byte_ = 0;
diff --git a/cpp/src/arrow/util/rle_encoding_internal.h
b/cpp/src/arrow/util/rle_encoding_internal.h
index fab8c50512..c231c9a63e 100644
--- a/cpp/src/arrow/util/rle_encoding_internal.h
+++ b/cpp/src/arrow/util/rle_encoding_internal.h
@@ -21,18 +21,17 @@
#pragma once
#include <algorithm>
-#include <cmath>
+#include <array>
#include <limits>
-#include <vector>
+#include <type_traits>
+#include <variant>
-#include "arrow/util/bit_block_counter.h"
#include "arrow/util/bit_run_reader.h"
#include "arrow/util/bit_stream_utils_internal.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/macros.h"
-namespace arrow {
-namespace util {
+namespace arrow::util {
/// Utility classes to do run length encoding (RLE) for fixed bit width
values. If runs
/// are sufficiently long, RLE is used, otherwise, the values are just
bit-packed
@@ -82,86 +81,411 @@ namespace util {
/// 200 ints = 25 groups of 8
/// <varint((25 << 1) | 1)> <25 bytes of values, bitpacked>
/// (total 26 bytes, 1 byte overhead)
-//
-/// Decoder class for RLE encoded data.
-class RleDecoder {
+/// The type for an encoded Rle of BitPacked run size, between 1 and 2^31-1 as
per Parquet
+/// spec.
+/// This is also pragmatically used for other integer used in the Rle and
BitPacked runs
+/// and decoder to avoid conversions.
+/// It can therefore be referred to as a "typical" size for Rle and BitPacked
logic.
+using rle_size_t = int32_t;
+
+template <typename T>
+class RleRunDecoder;
+
+/// A Single Run Length Encoded run.
+///
+/// Consist of a single value repeated multiple times.
+/// A previous version of this class also stored the value bit width to be
self contain,
+/// removing it and passing it explicitly when needed proved to speed up
decoding up to
+/// 10 % on some benchmarks.
+class RleRun {
public:
- /// Create a decoder object. buffer/buffer_len is the decoded data.
- /// bit_width is the width of each value (before encoding).
- RleDecoder(const uint8_t* buffer, int buffer_len, int bit_width)
- : bit_reader_(buffer, buffer_len),
- bit_width_(bit_width),
- current_value_(0),
- repeat_count_(0),
- literal_count_(0) {
- ARROW_DCHECK_GE(bit_width_, 0);
- ARROW_DCHECK_LE(bit_width_, 64);
+ /// The decoder class used to decode a single run in the given type.
+ template <typename T>
+ using DecoderType = RleRunDecoder<T>;
+
+ constexpr RleRun() noexcept = default;
+
+ explicit RleRun(const uint8_t* data, rle_size_t values_count,
+ rle_size_t value_bit_width) noexcept
+ : values_count_(values_count) {
+ ARROW_DCHECK_GE(value_bit_width, 0);
+ ARROW_DCHECK_GE(values_count, 0);
+ std::copy(data, data + raw_data_size(value_bit_width), data_.begin());
}
- RleDecoder() : bit_width_(-1) {}
+ /// The number of repeated values in this run.
+ constexpr rle_size_t values_count() const noexcept { return values_count_; }
- void Reset(const uint8_t* buffer, int buffer_len, int bit_width) {
- ARROW_DCHECK_GE(bit_width, 0);
- ARROW_DCHECK_LE(bit_width, 64);
- bit_reader_.Reset(buffer, buffer_len);
- bit_width_ = bit_width;
- current_value_ = 0;
- repeat_count_ = 0;
- literal_count_ = 0;
+ /// A pointer to the repeated value raw bytes.
+ constexpr const uint8_t* raw_data_ptr() const noexcept { return
data_.data(); }
+
+ /// The number of bytes used for the raw repeated value.
+ constexpr rle_size_t raw_data_size(rle_size_t value_bit_width) const
noexcept {
+ auto out = bit_util::BytesForBits(value_bit_width);
+ ARROW_DCHECK_LE(out, std::numeric_limits<rle_size_t>::max());
+ return static_cast<rle_size_t>(out);
}
- /// Gets the next value. Returns false if there are no more.
+ private:
+ /// The repeated value raw bytes stored inside the class with enough space
to store
+ /// up to a 64 bit value.
+ std::array<uint8_t, 8> data_ = {};
+ /// The number of time the value is repeated.
+ rle_size_t values_count_ = 0;
+};
+
+template <typename T>
+class BitPackedRunDecoder;
+
+/// A single bit packed run.
+///
+/// Consist of a view on a buffer of bytes that encode integers on
``value_bit_width``
+/// bits (that is the numbers are small enough that high order bits are all
zeros and can
+/// be omitted).
+/// A previous version of this class also stored the value bit width to be
self contain,
+/// removing it and passing it explicitly when needed proved to speed up
decoding up to
+/// 10 % on some benchmarks.
+class BitPackedRun {
+ public:
+ /// The decoder class used to decode a single run in the given type.
+ template <typename T>
+ using DecoderType = BitPackedRunDecoder<T>;
+
+ constexpr BitPackedRun() noexcept = default;
+
+ constexpr BitPackedRun(const uint8_t* data, rle_size_t values_count,
+ rle_size_t value_bit_width) noexcept
+ : data_(data), values_count_(values_count) {
+ ARROW_CHECK_GE(value_bit_width, 0);
+ ARROW_CHECK_GE(values_count_, 0);
+ }
+
+ constexpr rle_size_t values_count() const noexcept { return values_count_; }
+
+ constexpr const uint8_t* raw_data_ptr() const noexcept { return data_; }
+
+ constexpr rle_size_t raw_data_size(rle_size_t value_bit_width) const
noexcept {
+ auto out = bit_util::BytesForBits(static_cast<int64_t>(value_bit_width) *
+ static_cast<int64_t>(values_count_));
+ ARROW_CHECK_LE(out, std::numeric_limits<rle_size_t>::max());
+ return static_cast<rle_size_t>(out);
+ }
+
+ private:
+ /// The pointer to the beginning of the run
+ const uint8_t* data_ = nullptr;
+ /// Number of values in this run.
+ rle_size_t values_count_ = 0;
+};
+
+/// A parser that emits either a ``BitPackedRun`` or a ``RleRun``.
+class RleBitPackedParser {
+ public:
+ /// The different types of runs emitted by the parser
+ using dynamic_run_type = std::variant<RleRun, BitPackedRun>;
+
+ constexpr RleBitPackedParser() noexcept = default;
+
+ constexpr RleBitPackedParser(const uint8_t* data, rle_size_t data_size,
+ rle_size_t value_bit_width) noexcept
+ : data_(data), data_size_(data_size), value_bit_width_(value_bit_width)
{}
+
+ constexpr void Reset(const uint8_t* data, rle_size_t data_size,
+ rle_size_t value_bit_width) noexcept {
+ *this = {data, data_size, value_bit_width};
+ }
+
+ /// Whether there is still runs to iterate over.
+ ///
+ /// WARN: Due to simplistic error handling, iteration with Next and Peek
could
+ /// fail to return data while the parser is not exhausted.
+ /// This is how one can check for errors.
+ bool exhausted() const { return data_size_ == 0; }
+
+ /// Enum to return from an ``Parse`` handler.
+ ///
+ /// Since a callback has no way to know when to stop, the handler must return
+ /// a value indicating to the ``Parse`` function whether to stop or continue.
+ enum class ControlFlow {
+ Continue,
+ Break,
+ };
+
+ /// A callback approach to parsing.
+ ///
+ /// This approach is used to reduce the number of dynamic lookups involved
with using a
+ /// variant.
+ ///
+ /// The handler must be of the form
+ /// ```cpp
+ /// struct Handler {
+ /// ControlFlow OnBitPackedRun(BitPackedRun run);
+ ///
+ /// ControlFlow OnRleRun(RleRun run);
+ /// };
+ /// ```
+ template <typename Handler>
+ void Parse(Handler&& handler);
+
+ private:
+ /// The pointer to the beginning of the run
+ const uint8_t* data_ = nullptr;
+ /// Size in bytes of the run.
+ rle_size_t data_size_ = 0;
+ /// The size in bit of a packed value in the run
+ rle_size_t value_bit_width_ = 0;
+
+ /// Run the handler on the run read and return the number of values read.
+ /// Does not advance the parser.
+ template <typename Handler>
+ std::pair<rle_size_t, ControlFlow> PeekImpl(Handler&&) const;
+};
+
+/// Decoder class for a single run of RLE encoded data.
+template <typename T>
+class RleRunDecoder {
+ public:
+ /// The type in which the data should be decoded.
+ using value_type = T;
+ /// The type of run that can be decoded.
+ using RunType = RleRun;
+
+ constexpr RleRunDecoder() noexcept = default;
+
+ explicit RleRunDecoder(const RunType& run, rle_size_t value_bit_width)
noexcept {
+ Reset(run, value_bit_width);
+ }
+
+ void Reset(const RunType& run, rle_size_t value_bit_width) noexcept {
+ remaining_count_ = run.values_count();
+ if constexpr (std::is_same_v<value_type, bool>) {
+ // ARROW-18031: just check the LSB of the next byte and move on.
+ // If we memcpy + FromLittleEndian, we have potential undefined behavior
+ // if the bool value isn't 0 or 1.
+ value_ = *run.raw_data_ptr() & 1;
+ } else {
+ // Memcopy is required to avoid undefined behavior.
+ value_ = {};
+ std::memcpy(&value_, run.raw_data_ptr(),
run.raw_data_size(value_bit_width));
+ value_ = ::arrow::bit_util::FromLittleEndian(value_);
+ }
+ }
+
+ /// Return the number of values that can be advanced.
+ rle_size_t remaining() const { return remaining_count_; }
+
+ /// Return the repeated value of this decoder.
+ constexpr value_type value() const { return value_; }
+
+ /// Try to advance by as many values as provided.
+ /// Return the number of values skipped.
+ /// May advance by less than asked for if there are not enough values left.
+ [[nodiscard]] rle_size_t Advance(rle_size_t batch_size, rle_size_t
value_bit_width) {
+ const auto steps = std::min(batch_size, remaining_count_);
+ remaining_count_ -= steps;
+ return steps;
+ }
+
+ /// Get the next value and return false if there are no more.
+ [[nodiscard]] constexpr bool Get(value_type* out_value, rle_size_t
value_bit_width) {
+ return GetBatch(out_value, 1, value_bit_width) == 1;
+ }
+
+ /// Get a batch of values return the number of decoded elements.
+ /// May write fewer elements to the output than requested if there are not
enough values
+ /// left.
+ [[nodiscard]] rle_size_t GetBatch(value_type* out, rle_size_t batch_size,
+ rle_size_t value_bit_width) {
+ if (ARROW_PREDICT_FALSE(remaining_count_ == 0)) {
+ return 0;
+ }
+
+ const auto to_read = std::min(remaining_count_, batch_size);
+ std::fill(out, out + to_read, value_);
+ remaining_count_ -= to_read;
+ return to_read;
+ }
+
+ private:
+ value_type value_ = {};
+ rle_size_t remaining_count_ = 0;
+
+ static_assert(std::is_integral_v<value_type>,
+ "This class is meant to decode positive integers");
+};
+
+/// Decoder class for single run of bit-packed encoded data.
+template <typename T>
+class BitPackedRunDecoder {
+ public:
+ /// The type in which the data should be decoded.
+ using value_type = T;
+ /// The type of run that can be decoded.
+ using RunType = BitPackedRun;
+
+ BitPackedRunDecoder() noexcept = default;
+
+ explicit BitPackedRunDecoder(const RunType& run, rle_size_t value_bit_width)
noexcept {
+ Reset(run, value_bit_width);
+ }
+
+ void Reset(const RunType& run, rle_size_t value_bit_width) noexcept {
+ remaining_count_ = run.values_count();
+ ARROW_DCHECK_GE(value_bit_width, 0);
+ ARROW_DCHECK_LE(value_bit_width, 64);
+ bit_reader_.Reset(run.raw_data_ptr(), run.raw_data_size(value_bit_width));
+ }
+
+ /// Return the number of values that can be advanced.
+ constexpr rle_size_t remaining() const { return remaining_count_; }
+
+ /// Try to advance by as many values as provided.
+ /// Return the number of values skipped or 0 if it fail to advance.
+ /// May advance by less than asked for if there are not enough values left.
+ [[nodiscard]] rle_size_t Advance(rle_size_t batch_size, rle_size_t
value_bit_width) {
+ const auto steps = std::min(batch_size, remaining_count_);
+ if (bit_reader_.Advance(steps * value_bit_width)) {
+ remaining_count_ -= steps;
+ return steps;
+ }
+ return 0;
+ }
+
+ /// Get the next value and return false if there are no more or an error
occurred.
+ [[nodiscard]] constexpr bool Get(value_type* out_value, rle_size_t
value_bit_width) {
+ return GetBatch(out_value, 1, value_bit_width) == 1;
+ }
+
+ /// Get a batch of values return the number of decoded elements.
+ /// May write fewer elements to the output than requested if there are not
enough values
+ /// left or if an error occurred.
+ [[nodiscard]] rle_size_t GetBatch(value_type* out, rle_size_t batch_size,
+ rle_size_t value_bit_width) {
+ if (ARROW_PREDICT_FALSE(remaining_count_ == 0)) {
+ return 0;
+ }
+
+ const auto to_read = std::min(remaining_count_, batch_size);
+ const auto actual_read = bit_reader_.GetBatch(value_bit_width, out,
to_read);
+ // There should not be any reason why the actual read would be different
+ // but this is error resistant.
+ remaining_count_ -= actual_read;
+ return actual_read;
+ }
+
+ private:
+ ::arrow::bit_util::BitReader bit_reader_ = {};
+ rle_size_t remaining_count_ = 0;
+
+ static_assert(std::is_integral_v<value_type>,
+ "This class is meant to decode positive integers");
+};
+
+/// Decoder class for Parquet RLE bit-packed data.
+template <typename T>
+class RleBitPackedDecoder {
+ public:
+ /// The type in which the data should be decoded.
+ using value_type = T;
+ using DynamicRun = RleBitPackedParser::dynamic_run_type;
+
+ RleBitPackedDecoder() noexcept = default;
+
+ /// Create a decoder object.
+ ///
+ /// data and data_size are the raw bytes to decode.
+ /// value_bit_width is the size in bits of each encoded value.
+ RleBitPackedDecoder(const uint8_t* data, rle_size_t data_size,
+ rle_size_t value_bit_width) noexcept {
+ Reset(data, data_size, value_bit_width);
+ }
+
+ void Reset(const uint8_t* data, rle_size_t data_size,
+ rle_size_t value_bit_width) noexcept {
+ ARROW_DCHECK_GE(value_bit_width, 0);
+ ARROW_DCHECK_LE(value_bit_width, 64);
+ parser_.Reset(data, data_size, value_bit_width);
+ decoder_ = {};
+ value_bit_width_ = value_bit_width;
+ }
+
+ /// Whether there is still runs to iterate over.
+ ///
+ /// WARN: Due to lack of proper error handling, iteration with Get methods
could return
+ /// no data while the parser is not exhausted.
+ /// This is how one can check for errors.
+ bool exhausted() const { return (run_remaining() == 0) &&
parser_.exhausted(); }
+
+ /// Gets the next value or returns false if there are no more or an error
occurred.
///
/// NB: Because the encoding only supports literal runs with lengths
/// that are multiples of 8, RleEncoder sometimes pads the end of its
/// input with zeros. Since the encoding does not differentiate between
/// input values and padding, Get() returns true even for these padding
/// values.
- template <typename T>
- bool Get(T* val);
+ [[nodiscard]] bool Get(value_type* val);
- /// Gets a batch of values. Returns the number of decoded elements.
- template <typename T>
- int GetBatch(T* values, int batch_size);
+ /// Get a batch of values return the number of decoded elements.
+ /// May write fewer elements to the output than requested if there are not
enough values
+ /// left or if an error occurred.
+ [[nodiscard]] rle_size_t GetBatch(value_type* out, rle_size_t batch_size);
- /// Like GetBatch but add spacing for null entries
- template <typename T>
- int GetBatchSpaced(int batch_size, int null_count, const uint8_t* valid_bits,
- int64_t valid_bits_offset, T* out);
+ /// Like GetBatch but add spacing for null entries.
+ ///
+ /// Null entries will be set to an arbistrary value to avoid leaking private
data.
+ /// May write fewer elements to the output than requested if there are not
enough values
+ /// left or if an error occurred.
+ [[nodiscard]] rle_size_t GetBatchSpaced(rle_size_t batch_size, rle_size_t
null_count,
+ const uint8_t* valid_bits,
+ int64_t valid_bits_offset,
value_type* out);
/// Like GetBatch but the values are then decoded using the provided
dictionary
- template <typename T>
- int GetBatchWithDict(const T* dictionary, int32_t dictionary_length, T*
values,
- int batch_size);
+ ///
+ /// May write fewer elements to the output than requested if there are not
enough values
+ /// left or if an error occurred.
+ template <typename V>
+ [[nodiscard]] rle_size_t GetBatchWithDict(const V* dictionary,
+ int32_t dictionary_length, V* out,
+ rle_size_t batch_size);
/// Like GetBatchWithDict but add spacing for null entries
///
- /// Null entries will be zero-initialized in `values` to avoid leaking
- /// private data.
- template <typename T>
- int GetBatchWithDictSpaced(const T* dictionary, int32_t dictionary_length,
T* values,
- int batch_size, int null_count, const uint8_t*
valid_bits,
- int64_t valid_bits_offset);
-
- protected:
- ::arrow::bit_util::BitReader bit_reader_;
- /// Number of bits needed to encode the value. Must be between 0 and 64.
- int bit_width_;
- uint64_t current_value_;
- int32_t repeat_count_;
- int32_t literal_count_;
+ /// Null entries will be set to an arbistrary value to avoid leaking private
data.
+ /// May write fewer elements to the output than requested if there are not
enough values
+ /// left or if an error occurred.
+ template <typename V>
+ [[nodiscard]] rle_size_t GetBatchWithDictSpaced(
+ const V* dictionary, int32_t dictionary_length, V* out, rle_size_t
batch_size,
+ rle_size_t null_count, const uint8_t* valid_bits, int64_t
valid_bits_offset);
private:
- /// Fills literal_count_ and repeat_count_ with next values. Returns false
if there
- /// are no more.
- template <typename T>
- bool NextCounts();
+ RleBitPackedParser parser_ = {};
+ std::variant<RleRunDecoder<value_type>, BitPackedRunDecoder<value_type>>
decoder_ = {};
+ rle_size_t value_bit_width_;
+
+ /// Return the number of values that are remaining in the current run.
+ rle_size_t run_remaining() const {
+ return std::visit([](const auto& dec) { return dec.remaining(); },
decoder_);
+ }
+
+ /// Get a batch of values from the current run and return the number
elements read.
+ [[nodiscard]] rle_size_t RunGetBatch(value_type* out, rle_size_t batch_size)
{
+ return std::visit(
+ [&](auto& dec) { return dec.GetBatch(out, batch_size,
value_bit_width_); },
+ decoder_);
+ }
+
+ /// Call the parser with a single callable for all event types.
+ template <typename Callable>
+ void ParseWithCallable(Callable&& func);
/// Utility methods for retrieving spaced values.
- template <typename T, typename RunType, typename Converter>
- int GetSpaced(Converter converter, int batch_size, int null_count,
- const uint8_t* valid_bits, int64_t valid_bits_offset, T* out);
+ template <typename Converter>
+ [[nodiscard]] rle_size_t GetSpaced(Converter converter,
+ typename Converter::out_type* out,
+ rle_size_t batch_size, const uint8_t*
valid_bits,
+ int64_t valid_bits_offset, rle_size_t
null_count);
};
/// Class to incrementally build the rle data. This class does not allocate
any memory.
@@ -170,7 +494,7 @@ class RleDecoder {
/// This class does so by buffering 8 values at a time. If they are not all
the same
/// they are added to the literal run. If they are the same, they are added
to the
/// repeated run. When we switch modes, the previous run is flushed out.
-class RleEncoder {
+class RleBitPackedEncoder {
public:
/// buffer/buffer_len: preallocated output buffer.
/// bit_width: max number of bits for value.
@@ -178,7 +502,7 @@ class RleEncoder {
/// when values should be encoded as repeated runs. Currently this is
derived
/// based on the bit_width, which can determine a storage optimal choice.
/// TODO: allow 0 bit_width (and have dict encoder use it)
- RleEncoder(uint8_t* buffer, int buffer_len, int bit_width)
+ RleBitPackedEncoder(uint8_t* buffer, int buffer_len, int bit_width)
: bit_width_(bit_width), bit_writer_(buffer, buffer_len) {
ARROW_DCHECK_GE(bit_width_, 0);
ARROW_DCHECK_LE(bit_width_, 64);
@@ -191,12 +515,12 @@ class RleEncoder {
/// This is the maximum length of a single run for 'bit_width'.
/// It is not valid to pass a buffer less than this length.
static int MinBufferSize(int bit_width) {
- /// 1 indicator byte and MAX_VALUES_PER_LITERAL_RUN 'bit_width' values.
+ // 1 indicator byte and MAX_VALUES_PER_LITERAL_RUN 'bit_width' values.
int max_literal_run_size = 1 +
static_cast<int>(::arrow::bit_util::BytesForBits(
MAX_VALUES_PER_LITERAL_RUN *
bit_width));
- /// Up to kMaxVlqByteLength indicator and a single 'bit_width' value.
+ // Up to kMaxVlqByteLength indicator and a single 'bit_width' value.
int max_repeated_run_size =
- ::arrow::bit_util::BitReader::kMaxVlqByteLength +
+ bit_util::kMaxLEB128ByteLenFor<int32_t> +
static_cast<int>(::arrow::bit_util::BytesForBits(bit_width));
return std::max(max_literal_run_size, max_repeated_run_size);
}
@@ -299,385 +623,665 @@ class RleEncoder {
uint8_t* literal_indicator_byte_;
};
+/************************
+ * RleBitPackedParser *
+ ************************/
+
+template <typename Handler>
+void RleBitPackedParser::Parse(Handler&& handler) {
+ while (!exhausted()) {
+ auto [read, control] = PeekImpl(handler);
+ data_ += read;
+ data_size_ -= read;
+ if (ARROW_PREDICT_FALSE(control == ControlFlow::Break)) {
+ break;
+ }
+ }
+}
+
+namespace internal {
+/// The maximal unsigned size that a variable can fit.
+template <typename T>
+constexpr auto max_size_for_v =
+ static_cast<std::make_unsigned_t<T>>(std::numeric_limits<T>::max());
+
+} // namespace internal
+
+template <typename Handler>
+auto RleBitPackedParser::PeekImpl(Handler&& handler) const
+ -> std::pair<rle_size_t, ControlFlow> {
+ ARROW_DCHECK(!exhausted());
+
+ constexpr auto kMaxSize = bit_util::kMaxLEB128ByteLenFor<uint32_t>;
+ uint32_t run_len_type = 0;
+ const auto header_bytes = bit_util::ParseLeadingLEB128(data_, kMaxSize,
&run_len_type);
+
+ if (ARROW_PREDICT_FALSE(header_bytes == 0)) {
+ // Malfomrmed LEB128 data
+ return {0, ControlFlow::Break};
+ }
+
+ const bool is_bit_packed = run_len_type & 1;
+ const uint32_t count = run_len_type >> 1;
+ if (is_bit_packed) {
+ constexpr auto kMaxCount =
bit_util::CeilDiv(internal::max_size_for_v<rle_size_t>, 8);
+ if (ARROW_PREDICT_FALSE(count == 0 || count > kMaxCount)) {
+ // Illegal number of encoded values
+ return {0, ControlFlow::Break};
+ }
+
+ ARROW_DCHECK_LT(static_cast<uint64_t>(count) * 8,
+ internal::max_size_for_v<rle_size_t>);
+ const auto values_count = static_cast<rle_size_t>(count * 8);
+ // Count Already divided by 8
+ const auto bytes_read =
+ header_bytes + static_cast<rle_size_t>(count) * value_bit_width_;
+
+ auto control = handler.OnBitPackedRun(
+ BitPackedRun(data_ + header_bytes, values_count, value_bit_width_));
+
+ return {bytes_read, control};
+ }
+
+ if (ARROW_PREDICT_FALSE(count == 0)) {
+ // Illegal number of encoded values
+ return {0, ControlFlow::Break};
+ }
+
+ // Safe because created from right shift
+ const auto values_count = static_cast<rle_size_t>(count);
+ const auto value_bytes = bit_util::BytesForBits(value_bit_width_);
+ ARROW_DCHECK_LT(value_bytes, internal::max_size_for_v<rle_size_t>);
+ const auto bytes_read = header_bytes + static_cast<rle_size_t>(value_bytes);
+
+ auto control =
+ handler.OnRleRun(RleRun(data_ + header_bytes, values_count,
value_bit_width_));
+
+ return {bytes_read, control};
+}
+
+/*************************
+ * RleBitPackedDecoder *
+ *************************/
+
template <typename T>
-inline bool RleDecoder::Get(T* val) {
+template <typename Callable>
+void RleBitPackedDecoder<T>::ParseWithCallable(Callable&& func) {
+ struct {
+ Callable func;
+ auto OnBitPackedRun(BitPackedRun run) { return func(std::move(run)); }
+ auto OnRleRun(RleRun run) { return func(std::move(run)); }
+ } handler{std::move(func)};
+
+ parser_.Parse(std::move(handler));
+}
+
+template <typename T>
+bool RleBitPackedDecoder<T>::Get(value_type* val) {
return GetBatch(val, 1) == 1;
}
template <typename T>
-inline int RleDecoder::GetBatch(T* values, int batch_size) {
- ARROW_DCHECK_GE(bit_width_, 0);
- int values_read = 0;
-
- auto* out = values;
-
- while (values_read < batch_size) {
- int remaining = batch_size - values_read;
-
- if (repeat_count_ > 0) { // Repeated value case.
- int repeat_batch = std::min(remaining, repeat_count_);
- std::fill(out, out + repeat_batch, static_cast<T>(current_value_));
-
- repeat_count_ -= repeat_batch;
- values_read += repeat_batch;
- out += repeat_batch;
- } else if (literal_count_ > 0) {
- int literal_batch = std::min(remaining, literal_count_);
- int actual_read = bit_reader_.GetBatch(bit_width_, out, literal_batch);
- if (actual_read != literal_batch) {
- return values_read;
- }
+auto RleBitPackedDecoder<T>::GetBatch(value_type* out, rle_size_t batch_size)
+ -> rle_size_t {
+ using ControlFlow = RleBitPackedParser::ControlFlow;
- literal_count_ -= literal_batch;
- values_read += literal_batch;
- out += literal_batch;
- } else {
- if (!NextCounts<T>()) return values_read;
+ rle_size_t values_read = 0;
+
+ // Remaining from a previous call that would have left some unread data from
a run.
+ if (ARROW_PREDICT_FALSE(run_remaining() > 0)) {
+ const auto read = RunGetBatch(out, batch_size);
+ values_read += read;
+ out += read;
+
+ // Either we fulfilled all the batch to be read or we finished remaining
run.
+ if (ARROW_PREDICT_FALSE(values_read == batch_size)) {
+ return values_read;
}
+ ARROW_DCHECK(run_remaining() == 0);
}
+ ParseWithCallable([&](auto run) {
+ using RunDecoder = typename decltype(run)::template
DecoderType<value_type>;
+
+ ARROW_DCHECK_LT(values_read, batch_size);
+ RunDecoder decoder(run, value_bit_width_);
+ const auto read = decoder.GetBatch(out, batch_size - values_read,
value_bit_width_);
+ ARROW_DCHECK_LE(read, batch_size - values_read);
+ values_read += read;
+ out += read;
+
+ // Stop reading and store remaining decoder
+ if (ARROW_PREDICT_FALSE(values_read == batch_size || read == 0)) {
+ decoder_ = std::move(decoder);
+ return ControlFlow::Break;
+ }
+
+ return ControlFlow::Continue;
+ });
+
return values_read;
}
-template <typename T, typename RunType, typename Converter>
-inline int RleDecoder::GetSpaced(Converter converter, int batch_size, int
null_count,
- const uint8_t* valid_bits, int64_t
valid_bits_offset,
- T* out) {
- if (ARROW_PREDICT_FALSE(null_count == batch_size)) {
- converter.FillZero(out, out + batch_size);
- return batch_size;
- }
-
- ARROW_DCHECK_GE(bit_width_, 0);
- int values_read = 0;
- int values_remaining = batch_size - null_count;
-
- // Assume no bits to start.
- arrow::internal::BitRunReader bit_reader(valid_bits, valid_bits_offset,
- /*length=*/batch_size);
- arrow::internal::BitRun valid_run = bit_reader.NextRun();
- while (values_read < batch_size) {
- if (ARROW_PREDICT_FALSE(valid_run.length == 0)) {
- valid_run = bit_reader.NextRun();
+namespace internal {
+
+/// Utility class to safely handle values and null count without too
error-prone
+/// verbosity.
+class BatchCounter {
+ public:
+ using size_type = rle_size_t;
+
+ static constexpr BatchCounter FromBatchSizeAndNulls(size_type batch_size,
+ size_type null_count) {
+ ARROW_DCHECK_LE(null_count, batch_size);
+ return {batch_size - null_count, null_count};
+ }
+
+ constexpr BatchCounter(size_type values_count, size_type null_count) noexcept
+ : values_count_(values_count), null_count_(null_count) {}
+
+ constexpr size_type values_count() const noexcept { return values_count_; }
+
+ constexpr size_type values_read() const noexcept { return values_read_; }
+
+ constexpr size_type values_remaining() const noexcept {
+ ARROW_DCHECK_LE(values_read_, values_count_);
+ return values_count_ - values_read_;
+ }
+
+ constexpr void AccrueReadValues(size_type to_read) noexcept {
+ ARROW_DCHECK_LE(to_read, values_remaining());
+ values_read_ += to_read;
+ }
+
+ constexpr size_type null_count() const noexcept { return null_count_; }
+
+ constexpr size_type null_read() const noexcept { return null_read_; }
+
+ constexpr size_type null_remaining() const noexcept {
+ ARROW_DCHECK_LE(null_read_, null_count_);
+ return null_count_ - null_read_;
+ }
+
+ constexpr void AccrueReadNulls(size_type to_read) noexcept {
+ ARROW_DCHECK_LE(to_read, null_remaining());
+ null_read_ += to_read;
+ }
+
+ constexpr size_type total_remaining() const noexcept {
+ return values_remaining() + null_remaining();
+ }
+
+ constexpr size_type total_read() const noexcept { return values_read_ +
null_read_; }
+
+ constexpr bool is_fully_null() const noexcept { return values_remaining() ==
0; }
+
+ constexpr bool is_done() const noexcept { return total_remaining() == 0; }
+
+ private:
+ size_type values_count_ = 0;
+ size_type values_read_ = 0;
+ size_type null_count_ = 0;
+ size_type null_read_ = 0;
+};
+
+template <typename Int>
+struct GetSpacedResult {
+ Int values_read;
+ Int null_read;
+};
+
+/// Overload for GetSpaced for a single run in a RleDecoder
+template <typename Converter, typename BitRunReader, typename BitRun, typename
value_type>
+auto RunGetSpaced(Converter* converter, typename Converter::out_type* out,
+ rle_size_t batch_size, rle_size_t null_count,
+ rle_size_t value_bit_width, BitRunReader* validity_reader,
+ BitRun* validity_run, RleRunDecoder<value_type>* decoder)
+ -> GetSpacedResult<rle_size_t> {
+ ARROW_DCHECK_GT(batch_size, 0);
+ // The equality case is handled in the main loop in GetSpaced
+ ARROW_DCHECK_LT(null_count, batch_size);
+
+ auto batch = BatchCounter::FromBatchSizeAndNulls(batch_size, null_count);
+
+ const rle_size_t values_available = decoder->remaining();
+ ARROW_DCHECK_GT(values_available, 0);
+ auto values_remaining_run = [&]() {
+ auto out = values_available - batch.values_read();
+ ARROW_DCHECK_GE(out, 0);
+ return out;
+ };
+
+ // Consume as much as possible from the repeated run.
+ // We only need to count the number of nulls and non-nulls because we can
fill in the
+ // same value for nulls and non-nulls.
+ // This proves to be a big efficiency win.
+ while (values_remaining_run() > 0 && !batch.is_done()) {
+ ARROW_DCHECK_GE(validity_run->length, 0);
+ ARROW_DCHECK_LT(validity_run->length, max_size_for_v<rle_size_t>);
+ ARROW_DCHECK_LE(validity_run->length, batch.total_remaining());
+ const auto& validity_run_size =
static_cast<rle_size_t>(validity_run->length);
+
+ if (validity_run->set) {
+ // We may end the current RLE run in the middle of the validity run
+ auto update_size = std::min(validity_run_size, values_remaining_run());
+ batch.AccrueReadValues(update_size);
+ validity_run->length -= update_size;
+ } else {
+ // We can consume all nulls here because it does not matter if we
consume on this
+ // RLE run, or an a next encoded run. The value filled does not matter.
+ auto update_size = std::min(validity_run_size, batch.null_remaining());
+ batch.AccrueReadNulls(update_size);
+ validity_run->length -= update_size;
}
- ARROW_DCHECK_GT(batch_size, 0);
- ARROW_DCHECK_GT(valid_run.length, 0);
+ if (ARROW_PREDICT_TRUE(validity_run->length == 0)) {
+ *validity_run = validity_reader->NextRun();
+ }
+ }
+
+ const value_type value = decoder->value();
+ if (ARROW_PREDICT_FALSE(!converter->InputIsValid(value))) {
+ return {0, 0};
+ }
+ converter->WriteRepeated(out, out + batch.total_read(), value);
+ const auto actual_values_read = decoder->Advance(batch.values_read(),
value_bit_width);
+ // We always cropped the number of values_read by the remaining values in
the run.
+ // What's more the RLE decoder should not encounter any errors.
+ ARROW_DCHECK_EQ(actual_values_read, batch.values_read());
+
+ return {/* .values_read= */ batch.values_read(), /* .null_read= */
batch.null_read()};
+}
+
+template <typename Converter, typename BitRunReader, typename BitRun, typename
value_type>
+auto RunGetSpaced(Converter* converter, typename Converter::out_type* out,
+ rle_size_t batch_size, rle_size_t null_count,
+ rle_size_t value_bit_width, BitRunReader* validity_reader,
+ BitRun* validity_run, BitPackedRunDecoder<value_type>*
decoder)
+ -> GetSpacedResult<rle_size_t> {
+ ARROW_DCHECK_GT(batch_size, 0);
+ // The equality case is handled in the main loop in GetSpaced
+ ARROW_DCHECK_LT(null_count, batch_size);
+
+ auto batch = BatchCounter::FromBatchSizeAndNulls(batch_size, null_count);
+
+ const rle_size_t values_available = decoder->remaining();
+ ARROW_DCHECK_GT(values_available, 0);
+ auto run_values_remaining = [&]() {
+ auto out = values_available - batch.values_read();
+ ARROW_DCHECK_GE(out, 0);
+ return out;
+ };
+
+ while (run_values_remaining() > 0 && batch.values_remaining() > 0) {
+ // Pull a batch of values from the bit packed encoded data and store it in
a local
+ // buffer to benefit from unpacking intrinsics and data locality.
+ // Quick benchmarking on a linux x86-64 cloud instance show that this
previously
+ // hard-coded value is appropriate.
+ static constexpr rle_size_t kBufferCapacity = 1024;
+ std::array<value_type, kBufferCapacity> buffer = {};
+
+ rle_size_t buffer_start = 0;
+ rle_size_t buffer_end = 0;
+ auto buffer_size = [&]() {
+ auto out = buffer_end - buffer_start;
+ ARROW_DCHECK_GE(out, 0);
+ return out;
+ };
+
+ // buffer_start is 0 at this point so size is end
+ buffer_end = std::min(std::min(run_values_remaining(),
batch.values_remaining()),
+ kBufferCapacity);
+ buffer_end = decoder->GetBatch(buffer.data(), buffer_size(),
value_bit_width);
+ ARROW_DCHECK_LE(buffer_size(), kBufferCapacity);
+
+ if (ARROW_PREDICT_FALSE(!converter->InputIsValid(buffer.data(),
buffer_size()))) {
+ return {batch.values_read(), batch.null_read()};
+ }
- if (valid_run.set) {
- if ((repeat_count_ == 0) && (literal_count_ == 0)) {
- if (!NextCounts<RunType>()) return values_read;
- ARROW_DCHECK((repeat_count_ > 0) ^ (literal_count_ > 0));
+ // Copy chunks of valid values into the output, while adjusting spacing
for null
+ // values.
+ while (buffer_size() > 0) {
+ ARROW_DCHECK_GE(validity_run->length, 0);
+ ARROW_DCHECK_LT(validity_run->length, max_size_for_v<rle_size_t>);
+ ARROW_DCHECK_LE(validity_run->length, batch.total_remaining());
+ const auto validity_run_length =
static_cast<rle_size_t>(validity_run->length);
+
+ // Copy as much as possible from the buffer into the output while not
exceeding
+ // validity run
+ if (validity_run->set) {
+ const auto update_size = std::min(validity_run_length, buffer_size());
+ converter->WriteRange(out, buffer.data() + buffer_start, update_size);
+ buffer_start += update_size;
+ batch.AccrueReadValues(update_size);
+ out += update_size;
+ validity_run->length -= update_size;
+ // Simply write zeros in the output
+ } else {
+ const auto update_size = std::min(validity_run_length,
batch.null_remaining());
+ converter->WriteZero(out, out + update_size);
+ batch.AccrueReadNulls(update_size);
+ out += update_size;
+ validity_run->length -= update_size;
}
- if (repeat_count_ > 0) {
- int repeat_batch = 0;
- // Consume the entire repeat counts incrementing repeat_batch to
- // be the total of nulls + values consumed, we only need to
- // get the total count because we can fill in the same value for
- // nulls and non-nulls. This proves to be a big efficiency win.
- while (repeat_count_ > 0 && (values_read + repeat_batch) < batch_size)
{
- ARROW_DCHECK_GT(valid_run.length, 0);
- if (valid_run.set) {
- int update_size = std::min(static_cast<int>(valid_run.length),
repeat_count_);
- repeat_count_ -= update_size;
- repeat_batch += update_size;
- valid_run.length -= update_size;
- values_remaining -= update_size;
- } else {
- // We can consume all nulls here because we would do so on
- // the next loop anyways.
- repeat_batch += static_cast<int>(valid_run.length);
- valid_run.length = 0;
- }
- if (valid_run.length == 0) {
- valid_run = bit_reader.NextRun();
- }
- }
- RunType current_value = static_cast<RunType>(current_value_);
- if (ARROW_PREDICT_FALSE(!converter.IsValid(current_value))) {
- return values_read;
- }
- converter.Fill(out, out + repeat_batch, current_value);
- out += repeat_batch;
- values_read += repeat_batch;
- } else if (literal_count_ > 0) {
- int literal_batch = std::min(values_remaining, literal_count_);
- ARROW_DCHECK_GT(literal_batch, 0);
-
- // Decode the literals
- constexpr int kBufferSize = 1024;
- RunType indices[kBufferSize];
- literal_batch = std::min(literal_batch, kBufferSize);
- int actual_read = bit_reader_.GetBatch(bit_width_, indices,
literal_batch);
- if (ARROW_PREDICT_FALSE(actual_read != literal_batch)) {
- return values_read;
- }
- if (!converter.IsValid(indices, /*length=*/actual_read)) {
- return values_read;
- }
- int skipped = 0;
- int literals_read = 0;
- while (literals_read < literal_batch) {
- if (valid_run.set) {
- int update_size = std::min(literal_batch - literals_read,
- static_cast<int>(valid_run.length));
- converter.Copy(out, indices + literals_read, update_size);
- literals_read += update_size;
- out += update_size;
- valid_run.length -= update_size;
- } else {
- converter.FillZero(out, out + valid_run.length);
- out += valid_run.length;
- skipped += static_cast<int>(valid_run.length);
- valid_run.length = 0;
- }
- if (valid_run.length == 0) {
- valid_run = bit_reader.NextRun();
- }
- }
- literal_count_ -= literal_batch;
- values_remaining -= literal_batch;
- values_read += literal_batch + skipped;
+ if (validity_run->length == 0) {
+ *validity_run = validity_reader->NextRun();
}
- } else {
- converter.FillZero(out, out + valid_run.length);
- out += valid_run.length;
- values_read += static_cast<int>(valid_run.length);
- valid_run.length = 0;
}
+
+ ARROW_DCHECK_EQ(buffer_size(), 0);
}
- ARROW_DCHECK_EQ(valid_run.length, 0);
- ARROW_DCHECK_EQ(values_remaining, 0);
- return values_read;
+
+ ARROW_DCHECK_EQ(values_available - decoder->remaining(),
batch.values_read());
+ ARROW_DCHECK_LE(batch.total_read(), batch_size);
+ ARROW_DCHECK_LE(batch.null_read(), batch.null_count());
+
+ return {/* .values_read= */ batch.values_read(), /* .null_read= */
batch.null_read()};
}
+/// Overload for GetSpaced for a single run in a decoder variant
+template <typename Converter, typename BitRunReader, typename BitRun, typename
value_type>
+auto RunGetSpaced(
+ Converter* converter, typename Converter::out_type* out, rle_size_t
batch_size,
+ rle_size_t null_count, rle_size_t value_bit_width, BitRunReader*
validity_reader,
+ BitRun* validity_run,
+ std::variant<RleRunDecoder<value_type>, BitPackedRunDecoder<value_type>>*
decoder)
+ -> GetSpacedResult<rle_size_t> {
+ return std::visit(
+ [&](auto& dec) {
+ ARROW_DCHECK_GT(dec.remaining(), 0);
+ return RunGetSpaced(converter, out, batch_size, null_count,
value_bit_width,
+ validity_reader, validity_run, &dec);
+ },
+ *decoder);
+}
+
+} // namespace internal
+
+template <typename T>
+template <typename Converter>
+auto RleBitPackedDecoder<T>::GetSpaced(Converter converter,
+ typename Converter::out_type* out,
+ rle_size_t batch_size,
+ const uint8_t* validity_bits,
+ int64_t validity_bits_offset,
+ rle_size_t null_count) -> rle_size_t {
+ using ControlFlow = RleBitPackedParser::ControlFlow;
+
+ ARROW_DCHECK_GT(batch_size, 0);
+
+ auto batch = internal::BatchCounter::FromBatchSizeAndNulls(batch_size,
null_count);
+
+ if (ARROW_PREDICT_FALSE(batch.is_fully_null())) {
+ converter.WriteZero(out, out + batch.null_remaining());
+ return batch.null_remaining();
+ }
+
+ arrow::internal::BitRunReader validity_reader(validity_bits,
validity_bits_offset,
+
/*length=*/batch.total_remaining());
+ arrow::internal::BitRun validity_run = validity_reader.NextRun();
+
+ const auto check_and_handle_fully_null_remaining = [&]() {
+ if (batch.is_fully_null()) {
+ ARROW_DCHECK(validity_run.length == 0 || !validity_run.set);
+ ARROW_DCHECK_GE(validity_run.length, batch.null_remaining());
+
+ converter.WriteZero(out, out + batch.null_remaining());
+ out += batch.null_remaining();
+ batch.AccrueReadNulls(batch.null_remaining());
+ }
+ };
+
+ // Remaining from a previous call that would have left some unread data from
a run.
+ if (ARROW_PREDICT_FALSE(run_remaining() > 0)) {
+ const auto read = internal::RunGetSpaced(&converter, out,
batch.total_remaining(),
+ batch.null_remaining(),
value_bit_width_,
+ &validity_reader, &validity_run,
&decoder_);
+
+ batch.AccrueReadNulls(read.null_read);
+ batch.AccrueReadValues(read.values_read);
+ out += read.values_read + read.null_read;
+
+ // Either we fulfilled all the batch values to be read
+ if (ARROW_PREDICT_FALSE(batch.values_remaining() == 0)) {
+ // There may be remaining null if they are not greedily filled
+ check_and_handle_fully_null_remaining();
+ return batch.total_read();
+ }
+
+ // We finished the remaining run
+ ARROW_DCHECK(run_remaining() == 0);
+ }
+
+ ParseWithCallable([&](auto run) {
+ using RunDecoder = typename decltype(run)::template
DecoderType<value_type>;
+
+ RunDecoder decoder(run, value_bit_width_);
+
+ const auto read = internal::RunGetSpaced(&converter, out,
batch.total_remaining(),
+ batch.null_remaining(),
value_bit_width_,
+ &validity_reader, &validity_run,
&decoder);
+
+ batch.AccrueReadNulls(read.null_read);
+ batch.AccrueReadValues(read.values_read);
+ out += read.values_read + read.null_read;
+
+ // Stop reading and store remaining decoder
+ if (ARROW_PREDICT_FALSE(read.values_read == 0 || batch.values_remaining()
== 0)) {
+ decoder_ = std::move(decoder);
+ return ControlFlow::Break;
+ }
+
+ return ControlFlow::Continue;
+ });
+
+ // There may be remaining null if they are not greedily filled by either
decoder calls
+ check_and_handle_fully_null_remaining();
+
+ ARROW_DCHECK(batch.is_done() || exhausted());
+ return batch.total_read();
+}
+
+namespace internal {
+
// Converter for GetSpaced that handles runs that get returned
// directly as output.
template <typename T>
-struct PlainRleConverter {
- T kZero = {};
- inline bool IsValid(const T& values) const { return true; }
- inline bool IsValid(const T* values, int32_t length) const { return true; }
- inline void Fill(T* begin, T* end, const T& run_value) const {
+struct NoOpConverter {
+ using in_type = T;
+ using out_type = T;
+ using size_type = rle_size_t;
+
+ static constexpr bool InputIsValid(const in_type& values) { return true; }
+
+ static constexpr bool InputIsValid(const in_type* values, size_type length) {
+ return true;
+ }
+
+ static void WriteRepeated(out_type* begin, out_type* end, in_type run_value)
{
std::fill(begin, end, run_value);
}
- inline void FillZero(T* begin, T* end) { std::fill(begin, end, kZero); }
- inline void Copy(T* out, const T* values, int length) const {
- std::memcpy(out, values, length * sizeof(T));
+
+ static void WriteZero(out_type* begin, out_type* end) {
+ std::fill(begin, end, out_type{});
+ }
+
+ static void WriteRange(out_type* out, const in_type* values, size_type
length) {
+ std::memcpy(out, values, length * sizeof(out_type));
}
};
+} // namespace internal
+
template <typename T>
-inline int RleDecoder::GetBatchSpaced(int batch_size, int null_count,
- const uint8_t* valid_bits,
- int64_t valid_bits_offset, T* out) {
+auto RleBitPackedDecoder<T>::GetBatchSpaced(rle_size_t batch_size, rle_size_t
null_count,
+ const uint8_t* valid_bits,
+ int64_t valid_bits_offset,
value_type* out)
+ -> rle_size_t {
if (null_count == 0) {
- return GetBatch<T>(out, batch_size);
+ return GetBatch(out, batch_size);
}
- PlainRleConverter<T> converter;
- arrow::internal::BitBlockCounter block_counter(valid_bits, valid_bits_offset,
- batch_size);
+ internal::NoOpConverter<value_type> converter;
- int total_processed = 0;
- int processed = 0;
- arrow::internal::BitBlockCount block;
-
- do {
- block = block_counter.NextFourWords();
- if (block.length == 0) {
- break;
- }
- if (block.AllSet()) {
- processed = GetBatch<T>(out, block.length);
- } else if (block.NoneSet()) {
- converter.FillZero(out, out + block.length);
- processed = block.length;
- } else {
- processed = GetSpaced<T, /*RunType=*/T, PlainRleConverter<T>>(
- converter, block.length, block.length - block.popcount, valid_bits,
- valid_bits_offset, out);
- }
- total_processed += processed;
- out += block.length;
- valid_bits_offset += block.length;
- } while (processed == block.length);
- return total_processed;
+ return GetSpaced(converter, out, batch_size, valid_bits, valid_bits_offset,
null_count);
}
-static inline bool IndexInRange(int32_t idx, int32_t dictionary_length) {
- return idx >= 0 && idx < dictionary_length;
+namespace internal {
+
+template <typename I>
+bool IndexInRange(I idx, int32_t dictionary_length) {
+ ARROW_DCHECK_GT(dictionary_length, 0);
+ using T = std::common_type_t<decltype(idx), decltype(dictionary_length)>;
+ return idx >= 0 && static_cast<T>(idx) < static_cast<T>(dictionary_length);
}
// Converter for GetSpaced that handles runs of returned dictionary
// indices.
-template <typename T>
+template <typename V, typename I>
struct DictionaryConverter {
- T kZero = {};
- const T* dictionary;
- int32_t dictionary_length;
-
- inline bool IsValid(int32_t value) { return IndexInRange(value,
dictionary_length); }
-
- inline bool IsValid(const int32_t* values, int32_t length) const {
- using IndexType = int32_t;
- IndexType min_index = std::numeric_limits<IndexType>::max();
- IndexType max_index = std::numeric_limits<IndexType>::min();
- for (int x = 0; x < length; x++) {
- min_index = std::min(values[x], min_index);
- max_index = std::max(values[x], max_index);
+ using out_type = V;
+ using in_type = I;
+ using size_type = rle_size_t;
+
+ static constexpr bool kIsIdentity = false;
+
+ const out_type* dictionary;
+ size_type dictionary_length;
+
+ bool InputIsValid(in_type idx) const { return IndexInRange(idx,
dictionary_length); }
+
+ bool InputIsValid(const in_type* indices, size_type length) const {
+ ARROW_DCHECK(length > 0);
+
+ in_type min_index = std::numeric_limits<in_type>::max();
+ in_type max_index = std::numeric_limits<in_type>::min();
+ for (size_type x = 0; x < length; x++) {
+ min_index = std::min(indices[x], min_index);
+ max_index = std::max(indices[x], max_index);
}
return IndexInRange(min_index, dictionary_length) &&
IndexInRange(max_index, dictionary_length);
}
- inline void Fill(T* begin, T* end, const int32_t& run_value) const {
+
+ void WriteRepeated(out_type* begin, out_type* end, in_type run_value) const {
std::fill(begin, end, dictionary[run_value]);
}
- inline void FillZero(T* begin, T* end) { std::fill(begin, end, kZero); }
- inline void Copy(T* out, const int32_t* values, int length) const {
- for (int x = 0; x < length; x++) {
+ static void WriteZero(out_type* begin, out_type* end) {
+ std::fill(begin, end, out_type{});
+ }
+
+ void WriteRange(out_type* out, const in_type* values, size_type length)
const {
+ for (size_type x = 0; x < length; x++) {
out[x] = dictionary[values[x]];
}
}
};
+/// Dummy imitation of BitRun that is all set.
+struct AllSetBitRun {
+ static constexpr bool set = true;
+ int64_t length = 0;
+};
+
+/// Dummy imitation of BitRunReader that should never be called.
+struct UnreachableBitRunReader {
+ constexpr static AllSetBitRun NextRun() { return {}; }
+};
+
+} // namespace internal
+
template <typename T>
-inline int RleDecoder::GetBatchWithDict(const T* dictionary, int32_t
dictionary_length,
- T* values, int batch_size) {
- // Per https://github.com/apache/parquet-format/blob/master/Encodings.md,
- // the maximum dictionary index width in Parquet is 32 bits.
- using IndexType = int32_t;
- DictionaryConverter<T> converter;
- converter.dictionary = dictionary;
- converter.dictionary_length = dictionary_length;
-
- ARROW_DCHECK_GE(bit_width_, 0);
- int values_read = 0;
-
- auto* out = values;
-
- while (values_read < batch_size) {
- int remaining = batch_size - values_read;
-
- if (repeat_count_ > 0) {
- auto idx = static_cast<IndexType>(current_value_);
- if (ARROW_PREDICT_FALSE(!IndexInRange(idx, dictionary_length))) {
- return values_read;
- }
- T val = dictionary[idx];
+template <typename V>
+auto RleBitPackedDecoder<T>::GetBatchWithDict(const V* dictionary,
+ int32_t dictionary_length, V*
out,
+ rle_size_t batch_size) ->
rle_size_t {
+ using ControlFlow = RleBitPackedParser::ControlFlow;
+
+ if (ARROW_PREDICT_FALSE(batch_size <= 0)) {
+ return 0;
+ }
- int repeat_batch = std::min(remaining, repeat_count_);
- std::fill(out, out + repeat_batch, val);
+ internal::DictionaryConverter<V, value_type> converter{dictionary,
dictionary_length};
- /* Upkeep counters */
- repeat_count_ -= repeat_batch;
- values_read += repeat_batch;
- out += repeat_batch;
- } else if (literal_count_ > 0) {
- constexpr int kBufferSize = 1024;
- IndexType indices[kBufferSize];
+ // Make lightweight BitRun class to reuse previous methods.
+ constexpr internal::UnreachableBitRunReader validity_reader{};
+ internal::AllSetBitRun validity_run = {batch_size};
- int literal_batch = std::min(remaining, literal_count_);
- literal_batch = std::min(literal_batch, kBufferSize);
+ rle_size_t values_read = 0;
+ auto batch_values_remaining = [&]() {
+ ARROW_DCHECK_LE(values_read, batch_size);
+ return batch_size - values_read;
+ };
- int actual_read = bit_reader_.GetBatch(bit_width_, indices,
literal_batch);
- if (ARROW_PREDICT_FALSE(actual_read != literal_batch)) {
- return values_read;
- }
- if (ARROW_PREDICT_FALSE(!converter.IsValid(indices,
/*length=*/literal_batch))) {
- return values_read;
- }
- converter.Copy(out, indices, literal_batch);
+ if (ARROW_PREDICT_FALSE(run_remaining() > 0)) {
+ const auto read = internal::RunGetSpaced(&converter, out, batch_size,
+ /* null_count= */ 0,
value_bit_width_,
+ &validity_reader, &validity_run,
&decoder_);
- /* Upkeep counters */
- literal_count_ -= literal_batch;
- values_read += literal_batch;
- out += literal_batch;
- } else {
- if (!NextCounts<IndexType>()) return values_read;
+ ARROW_DCHECK_EQ(read.null_read, 0);
+ values_read += read.values_read;
+ out += read.values_read;
+
+ // Either we fulfilled all the batch values to be read
+ if (ARROW_PREDICT_FALSE(values_read >= batch_size)) {
+ // There may be remaining null if they are not greedily filled
+ return values_read;
}
+
+ // We finished the remaining run
+ ARROW_DCHECK(run_remaining() == 0);
}
- return values_read;
-}
+ ParseWithCallable([&](auto run) {
+ using RunDecoder = typename decltype(run)::template
DecoderType<value_type>;
-template <typename T>
-inline int RleDecoder::GetBatchWithDictSpaced(const T* dictionary,
- int32_t dictionary_length, T*
out,
- int batch_size, int null_count,
- const uint8_t* valid_bits,
- int64_t valid_bits_offset) {
- if (null_count == 0) {
- return GetBatchWithDict<T>(dictionary, dictionary_length, out, batch_size);
- }
- arrow::internal::BitBlockCounter block_counter(valid_bits, valid_bits_offset,
- batch_size);
- using IndexType = int32_t;
- DictionaryConverter<T> converter;
- converter.dictionary = dictionary;
- converter.dictionary_length = dictionary_length;
-
- int total_processed = 0;
- int processed = 0;
- arrow::internal::BitBlockCount block;
- do {
- block = block_counter.NextFourWords();
- if (block.length == 0) {
- break;
- }
- if (block.AllSet()) {
- processed = GetBatchWithDict<T>(dictionary, dictionary_length, out,
block.length);
- } else if (block.NoneSet()) {
- converter.FillZero(out, out + block.length);
- processed = block.length;
- } else {
- processed = GetSpaced<T, /*RunType=*/IndexType, DictionaryConverter<T>>(
- converter, block.length, block.length - block.popcount, valid_bits,
- valid_bits_offset, out);
+ RunDecoder decoder(run, value_bit_width_);
+
+ const auto read = internal::RunGetSpaced(&converter, out,
batch_values_remaining(),
+ /* null_count= */ 0,
value_bit_width_,
+ &validity_reader, &validity_run,
&decoder);
+
+ ARROW_DCHECK_EQ(read.null_read, 0);
+ values_read += read.values_read;
+ out += read.values_read;
+
+ // Stop reading and store remaining decoder
+ if (ARROW_PREDICT_FALSE(read.values_read == 0 || values_read ==
batch_size)) {
+ decoder_ = std::move(decoder);
+ return ControlFlow::Break;
}
- total_processed += processed;
- out += block.length;
- valid_bits_offset += block.length;
- } while (processed == block.length);
- return total_processed;
+
+ return ControlFlow::Continue;
+ });
+
+ return values_read;
}
template <typename T>
-bool RleDecoder::NextCounts() {
- // Read the next run's indicator int, it could be a literal or repeated run.
- // The int is encoded as a vlq-encoded value.
- uint32_t indicator_value = 0;
- if (!bit_reader_.GetVlqInt(&indicator_value)) return false;
-
- // lsb indicates if it is a literal run or repeated run
- bool is_literal = indicator_value & 1;
- uint32_t count = indicator_value >> 1;
- if (is_literal) {
- if (ARROW_PREDICT_FALSE(count == 0 || count >
static_cast<uint32_t>(INT32_MAX) / 8)) {
- return false;
- }
- literal_count_ = count * 8;
- } else {
- if (ARROW_PREDICT_FALSE(count == 0 || count >
static_cast<uint32_t>(INT32_MAX))) {
- return false;
- }
- repeat_count_ = count;
- T value = {};
- if (!bit_reader_.GetAligned<T>(
- static_cast<int>(::arrow::bit_util::CeilDiv(bit_width_, 8)),
&value)) {
- return false;
- }
- current_value_ = static_cast<uint64_t>(value);
+template <typename V>
+auto RleBitPackedDecoder<T>::GetBatchWithDictSpaced(
+ const V* dictionary, int32_t dictionary_length, V* out, rle_size_t
batch_size,
+ rle_size_t null_count, const uint8_t* valid_bits, int64_t
valid_bits_offset)
+ -> rle_size_t {
+ if (null_count == 0) {
+ return GetBatchWithDict<V>(dictionary, dictionary_length, out, batch_size);
}
- return true;
+ internal::DictionaryConverter<V, value_type> converter{dictionary,
dictionary_length};
+
+ return GetSpaced(converter, out, batch_size, valid_bits, valid_bits_offset,
null_count);
}
+/*************************
+ * RleBitPackedEncoder *
+ *************************/
+
/// This function buffers input values 8 at a time. After seeing all 8 values,
/// it decides whether they should be encoded as a literal or repeated run.
-inline bool RleEncoder::Put(uint64_t value) {
+inline bool RleBitPackedEncoder::Put(uint64_t value) {
ARROW_DCHECK(bit_width_ == 64 || value < (1ULL << bit_width_));
if (ARROW_PREDICT_FALSE(buffer_full_)) return false;
@@ -708,7 +1312,7 @@ inline bool RleEncoder::Put(uint64_t value) {
return true;
}
-inline void RleEncoder::FlushLiteralRun(bool update_indicator_byte) {
+inline void RleBitPackedEncoder::FlushLiteralRun(bool update_indicator_byte) {
if (literal_indicator_byte_ == NULL) {
// The literal indicator byte has not been reserved yet, get one now.
literal_indicator_byte_ = bit_writer_.GetNextBytePtr();
@@ -738,7 +1342,7 @@ inline void RleEncoder::FlushLiteralRun(bool
update_indicator_byte) {
}
}
-inline void RleEncoder::FlushRepeatedRun() {
+inline void RleBitPackedEncoder::FlushRepeatedRun() {
ARROW_DCHECK_GT(repeat_count_, 0);
bool result = true;
// The lsb of 0 indicates this is a repeated run
@@ -754,7 +1358,7 @@ inline void RleEncoder::FlushRepeatedRun() {
/// Flush the values that have been buffered. At this point we decide whether
/// we need to switch between the run types or continue the current one.
-inline void RleEncoder::FlushBufferedValues(bool done) {
+inline void RleBitPackedEncoder::FlushBufferedValues(bool done) {
if (repeat_count_ >= 8) {
// Clear the buffered values. They are part of the repeated run now and we
// don't want to flush them out as literals.
@@ -784,7 +1388,7 @@ inline void RleEncoder::FlushBufferedValues(bool done) {
repeat_count_ = 0;
}
-inline int RleEncoder::Flush() {
+inline int RleBitPackedEncoder::Flush() {
if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) {
bool all_repeat = literal_count_ == 0 && (repeat_count_ ==
num_buffered_values_ ||
num_buffered_values_ == 0);
@@ -811,14 +1415,14 @@ inline int RleEncoder::Flush() {
return bit_writer_.bytes_written();
}
-inline void RleEncoder::CheckBufferFull() {
+inline void RleBitPackedEncoder::CheckBufferFull() {
int bytes_written = bit_writer_.bytes_written();
if (bytes_written + max_run_byte_size_ > bit_writer_.buffer_len()) {
buffer_full_ = true;
}
}
-inline void RleEncoder::Clear() {
+inline void RleBitPackedEncoder::Clear() {
buffer_full_ = false;
current_value_ = 0;
repeat_count_ = 0;
@@ -828,5 +1432,4 @@ inline void RleEncoder::Clear() {
bit_writer_.Clear();
}
-} // namespace util
-} // namespace arrow
+} // namespace arrow::util
diff --git a/cpp/src/arrow/util/rle_encoding_test.cc
b/cpp/src/arrow/util/rle_encoding_test.cc
index 0cc0a276a2..c7f4878b74 100644
--- a/cpp/src/arrow/util/rle_encoding_test.cc
+++ b/cpp/src/arrow/util/rle_encoding_test.cc
@@ -25,7 +25,10 @@
#include <gtest/gtest.h>
#include "arrow/array.h"
-#include "arrow/buffer.h"
+#include "arrow/array/concatenate.h"
+#include "arrow/array/util.h"
+#include "arrow/scalar.h"
+#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/type.h"
#include "arrow/util/bit_stream_utils_internal.h"
@@ -33,8 +36,7 @@
#include "arrow/util/io_util.h"
#include "arrow/util/rle_encoding_internal.h"
-namespace arrow {
-namespace util {
+namespace arrow::util {
const int MAX_WIDTH = 32;
@@ -207,12 +209,303 @@ TEST(BitUtil, RoundTripIntValues) {
}
}
+/// A Rle run is a simple class owning some data and a repetition count.
+/// It does not know how to read such data.
+TEST(Rle, RleRun) {
+ const std::array<uint8_t, 4> value = {21, 2, 0, 0};
+
+ const rle_size_t value_count = 12;
+
+ // 12 times the value 21 fitting over 5 bits
+ const rle_size_t value_bit_width_5 = 5;
+ const auto run_5 = RleRun(value.data(), value_count, value_bit_width_5);
+ EXPECT_EQ(run_5.values_count(), value_count);
+ EXPECT_EQ(run_5.raw_data_size(value_bit_width_5), 1); // 5 bits fit in one
byte
+ EXPECT_EQ(*run_5.raw_data_ptr(), 21);
+
+ // 12 times the value 21 fitting over 8 bits
+ const rle_size_t value_bit_width_8 = 8;
+ const auto run_8 = RleRun(value.data(), value_count, value_bit_width_8);
+ EXPECT_EQ(run_8.values_count(), value_count);
+ EXPECT_EQ(run_8.raw_data_size(value_bit_width_8), 1); // 8 bits fit in 1
byte
+ EXPECT_EQ(*run_8.raw_data_ptr(), 21);
+
+ // 12 times the value 533 (21 + 2 * 2^8) fitting over 10 bits
+ const rle_size_t value_bit_width_10 = 10;
+ const auto run_10 = RleRun(value.data(), value_count, value_bit_width_10);
+ EXPECT_EQ(run_10.values_count(), value_count);
+ EXPECT_EQ(run_10.raw_data_size(value_bit_width_10), 2); // 10 bits fit in 2
bytes
+ EXPECT_EQ(*(run_10.raw_data_ptr() + 0), 21);
+ EXPECT_EQ(*(run_10.raw_data_ptr() + 1), 2);
+
+ // 12 times the value 533 (21 + 2 * 2^8) fitting over 32 bits
+ const rle_size_t value_bit_width_32 = 32;
+ const auto run_32 = RleRun(value.data(), value_count, value_bit_width_32);
+ EXPECT_EQ(run_32.values_count(), value_count);
+ EXPECT_EQ(run_32.raw_data_size(value_bit_width_32), 4); // 32 bits fit in 4
bytes
+ EXPECT_EQ(*(run_32.raw_data_ptr() + 0), 21);
+ EXPECT_EQ(*(run_32.raw_data_ptr() + 1), 2);
+ EXPECT_EQ(*(run_32.raw_data_ptr() + 2), 0);
+ EXPECT_EQ(*(run_32.raw_data_ptr() + 3), 0);
+}
+
+/// A BitPacked run is a simple class owning some data and its size.
+/// It does not know how to read such data.
+TEST(BitPacked, BitPackedRun) {
+ const std::array<uint8_t, 4> value = {0b10101010, 0, 0, 0b1111111};
+
+ // 16 values of 1 bit for a total of 16 bits
+ const rle_size_t value_count_1 = 16;
+ const rle_size_t value_bit_width_1 = 1;
+ const auto run_1 = BitPackedRun(value.data(), value_count_1,
value_bit_width_1);
+ EXPECT_EQ(run_1.values_count(), value_count_1);
+ EXPECT_EQ(run_1.raw_data_size(value_bit_width_1), 2); // 16 bits fit in 2
bytes
+ EXPECT_EQ(run_1.raw_data_ptr(), value.data());
+
+ // 8 values of 3 bits for a total of 24 bits
+ const rle_size_t value_count_3 = 8;
+ const rle_size_t value_bit_width_3 = 3;
+ const auto run_3 = BitPackedRun(value.data(), value_count_3,
value_bit_width_3);
+ EXPECT_EQ(run_3.values_count(), value_count_3);
+ EXPECT_EQ(run_3.raw_data_size(value_bit_width_3), 3); // 24 bits fit in 3
bytes
+ EXPECT_EQ(run_3.raw_data_ptr(), value.data());
+}
+
+template <typename T>
+void TestRleDecoder(std::vector<uint8_t> bytes, rle_size_t value_count,
+ rle_size_t bit_width, T expected_value) {
+ // Pre-requisite for this test
+ EXPECT_GT(value_count, 6);
+
+ const auto run = RleRun(bytes.data(), value_count, bit_width);
+
+ auto decoder = RleRunDecoder<T>(run, bit_width);
+ std::vector<T> vals = {0, 0};
+
+ EXPECT_EQ(decoder.remaining(), value_count);
+
+ rle_size_t read = 0;
+ EXPECT_EQ(decoder.Get(vals.data(), bit_width), 1);
+ read += 1;
+ EXPECT_EQ(vals.at(0), expected_value);
+ EXPECT_EQ(decoder.remaining(), value_count - read);
+
+ EXPECT_EQ(decoder.Advance(3, bit_width), 3);
+ read += 3;
+ EXPECT_EQ(decoder.remaining(), value_count - read);
+
+ vals = {0, 0};
+ EXPECT_EQ(decoder.GetBatch(vals.data(), 2, bit_width), vals.size());
+ EXPECT_EQ(vals.at(0), expected_value);
+ EXPECT_EQ(vals.at(1), expected_value);
+ read += static_cast<decltype(read)>(vals.size());
+ EXPECT_EQ(decoder.remaining(), value_count - read);
+
+ // Exhaust iteration
+ EXPECT_EQ(decoder.Advance(value_count - read, bit_width), value_count -
read);
+ EXPECT_EQ(decoder.remaining(), 0);
+ EXPECT_EQ(decoder.Advance(1, bit_width), 0);
+ vals = {0, 0};
+ EXPECT_EQ(decoder.Get(vals.data(), bit_width), 0);
+ EXPECT_EQ(vals.at(0), 0);
+
+ // Reset the decoder
+ decoder.Reset(run, bit_width);
+ EXPECT_EQ(decoder.remaining(), value_count);
+ vals = {0, 0};
+ EXPECT_EQ(decoder.GetBatch(vals.data(), 2, bit_width), vals.size());
+ EXPECT_EQ(vals.at(0), expected_value);
+ EXPECT_EQ(vals.at(1), expected_value);
+}
+
+TEST(Rle, RleDecoder) {
+ TestRleDecoder<uint8_t>({21, 0, 0}, /* value_count= */ 23, /* bit_width= */
5,
+ /* expected_value= */ 21);
+ TestRleDecoder<uint16_t>({1, 0}, /* value_count= */ 13, /* bit_width= */ 1,
+ /* expected_value= */ 1);
+ TestRleDecoder<uint32_t>({21, 0, 0}, /* value_count= */ 23, /* bit_width= */
5,
+ /* expected_value= */ 21);
+ TestRleDecoder<int32_t>({21, 0, 0}, /* value_count= */ 23, /* bit_width= */
5,
+ /* expected_value= */ 21);
+ TestRleDecoder<uint64_t>({21, 2, 0, 1}, /* value_count= */ 20, /* bit_width=
*/ 30,
+ /* expected_value= */ 16777749);
+}
+
+template <typename T>
+void TestBitPackedDecoder(std::vector<uint8_t> bytes, rle_size_t value_count,
+ rle_size_t bit_width, std::vector<T> expected) {
+ // Pre-requisite for this test
+ EXPECT_GT(value_count, 6);
+
+ const auto run = BitPackedRun(bytes.data(), value_count, bit_width);
+
+ auto decoder = BitPackedRunDecoder<T>(run, bit_width);
+ std::vector<T> vals = {0, 0};
+
+ EXPECT_EQ(decoder.remaining(), value_count);
+
+ rle_size_t read = 0;
+ EXPECT_EQ(decoder.Get(vals.data(), bit_width), 1);
+ EXPECT_EQ(vals.at(0), expected.at(0 + read));
+ read += 1;
+ EXPECT_EQ(decoder.remaining(), value_count - read);
+
+ EXPECT_EQ(decoder.Advance(3, bit_width), 3);
+ read += 3;
+ EXPECT_EQ(decoder.remaining(), value_count - read);
+
+ vals = {0, 0};
+ EXPECT_EQ(decoder.GetBatch(vals.data(), 2, bit_width), vals.size());
+ EXPECT_EQ(vals.at(0), expected.at(0 + read));
+ EXPECT_EQ(vals.at(1), expected.at(1 + read));
+ read += static_cast<decltype(read)>(vals.size());
+ EXPECT_EQ(decoder.remaining(), value_count - read);
+
+ // Exhaust iteration
+ EXPECT_EQ(decoder.Advance(value_count - read, bit_width), value_count -
read);
+ EXPECT_EQ(decoder.remaining(), 0);
+ EXPECT_EQ(decoder.Advance(1, bit_width), 0);
+ vals = {0, 0};
+ EXPECT_EQ(decoder.Get(vals.data(), bit_width), 0);
+ EXPECT_EQ(vals.at(0), 0);
+
+ // Reset the decoder
+ decoder.Reset(run, bit_width);
+ read = 0;
+ EXPECT_EQ(decoder.remaining(), value_count);
+ vals = {0, 0};
+ EXPECT_EQ(decoder.GetBatch(vals.data(), 2, bit_width), vals.size());
+ EXPECT_EQ(vals.at(0), expected.at(0 + read));
+ EXPECT_EQ(vals.at(1), expected.at(1 + read));
+}
+
+TEST(BitPacked, BitPackedDecoder) {
+ // See parquet encoding for bytes layout
+ TestBitPackedDecoder<uint16_t>(
+ /* bytes= */ {0x88, 0xc6, 0xfa},
+ /* values_count= */ 8,
+ /* bit_width= */ 3,
+ /* expected= */ {0, 1, 2, 3, 4, 5, 6, 7});
+ TestBitPackedDecoder<uint8_t>(
+ /* bytes= */ {0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7},
+ /* values_count= */ 8,
+ /* bit_width= */ 8,
+ /* expected= */ {0, 1, 2, 3, 4, 5, 6, 7});
+ TestBitPackedDecoder<uint32_t>(
+ /* bytes= */ {0x47, 0xc, 0x10, 0x35},
+ /* values_count= */ 8,
+ /* bit_width= */ 4,
+ /* expected= */ {7, 4, 12, 0, 0, 1, 5, 3});
+ TestBitPackedDecoder<uint64_t>(
+ /* bytes= */ {0xe8, 0x7, 0x20, 0xc0, 0x0, 0x4, 0x14, 0x60, 0xc0, 0x1},
+ /* values_count= */ 8,
+ /* bit_width= */ 10,
+ /* expected= */ {1000, 1, 2, 3, 4, 5, 6, 7});
+}
+
+template <typename T>
+void TestRleBitPackedParser(std::vector<uint8_t> bytes, rle_size_t bit_width,
+ std::vector<T> expected) {
+ auto parser =
+ RleBitPackedParser(bytes.data(), static_cast<rle_size_t>(bytes.size()),
bit_width);
+ EXPECT_FALSE(parser.exhausted());
+
+ // Try to decode all data of all runs in the decoded vector
+ decltype(expected) decoded = {};
+ auto rle_decoder = RleRunDecoder<T>();
+ auto bit_packed_decoder = BitPackedRunDecoder<T>();
+
+ struct {
+ decltype(rle_decoder)* rle_decoder_ptr_;
+ decltype(bit_packed_decoder)* bit_packed_decoder_ptr_;
+ decltype(decoded)* decoded_ptr_;
+ decltype(bit_width) bit_width_;
+
+ auto OnRleRun(RleRun run) {
+ rle_decoder_ptr_->Reset(run, bit_width_);
+
+ const auto n_decoded = decoded_ptr_->size();
+ const auto n_to_decode = rle_decoder_ptr_->remaining();
+ decoded_ptr_->resize(n_decoded + n_to_decode);
+ EXPECT_EQ(rle_decoder_ptr_->GetBatch(decoded_ptr_->data() + n_decoded,
n_to_decode,
+ bit_width_),
+ n_to_decode);
+ EXPECT_EQ(rle_decoder_ptr_->remaining(), 0);
+
+ return RleBitPackedParser::ControlFlow::Continue;
+ }
+
+ auto OnBitPackedRun(BitPackedRun run) {
+ bit_packed_decoder_ptr_->Reset(run, bit_width_);
+
+ const auto n_decoded = decoded_ptr_->size();
+ const auto n_to_decode = bit_packed_decoder_ptr_->remaining();
+ decoded_ptr_->resize(n_decoded + n_to_decode);
+ EXPECT_EQ(bit_packed_decoder_ptr_->GetBatch(decoded_ptr_->data() +
n_decoded,
+ n_to_decode, bit_width_),
+ n_to_decode);
+ EXPECT_EQ(bit_packed_decoder_ptr_->remaining(), 0);
+
+ return RleBitPackedParser::ControlFlow::Continue;
+ }
+ } handler{&rle_decoder, &bit_packed_decoder, &decoded, bit_width};
+
+ // Iterate over all runs
+ parser.Parse(handler);
+
+ EXPECT_TRUE(parser.exhausted());
+ EXPECT_EQ(decoded.size(), expected.size());
+ EXPECT_EQ(decoded, expected);
+}
+
+TEST(RleBitPacked, RleBitPackedParser) {
+ TestRleBitPackedParser<uint16_t>(
+ /* bytes= */
+ {/* LEB128 for 8 values bit packed marker */ 0x3,
+ /* Bitpacked run */ 0x88, 0xc6, 0xfa},
+ /* bit_width= */ 3,
+ /* expected= */ {0, 1, 2, 3, 4, 5, 6, 7});
+
+ {
+ std::vector<uint32_t> expected = {0, 1, 2, 3, 4, 5, 6, 7};
+ expected.resize(expected.size() + 200, 5);
+ TestRleBitPackedParser<uint32_t>(
+ /* bytes= */
+ {/* LEB128 for 8 values bit packed marker */ 0x3,
+ /* Bitpacked run */ 0x88, 0xc6, 0xfa,
+ /* LEB128 for 200 RLE marker */ 0x90, 0x3,
+ /* Value 5 over paded to a byte*/ 0x5},
+ /* bit_width= */ 3,
+ /* expected= */ expected);
+ }
+
+ {
+ std::vector<uint16_t> expected = {0, 0, 0, 0, 1, 1, 1, 1};
+ expected.resize(expected.size() + 200, 1);
+ expected.resize(expected.size() + 10, 3);
+ std::array<uint16_t, 16> run2 = {1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2,
1, 2};
+ expected.insert(expected.end(), run2.begin(), run2.end());
+ TestRleBitPackedParser<uint16_t>(
+ /* bytes= */
+ {/* LEB128 for 8 values bit packed marker */ 0x3,
+ /* Bitpacked run */ 0x0, 0x55,
+ /* LEB128 for 200 RLE marker */ 0x90, 0x3,
+ /* Value 1 over paded to a byte*/ 0x1,
+ /* LEB128 for 10 RLE marker */ 0x14,
+ /* Value 3 over paded to a byte*/ 0x3,
+ /* LEB128 for 16 values bit packed marker */ 0x5,
+ /* Bitpacked run */ 0x99, 0x99, 0x99, 0x99},
+ /* bit_width= */ 2,
+ /* expected= */ expected);
+ }
+}
+
// Validates encoding of values by encoding and decoding them. If
// expected_encoding != NULL, also validates that the encoded buffer is
// exactly 'expected_encoding'.
// if expected_len is not -1, it will validate the encoded size is correct.
-void ValidateRle(const std::vector<int>& values, int bit_width,
- uint8_t* expected_encoding, int expected_len) {
+void ValidateRleBitPacked(const std::vector<int>& values, int bit_width,
+ uint8_t* expected_encoding, int expected_len) {
const int len = 64 * 1024;
#ifdef __EMSCRIPTEN__
// don't make this on the stack as it is
@@ -224,7 +517,7 @@ void ValidateRle(const std::vector<int>& values, int
bit_width,
#endif
EXPECT_LE(expected_len, len);
- RleEncoder encoder(buffer, len, bit_width);
+ RleBitPackedEncoder encoder(buffer, len, bit_width);
for (size_t i = 0; i < values.size(); ++i) {
bool result = encoder.Put(values[i]);
EXPECT_TRUE(result);
@@ -240,7 +533,7 @@ void ValidateRle(const std::vector<int>& values, int
bit_width,
// Verify read
{
- RleDecoder decoder(buffer, len, bit_width);
+ RleBitPackedDecoder<uint64_t> decoder(buffer, len, bit_width);
for (size_t i = 0; i < values.size(); ++i) {
uint64_t val;
bool result = decoder.Get(&val);
@@ -251,7 +544,7 @@ void ValidateRle(const std::vector<int>& values, int
bit_width,
// Verify batch read
{
- RleDecoder decoder(buffer, len, bit_width);
+ RleBitPackedDecoder<int> decoder(buffer, len, bit_width);
std::vector<int> values_read(values.size());
ASSERT_EQ(values.size(),
decoder.GetBatch(values_read.data(),
static_cast<int>(values.size())));
@@ -271,7 +564,7 @@ bool CheckRoundTrip(const std::vector<int>& values, int
bit_width) {
#else
uint8_t buffer[len];
#endif
- RleEncoder encoder(buffer, len, bit_width);
+ RleBitPackedEncoder encoder(buffer, len, bit_width);
for (size_t i = 0; i < values.size(); ++i) {
bool result = encoder.Put(values[i]);
if (!result) {
@@ -282,7 +575,7 @@ bool CheckRoundTrip(const std::vector<int>& values, int
bit_width) {
int out = 0;
{
- RleDecoder decoder(buffer, encoded_len, bit_width);
+ RleBitPackedDecoder<int> decoder(buffer, encoded_len, bit_width);
for (size_t i = 0; i < values.size(); ++i) {
EXPECT_TRUE(decoder.Get(&out));
if (values[i] != out) {
@@ -293,7 +586,7 @@ bool CheckRoundTrip(const std::vector<int>& values, int
bit_width) {
// Verify batch read
{
- RleDecoder decoder(buffer, encoded_len, bit_width);
+ RleBitPackedDecoder<int> decoder(buffer, encoded_len, bit_width);
std::vector<int> values_read(values.size());
if (static_cast<int>(values.size()) !=
decoder.GetBatch(values_read.data(), static_cast<int>(values.size())))
{
@@ -308,7 +601,7 @@ bool CheckRoundTrip(const std::vector<int>& values, int
bit_width) {
return true;
}
-TEST(Rle, SpecificSequences) {
+TEST(RleBitPacked, SpecificSequences) {
const int len = 1024;
uint8_t expected_buffer[len];
std::vector<int> values;
@@ -328,12 +621,12 @@ TEST(Rle, SpecificSequences) {
expected_buffer[2] = (50 << 1);
expected_buffer[3] = 1;
for (int width = 1; width <= 8; ++width) {
- ValidateRle(values, width, expected_buffer, 4);
+ ValidateRleBitPacked(values, width, expected_buffer, 4);
}
for (int width = 9; width <= MAX_WIDTH; ++width) {
- ValidateRle(values, width, nullptr,
- 2 * (1 + static_cast<int>(bit_util::CeilDiv(width, 8))));
+ ValidateRleBitPacked(values, width, nullptr,
+ 2 * (1 + static_cast<int>(bit_util::CeilDiv(width,
8))));
}
// Test 100 0's and 1's alternating
@@ -349,11 +642,11 @@ TEST(Rle, SpecificSequences) {
expected_buffer[100 / 8 + 1] = 0x0A /* 0b00001010 */;
// num_groups and expected_buffer only valid for bit width = 1
- ValidateRle(values, 1, expected_buffer, 1 + num_groups);
+ ValidateRleBitPacked(values, 1, expected_buffer, 1 + num_groups);
for (int width = 2; width <= MAX_WIDTH; ++width) {
int num_values = static_cast<int>(bit_util::CeilDiv(100, 8)) * 8;
- ValidateRle(values, width, nullptr,
- 1 + static_cast<int>(bit_util::CeilDiv(width * num_values,
8)));
+ ValidateRleBitPacked(values, width, nullptr,
+ 1 + static_cast<int>(bit_util::CeilDiv(width *
num_values, 8)));
}
// Test 16-bit values to confirm encoded values are stored in little endian
@@ -371,7 +664,7 @@ TEST(Rle, SpecificSequences) {
expected_buffer[4] = 0x55;
expected_buffer[5] = 0xaa;
- ValidateRle(values, 16, expected_buffer, 6);
+ ValidateRleBitPacked(values, 16, expected_buffer, 6);
// Test 32-bit values to confirm encoded values are stored in little endian
values.resize(28);
@@ -392,7 +685,7 @@ TEST(Rle, SpecificSequences) {
expected_buffer[8] = 0xaa;
expected_buffer[9] = 0x5a;
- ValidateRle(values, 32, expected_buffer, 10);
+ ValidateRleBitPacked(values, 32, expected_buffer, 10);
}
// ValidateRle on 'num_vals' values with width 'bit_width'. If 'value' != -1,
that value
@@ -403,10 +696,10 @@ void TestRleValues(int bit_width, int num_vals, int value
= -1) {
for (int v = 0; v < num_vals; ++v) {
values.push_back((value != -1) ? value : static_cast<int>(v % mod));
}
- ValidateRle(values, bit_width, NULL, -1);
+ ValidateRleBitPacked(values, bit_width, NULL, -1);
}
-TEST(Rle, TestValues) {
+TEST(RleBitPacked, TestValues) {
for (int width = 1; width <= MAX_WIDTH; ++width) {
TestRleValues(width, 1);
TestRleValues(width, 1024);
@@ -415,11 +708,11 @@ TEST(Rle, TestValues) {
}
}
-TEST(Rle, BitWidthZeroRepeated) {
+TEST(RleBitPacked, BitWidthZeroRepeated) {
uint8_t buffer[1];
const int num_values = 15;
buffer[0] = num_values << 1; // repeated indicator byte
- RleDecoder decoder(buffer, sizeof(buffer), 0);
+ RleBitPackedDecoder<uint8_t> decoder(buffer, sizeof(buffer), 0);
uint8_t val;
for (int i = 0; i < num_values; ++i) {
bool result = decoder.Get(&val);
@@ -429,11 +722,11 @@ TEST(Rle, BitWidthZeroRepeated) {
EXPECT_FALSE(decoder.Get(&val));
}
-TEST(Rle, BitWidthZeroLiteral) {
+TEST(RleBitPacked, BitWidthZeroLiteral) {
uint8_t buffer[1];
const int num_groups = 4;
buffer[0] = num_groups << 1 | 1; // literal indicator byte
- RleDecoder decoder = RleDecoder(buffer, sizeof(buffer), 0);
+ RleBitPackedDecoder<uint8_t> decoder = {buffer, sizeof(buffer), 0};
const int num_values = num_groups * 8;
uint8_t val;
for (int i = 0; i < num_values; ++i) {
@@ -450,13 +743,13 @@ TEST(BitRle, Flush) {
std::vector<int> values;
for (int i = 0; i < 16; ++i) values.push_back(1);
values.push_back(0);
- ValidateRle(values, 1, NULL, -1);
+ ValidateRleBitPacked(values, 1, NULL, -1);
values.push_back(1);
- ValidateRle(values, 1, NULL, -1);
+ ValidateRleBitPacked(values, 1, NULL, -1);
values.push_back(1);
- ValidateRle(values, 1, NULL, -1);
+ ValidateRleBitPacked(values, 1, NULL, -1);
values.push_back(1);
- ValidateRle(values, 1, NULL, -1);
+ ValidateRleBitPacked(values, 1, NULL, -1);
}
// Test some random sequences.
@@ -515,17 +808,17 @@ TEST(BitRle, RepeatedPattern) {
}
}
- ValidateRle(values, 1, NULL, -1);
+ ValidateRleBitPacked(values, 1, NULL, -1);
}
TEST(BitRle, Overflow) {
for (int bit_width = 1; bit_width < 32; bit_width += 3) {
- int len = RleEncoder::MinBufferSize(bit_width);
+ int len = RleBitPackedEncoder::MinBufferSize(bit_width);
std::vector<uint8_t> buffer(len);
int num_added = 0;
bool parity = true;
- RleEncoder encoder(buffer.data(), len, bit_width);
+ RleBitPackedEncoder encoder(buffer.data(), len, bit_width);
// Insert alternating true/false until there is no space left
while (true) {
bool result = encoder.Put(parity);
@@ -538,7 +831,7 @@ TEST(BitRle, Overflow) {
EXPECT_LE(bytes_written, len);
EXPECT_GT(num_added, 0);
- RleDecoder decoder(buffer.data(), bytes_written, bit_width);
+ RleBitPackedDecoder<uint32_t> decoder(buffer.data(), bytes_written,
bit_width);
parity = true;
uint32_t v;
for (int i = 0; i < num_added; ++i) {
@@ -553,69 +846,300 @@ TEST(BitRle, Overflow) {
}
}
+/// Check RleBitPacked encoding/decoding round trip.
+///
+/// \param spaced If set to false, treat Nulls in the input array as regular
data.
+/// \param parts The number of parts in which the data will be decoded.
+/// For number greater than one, this ensure that the decoder
intermediate state
+/// is valid.
template <typename Type>
-void CheckRoundTripSpaced(const Array& data, int bit_width) {
+void CheckRoundTrip(const Array& data, int bit_width, bool spaced, int32_t
parts,
+ std::shared_ptr<FloatArray> dict = {}) {
using ArrayType = typename TypeTraits<Type>::ArrayType;
- using T = typename Type::c_type;
+ using value_type = typename Type::c_type;
- int num_values = static_cast<int>(data.length());
- int buffer_size = RleEncoder::MaxBufferSize(bit_width, num_values);
+ const int data_size = static_cast<int>(data.length());
+ const int data_values_count =
+ static_cast<int>(data.length() - spaced * data.null_count());
+ const int buffer_size = RleBitPackedEncoder::MaxBufferSize(bit_width,
data_size);
+ ASSERT_GE(parts, 1);
+ ASSERT_LE(parts, data_size);
- const T* values = static_cast<const ArrayType&>(data).raw_values();
+ const value_type* data_values = static_cast<const
ArrayType&>(data).raw_values();
+ // Encode the data into `buffer` using the encoder.
std::vector<uint8_t> buffer(buffer_size);
- RleEncoder encoder(buffer.data(), buffer_size, bit_width);
- for (int i = 0; i < num_values; ++i) {
- if (data.IsValid(i)) {
- if (!encoder.Put(static_cast<uint64_t>(values[i]))) {
- FAIL() << "Encoding failed";
- }
+ RleBitPackedEncoder encoder(buffer.data(), buffer_size, bit_width);
+ int32_t encoded_values_size = 0;
+ for (int i = 0; i < data_size; ++i) {
+ // Depending on `spaced` we treat nulls as regular values.
+ if (data.IsValid(i) || !spaced) {
+ bool success = encoder.Put(static_cast<uint64_t>(data_values[i]));
+ ASSERT_TRUE(success) << "Encoding failed in pos " << i;
+ ++encoded_values_size;
}
}
- int encoded_size = encoder.Flush();
+ int encoded_byte_size = encoder.Flush();
+ ASSERT_EQ(encoded_values_size, data_values_count)
+ << "All values input were not encoded successfully by the encoder";
+
+ // Now we verify batch read
+ RleBitPackedDecoder<value_type> decoder(buffer.data(), encoded_byte_size,
bit_width);
+ // We will only use one of them depending on whether this is a dictionary
tests
+ std::vector<float> dict_read;
+ std::vector<value_type> values_read;
+ if (dict) {
+ dict_read.resize(data_size);
+ } else {
+ values_read.resize(data_size);
+ }
- // Verify batch read
- RleDecoder decoder(buffer.data(), encoded_size, bit_width);
- std::vector<T> values_read(num_values);
+ // We will read the data in `parts` calls to make sure intermediate states
are valid
+ rle_size_t total_read_count = 0;
+ while (total_read_count < data_size) {
+ const auto remaining = data_size - total_read_count;
+ auto to_read = data_size / parts;
+ if (remaining / to_read == 1) {
+ to_read = remaining;
+ }
- if (num_values != decoder.GetBatchSpaced(
- num_values, static_cast<int>(data.null_count()),
- data.null_bitmap_data(), data.offset(),
values_read.data())) {
- FAIL();
- }
+ rle_size_t read = 0;
+ if (spaced) {
+ // We need to slice the input array get the proper null count and bitmap
+ auto data_remaining = data.Slice(total_read_count, to_read);
+
+ if (dict) {
+ auto* out = dict_read.data() + total_read_count;
+ read = decoder.GetBatchWithDictSpaced(
+ dict->raw_values(), static_cast<int32_t>(dict->length()), out,
to_read,
+ static_cast<int32_t>(data_remaining->null_count()),
+ data_remaining->null_bitmap_data(), data_remaining->offset());
+ } else {
+ auto* out = values_read.data() + total_read_count;
+ read = decoder.GetBatchSpaced(
+ to_read, static_cast<int32_t>(data_remaining->null_count()),
+ data_remaining->null_bitmap_data(), data_remaining->offset(), out);
+ }
+ } else {
+ if (dict) {
+ auto* out = dict_read.data() + total_read_count;
+ read = decoder.GetBatchWithDict(
+ dict->raw_values(), static_cast<int32_t>(dict->length()), out,
to_read);
+ } else {
+ auto* out = values_read.data() + total_read_count;
+ read = decoder.GetBatch(out, to_read);
+ }
+ }
+ ASSERT_EQ(read, to_read) << "Decoder did not read as many values as
requested";
- for (int64_t i = 0; i < num_values; ++i) {
- if (data.IsValid(i)) {
- if (values_read[i] != values[i]) {
- FAIL() << "Index " << i << " read " << values_read[i] << " but should
be "
- << values[i];
+ total_read_count += read;
+ }
+ EXPECT_EQ(total_read_count, data_size) << "Total number of values read is
off";
+
+ // Verify the round trip: encoded-decoded values must equal the original one
+ for (int64_t i = 0; i < data_size; ++i) {
+ if (data.IsValid(i) || !spaced) {
+ if (dict) {
+ EXPECT_EQ(dict_read.at(i), dict->Value(data_values[i]))
+ << "Encoded then decoded and mapped value at position " << i << "
("
+ << values_read[i] << ") differs from original value (" <<
data_values[i]
+ << " mapped to " << dict->Value(data_values[i]) << ")";
+ } else {
+ EXPECT_EQ(values_read.at(i), data_values[i])
+ << "Encoded then decoded value at position " << i << " (" <<
values_read.at(i)
+ << ") differs from original value (" << data_values[i] << ")";
}
}
}
}
template <typename T>
-struct GetBatchSpacedTestCase {
- T max_value;
- int64_t size;
+struct DataTestRleBitPackedRandomPart {
+ using value_type = T;
+
+ value_type max;
+ int32_t size;
+ double null_probability;
+};
+
+template <typename T>
+struct DataTestRleBitPackedRepeatPart {
+ using value_type = T;
+
+ value_type value;
+ int32_t size;
double null_probability;
- int bit_width;
};
-TEST(RleDecoder, GetBatchSpaced) {
- uint32_t kSeed = 1337;
- ::arrow::random::RandomArrayGenerator rand(kSeed);
+template <typename T>
+struct DataTestRleBitPackedNullPart {
+ using value_type = T;
- std::vector<GetBatchSpacedTestCase<int32_t>> int32_cases{
- {1, 100000, 0.01, 1}, {1, 100000, 0.1, 1}, {1, 100000, 0.5, 1},
- {4, 100000, 0.05, 3}, {100, 100000, 0.05, 7},
+ int32_t size;
+};
+
+template <typename T>
+struct DataTestRleBitPacked {
+ using value_type = T;
+ using ArrowType = typename arrow::CTypeTraits<value_type>::ArrowType;
+ using RandomPart = DataTestRleBitPackedRandomPart<value_type>;
+ using RepeatPart = DataTestRleBitPackedRepeatPart<value_type>;
+ using NullPart = DataTestRleBitPackedNullPart<value_type>;
+ using AnyPart = std::variant<RandomPart, RepeatPart, NullPart>;
+
+ std::vector<AnyPart> parts;
+ int32_t bit_width;
+
+ std::shared_ptr<::arrow::Array> MakeArray(
+ ::arrow::random::RandomArrayGenerator& rand) const {
+ using Traits = arrow::TypeTraits<ArrowType>;
+
+ std::vector<std::shared_ptr<::arrow::Array>> arrays = {};
+
+ for (const auto& dyn_part : parts) {
+ if (auto* part = std::get_if<RandomPart>(&dyn_part)) {
+ auto arr = rand.Numeric<ArrowType>(part->size, /* min= */
value_type(0),
+ part->max, part->null_probability);
+ arrays.push_back(std::move(arr));
+
+ } else if (auto* part = std::get_if<RepeatPart>(&dyn_part)) {
+ auto arr =
+ rand.Numeric<ArrowType>(part->size, /* min= */ part->value,
+ /* max= */ part->value,
part->null_probability);
+ arrays.push_back(std::move(arr));
+
+ } else if (auto* part = std::get_if<NullPart>(&dyn_part)) {
+ EXPECT_OK_AND_ASSIGN(
+ auto arr, ::arrow::MakeArrayOfNull(Traits::type_singleton(),
part->size));
+ arrays.push_back(std::move(arr));
+ }
+ }
+ ARROW_DCHECK_EQ(parts.size(), arrays.size());
+
+ return ::arrow::Concatenate(arrays).ValueOrDie();
+ }
+};
+
+template <typename T>
+void DoTestGetBatchSpacedRoundtrip() {
+ using Data = DataTestRleBitPacked<T>;
+ using ArrowType = typename Data::ArrowType;
+ using RandomPart = typename Data::RandomPart;
+ using NullPart = typename Data::NullPart;
+ using RepeatPart = typename Data::RepeatPart;
+
+ std::vector<Data> test_cases = {
+ {
+ {RandomPart{/* max=*/1, /* size=*/400, /* null_proba= */ 0.1}},
+ /* bit_width= */ 1,
+ },
+ {
+ {
+ RandomPart{/* max=*/7, /* size=*/1037, /* null_proba= */ 0.0},
+ NullPart{/* size= */ 1153},
+ RandomPart{/* max=*/7, /* size=*/800, /* null_proba= */ 0.5},
+ },
+ /* bit_width= */ 3,
+ },
+ {
+ {
+ NullPart{/* size= */ 80},
+ RandomPart{/* max=*/static_cast<T>(1023), /* size=*/800,
+ /* null_proba= */ 0.01},
+ NullPart{/* size= */ 1023},
+ },
+ /* bit_width= */ 11,
+ },
+ {
+ {RepeatPart{/* value=*/13, /* size=*/1024, /* null_proba= */ 0.01}},
+ /* bit_width= */ 10,
+ },
+ {
+ {
+ NullPart{/* size= */ 1024},
+ RepeatPart{/* value=*/static_cast<T>(10000), /* size=*/1025,
+ /* null_proba= */ 0.1},
+ NullPart{/* size= */ 77},
+ },
+ /* bit_width= */ 23,
+ },
+ {
+ {
+ RepeatPart{/* value=*/13, /* size=*/1023, /* null_proba= */ 0.0},
+ NullPart{/* size= */ 1153},
+ RepeatPart{/* value=*/72, /* size=*/1799, /* null_proba= */ 0.5},
+ },
+ /* bit_width= */ 10,
+ },
+ {
+ {
+ RandomPart{/* max=*/1, /* size=*/1013, /* null_proba= */ 0.01},
+ NullPart{/* size=*/8},
+ RepeatPart{1, /* size= */ 256, /* null_proba= */ 0.1},
+ NullPart{/* size=*/128},
+ RepeatPart{0, /* size= */ 256, /* null_proba= */ 0.0},
+ NullPart{/* size=*/15},
+ RandomPart{/* max=*/1, /* size=*/1024, /* null_proba= */ 0.01},
+ },
+ /* bit_width= */ 1,
+ },
};
- for (auto case_ : int32_cases) {
- auto arr = rand.Int32(case_.size, /*min=*/0, case_.max_value,
case_.null_probability);
- CheckRoundTripSpaced<Int32Type>(*arr, case_.bit_width);
- CheckRoundTripSpaced<Int32Type>(*arr->Slice(1), case_.bit_width);
+
+ ::arrow::random::RandomArrayGenerator rand(/* seed= */ 12);
+ // FRAGILE: we create a dictionary large enough so that any encoded value
from the
+ // previous test cases can be used as an index in the dictionary.
+ // Its size must be increased accordingly if larger values are encoded in
the test
+ // cases.
+ auto dict = std::static_pointer_cast<arrow::FloatArray>(rand.Float32(20000,
-1.0, 1.0));
+
+ // Number of bits available in T to write a positive integer.
+ constexpr int kBitsAvailable = 8 * sizeof(T) - (std::is_signed_v<T> ? 1 : 0);
+
+ for (auto case_ : test_cases) {
+ if (case_.bit_width > kBitsAvailable) {
+ continue;
+ }
+
+ auto array = case_.MakeArray(rand);
+
+ // Tests for GetBatch
+ CheckRoundTrip<ArrowType>(*array, case_.bit_width, /* spaced= */ false,
+ /* parts= */ 1);
+ CheckRoundTrip<ArrowType>(*array, case_.bit_width, /* spaced= */ false,
+ /* parts= */ 3);
+
+ // Tests for GetBatchSpaced
+ CheckRoundTrip<ArrowType>(*array, case_.bit_width, /* spaced= */ true,
+ /* parts= */ 1);
+ CheckRoundTrip<ArrowType>(*array, case_.bit_width, /* spaced= */ true,
+ /* parts= */ 7);
+ CheckRoundTrip<ArrowType>(*array->Slice(1), case_.bit_width, /* spaced= */
true,
+ /* parts= */ 1);
+
+ // Cannot test GetBatchWithDict with this method since unknown null values
+
+ // Tests for GetBatchWithDictSpaced
+ CheckRoundTrip<ArrowType>(*array, case_.bit_width, /* spaced= */ true, /*
parts= */ 1,
+ dict);
+ CheckRoundTrip<ArrowType>(*array, case_.bit_width, /* spaced= */ true, /*
parts= */ 5,
+ dict);
}
}
-} // namespace util
-} // namespace arrow
+TEST(RleBitPacked, GetBatchSpacedRoundtripUint8) {
+ DoTestGetBatchSpacedRoundtrip<uint8_t>();
+}
+TEST(RleBitPacked, GetBatchSpacedRoundtripUint16) {
+ DoTestGetBatchSpacedRoundtrip<uint16_t>();
+}
+TEST(RleBitPacked, GetBatchSpacedRoundtripInt32) {
+ DoTestGetBatchSpacedRoundtrip<int32_t>();
+}
+TEST(RleBitPacked, GetBatchSpacedRoundtripUInt32) {
+ DoTestGetBatchSpacedRoundtrip<uint32_t>();
+}
+TEST(RleBitPacked, GetBatchSpacedRoundtripUint64) {
+ DoTestGetBatchSpacedRoundtrip<uint64_t>();
+}
+
+} // namespace arrow::util
diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index f40e3cae54..9c314cf818 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -113,8 +113,8 @@ int LevelDecoder::SetData(Encoding::type encoding, int16_t
max_level,
}
const uint8_t* decoder_data = data + 4;
if (!rle_decoder_) {
- rle_decoder_ =
std::make_unique<::arrow::util::RleDecoder>(decoder_data,
- num_bytes,
bit_width_);
+ rle_decoder_ =
std::make_unique<::arrow::util::RleBitPackedDecoder<int16_t>>(
+ decoder_data, num_bytes, bit_width_);
} else {
rle_decoder_->Reset(decoder_data, num_bytes, bit_width_);
}
@@ -157,8 +157,8 @@ void LevelDecoder::SetDataV2(int32_t num_bytes, int16_t
max_level,
bit_width_ = bit_util::Log2(max_level + 1);
if (!rle_decoder_) {
- rle_decoder_ =
- std::make_unique<::arrow::util::RleDecoder>(data, num_bytes,
bit_width_);
+ rle_decoder_ =
std::make_unique<::arrow::util::RleBitPackedDecoder<int16_t>>(
+ data, num_bytes, bit_width_);
} else {
rle_decoder_->Reset(data, num_bytes, bit_width_);
}
diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h
index 0bff52f792..ac4469b190 100644
--- a/cpp/src/parquet/column_reader.h
+++ b/cpp/src/parquet/column_reader.h
@@ -39,7 +39,8 @@ class BitReader;
} // namespace bit_util
namespace util {
-class RleDecoder;
+template <typename T>
+class RleBitPackedDecoder;
} // namespace util
} // namespace arrow
@@ -95,7 +96,7 @@ class PARQUET_EXPORT LevelDecoder {
int bit_width_;
int num_values_remaining_;
Encoding::type encoding_;
- std::unique_ptr<::arrow::util::RleDecoder> rle_decoder_;
+ std::unique_ptr<::arrow::util::RleBitPackedDecoder<int16_t>> rle_decoder_;
std::unique_ptr<::arrow::bit_util::BitReader> bit_packed_decoder_;
int16_t max_level_;
};
diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index f35f84f002..1f3d64f622 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -70,7 +70,7 @@ using arrow::bit_util::BitWriter;
using arrow::internal::checked_cast;
using arrow::internal::checked_pointer_cast;
using arrow::util::Float16;
-using arrow::util::RleEncoder;
+using arrow::util::RleBitPackedEncoder;
namespace bit_util = arrow::bit_util;
@@ -168,7 +168,7 @@ void LevelEncoder::Init(Encoding::type encoding, int16_t
max_level,
encoding_ = encoding;
switch (encoding) {
case Encoding::RLE: {
- rle_encoder_ = std::make_unique<RleEncoder>(data, data_size, bit_width_);
+ rle_encoder_ = std::make_unique<RleBitPackedEncoder>(data, data_size,
bit_width_);
break;
}
case Encoding::BIT_PACKED: {
@@ -190,8 +190,8 @@ int LevelEncoder::MaxBufferSize(Encoding::type encoding,
int16_t max_level,
case Encoding::RLE: {
// TODO: Due to the way we currently check if the buffer is full enough,
// we need to have MinBufferSize as head room.
- num_bytes = RleEncoder::MaxBufferSize(bit_width, num_buffered_values) +
- RleEncoder::MinBufferSize(bit_width);
+ num_bytes = RleBitPackedEncoder::MaxBufferSize(bit_width,
num_buffered_values) +
+ RleBitPackedEncoder::MinBufferSize(bit_width);
break;
}
case Encoding::BIT_PACKED: {
diff --git a/cpp/src/parquet/column_writer.h b/cpp/src/parquet/column_writer.h
index bd329d6105..2a046a0ca5 100644
--- a/cpp/src/parquet/column_writer.h
+++ b/cpp/src/parquet/column_writer.h
@@ -36,7 +36,7 @@ class BitWriter;
} // namespace bit_util
namespace util {
-class RleEncoder;
+class RleBitPackedEncoder;
class CodecOptions;
} // namespace util
@@ -80,7 +80,7 @@ class PARQUET_EXPORT LevelEncoder {
int bit_width_;
int rle_length_;
Encoding::type encoding_;
- std::unique_ptr<::arrow::util::RleEncoder> rle_encoder_;
+ std::unique_ptr<::arrow::util::RleBitPackedEncoder> rle_encoder_;
std::unique_ptr<::arrow::bit_util::BitWriter> bit_packed_encoder_;
};
diff --git a/cpp/src/parquet/decoder.cc b/cpp/src/parquet/decoder.cc
index fc191e8ded..46d1c201e9 100644
--- a/cpp/src/parquet/decoder.cc
+++ b/cpp/src/parquet/decoder.cc
@@ -861,7 +861,8 @@ class DictDecoderImpl : public TypedDecoderImpl<Type>,
public DictDecoder<Type>
this->num_values_ = num_values;
if (len == 0) {
// Initialize dummy decoder to avoid crashes later on
- idx_decoder_ = ::arrow::util::RleDecoder(data, len, /*bit_width=*/1);
+ idx_decoder_ =
+ ::arrow::util::RleBitPackedDecoder<int32_t>(data, len,
/*bit_width=*/1);
return;
}
uint8_t bit_width = *data;
@@ -869,7 +870,7 @@ class DictDecoderImpl : public TypedDecoderImpl<Type>,
public DictDecoder<Type>
throw ParquetException("Invalid or corrupted bit_width " +
std::to_string(bit_width) + ". Maximum allowed is
32.");
}
- idx_decoder_ = ::arrow::util::RleDecoder(++data, --len, bit_width);
+ idx_decoder_ = ::arrow::util::RleBitPackedDecoder<int32_t>(++data, --len,
bit_width);
}
int Decode(T* buffer, int num_values) override {
@@ -1003,7 +1004,7 @@ class DictDecoderImpl : public TypedDecoderImpl<Type>,
public DictDecoder<Type>
// BinaryDictionary32Builder
std::shared_ptr<ResizableBuffer> indices_scratch_space_;
- ::arrow::util::RleDecoder idx_decoder_;
+ ::arrow::util::RleBitPackedDecoder<int32_t> idx_decoder_;
};
template <typename Type>
@@ -1810,8 +1811,9 @@ class RleBooleanDecoder : public
TypedDecoderImpl<BooleanType>, public BooleanDe
auto decoder_data = data + 4;
if (decoder_ == nullptr) {
- decoder_ = std::make_shared<::arrow::util::RleDecoder>(decoder_data,
num_bytes,
- /*bit_width=*/1);
+ decoder_ = std::make_shared<::arrow::util::RleBitPackedDecoder<bool>>(
+ decoder_data, num_bytes,
+ /*bit_width=*/1);
} else {
decoder_->Reset(decoder_data, num_bytes, /*bit_width=*/1);
}
@@ -1898,7 +1900,7 @@ class RleBooleanDecoder : public
TypedDecoderImpl<BooleanType>, public BooleanDe
}
private:
- std::shared_ptr<::arrow::util::RleDecoder> decoder_;
+ std::shared_ptr<::arrow::util::RleBitPackedDecoder<bool>> decoder_;
};
// ----------------------------------------------------------------------
@@ -2123,7 +2125,7 @@ class DeltaByteArrayDecoderImpl : public
TypedDecoderImpl<DType> {
int num_valid_values_{0};
uint32_t prefix_len_offset_{0};
std::shared_ptr<ResizableBuffer> buffered_prefix_length_;
- // buffer for decoded strings, which gurantees the lifetime of the decoded
strings
+ // buffer for decoded strings, which guarantees the lifetime of the decoded
strings
// until the next call of Decode.
std::shared_ptr<ResizableBuffer> buffered_data_;
};
diff --git a/cpp/src/parquet/encoder.cc b/cpp/src/parquet/encoder.cc
index 112b810a8f..831ddbddab 100644
--- a/cpp/src/parquet/encoder.cc
+++ b/cpp/src/parquet/encoder.cc
@@ -438,8 +438,8 @@ int RlePreserveBufferSize(int num_values, int bit_width) {
// is called, we have to reserve an extra "RleEncoder::MinBufferSize"
// bytes. These extra bytes won't be used but not reserving them
// would cause the encoder to fail.
- return ::arrow::util::RleEncoder::MaxBufferSize(bit_width, num_values) +
- ::arrow::util::RleEncoder::MinBufferSize(bit_width);
+ return ::arrow::util::RleBitPackedEncoder::MaxBufferSize(bit_width,
num_values) +
+ ::arrow::util::RleBitPackedEncoder::MinBufferSize(bit_width);
}
/// See the dictionary encoding section of
@@ -476,7 +476,7 @@ class DictEncoderImpl : public EncoderImpl, virtual public
DictEncoder<DType> {
++buffer;
--buffer_len;
- ::arrow::util::RleEncoder encoder(buffer, buffer_len, bit_width());
+ ::arrow::util::RleBitPackedEncoder encoder(buffer, buffer_len,
bit_width());
for (int32_t index : buffered_indices_) {
if (ARROW_PREDICT_FALSE(!encoder.Put(index))) return -1;
@@ -1717,8 +1717,9 @@ std::shared_ptr<Buffer> RleBooleanEncoder::FlushValues() {
int rle_buffer_size_max = MaxRleBufferSize();
std::shared_ptr<ResizableBuffer> buffer =
AllocateBuffer(this->pool_, rle_buffer_size_max + kRleLengthInBytes);
- ::arrow::util::RleEncoder encoder(buffer->mutable_data() + kRleLengthInBytes,
- rle_buffer_size_max, /*bit_width*/
kBitWidth);
+ ::arrow::util::RleBitPackedEncoder encoder(buffer->mutable_data() +
kRleLengthInBytes,
+ rle_buffer_size_max,
+ /*bit_width*/ kBitWidth);
for (bool value : buffered_append_values_) {
encoder.Put(value ? 1 : 0);