pitrou commented on code in PR #47294: URL: https://github.com/apache/arrow/pull/47294#discussion_r2352588002
########## cpp/src/arrow/util/bit_util_test.cc: ########## @@ -1997,11 +1997,191 @@ TEST(BitUtil, RoundUpToPowerOf2) { #undef U64 #undef S64 +/// Test the maximum number of bytes needed to write a LEB128 of a give size. +TEST(LEB128, MaxLEB128ByteLenFor) { + EXPECT_EQ(bit_util::kMaxLEB128ByteLenFor<int16_t>, 3); + EXPECT_EQ(bit_util::kMaxLEB128ByteLenFor<int32_t>, 5); + EXPECT_EQ(bit_util::kMaxLEB128ByteLenFor<int64_t>, 10); +} + +/// Utility function to test LEB128 encoding with known input value and expected byte +/// array +template <typename Int> +void TestLEB128Encode(Int input_value, const std::vector<uint8_t>& expected_data, + std::size_t buffer_size) { + std::vector<uint8_t> buffer(buffer_size); + auto bytes_written = bit_util::WriteLEB128(input_value, buffer.data(), + static_cast<int32_t>(buffer.size())); + + EXPECT_EQ(bytes_written, expected_data.size()); + // Encoded data + for (std::size_t i = 0; i < expected_data.size(); ++i) { + EXPECT_EQ(buffer.at(i), expected_data.at(i)); + } + + // When the value is successfully encoded, the remaining of the buffer is untouched + if (bytes_written > 0) { + for (std::size_t i = bytes_written; i < buffer.size(); ++i) { + EXPECT_EQ(buffer.at(i), 0); + } + } +} + +/// Test encoding to known LEB128 byte sequences with edge cases parameters. +/// \see LEB128.KnownSuccessfulValues for other known values tested. +TEST(LEB128, WriteEdgeCases) { + // Single byte value 0 + TestLEB128Encode(0U, {0x00}, 1); + // Single byte value 127 + TestLEB128Encode(127U, {0x7F}, 1); + // Three byte value 16384, encoded in larger buffer + TestLEB128Encode(16384U, {0x80, 0x80, 0x01}, 10); + // Two byte boundary values Review Comment: 128U would be the actual boundary value, right? Perhaps test it too? ########## cpp/src/arrow/util/bit_util_test.cc: ########## @@ -1997,11 +1997,191 @@ TEST(BitUtil, RoundUpToPowerOf2) { #undef U64 #undef S64 +/// Test the maximum number of bytes needed to write a LEB128 of a give size. +TEST(LEB128, MaxLEB128ByteLenFor) { + EXPECT_EQ(bit_util::kMaxLEB128ByteLenFor<int16_t>, 3); + EXPECT_EQ(bit_util::kMaxLEB128ByteLenFor<int32_t>, 5); + EXPECT_EQ(bit_util::kMaxLEB128ByteLenFor<int64_t>, 10); Review Comment: Also test the unsigned types? ########## cpp/src/arrow/util/bit_util_test.cc: ########## @@ -1997,11 +1997,191 @@ TEST(BitUtil, RoundUpToPowerOf2) { #undef U64 #undef S64 +/// Test the maximum number of bytes needed to write a LEB128 of a give size. +TEST(LEB128, MaxLEB128ByteLenFor) { + EXPECT_EQ(bit_util::kMaxLEB128ByteLenFor<int16_t>, 3); + EXPECT_EQ(bit_util::kMaxLEB128ByteLenFor<int32_t>, 5); + EXPECT_EQ(bit_util::kMaxLEB128ByteLenFor<int64_t>, 10); +} + +/// Utility function to test LEB128 encoding with known input value and expected byte +/// array +template <typename Int> +void TestLEB128Encode(Int input_value, const std::vector<uint8_t>& expected_data, + std::size_t buffer_size) { + std::vector<uint8_t> buffer(buffer_size); + auto bytes_written = bit_util::WriteLEB128(input_value, buffer.data(), + static_cast<int32_t>(buffer.size())); + + EXPECT_EQ(bytes_written, expected_data.size()); + // Encoded data + for (std::size_t i = 0; i < expected_data.size(); ++i) { + EXPECT_EQ(buffer.at(i), expected_data.at(i)); + } + + // When the value is successfully encoded, the remaining of the buffer is untouched + if (bytes_written > 0) { + for (std::size_t i = bytes_written; i < buffer.size(); ++i) { + EXPECT_EQ(buffer.at(i), 0); + } + } +} + +/// Test encoding to known LEB128 byte sequences with edge cases parameters. +/// \see LEB128.KnownSuccessfulValues for other known values tested. +TEST(LEB128, WriteEdgeCases) { + // Single byte value 0 + TestLEB128Encode(0U, {0x00}, 1); + // Single byte value 127 + TestLEB128Encode(127U, {0x7F}, 1); + // Three byte value 16384, encoded in larger buffer + TestLEB128Encode(16384U, {0x80, 0x80, 0x01}, 10); + // Two byte boundary values + TestLEB128Encode(129U, {0x81, 0x01}, 2); + TestLEB128Encode(16383U, {0xFF, 0x7F}, 2); + // Error case: Buffer too small for value 128 (needs 2 bytes but only 1 provided) + TestLEB128Encode(128U, {}, 1); + // Error case: Buffer too small for uint32_t max (needs 5 bytes but only 4 provided) + TestLEB128Encode(4294967295U, {}, 4); + // Error case: Zero buffer size + TestLEB128Encode(52U, {}, 0); + // Error case: Negative value + TestLEB128Encode(-3, {}, 1); +} + +/// Utility function to test LEB128 decoding with known byte array and expected result +template <typename Int> +void TestLEB128Decode(const std::vector<uint8_t>& data, Int expected_value, + int32_t expected_bytes_read) { + Int result = 0; + auto bytes_read = bit_util::ParseLeadingLEB128( + data.data(), static_cast<int32_t>(data.size()), &result); + EXPECT_EQ(bytes_read, expected_bytes_read); + if (expected_bytes_read > 0) { + EXPECT_EQ(result, expected_value); + } +} + +template <typename Int> +void TestLEB128Decode(const std::vector<uint8_t>& data, Int expected_value, + std::size_t expected_bytes_read) { Review Comment: Do we really need this override? It doesn't seem very useful to me. ########## cpp/src/arrow/util/rle_encoding_test.cc: ########## @@ -553,47 +853,183 @@ TEST(BitRle, Overflow) { } } +/// Check RleBitPacked encoding/decoding round trip. +/// +/// \param spaced If set to false, treat Nulls in the input array as regular data. +/// \param parts The number of parts in which the data will be decoded. +/// For number greater than one, this ensure that the decoder intermediary state +/// is valid. template <typename Type> -void CheckRoundTripSpaced(const Array& data, int bit_width) { +void CheckRoundTrip(const Array& data, int bit_width, bool spaced, int32_t parts, + std::shared_ptr<FloatArray> dict = {}) { using ArrayType = typename TypeTraits<Type>::ArrayType; - using T = typename Type::c_type; + using value_type = typename Type::c_type; - int num_values = static_cast<int>(data.length()); - int buffer_size = RleEncoder::MaxBufferSize(bit_width, num_values); + const int data_size = static_cast<int>(data.length()); + const int data_values_count = + static_cast<int>(data.length() - spaced * data.null_count()); + const int buffer_size = RleBitPackedEncoder::MaxBufferSize(bit_width, data_size); + ASSERT_GE(parts, 1); + ASSERT_LE(parts, data_size); - const T* values = static_cast<const ArrayType&>(data).raw_values(); + const value_type* data_values = static_cast<const ArrayType&>(data).raw_values(); + // Encode the data into `buffer` using the encoder. std::vector<uint8_t> buffer(buffer_size); - RleEncoder encoder(buffer.data(), buffer_size, bit_width); - for (int i = 0; i < num_values; ++i) { - if (data.IsValid(i)) { - if (!encoder.Put(static_cast<uint64_t>(values[i]))) { - FAIL() << "Encoding failed"; - } + RleBitPackedEncoder encoder(buffer.data(), buffer_size, bit_width); + int32_t encoded_values_size = 0; + for (int i = 0; i < data_size; ++i) { + // Depending on `spaced` we treat nulls as regular values. + if (data.IsValid(i) || !spaced) { + bool success = encoder.Put(static_cast<uint64_t>(data_values[i])); + ASSERT_TRUE(success) << "Encoding failed in pos " << i; + ++encoded_values_size; } } - int encoded_size = encoder.Flush(); + int encoded_byte_size = encoder.Flush(); + ASSERT_EQ(encoded_values_size, data_values_count) + << "All values input were not encoded successfully by the encoder"; + + // Now we verify batch read + RleBitPackedDecoder<value_type> decoder(buffer.data(), encoded_byte_size, bit_width); + // We will only use one of them depending on whether this is a dictonnary tests + std::vector<float> dict_read; + std::vector<value_type> values_read; + if (dict) { + dict_read.resize(data_size); + } else { + values_read.resize(data_size); + } - // Verify batch read - RleDecoder decoder(buffer.data(), encoded_size, bit_width); - std::vector<T> values_read(num_values); + // We will read the data in `parts` calls to make sure intermediate states are valid + int32_t actual_read_count = 0; + int32_t requested_read_count = 0; + while (requested_read_count < data_size) { + const auto remaining = data_size - requested_read_count; + auto to_read = data_size / parts; + if (remaining / to_read == 1) { + to_read = remaining; + } - if (num_values != decoder.GetBatchSpaced( - num_values, static_cast<int>(data.null_count()), - data.null_bitmap_data(), data.offset(), values_read.data())) { - FAIL(); - } + auto read = 0; + if (spaced) { + // We need to slice the input array get the proper null count and bitmap + auto data_remaining = data.Slice(requested_read_count, to_read); + + if (dict) { + auto* out = dict_read.data() + requested_read_count; + read = decoder.GetBatchWithDictSpaced( + dict->raw_values(), static_cast<int32_t>(dict->length()), out, to_read, + static_cast<int32_t>(data_remaining->null_count()), + data_remaining->null_bitmap_data(), data_remaining->offset()); + } else { + auto* out = values_read.data() + requested_read_count; + read = decoder.GetBatchSpaced( + to_read, static_cast<int32_t>(data_remaining->null_count()), + data_remaining->null_bitmap_data(), data_remaining->offset(), out); + } + } else { + if (dict) { + auto* out = dict_read.data() + requested_read_count; + read = decoder.GetBatchWithDict( + dict->raw_values(), static_cast<int32_t>(dict->length()), out, to_read); + } else { + auto* out = values_read.data() + requested_read_count; + read = decoder.GetBatch(out, to_read); + } + } + ASSERT_EQ(read, to_read) << "Decoder did not read as many values as requested"; - for (int64_t i = 0; i < num_values; ++i) { - if (data.IsValid(i)) { - if (values_read[i] != values[i]) { - FAIL() << "Index " << i << " read " << values_read[i] << " but should be " - << values[i]; + actual_read_count += read; + requested_read_count += to_read; + } + EXPECT_EQ(requested_read_count, data_size) << "This test logic is wrong"; + EXPECT_EQ(actual_read_count, data_size) << "Total number of values read is off"; + + // Verify the round trip: encoded-decoded values must equal the original one + for (int64_t i = 0; i < data_size; ++i) { + if (data.IsValid(i) || !spaced) { + if (dict) { + EXPECT_EQ(dict_read.at(i), dict->Value(data_values[i])) + << "Encoded then decoded and mapped value at position " << i << " (" + << values_read[i] << ") differs from original value (" << data_values[i] + << " mapped to " << dict->Value(data_values[i]) << ")"; + } else { + EXPECT_EQ(values_read.at(i), data_values[i]) + << "Encoded then decoded value at position " << i << " (" << values_read.at(i) + << ") differs from original value (" << data_values[i] << ")"; } } } } +template <typename T> +struct DataTestRleBitPackedRandomPart { + using value_type = T; + + value_type max; + int32_t size; + double null_probability; +}; + +template <typename T> +struct DataTestRleBitPackedRepeatPart { + using value_type = T; + + value_type value; + int32_t size; + double null_probability; +}; + +template <typename T> +struct DataTestRleBitPackedNullPart { + using value_type = T; + + int32_t size; +}; + +template <typename T> +struct DataTestRleBitPacked { + using value_type = T; + using ArrowType = typename arrow::CTypeTraits<value_type>::ArrowType; + using RandomPart = DataTestRleBitPackedRandomPart<value_type>; + using RepeatPart = DataTestRleBitPackedRepeatPart<value_type>; + using NullPart = DataTestRleBitPackedNullPart<value_type>; + using AnyPart = std::variant<RandomPart, RepeatPart, NullPart>; + + std::vector<AnyPart> parts; + int32_t bit_width; + + std::shared_ptr<::arrow::Array> MakeArray( + ::arrow::random::RandomArrayGenerator& rand) const { + using Traits = arrow::TypeTraits<ArrowType>; + + std::vector<std::shared_ptr<::arrow::Array>> arrays = {}; + + for (const auto& dyn_part : parts) { + if (auto* part = std::get_if<RandomPart>(&dyn_part)) { + auto arr = rand.Numeric<ArrowType>(part->size, /* min= */ value_type(0), + part->max, part->null_probability); + arrays.push_back(std::move(arr)); + + } else if (auto* part = std::get_if<RepeatPart>(&dyn_part)) { + auto arr = + rand.Numeric<ArrowType>(part->size, /* min= */ part->value, + /* max= */ part->value, part->null_probability); + arrays.push_back(std::move(arr)); + + } else if (auto* part = std::get_if<NullPart>(&dyn_part)) { + EXPECT_OK_AND_ASSIGN( + auto arr, ::arrow::MakeArrayOfNull(Traits::type_singleton(), part->size)); + arrays.push_back(std::move(arr)); + } + } + ARROW_DCHECK_EQ(parts.size(), arrays.size()); + + return ::arrow::Concatenate(arrays).ValueOrDie(); + } +}; + template <typename T> struct GetBatchSpacedTestCase { Review Comment: Is this one still used? ########## cpp/src/arrow/util/bit_stream_utils_internal.h: ########## @@ -439,91 +424,84 @@ inline bool BitReader::Advance(int64_t num_bits) { return true; } -inline bool BitWriter::PutVlqInt(uint32_t v) { - bool result = true; - while ((v & 0xFFFFFF80UL) != 0UL) { - result &= PutAligned<uint8_t>(static_cast<uint8_t>((v & 0x7F) | 0x80), 1); - v >>= 7; - } - result &= PutAligned<uint8_t>(static_cast<uint8_t>(v & 0x7F), 1); - return result; -} +template <typename Int> +inline bool BitWriter::PutVlqInt(Int v) { + static_assert(std::is_integral_v<Int>); Review Comment: Same here: assert it's unsigned? ########## cpp/src/arrow/util/rle_encoding_internal.h: ########## @@ -84,32 +83,279 @@ namespace util { /// (total 26 bytes, 1 byte overhead) // -/// Decoder class for RLE encoded data. -class RleDecoder { +template <typename T> +class RleRunDecoder; + +class RleRun { public: - /// Create a decoder object. buffer/buffer_len is the decoded data. - /// bit_width is the width of each value (before encoding). - RleDecoder(const uint8_t* buffer, int buffer_len, int bit_width) - : bit_reader_(buffer, buffer_len), - bit_width_(bit_width), - current_value_(0), - repeat_count_(0), - literal_count_(0) { - ARROW_DCHECK_GE(bit_width_, 0); - ARROW_DCHECK_LE(bit_width_, 64); - } + /// Enough space to store a 64bit value + using raw_data_storage = std::array<uint8_t, 8>; + using raw_data_const_pointer = const uint8_t*; + using raw_data_size_type = int32_t; + /// The type of the size of either run, between 1 and 2^31-1 as per Parquet spec + using values_count_type = int32_t; + /// The type to represent a size in bits + using bit_size_type = int32_t; + + /// The decoder class used to decode a single run in the given type. + template <typename T> + using DecoderType = RleRunDecoder<T>; - RleDecoder() : bit_width_(-1) {} + constexpr RleRun() noexcept = default; + constexpr RleRun(const RleRun&) noexcept = default; + constexpr RleRun(RleRun&&) noexcept = default; - void Reset(const uint8_t* buffer, int buffer_len, int bit_width) { - ARROW_DCHECK_GE(bit_width, 0); - ARROW_DCHECK_LE(bit_width, 64); - bit_reader_.Reset(buffer, buffer_len); - bit_width_ = bit_width; - current_value_ = 0; - repeat_count_ = 0; - literal_count_ = 0; - } + explicit RleRun(raw_data_const_pointer data, values_count_type values_count, + bit_size_type value_bit_width) noexcept; + + constexpr RleRun& operator=(const RleRun&) noexcept = default; + constexpr RleRun& operator=(RleRun&&) noexcept = default; + + /// The number of repeated values in this run. + [[nodiscard]] constexpr values_count_type ValuesCount() const noexcept; Review Comment: Again, `[[nodiscard]]` does not seem useful on such simple accessor methods. Nothing dangerous happens if you ignore the result... ########## cpp/src/parquet/decoder.cc: ########## @@ -1898,7 +1900,7 @@ class RleBooleanDecoder : public TypedDecoderImpl<BooleanType>, public BooleanDe } private: - std::shared_ptr<::arrow::util::RleDecoder> decoder_; + std::shared_ptr<::arrow::util::RleBitPackedDecoder<bool>> decoder_; Review Comment: Fair enough. ########## cpp/src/arrow/util/rle_encoding_test.cc: ########## @@ -207,12 +209,310 @@ TEST(BitUtil, RoundTripIntValues) { } } +/// A Rle run is a simple class owning some data and a repetition count. +/// It does not know how to read such data. +TEST(Rle, RleRun) { + const std::array<RleRun::byte, 4> value = {21, 2, 0, 0}; + + RleRun::values_count_type value_count = 12; + + // 12 times the value 21 fitting over 5 bits + auto const run_5 = RleRun(value.data(), value_count, /* value_bit_width= */ 5); + EXPECT_EQ(run_5.ValuesCount(), value_count); + EXPECT_EQ(run_5.ValuesBitWidth(), 5); + EXPECT_EQ(run_5.RawDataSize(), 1); // 5 bits fit in one byte + EXPECT_EQ(*run_5.RawDataPtr(), 21); + + // 12 times the value 21 fitting over 16 bits + auto const run_8 = RleRun(value.data(), value_count, /* value_bit_width= */ 8); + EXPECT_EQ(run_8.ValuesCount(), value_count); + EXPECT_EQ(run_8.ValuesBitWidth(), 8); + EXPECT_EQ(run_8.RawDataSize(), 1); // 8 bits fit in 1 byte + EXPECT_EQ(*run_8.RawDataPtr(), 21); + + // 12 times the value {21, 2} fitting over 10 bits + auto const run_10 = RleRun(value.data(), value_count, /* value_bit_width= */ 10); + + EXPECT_EQ(run_10.ValuesCount(), value_count); + EXPECT_EQ(run_10.ValuesBitWidth(), 10); + EXPECT_EQ(run_10.RawDataSize(), 2); // 10 bits fit in 2 bytes + EXPECT_EQ(*(run_10.RawDataPtr() + 0), 21); + EXPECT_EQ(*(run_10.RawDataPtr() + 1), 2); + + // 12 times the value {21, 2} fitting over 32 bits + auto const run_32 = RleRun(value.data(), value_count, /* value_bit_width= */ 32); + EXPECT_EQ(run_32.ValuesCount(), value_count); + EXPECT_EQ(run_32.ValuesBitWidth(), 32); + EXPECT_EQ(run_32.RawDataSize(), 4); // 32 bits fit in 4 bytes + EXPECT_EQ(*(run_32.RawDataPtr() + 0), 21); + EXPECT_EQ(*(run_32.RawDataPtr() + 1), 2); + EXPECT_EQ(*(run_32.RawDataPtr() + 2), 0); + EXPECT_EQ(*(run_32.RawDataPtr() + 3), 0); +} + +/// A BitPacked run is a simple class owning some data and its size. +/// It does not know how to read such data. +TEST(BitPacked, BitPackedRun) { + const std::array<BitPackedRun::byte, 4> value = {0b10101010, 0, 0, 0b1111111}; + + /// 16 values of 1 bit for a total of 16 bits + BitPackedRun::values_count_type value_count_1 = 16; + auto const run_1 = BitPackedRun(value.data(), value_count_1, /* value_bit_width= */ 1); + EXPECT_EQ(run_1.ValuesCount(), value_count_1); + EXPECT_EQ(run_1.ValuesBitWidth(), 1); + EXPECT_EQ(run_1.RawDataSize(), 2); // 16 bits fit in 2 bytes + for (BitPackedRun::raw_data_size_type i = 0; i < run_1.RawDataSize(); ++i) { + EXPECT_EQ(*(run_1.RawDataPtr() + i), value[i]); Review Comment: Hmm, I'm still wondering about this (see question above). ########## cpp/src/arrow/util/rle_encoding_internal.h: ########## @@ -299,385 +552,988 @@ class RleEncoder { uint8_t* literal_indicator_byte_; }; +/************************* + * RleBitPackedDecoder * + *************************/ + +template <typename T> +RleBitPackedDecoder<T>::RleBitPackedDecoder(raw_data_const_pointer data, + raw_data_size_type data_size, + bit_size_type value_bit_width) noexcept { + Reset(data, data_size, value_bit_width); +} + +template <typename T> +void RleBitPackedDecoder<T>::Reset(raw_data_const_pointer data, + raw_data_size_type data_size, + bit_size_type value_bit_width) noexcept { + ARROW_DCHECK_GE(value_bit_width, 0); + ARROW_DCHECK_LE(value_bit_width, 64); + parser_.Reset(data, data_size, value_bit_width); + decoder_ = {}; +} + +template <typename T> +auto RleBitPackedDecoder<T>::RunRemaining() const -> values_count_type { + return std::visit([](auto const& dec) { return dec.Remaining(); }, decoder_); +} + +template <typename T> +bool RleBitPackedDecoder<T>::Exhausted() const { + return (RunRemaining() == 0) && parser_.Exhausted(); +} + template <typename T> -inline bool RleDecoder::Get(T* val) { +bool RleBitPackedDecoder<T>::ParseAndResetDecoder() { + auto dyn_run = parser_.Next(); + if (!dyn_run.has_value()) { + return false; + } + + if (auto* rle_run = std::get_if<BitPackedRun>(dyn_run.operator->())) { + decoder_ = {BitPackedDecoder<value_type>(*rle_run)}; + return true; + } + + auto* bit_packed_run = std::get_if<RleRun>(dyn_run.operator->()); + ARROW_DCHECK(bit_packed_run); // Only two possibilities in the variant + decoder_ = {RleDecoder<value_type>(*bit_packed_run)}; + return true; +} + +template <typename T> +auto RleBitPackedDecoder<T>::RunGetBatch(value_type* out, values_count_type batch_size) + -> values_count_type { + return std::visit([&](auto& dec) { return dec.GetBatch(out, batch_size); }, decoder_); +} + +template <typename T> +bool RleBitPackedDecoder<T>::Get(value_type* val) { return GetBatch(val, 1) == 1; } +namespace internal { + +/// A ``Parse`` handler that calls a single lambda. +/// +/// This lambda would typically take the input run as ``auto run`` (i.e. the lambda is +/// templated) and deduce other types from it. +template <typename Lambda> +struct LambdaHandler { + Lambda handlder_; + + auto OnBitPackedRun(BitPackedRun run) { return handlder_(std::move(run)); } + + auto OnRleRun(RleRun run) { return handlder_(std::move(run)); } +}; + +template <typename Lambda> +LambdaHandler(Lambda) -> LambdaHandler<Lambda>; + +template <typename value_type, typename Run> +struct decoder_for; + +template <typename value_type> +struct decoder_for<value_type, BitPackedRun> { + using type = BitPackedDecoder<value_type>; +}; + +template <typename value_type> +struct decoder_for<value_type, RleRun> { + using type = RleDecoder<value_type>; +}; + +template <typename value_type, typename Run> +using decoder_for_t = typename decoder_for<value_type, Run>::type; + +} // namespace internal + template <typename T> -inline int RleDecoder::GetBatch(T* values, int batch_size) { - ARROW_DCHECK_GE(bit_width_, 0); - int values_read = 0; - - auto* out = values; - - while (values_read < batch_size) { - int remaining = batch_size - values_read; - - if (repeat_count_ > 0) { // Repeated value case. - int repeat_batch = std::min(remaining, repeat_count_); - std::fill(out, out + repeat_batch, static_cast<T>(current_value_)); - - repeat_count_ -= repeat_batch; - values_read += repeat_batch; - out += repeat_batch; - } else if (literal_count_ > 0) { - int literal_batch = std::min(remaining, literal_count_); - int actual_read = bit_reader_.GetBatch(bit_width_, out, literal_batch); - if (actual_read != literal_batch) { - return values_read; - } +auto RleBitPackedDecoder<T>::GetBatch(value_type* out, values_count_type batch_size) + -> values_count_type { + using ControlFlow = RleBitPackedParser::ControlFlow; - literal_count_ -= literal_batch; - values_read += literal_batch; - out += literal_batch; - } else { - if (!NextCounts<T>()) return values_read; + values_count_type values_read = 0; + + // Remaining from a previous call that would have left some unread data from a run. + if (ARROW_PREDICT_FALSE(RunRemaining() > 0)) { + auto const read = RunGetBatch(out, batch_size); + values_read += read; + out += read; + + // Either we fulfilled all the batch to be read or we finished remaining run. + if (ARROW_PREDICT_FALSE(values_read == batch_size)) { + return values_read; } + ARROW_DCHECK(RunRemaining() == 0); } + auto handler = internal::LambdaHandler{ + [&](auto run) { + ARROW_DCHECK_LT(values_read, batch_size); + internal::decoder_for_t<value_type, decltype(run)> decoder(run); + auto const read = decoder.GetBatch(out, batch_size - values_read); + ARROW_DCHECK_LE(read, batch_size - values_read); + values_read += read; + out += read; + + // Stop reading and store remaining decoder + if (ARROW_PREDICT_FALSE(values_read == batch_size || read == 0)) { + decoder_ = std::move(decoder); + return ControlFlow::Break; + } + + return ControlFlow::Continue; + }, + }; + + parser_.Parse(handler); + return values_read; } -template <typename T, typename RunType, typename Converter> -inline int RleDecoder::GetSpaced(Converter converter, int batch_size, int null_count, - const uint8_t* valid_bits, int64_t valid_bits_offset, - T* out) { - if (ARROW_PREDICT_FALSE(null_count == batch_size)) { - converter.FillZero(out, out + batch_size); - return batch_size; +namespace internal { + +/// Utility class to safely handle values and null count without too error-prone +/// verbosity. +class BatchCounter { + public: + using size_type = int32_t; + + [[nodiscard]] static constexpr BatchCounter FromBatchSizeAndNulls( + size_type batch_size, size_type null_count) { + ARROW_DCHECK_LE(null_count, batch_size); + return {batch_size - null_count, null_count}; } - ARROW_DCHECK_GE(bit_width_, 0); - int values_read = 0; - int values_remaining = batch_size - null_count; + constexpr BatchCounter(size_type values_count, size_type null_count) noexcept + : values_count_(values_count), null_count_(null_count) {} - // Assume no bits to start. - arrow::internal::BitRunReader bit_reader(valid_bits, valid_bits_offset, - /*length=*/batch_size); - arrow::internal::BitRun valid_run = bit_reader.NextRun(); - while (values_read < batch_size) { - if (ARROW_PREDICT_FALSE(valid_run.length == 0)) { - valid_run = bit_reader.NextRun(); + [[nodiscard]] constexpr size_type ValuesCount() const noexcept { return values_count_; } + + [[nodiscard]] constexpr size_type ValuesRead() const noexcept { return values_read_; } + + [[nodiscard]] constexpr size_type ValuesRemaining() const noexcept { + ARROW_DCHECK_LE(values_read_, values_count_); + return values_count_ - values_read_; + } + + constexpr void AccrueReadValues(size_type to_read) noexcept { + ARROW_DCHECK_LE(to_read, ValuesRemaining()); + values_read_ += to_read; + } + + [[nodiscard]] constexpr size_type NullCount() const noexcept { return null_count_; } + + [[nodiscard]] constexpr size_type NullRead() const noexcept { return null_read_; } + + [[nodiscard]] constexpr size_type NullRemaining() const noexcept { + ARROW_DCHECK_LE(null_read_, null_count_); + return null_count_ - null_read_; + } + + constexpr void AccrueReadNulls(size_type to_read) noexcept { + ARROW_DCHECK_LE(to_read, NullRemaining()); + null_read_ += to_read; + } + + [[nodiscard]] constexpr size_type TotalRemaining() const noexcept { + return ValuesRemaining() + NullRemaining(); + } + + [[nodiscard]] constexpr size_type TotalRead() const noexcept { + return values_read_ + null_read_; + } + + [[nodiscard]] constexpr bool IsFullyNull() const noexcept { + return ValuesRemaining() == 0; + } + + [[nodiscard]] constexpr bool IsDone() const noexcept { return TotalRemaining() == 0; } + + private: + size_type values_count_ = 0; + size_type values_read_ = 0; + size_type null_count_ = 0; + size_type null_read_ = 0; +}; + +// The maximal unsigned size that a variable can fit. +template <typename T> +constexpr auto max_size_for_v = + static_cast<std::make_unsigned_t<T>>(std::numeric_limits<T>::max()); + +/// Overload for GetSpaced for a single run in a RleDecoder +template <typename Converter, typename BitRunReader, typename BitRun, + typename values_count_type, typename value_type> +auto RunGetSpaced(Converter& converter, typename Converter::out_type* out, + values_count_type batch_size, values_count_type null_count, + BitRunReader&& validity_reader, BitRun&& validity_run, + RleDecoder<value_type>& decoder) + -> std::pair<values_count_type, values_count_type> { + ARROW_DCHECK_GT(batch_size, 0); + // The equality case is handled in the main loop in GetSpaced + ARROW_DCHECK_LT(null_count, batch_size); + + auto batch = BatchCounter::FromBatchSizeAndNulls(batch_size, null_count); + + values_count_type const values_available = decoder.Remaining(); + ARROW_DCHECK_GT(values_available, 0); + auto values_remaining_run = [&]() { + auto out = values_available - batch.ValuesRead(); + ARROW_DCHECK_GE(out, 0); + return out; + }; + + // Consume as much as possible from the repeated run. + // We only need to count the number of nulls and non-nulls because we can fill in the + // same value for nulls and non-nulls. + // This proves to be a big efficiency win. + while (values_remaining_run() > 0 && !batch.IsDone()) { + ARROW_DCHECK_GE(validity_run.length, 0); + ARROW_DCHECK_LT(validity_run.length, max_size_for_v<values_count_type>); + ARROW_DCHECK_LE(validity_run.length, batch.TotalRemaining()); + auto const& validity_run_size = static_cast<values_count_type>(validity_run.length); + + if (validity_run.set) { + // We may end the current RLE run in the middle of the validity run + auto update_size = std::min(validity_run_size, values_remaining_run()); + batch.AccrueReadValues(update_size); + validity_run.length -= update_size; + } else { + // We can consume all nulls here because it does not matter if we consume on this + // RLE run, or an a next encoded run. The value filled does not matter. + auto update_size = std::min(validity_run_size, batch.NullRemaining()); + batch.AccrueReadNulls(update_size); + validity_run.length -= update_size; + } + + if (ARROW_PREDICT_TRUE(validity_run.length == 0)) { + validity_run = validity_reader.NextRun(); } + } + + value_type const value = decoder.Value(); + if (ARROW_PREDICT_FALSE(!converter.InputIsValid(value))) { + return {0, 0}; + } + converter.WriteRepeated(out, out + batch.TotalRead(), value); + auto const actual_values_read = decoder.Advance(batch.ValuesRead()); + // We always cropped the number of values_read by the remaining values in the run. + // What's more the RLE decoder should not encounter any errors. + ARROW_DCHECK_EQ(actual_values_read, batch.ValuesRead()); - ARROW_DCHECK_GT(batch_size, 0); - ARROW_DCHECK_GT(valid_run.length, 0); + return {batch.ValuesRead(), batch.NullRead()}; +} + +template <typename T, typename... Ts> +[[nodiscard]] constexpr T min(T x, Ts... ys) { + ((x = std::min(x, ys)), ...); + return x; +} + +static_assert(min(5) == 5); +static_assert(min(5, 4, -1) == -1); +static_assert(min(5, 41) == 5); + +template <typename Converter, typename BitRunReader, typename BitRun, + typename values_count_type, typename value_type> +auto RunGetSpaced(Converter& converter, typename Converter::out_type* out, + values_count_type batch_size, values_count_type null_count, + BitRunReader&& validity_reader, BitRun&& validity_run, + BitPackedDecoder<value_type>& decoder) + -> std::pair<values_count_type, values_count_type> { + ARROW_DCHECK_GT(batch_size, 0); + // The equality case is handled in the main loop in GetSpaced + ARROW_DCHECK_LT(null_count, batch_size); + + auto batch = BatchCounter::FromBatchSizeAndNulls(batch_size, null_count); + + values_count_type const values_available = decoder.Remaining(); + ARROW_DCHECK_GT(values_available, 0); + auto run_values_remaining = [&]() { + auto out = values_available - batch.ValuesRead(); + ARROW_DCHECK_GE(out, 0); + return out; + }; + + while (run_values_remaining() > 0 && batch.ValuesRemaining() > 0) { + // TODO should this size be tune depending on sizeof(value_size)? cpu cache size? + // Pull a batch of values from the bit packed encoded data and store it in a local + // buffer to benefit from unpacking intrinsics and data locality. + static constexpr values_count_type kBufferCapacity = 1024; + std::array<value_type, kBufferCapacity> buffer = {}; + + values_count_type buffer_start = 0; + values_count_type buffer_end = 0; + auto buffer_size = [&]() { + auto out = buffer_end - buffer_start; + ARROW_DCHECK_GE(out, 0); + return out; + }; + + // buffer_start is 0 at this point so size is end + buffer_end = min(run_values_remaining(), batch.ValuesRemaining(), kBufferCapacity); + buffer_end = decoder.GetBatch(buffer.data(), buffer_size()); + ARROW_DCHECK_LE(buffer_size(), kBufferCapacity); + + if (ARROW_PREDICT_FALSE(!converter.InputIsValid(buffer.data(), buffer_size()))) { + return {batch.ValuesRead(), batch.NullRead()}; Review Comment: If easily doable then yes. ########## cpp/src/arrow/util/rle_encoding_test.cc: ########## @@ -553,47 +853,183 @@ TEST(BitRle, Overflow) { } } +/// Check RleBitPacked encoding/decoding round trip. +/// +/// \param spaced If set to false, treat Nulls in the input array as regular data. +/// \param parts The number of parts in which the data will be decoded. +/// For number greater than one, this ensure that the decoder intermediary state Review Comment: ```suggestion /// For number greater than one, this ensure that the decoder intermediate state ``` ########## cpp/src/arrow/util/rle_encoding_test.cc: ########## @@ -207,12 +209,310 @@ TEST(BitUtil, RoundTripIntValues) { } } +/// A Rle run is a simple class owning some data and a repetition count. +/// It does not know how to read such data. +TEST(Rle, RleRun) { + const std::array<uint8_t, 4> value = {21, 2, 0, 0}; + + RleRun::values_count_type value_count = 12; + + // 12 times the value 21 fitting over 5 bits + const auto run_5 = RleRun(value.data(), value_count, /* value_bit_width= */ 5); + EXPECT_EQ(run_5.ValuesCount(), value_count); + EXPECT_EQ(run_5.ValuesBitWidth(), 5); + EXPECT_EQ(run_5.RawDataSize(), 1); // 5 bits fit in one byte + EXPECT_EQ(*run_5.RawDataPtr(), 21); + + // 12 times the value 21 fitting over 16 bits + const auto run_8 = RleRun(value.data(), value_count, /* value_bit_width= */ 8); + EXPECT_EQ(run_8.ValuesCount(), value_count); + EXPECT_EQ(run_8.ValuesBitWidth(), 8); + EXPECT_EQ(run_8.RawDataSize(), 1); // 8 bits fit in 1 byte + EXPECT_EQ(*run_8.RawDataPtr(), 21); + + // 12 times the value {21, 2} fitting over 10 bits Review Comment: You mean the value 533? ########## cpp/src/arrow/util/rle_encoding_test.cc: ########## @@ -553,47 +853,183 @@ TEST(BitRle, Overflow) { } } +/// Check RleBitPacked encoding/decoding round trip. +/// +/// \param spaced If set to false, treat Nulls in the input array as regular data. +/// \param parts The number of parts in which the data will be decoded. +/// For number greater than one, this ensure that the decoder intermediary state +/// is valid. template <typename Type> -void CheckRoundTripSpaced(const Array& data, int bit_width) { +void CheckRoundTrip(const Array& data, int bit_width, bool spaced, int32_t parts, + std::shared_ptr<FloatArray> dict = {}) { using ArrayType = typename TypeTraits<Type>::ArrayType; - using T = typename Type::c_type; + using value_type = typename Type::c_type; - int num_values = static_cast<int>(data.length()); - int buffer_size = RleEncoder::MaxBufferSize(bit_width, num_values); + const int data_size = static_cast<int>(data.length()); + const int data_values_count = + static_cast<int>(data.length() - spaced * data.null_count()); + const int buffer_size = RleBitPackedEncoder::MaxBufferSize(bit_width, data_size); + ASSERT_GE(parts, 1); + ASSERT_LE(parts, data_size); - const T* values = static_cast<const ArrayType&>(data).raw_values(); + const value_type* data_values = static_cast<const ArrayType&>(data).raw_values(); + // Encode the data into `buffer` using the encoder. std::vector<uint8_t> buffer(buffer_size); - RleEncoder encoder(buffer.data(), buffer_size, bit_width); - for (int i = 0; i < num_values; ++i) { - if (data.IsValid(i)) { - if (!encoder.Put(static_cast<uint64_t>(values[i]))) { - FAIL() << "Encoding failed"; - } + RleBitPackedEncoder encoder(buffer.data(), buffer_size, bit_width); + int32_t encoded_values_size = 0; + for (int i = 0; i < data_size; ++i) { + // Depending on `spaced` we treat nulls as regular values. + if (data.IsValid(i) || !spaced) { + bool success = encoder.Put(static_cast<uint64_t>(data_values[i])); + ASSERT_TRUE(success) << "Encoding failed in pos " << i; + ++encoded_values_size; } } - int encoded_size = encoder.Flush(); + int encoded_byte_size = encoder.Flush(); + ASSERT_EQ(encoded_values_size, data_values_count) + << "All values input were not encoded successfully by the encoder"; + + // Now we verify batch read + RleBitPackedDecoder<value_type> decoder(buffer.data(), encoded_byte_size, bit_width); + // We will only use one of them depending on whether this is a dictonnary tests + std::vector<float> dict_read; + std::vector<value_type> values_read; + if (dict) { + dict_read.resize(data_size); + } else { + values_read.resize(data_size); + } - // Verify batch read - RleDecoder decoder(buffer.data(), encoded_size, bit_width); - std::vector<T> values_read(num_values); + // We will read the data in `parts` calls to make sure intermediate states are valid + int32_t actual_read_count = 0; + int32_t requested_read_count = 0; + while (requested_read_count < data_size) { + const auto remaining = data_size - requested_read_count; + auto to_read = data_size / parts; + if (remaining / to_read == 1) { + to_read = remaining; + } - if (num_values != decoder.GetBatchSpaced( - num_values, static_cast<int>(data.null_count()), - data.null_bitmap_data(), data.offset(), values_read.data())) { - FAIL(); - } + auto read = 0; Review Comment: Let's put the type (aka `int` or `int32_t`) explicitly here? ########## cpp/src/arrow/util/rle_encoding_test.cc: ########## @@ -553,47 +853,183 @@ TEST(BitRle, Overflow) { } } +/// Check RleBitPacked encoding/decoding round trip. +/// +/// \param spaced If set to false, treat Nulls in the input array as regular data. +/// \param parts The number of parts in which the data will be decoded. +/// For number greater than one, this ensure that the decoder intermediary state +/// is valid. template <typename Type> -void CheckRoundTripSpaced(const Array& data, int bit_width) { +void CheckRoundTrip(const Array& data, int bit_width, bool spaced, int32_t parts, + std::shared_ptr<FloatArray> dict = {}) { using ArrayType = typename TypeTraits<Type>::ArrayType; - using T = typename Type::c_type; + using value_type = typename Type::c_type; - int num_values = static_cast<int>(data.length()); - int buffer_size = RleEncoder::MaxBufferSize(bit_width, num_values); + const int data_size = static_cast<int>(data.length()); + const int data_values_count = + static_cast<int>(data.length() - spaced * data.null_count()); + const int buffer_size = RleBitPackedEncoder::MaxBufferSize(bit_width, data_size); + ASSERT_GE(parts, 1); + ASSERT_LE(parts, data_size); - const T* values = static_cast<const ArrayType&>(data).raw_values(); + const value_type* data_values = static_cast<const ArrayType&>(data).raw_values(); + // Encode the data into `buffer` using the encoder. std::vector<uint8_t> buffer(buffer_size); - RleEncoder encoder(buffer.data(), buffer_size, bit_width); - for (int i = 0; i < num_values; ++i) { - if (data.IsValid(i)) { - if (!encoder.Put(static_cast<uint64_t>(values[i]))) { - FAIL() << "Encoding failed"; - } + RleBitPackedEncoder encoder(buffer.data(), buffer_size, bit_width); + int32_t encoded_values_size = 0; + for (int i = 0; i < data_size; ++i) { + // Depending on `spaced` we treat nulls as regular values. + if (data.IsValid(i) || !spaced) { + bool success = encoder.Put(static_cast<uint64_t>(data_values[i])); + ASSERT_TRUE(success) << "Encoding failed in pos " << i; + ++encoded_values_size; } } - int encoded_size = encoder.Flush(); + int encoded_byte_size = encoder.Flush(); + ASSERT_EQ(encoded_values_size, data_values_count) + << "All values input were not encoded successfully by the encoder"; + + // Now we verify batch read + RleBitPackedDecoder<value_type> decoder(buffer.data(), encoded_byte_size, bit_width); + // We will only use one of them depending on whether this is a dictonnary tests Review Comment: ```suggestion // We will only use one of them depending on whether this is a dictionary test ``` ########## cpp/src/arrow/util/rle_encoding_test.cc: ########## @@ -207,12 +209,310 @@ TEST(BitUtil, RoundTripIntValues) { } } +/// A Rle run is a simple class owning some data and a repetition count. +/// It does not know how to read such data. +TEST(Rle, RleRun) { + const std::array<uint8_t, 4> value = {21, 2, 0, 0}; + + RleRun::values_count_type value_count = 12; + + // 12 times the value 21 fitting over 5 bits + const auto run_5 = RleRun(value.data(), value_count, /* value_bit_width= */ 5); + EXPECT_EQ(run_5.ValuesCount(), value_count); + EXPECT_EQ(run_5.ValuesBitWidth(), 5); + EXPECT_EQ(run_5.RawDataSize(), 1); // 5 bits fit in one byte + EXPECT_EQ(*run_5.RawDataPtr(), 21); + + // 12 times the value 21 fitting over 16 bits + const auto run_8 = RleRun(value.data(), value_count, /* value_bit_width= */ 8); + EXPECT_EQ(run_8.ValuesCount(), value_count); + EXPECT_EQ(run_8.ValuesBitWidth(), 8); + EXPECT_EQ(run_8.RawDataSize(), 1); // 8 bits fit in 1 byte + EXPECT_EQ(*run_8.RawDataPtr(), 21); + + // 12 times the value {21, 2} fitting over 10 bits + const auto run_10 = RleRun(value.data(), value_count, /* value_bit_width= */ 10); + + EXPECT_EQ(run_10.ValuesCount(), value_count); + EXPECT_EQ(run_10.ValuesBitWidth(), 10); + EXPECT_EQ(run_10.RawDataSize(), 2); // 10 bits fit in 2 bytes + EXPECT_EQ(*(run_10.RawDataPtr() + 0), 21); + EXPECT_EQ(*(run_10.RawDataPtr() + 1), 2); + + // 12 times the value {21, 2} fitting over 32 bits + const auto run_32 = RleRun(value.data(), value_count, /* value_bit_width= */ 32); + EXPECT_EQ(run_32.ValuesCount(), value_count); + EXPECT_EQ(run_32.ValuesBitWidth(), 32); + EXPECT_EQ(run_32.RawDataSize(), 4); // 32 bits fit in 4 bytes + EXPECT_EQ(*(run_32.RawDataPtr() + 0), 21); + EXPECT_EQ(*(run_32.RawDataPtr() + 1), 2); + EXPECT_EQ(*(run_32.RawDataPtr() + 2), 0); + EXPECT_EQ(*(run_32.RawDataPtr() + 3), 0); +} + +/// A BitPacked run is a simple class owning some data and its size. +/// It does not know how to read such data. +TEST(BitPacked, BitPackedRun) { + const std::array<uint8_t, 4> value = {0b10101010, 0, 0, 0b1111111}; + + // 16 values of 1 bit for a total of 16 bits + BitPackedRun::values_count_type value_count_1 = 16; + const auto run_1 = BitPackedRun(value.data(), value_count_1, /* value_bit_width= */ 1); + EXPECT_EQ(run_1.ValuesCount(), value_count_1); + EXPECT_EQ(run_1.ValuesBitWidth(), 1); + EXPECT_EQ(run_1.RawDataSize(), 2); // 16 bits fit in 2 bytes + for (BitPackedRun::raw_data_size_type i = 0; i < run_1.RawDataSize(); ++i) { + EXPECT_EQ(*(run_1.RawDataPtr() + i), value[i]); + } + + // 8 values of 3 bits for a total of 24 bits + BitPackedRun::values_count_type value_count_3 = 8; + const auto run_3 = BitPackedRun(value.data(), value_count_3, /* value_bit_width= */ 3); + EXPECT_EQ(run_3.ValuesCount(), value_count_3); + EXPECT_EQ(run_3.ValuesBitWidth(), 3); + EXPECT_EQ(run_3.RawDataSize(), 3); // 24 bits fit in 3 bytes + for (BitPackedRun::raw_data_size_type i = 0; i < run_3.RawDataSize(); ++i) { + EXPECT_EQ(*(run_3.RawDataPtr() + i), value[i]); + } +} + +template <typename T> +void TestRleDecoder(std::vector<uint8_t> bytes, RleRun::values_count_type value_count, + RleRun::bit_size_type bit_width, T expected_value) { + // Pre-requisite for this test + EXPECT_GT(value_count, 6); + + const auto run = RleRun(bytes.data(), value_count, bit_width); + + auto decoder = RleRunDecoder<T>(run); + std::vector<T> vals = {0, 0}; + + EXPECT_EQ(decoder.Remaining(), value_count); + + typename decltype(decoder)::values_count_type read = 0; + EXPECT_EQ(decoder.Get(vals.data()), 1); + read += 1; + EXPECT_EQ(vals.at(0), expected_value); + EXPECT_EQ(decoder.Remaining(), value_count - read); + + EXPECT_EQ(decoder.Advance(3), 3); + read += 3; + EXPECT_EQ(decoder.Remaining(), value_count - read); + + vals = {0, 0}; + EXPECT_EQ(decoder.GetBatch(vals.data(), 2), vals.size()); + EXPECT_EQ(vals.at(0), expected_value); + EXPECT_EQ(vals.at(1), expected_value); + read += static_cast<decltype(read)>(vals.size()); + EXPECT_EQ(decoder.Remaining(), value_count - read); + + // Exhaust iteration + EXPECT_EQ(decoder.Advance(value_count - read), value_count - read); + EXPECT_EQ(decoder.Remaining(), 0); + EXPECT_EQ(decoder.Advance(1), 0); + vals = {0, 0}; + EXPECT_EQ(decoder.Get(vals.data()), 0); + EXPECT_EQ(vals.at(0), 0); + + // Reset the decoder + decoder.Reset(run); + EXPECT_EQ(decoder.Remaining(), value_count); + vals = {0, 0}; + EXPECT_EQ(decoder.GetBatch(vals.data(), 2), vals.size()); + EXPECT_EQ(vals.at(0), expected_value); + EXPECT_EQ(vals.at(1), expected_value); +} + +TEST(Rle, RleDecoder) { + TestRleDecoder<uint8_t>({21, 0, 0}, /* value_count= */ 21, /* bit_width= */ 5, + /* expected_value= */ 21); + TestRleDecoder<uint16_t>({1, 0}, /* value_count= */ 13, /* bit_width= */ 1, + /* expected_value= */ 1); + TestRleDecoder<uint32_t>({21, 0, 0}, /* value_count= */ 21, /* bit_width= */ 5, + /* expected_value= */ 21); Review Comment: It's a bit weird to test with `value_count == expected_value`. ########## cpp/src/arrow/util/rle_encoding_test.cc: ########## @@ -602,20 +1038,126 @@ struct GetBatchSpacedTestCase { int bit_width; }; -TEST(RleDecoder, GetBatchSpaced) { - uint32_t kSeed = 1337; - ::arrow::random::RandomArrayGenerator rand(kSeed); - - std::vector<GetBatchSpacedTestCase<int32_t>> int32_cases{ - {1, 100000, 0.01, 1}, {1, 100000, 0.1, 1}, {1, 100000, 0.5, 1}, - {4, 100000, 0.05, 3}, {100, 100000, 0.05, 7}, +template <typename T> +void DoTestGetBatchSpacedRoundtrip() { + using Data = DataTestRleBitPacked<T>; + using ArrowType = typename Data::ArrowType; + using RandomPart = typename Data::RandomPart; + using NullPart = typename Data::NullPart; + using RepeatPart = typename Data::RepeatPart; + + std::vector<Data> test_cases = { + { + {RandomPart{/* max=*/1, /* size=*/400, /* null_proba= */ 0.1}}, + /* bit_width= */ 1, + }, + { + { + RandomPart{/* max=*/7, /* size=*/10037, /* null_proba= */ 0.0}, + NullPart{/* size= */ 1153}, + RandomPart{/* max=*/7, /* size=*/800, /* null_proba= */ 0.5}, + }, + /* bit_width= */ 3, + }, + { + { + NullPart{/* size= */ 80}, + RandomPart{/* max=*/static_cast<T>(1023), /* size=*/800, + /* null_proba= */ 0.01}, + NullPart{/* size= */ 1023}, + }, + /* bit_width= */ 11, + }, + { + {RepeatPart{/* value=*/13, /* size=*/100000, /* null_proba= */ 0.01}}, Review Comment: We probably want to reduce the sizes a bit to keep the test durations in check (especially in ASAN and Valgrind builds). I don't think it's useful to test more than 1k repeated values, for example. ########## cpp/src/arrow/util/bit_stream_utils_internal.h: ########## @@ -439,91 +424,84 @@ inline bool BitReader::Advance(int64_t num_bits) { return true; } -inline bool BitWriter::PutVlqInt(uint32_t v) { - bool result = true; - while ((v & 0xFFFFFF80UL) != 0UL) { - result &= PutAligned<uint8_t>(static_cast<uint8_t>((v & 0x7F) | 0x80), 1); - v >>= 7; - } - result &= PutAligned<uint8_t>(static_cast<uint8_t>(v & 0x7F), 1); - return result; -} +template <typename Int> +inline bool BitWriter::PutVlqInt(Int v) { + static_assert(std::is_integral_v<Int>); -inline bool BitReader::GetVlqInt(uint32_t* v) { - uint32_t tmp = 0; + constexpr auto kBufferSize = kMaxLEB128ByteLenFor<Int>; - for (int i = 0; i < kMaxVlqByteLength; i++) { - uint8_t byte = 0; - if (ARROW_PREDICT_FALSE(!GetAligned<uint8_t>(1, &byte))) { - return false; - } - tmp |= static_cast<uint32_t>(byte & 0x7F) << (7 * i); + uint8_t buffer[kBufferSize] = {}; + const auto bytes_written = WriteLEB128(v, buffer, kBufferSize); + ARROW_DCHECK_LE(bytes_written, kBufferSize); + ARROW_DCHECK_GT(bytes_written, 0); // Cannot fail since we gave max space - if ((byte & 0x80) == 0) { - *v = tmp; - return true; + for (int i = 0; i < bytes_written; ++i) { + const bool success = PutAligned(buffer[i], 1); + if (ARROW_PREDICT_FALSE(!success)) { + return false; } } - return false; -} - -inline bool BitWriter::PutZigZagVlqInt(int32_t v) { - uint32_t u_v = ::arrow::util::SafeCopy<uint32_t>(v); - u_v = (u_v << 1) ^ static_cast<uint32_t>(v >> 31); - return PutVlqInt(u_v); -} - -inline bool BitReader::GetZigZagVlqInt(int32_t* v) { - uint32_t u; - if (!GetVlqInt(&u)) return false; - u = (u >> 1) ^ (~(u & 1) + 1); - *v = ::arrow::util::SafeCopy<int32_t>(u); return true; } -inline bool BitWriter::PutVlqInt(uint64_t v) { - bool result = true; - while ((v & 0xFFFFFFFFFFFFFF80ULL) != 0ULL) { - result &= PutAligned<uint8_t>(static_cast<uint8_t>((v & 0x7F) | 0x80), 1); - v >>= 7; +template <typename Int> +inline bool BitReader::GetVlqInt(Int* v) { + static_assert(std::is_integral_v<Int>); + + // The data that we will pass to the LEB128 parser + // In all case, we read an byte-aligned value, skipping remaining bits Review Comment: ```suggestion // In all case, we read a byte-aligned value, skipping remaining bits ``` ########## cpp/src/arrow/util/rle_encoding_test.cc: ########## @@ -553,47 +853,183 @@ TEST(BitRle, Overflow) { } } +/// Check RleBitPacked encoding/decoding round trip. +/// +/// \param spaced If set to false, treat Nulls in the input array as regular data. +/// \param parts The number of parts in which the data will be decoded. +/// For number greater than one, this ensure that the decoder intermediary state +/// is valid. template <typename Type> -void CheckRoundTripSpaced(const Array& data, int bit_width) { +void CheckRoundTrip(const Array& data, int bit_width, bool spaced, int32_t parts, + std::shared_ptr<FloatArray> dict = {}) { using ArrayType = typename TypeTraits<Type>::ArrayType; - using T = typename Type::c_type; + using value_type = typename Type::c_type; - int num_values = static_cast<int>(data.length()); - int buffer_size = RleEncoder::MaxBufferSize(bit_width, num_values); + const int data_size = static_cast<int>(data.length()); + const int data_values_count = + static_cast<int>(data.length() - spaced * data.null_count()); + const int buffer_size = RleBitPackedEncoder::MaxBufferSize(bit_width, data_size); + ASSERT_GE(parts, 1); + ASSERT_LE(parts, data_size); - const T* values = static_cast<const ArrayType&>(data).raw_values(); + const value_type* data_values = static_cast<const ArrayType&>(data).raw_values(); + // Encode the data into `buffer` using the encoder. std::vector<uint8_t> buffer(buffer_size); - RleEncoder encoder(buffer.data(), buffer_size, bit_width); - for (int i = 0; i < num_values; ++i) { - if (data.IsValid(i)) { - if (!encoder.Put(static_cast<uint64_t>(values[i]))) { - FAIL() << "Encoding failed"; - } + RleBitPackedEncoder encoder(buffer.data(), buffer_size, bit_width); + int32_t encoded_values_size = 0; + for (int i = 0; i < data_size; ++i) { + // Depending on `spaced` we treat nulls as regular values. + if (data.IsValid(i) || !spaced) { + bool success = encoder.Put(static_cast<uint64_t>(data_values[i])); + ASSERT_TRUE(success) << "Encoding failed in pos " << i; + ++encoded_values_size; } } - int encoded_size = encoder.Flush(); + int encoded_byte_size = encoder.Flush(); + ASSERT_EQ(encoded_values_size, data_values_count) + << "All values input were not encoded successfully by the encoder"; + + // Now we verify batch read + RleBitPackedDecoder<value_type> decoder(buffer.data(), encoded_byte_size, bit_width); + // We will only use one of them depending on whether this is a dictonnary tests + std::vector<float> dict_read; + std::vector<value_type> values_read; + if (dict) { + dict_read.resize(data_size); + } else { + values_read.resize(data_size); + } - // Verify batch read - RleDecoder decoder(buffer.data(), encoded_size, bit_width); - std::vector<T> values_read(num_values); + // We will read the data in `parts` calls to make sure intermediate states are valid + int32_t actual_read_count = 0; + int32_t requested_read_count = 0; + while (requested_read_count < data_size) { + const auto remaining = data_size - requested_read_count; + auto to_read = data_size / parts; + if (remaining / to_read == 1) { + to_read = remaining; + } - if (num_values != decoder.GetBatchSpaced( - num_values, static_cast<int>(data.null_count()), - data.null_bitmap_data(), data.offset(), values_read.data())) { - FAIL(); - } + auto read = 0; + if (spaced) { + // We need to slice the input array get the proper null count and bitmap + auto data_remaining = data.Slice(requested_read_count, to_read); + + if (dict) { + auto* out = dict_read.data() + requested_read_count; + read = decoder.GetBatchWithDictSpaced( + dict->raw_values(), static_cast<int32_t>(dict->length()), out, to_read, + static_cast<int32_t>(data_remaining->null_count()), + data_remaining->null_bitmap_data(), data_remaining->offset()); + } else { + auto* out = values_read.data() + requested_read_count; + read = decoder.GetBatchSpaced( + to_read, static_cast<int32_t>(data_remaining->null_count()), + data_remaining->null_bitmap_data(), data_remaining->offset(), out); + } + } else { + if (dict) { + auto* out = dict_read.data() + requested_read_count; + read = decoder.GetBatchWithDict( + dict->raw_values(), static_cast<int32_t>(dict->length()), out, to_read); + } else { + auto* out = values_read.data() + requested_read_count; + read = decoder.GetBatch(out, to_read); + } + } + ASSERT_EQ(read, to_read) << "Decoder did not read as many values as requested"; - for (int64_t i = 0; i < num_values; ++i) { - if (data.IsValid(i)) { - if (values_read[i] != values[i]) { - FAIL() << "Index " << i << " read " << values_read[i] << " but should be " - << values[i]; + actual_read_count += read; + requested_read_count += to_read; Review Comment: Since we asserted just above that `read` and `to_read` were equal, is there a reason to keep track of both `actual_read_count` and `requested_read_count`? It's impossible for them to get out of sync. ########## cpp/src/arrow/util/rle_encoding_internal.h: ########## @@ -84,32 +85,278 @@ namespace util { /// (total 26 bytes, 1 byte overhead) // +class RleRun { + public: + using byte = uint8_t; + /// Enough space to store a 64bit value + using raw_data_storage = std::array<byte, 8>; + using raw_data_const_pointer = const byte*; + using raw_data_size_type = int32_t; + /// The type of the size of either run, between 1 and 2^31-1 as per Parquet spec + using values_count_type = int32_t; + /// The type to represent a size in bits + using bit_size_type = int32_t; + + constexpr RleRun() noexcept = default; Review Comment: Well, you can have a simple `struct` with attributes *plus* some helper methods. ########## cpp/src/arrow/util/rle_encoding_internal.h: ########## @@ -84,32 +85,278 @@ namespace util { /// (total 26 bytes, 1 byte overhead) // +class RleRun { + public: + using byte = uint8_t; + /// Enough space to store a 64bit value + using raw_data_storage = std::array<byte, 8>; + using raw_data_const_pointer = const byte*; + using raw_data_size_type = int32_t; + /// The type of the size of either run, between 1 and 2^31-1 as per Parquet spec + using values_count_type = int32_t; + /// The type to represent a size in bits + using bit_size_type = int32_t; + + constexpr RleRun() noexcept = default; + constexpr RleRun(RleRun const&) noexcept = default; + constexpr RleRun(RleRun&&) noexcept = default; + + explicit RleRun(raw_data_const_pointer data, values_count_type values_count, + bit_size_type value_bit_width) noexcept; + + constexpr RleRun& operator=(RleRun const&) noexcept = default; + constexpr RleRun& operator=(RleRun&&) noexcept = default; + + /// The number of repeated values in this run. + [[nodiscard]] constexpr values_count_type ValuesCount() const noexcept; + + /// The size in bits of each encoded value. + [[nodiscard]] constexpr bit_size_type ValuesBitWidth() const noexcept; + + /// A pointer to the repeated value raw bytes. + [[nodiscard]] constexpr raw_data_const_pointer RawDataPtr() const noexcept; + + /// The number of bytes used for the raw repeated value. + [[nodiscard]] constexpr raw_data_size_type RawDataSize() const noexcept; + + private: + /// The repeated value raw bytes stored inside the class + raw_data_storage data_ = {}; + /// The number of time the value is repeated + values_count_type values_count_ = 0; + /// The size in bit of a packed value in the run + bit_size_type value_bit_width_ = 0; +}; + +class BitPackedRun { + public: + using byte = uint8_t; + using raw_data_const_pointer = const byte*; + /// According to the Parquet thrift definition the page size can be written into an + /// int32_t. + using raw_data_size_type = int32_t; + /// The type of the size of either run, between 1 and 2^31-1 as per Parquet spec + using values_count_type = int32_t; + /// The type to represent a size in bits + using bit_size_type = int32_t; + + constexpr BitPackedRun() noexcept = default; + constexpr BitPackedRun(BitPackedRun const&) noexcept = default; + constexpr BitPackedRun(BitPackedRun&&) noexcept = default; + + constexpr BitPackedRun(raw_data_const_pointer data, values_count_type values_count, + bit_size_type value_bit_width) noexcept; + + constexpr BitPackedRun& operator=(BitPackedRun const&) noexcept = default; + constexpr BitPackedRun& operator=(BitPackedRun&&) noexcept = default; + + [[nodiscard]] constexpr values_count_type ValuesCount() const noexcept; + + /// The size in bits of each encoded value. + [[nodiscard]] constexpr bit_size_type ValuesBitWidth() const noexcept; + + [[nodiscard]] constexpr raw_data_const_pointer RawDataPtr() const noexcept; + + [[nodiscard]] constexpr raw_data_size_type RawDataSize() const noexcept; + + private: + /// The pointer to the beginning of the run + raw_data_const_pointer data_ = nullptr; + /// Number of values in this run. + raw_data_size_type values_count_ = 0; + /// The size in bit of a packed value in the run + bit_size_type value_bit_width_ = 0; +}; + +/// A parser that emits either a ``BitPackedRun`` or a ``RleRun``. +class RleBitPackedParser { + public: + using byte = uint8_t; + using raw_data_const_pointer = const byte*; + /// By Parquet thrift definition the page size can be written into an int32_t. + using raw_data_size_type = int32_t; + /// The type to represent a size in bits + using bit_size_type = int32_t; + /// The different types of runs emitted by the parser + using dynamic_run_type = std::variant<RleRun, BitPackedRun>; + + constexpr RleBitPackedParser() noexcept = default; + + constexpr RleBitPackedParser(raw_data_const_pointer data, raw_data_size_type data_size, + bit_size_type value_bit_width) noexcept; + + constexpr void Reset(raw_data_const_pointer data, raw_data_size_type data_size, + bit_size_type value_bit_width_) noexcept; + + /// Get the current run with a small parsing cost without advancing the iteration. + [[nodiscard]] std::optional<dynamic_run_type> Peek() const; + + /// Move to the next run. + [[nodiscard]] bool Advance(); + + /// Advance and return the current run. + [[nodiscard]] std::optional<dynamic_run_type> Next(); + + /// Whether there is still runs to iterate over. + /// + /// WARN: Due to lack of proper error handling, iteration with Next and Peek could + /// return not data while the parser is not exhausted. + /// This is how one can check for errors. + [[nodiscard]] bool Exhausted() const; + + /// Enum to return from an ``Parse`` handler. + /// + /// Since a callback has no way to know when to stop, the handler must return + /// a value indicating to the ``Parse`` function whether to stop or continue. + enum class ControlFlow { + Continue, + Break, + }; + + /// A callback approach to parsing. + /// + /// This approach is used to reduce the number of dynamic lookups involved with using a + /// variant. + /// + /// The handler must be of the form + /// ```cpp` + /// struct Handler { + /// ControlFlow OnBitPackedRun(BitPackedRun run); + /// + /// ControlFlow OnRleRun(RleRun run); + /// }; + /// ``` + template <typename Handler> + void Parse(Handler&& handler); + + private: + /// The pointer to the beginning of the run + raw_data_const_pointer data_ = nullptr; + /// Size in bytes of the run. + raw_data_size_type data_size_ = 0; + /// The size in bit of a packed value in the run + bit_size_type value_bit_width_ = 0; + + /// Run the handler on the run read and return the number of values read. + /// Does not advance the parser. + template <typename Handler> + std::pair<raw_data_size_type, ControlFlow> PeekImpl(Handler&&) const; +}; + /// Decoder class for RLE encoded data. +template <typename T> class RleDecoder { public: - /// Create a decoder object. buffer/buffer_len is the decoded data. - /// bit_width is the width of each value (before encoding). - RleDecoder(const uint8_t* buffer, int buffer_len, int bit_width) - : bit_reader_(buffer, buffer_len), - bit_width_(bit_width), - current_value_(0), - repeat_count_(0), - literal_count_(0) { - ARROW_DCHECK_GE(bit_width_, 0); - ARROW_DCHECK_LE(bit_width_, 64); - } + /// The type in which the data should be decoded. + using value_type = T; + /// The type of run that can be decoded. + using run_type = RleRun; + using values_count_type = run_type::values_count_type; Review Comment: > The thing I think that remain very useful in reading in the future is answering questions like "_why does this only has 32 bits?_", "_Why is it signed?_"... Agreed. Adding explanatory comments can be part of the answer here. > We could keep a few namespace level alias like `using RleBitPackedSize = int32_t;` along with their documentation. What do you think? Why not. ########## cpp/src/arrow/util/bit_stream_utils_internal.h: ########## @@ -439,91 +424,84 @@ inline bool BitReader::Advance(int64_t num_bits) { return true; } -inline bool BitWriter::PutVlqInt(uint32_t v) { - bool result = true; - while ((v & 0xFFFFFF80UL) != 0UL) { - result &= PutAligned<uint8_t>(static_cast<uint8_t>((v & 0x7F) | 0x80), 1); - v >>= 7; - } - result &= PutAligned<uint8_t>(static_cast<uint8_t>(v & 0x7F), 1); - return result; -} +template <typename Int> +inline bool BitWriter::PutVlqInt(Int v) { + static_assert(std::is_integral_v<Int>); -inline bool BitReader::GetVlqInt(uint32_t* v) { - uint32_t tmp = 0; + constexpr auto kBufferSize = kMaxLEB128ByteLenFor<Int>; - for (int i = 0; i < kMaxVlqByteLength; i++) { - uint8_t byte = 0; - if (ARROW_PREDICT_FALSE(!GetAligned<uint8_t>(1, &byte))) { - return false; - } - tmp |= static_cast<uint32_t>(byte & 0x7F) << (7 * i); + uint8_t buffer[kBufferSize] = {}; + const auto bytes_written = WriteLEB128(v, buffer, kBufferSize); + ARROW_DCHECK_LE(bytes_written, kBufferSize); + ARROW_DCHECK_GT(bytes_written, 0); // Cannot fail since we gave max space - if ((byte & 0x80) == 0) { - *v = tmp; - return true; + for (int i = 0; i < bytes_written; ++i) { + const bool success = PutAligned(buffer[i], 1); + if (ARROW_PREDICT_FALSE(!success)) { + return false; } } - return false; -} - -inline bool BitWriter::PutZigZagVlqInt(int32_t v) { - uint32_t u_v = ::arrow::util::SafeCopy<uint32_t>(v); - u_v = (u_v << 1) ^ static_cast<uint32_t>(v >> 31); - return PutVlqInt(u_v); -} - -inline bool BitReader::GetZigZagVlqInt(int32_t* v) { - uint32_t u; - if (!GetVlqInt(&u)) return false; - u = (u >> 1) ^ (~(u & 1) + 1); - *v = ::arrow::util::SafeCopy<int32_t>(u); return true; } -inline bool BitWriter::PutVlqInt(uint64_t v) { - bool result = true; - while ((v & 0xFFFFFFFFFFFFFF80ULL) != 0ULL) { - result &= PutAligned<uint8_t>(static_cast<uint8_t>((v & 0x7F) | 0x80), 1); - v >>= 7; +template <typename Int> +inline bool BitReader::GetVlqInt(Int* v) { + static_assert(std::is_integral_v<Int>); Review Comment: Can we also assert that it's unsigned? ########## cpp/src/arrow/util/bit_util_test.cc: ########## @@ -1997,11 +1997,191 @@ TEST(BitUtil, RoundUpToPowerOf2) { #undef U64 #undef S64 +/// Test the maximum number of bytes needed to write a LEB128 of a give size. +TEST(LEB128, MaxLEB128ByteLenFor) { + EXPECT_EQ(bit_util::kMaxLEB128ByteLenFor<int16_t>, 3); + EXPECT_EQ(bit_util::kMaxLEB128ByteLenFor<int32_t>, 5); + EXPECT_EQ(bit_util::kMaxLEB128ByteLenFor<int64_t>, 10); +} + +/// Utility function to test LEB128 encoding with known input value and expected byte +/// array +template <typename Int> +void TestLEB128Encode(Int input_value, const std::vector<uint8_t>& expected_data, + std::size_t buffer_size) { + std::vector<uint8_t> buffer(buffer_size); + auto bytes_written = bit_util::WriteLEB128(input_value, buffer.data(), + static_cast<int32_t>(buffer.size())); + + EXPECT_EQ(bytes_written, expected_data.size()); + // Encoded data + for (std::size_t i = 0; i < expected_data.size(); ++i) { + EXPECT_EQ(buffer.at(i), expected_data.at(i)); + } + + // When the value is successfully encoded, the remaining of the buffer is untouched + if (bytes_written > 0) { + for (std::size_t i = bytes_written; i < buffer.size(); ++i) { + EXPECT_EQ(buffer.at(i), 0); + } + } +} + +/// Test encoding to known LEB128 byte sequences with edge cases parameters. +/// \see LEB128.KnownSuccessfulValues for other known values tested. +TEST(LEB128, WriteEdgeCases) { + // Single byte value 0 + TestLEB128Encode(0U, {0x00}, 1); + // Single byte value 127 + TestLEB128Encode(127U, {0x7F}, 1); + // Three byte value 16384, encoded in larger buffer + TestLEB128Encode(16384U, {0x80, 0x80, 0x01}, 10); + // Two byte boundary values + TestLEB128Encode(129U, {0x81, 0x01}, 2); + TestLEB128Encode(16383U, {0xFF, 0x7F}, 2); + // Error case: Buffer too small for value 128 (needs 2 bytes but only 1 provided) + TestLEB128Encode(128U, {}, 1); + // Error case: Buffer too small for uint32_t max (needs 5 bytes but only 4 provided) + TestLEB128Encode(4294967295U, {}, 4); Review Comment: Can we test a couple large 64-bit ints? ########## cpp/src/arrow/util/rle_encoding_internal.h: ########## @@ -84,32 +85,278 @@ namespace util { /// (total 26 bytes, 1 byte overhead) // +class RleRun { + public: + using byte = uint8_t; + /// Enough space to store a 64bit value + using raw_data_storage = std::array<byte, 8>; + using raw_data_const_pointer = const byte*; + using raw_data_size_type = int32_t; + /// The type of the size of either run, between 1 and 2^31-1 as per Parquet spec + using values_count_type = int32_t; + /// The type to represent a size in bits + using bit_size_type = int32_t; + + constexpr RleRun() noexcept = default; + constexpr RleRun(RleRun const&) noexcept = default; + constexpr RleRun(RleRun&&) noexcept = default; + + explicit RleRun(raw_data_const_pointer data, values_count_type values_count, + bit_size_type value_bit_width) noexcept; + + constexpr RleRun& operator=(RleRun const&) noexcept = default; + constexpr RleRun& operator=(RleRun&&) noexcept = default; + + /// The number of repeated values in this run. + [[nodiscard]] constexpr values_count_type ValuesCount() const noexcept; + + /// The size in bits of each encoded value. + [[nodiscard]] constexpr bit_size_type ValuesBitWidth() const noexcept; + + /// A pointer to the repeated value raw bytes. + [[nodiscard]] constexpr raw_data_const_pointer RawDataPtr() const noexcept; + + /// The number of bytes used for the raw repeated value. + [[nodiscard]] constexpr raw_data_size_type RawDataSize() const noexcept; + + private: + /// The repeated value raw bytes stored inside the class + raw_data_storage data_ = {}; + /// The number of time the value is repeated + values_count_type values_count_ = 0; + /// The size in bit of a packed value in the run + bit_size_type value_bit_width_ = 0; +}; + +class BitPackedRun { + public: + using byte = uint8_t; + using raw_data_const_pointer = const byte*; + /// According to the Parquet thrift definition the page size can be written into an + /// int32_t. + using raw_data_size_type = int32_t; + /// The type of the size of either run, between 1 and 2^31-1 as per Parquet spec + using values_count_type = int32_t; + /// The type to represent a size in bits + using bit_size_type = int32_t; + + constexpr BitPackedRun() noexcept = default; + constexpr BitPackedRun(BitPackedRun const&) noexcept = default; + constexpr BitPackedRun(BitPackedRun&&) noexcept = default; + + constexpr BitPackedRun(raw_data_const_pointer data, values_count_type values_count, + bit_size_type value_bit_width) noexcept; + + constexpr BitPackedRun& operator=(BitPackedRun const&) noexcept = default; + constexpr BitPackedRun& operator=(BitPackedRun&&) noexcept = default; + + [[nodiscard]] constexpr values_count_type ValuesCount() const noexcept; + + /// The size in bits of each encoded value. + [[nodiscard]] constexpr bit_size_type ValuesBitWidth() const noexcept; + + [[nodiscard]] constexpr raw_data_const_pointer RawDataPtr() const noexcept; + + [[nodiscard]] constexpr raw_data_size_type RawDataSize() const noexcept; + + private: + /// The pointer to the beginning of the run + raw_data_const_pointer data_ = nullptr; + /// Number of values in this run. + raw_data_size_type values_count_ = 0; + /// The size in bit of a packed value in the run + bit_size_type value_bit_width_ = 0; +}; + +/// A parser that emits either a ``BitPackedRun`` or a ``RleRun``. +class RleBitPackedParser { + public: + using byte = uint8_t; + using raw_data_const_pointer = const byte*; + /// By Parquet thrift definition the page size can be written into an int32_t. + using raw_data_size_type = int32_t; + /// The type to represent a size in bits + using bit_size_type = int32_t; + /// The different types of runs emitted by the parser + using dynamic_run_type = std::variant<RleRun, BitPackedRun>; + + constexpr RleBitPackedParser() noexcept = default; + + constexpr RleBitPackedParser(raw_data_const_pointer data, raw_data_size_type data_size, + bit_size_type value_bit_width) noexcept; + + constexpr void Reset(raw_data_const_pointer data, raw_data_size_type data_size, + bit_size_type value_bit_width_) noexcept; + + /// Get the current run with a small parsing cost without advancing the iteration. + [[nodiscard]] std::optional<dynamic_run_type> Peek() const; + + /// Move to the next run. + [[nodiscard]] bool Advance(); + + /// Advance and return the current run. + [[nodiscard]] std::optional<dynamic_run_type> Next(); + + /// Whether there is still runs to iterate over. + /// + /// WARN: Due to lack of proper error handling, iteration with Next and Peek could + /// return not data while the parser is not exhausted. + /// This is how one can check for errors. + [[nodiscard]] bool Exhausted() const; + + /// Enum to return from an ``Parse`` handler. + /// + /// Since a callback has no way to know when to stop, the handler must return + /// a value indicating to the ``Parse`` function whether to stop or continue. + enum class ControlFlow { + Continue, + Break, + }; + + /// A callback approach to parsing. + /// + /// This approach is used to reduce the number of dynamic lookups involved with using a + /// variant. + /// + /// The handler must be of the form + /// ```cpp` + /// struct Handler { + /// ControlFlow OnBitPackedRun(BitPackedRun run); + /// + /// ControlFlow OnRleRun(RleRun run); + /// }; + /// ``` + template <typename Handler> + void Parse(Handler&& handler); + + private: + /// The pointer to the beginning of the run + raw_data_const_pointer data_ = nullptr; + /// Size in bytes of the run. + raw_data_size_type data_size_ = 0; + /// The size in bit of a packed value in the run + bit_size_type value_bit_width_ = 0; + + /// Run the handler on the run read and return the number of values read. + /// Does not advance the parser. + template <typename Handler> + std::pair<raw_data_size_type, ControlFlow> PeekImpl(Handler&&) const; +}; + /// Decoder class for RLE encoded data. +template <typename T> class RleDecoder { public: - /// Create a decoder object. buffer/buffer_len is the decoded data. - /// bit_width is the width of each value (before encoding). - RleDecoder(const uint8_t* buffer, int buffer_len, int bit_width) - : bit_reader_(buffer, buffer_len), - bit_width_(bit_width), - current_value_(0), - repeat_count_(0), - literal_count_(0) { - ARROW_DCHECK_GE(bit_width_, 0); - ARROW_DCHECK_LE(bit_width_, 64); - } + /// The type in which the data should be decoded. + using value_type = T; + /// The type of run that can be decoded. + using run_type = RleRun; + using values_count_type = run_type::values_count_type; - RleDecoder() : bit_width_(-1) {} + constexpr RleDecoder() noexcept = default; - void Reset(const uint8_t* buffer, int buffer_len, int bit_width) { - ARROW_DCHECK_GE(bit_width, 0); - ARROW_DCHECK_LE(bit_width, 64); - bit_reader_.Reset(buffer, buffer_len); - bit_width_ = bit_width; - current_value_ = 0; - repeat_count_ = 0; - literal_count_ = 0; - } + explicit RleDecoder(run_type const& run) noexcept; + + void Reset(run_type const& run) noexcept; + + /// Return the number of values that can be advanced. + [[nodiscard]] values_count_type Remaining() const; + + /// Return the repeated value of this decoder. + [[nodiscard]] constexpr value_type Value() const; + + /// Try to advance by as many values as provided. + /// Return the number of values skipped. + [[nodiscard]] values_count_type Advance(values_count_type batch_size); + + /// Get the next value and return false if there are no more. + [[nodiscard]] constexpr bool Get(value_type* out_value); + + /// Get a batch of values return the number of decoded elements. + [[nodiscard]] values_count_type GetBatch(value_type* out, values_count_type batch_size); + + private: + value_type value_ = {}; + values_count_type remaining_count_ = 0; + + static_assert(std::is_integral_v<value_type>, + "This class makes assumptions about integer endianness and padding"); +}; + +/// Decoder class for Bit packing encoded data. +template <typename T> +class BitPackedDecoder { + public: + /// The type in which the data should be decoded. + using value_type = T; + /// The type of run that can be decoded. + using run_type = BitPackedRun; + using values_count_type = run_type::values_count_type; + using bit_size_type = run_type::bit_size_type; + + BitPackedDecoder() noexcept = default; + + explicit BitPackedDecoder(run_type const& run) noexcept; + + void Reset(run_type const& run) noexcept; + + /// Return the number of values that can be advanced. + [[nodiscard]] constexpr values_count_type Remaining() const; + + /// Return the size in bit in which each encoded value is written. + [[nodiscard]] constexpr bit_size_type ValueBitWidth() const; + + /// Try to advance by as many values as provided. + /// Return the number of values skipped. + [[nodiscard]] values_count_type Advance(values_count_type batch_size); + + /// Get the next value and return false if there are no more. + [[nodiscard]] bool Get(value_type* out_value); + + /// Get a batch of values return the number of decoded elements. + [[nodiscard]] values_count_type GetBatch(value_type* out, values_count_type batch_size); + + private: + ::arrow::bit_util::BitReader bit_reader_ = {}; + bit_size_type value_bit_width_ = 0; + values_count_type remaining_count_ = 0; + + static_assert(std::is_integral_v<value_type>, + "This class makes assumptions about integer endianness and padding"); +}; + +/// Decoder class for RLE encoded data. +template <typename T> +class RleBitPackedDecoder { + public: + /// The type in which the data should be decoded. + using value_type = T; + using byte = RleBitPackedParser::byte; + using raw_data_const_pointer = RleBitPackedParser::raw_data_const_pointer; + using raw_data_size_type = RleBitPackedParser::raw_data_size_type; + using bit_size_type = RleBitPackedParser::bit_size_type; + using dynamic_run_type = RleBitPackedParser::dynamic_run_type; + /// The type of the size of either run, between 1 and 2^31-1 as per Parquet spec + using values_count_type = int32_t; + + RleBitPackedDecoder() noexcept = default; + + /// Create a decoder object. + /// + /// data and data_size are the raw bytes to decode. + /// value_bit_width is the size in bits of each encoded value. + RleBitPackedDecoder(raw_data_const_pointer data, raw_data_size_type data_size, + bit_size_type value_bit_width) noexcept; + + void Reset(raw_data_const_pointer data, raw_data_size_type data_size, + bit_size_type value_bit_width_) noexcept; + + /// Whether there is still runs to iterate over. + /// + /// WARN: Due to lack of proper error handling, iteration with Get methods could return + /// no data while the parser is not exhausted. + /// This is how one can check for errors. + [[nodiscard]] bool Exhausted() const; Review Comment: Just a ping on this :) ########## cpp/src/arrow/util/rle_encoding_internal.h: ########## @@ -84,32 +85,278 @@ namespace util { /// (total 26 bytes, 1 byte overhead) // +class RleRun { + public: + using byte = uint8_t; + /// Enough space to store a 64bit value + using raw_data_storage = std::array<byte, 8>; + using raw_data_const_pointer = const byte*; + using raw_data_size_type = int32_t; + /// The type of the size of either run, between 1 and 2^31-1 as per Parquet spec + using values_count_type = int32_t; + /// The type to represent a size in bits + using bit_size_type = int32_t; + + constexpr RleRun() noexcept = default; + constexpr RleRun(RleRun const&) noexcept = default; + constexpr RleRun(RleRun&&) noexcept = default; + + explicit RleRun(raw_data_const_pointer data, values_count_type values_count, + bit_size_type value_bit_width) noexcept; + + constexpr RleRun& operator=(RleRun const&) noexcept = default; + constexpr RleRun& operator=(RleRun&&) noexcept = default; + + /// The number of repeated values in this run. + [[nodiscard]] constexpr values_count_type ValuesCount() const noexcept; + + /// The size in bits of each encoded value. + [[nodiscard]] constexpr bit_size_type ValuesBitWidth() const noexcept; + + /// A pointer to the repeated value raw bytes. + [[nodiscard]] constexpr raw_data_const_pointer RawDataPtr() const noexcept; + + /// The number of bytes used for the raw repeated value. + [[nodiscard]] constexpr raw_data_size_type RawDataSize() const noexcept; + + private: + /// The repeated value raw bytes stored inside the class + raw_data_storage data_ = {}; + /// The number of time the value is repeated + values_count_type values_count_ = 0; + /// The size in bit of a packed value in the run + bit_size_type value_bit_width_ = 0; +}; + +class BitPackedRun { + public: + using byte = uint8_t; + using raw_data_const_pointer = const byte*; + /// According to the Parquet thrift definition the page size can be written into an + /// int32_t. + using raw_data_size_type = int32_t; + /// The type of the size of either run, between 1 and 2^31-1 as per Parquet spec + using values_count_type = int32_t; + /// The type to represent a size in bits + using bit_size_type = int32_t; + + constexpr BitPackedRun() noexcept = default; + constexpr BitPackedRun(BitPackedRun const&) noexcept = default; + constexpr BitPackedRun(BitPackedRun&&) noexcept = default; + + constexpr BitPackedRun(raw_data_const_pointer data, values_count_type values_count, + bit_size_type value_bit_width) noexcept; + + constexpr BitPackedRun& operator=(BitPackedRun const&) noexcept = default; + constexpr BitPackedRun& operator=(BitPackedRun&&) noexcept = default; + + [[nodiscard]] constexpr values_count_type ValuesCount() const noexcept; + + /// The size in bits of each encoded value. + [[nodiscard]] constexpr bit_size_type ValuesBitWidth() const noexcept; + + [[nodiscard]] constexpr raw_data_const_pointer RawDataPtr() const noexcept; + + [[nodiscard]] constexpr raw_data_size_type RawDataSize() const noexcept; + + private: + /// The pointer to the beginning of the run + raw_data_const_pointer data_ = nullptr; + /// Number of values in this run. + raw_data_size_type values_count_ = 0; + /// The size in bit of a packed value in the run + bit_size_type value_bit_width_ = 0; +}; + +/// A parser that emits either a ``BitPackedRun`` or a ``RleRun``. +class RleBitPackedParser { + public: + using byte = uint8_t; + using raw_data_const_pointer = const byte*; + /// By Parquet thrift definition the page size can be written into an int32_t. + using raw_data_size_type = int32_t; + /// The type to represent a size in bits + using bit_size_type = int32_t; + /// The different types of runs emitted by the parser + using dynamic_run_type = std::variant<RleRun, BitPackedRun>; + + constexpr RleBitPackedParser() noexcept = default; + + constexpr RleBitPackedParser(raw_data_const_pointer data, raw_data_size_type data_size, + bit_size_type value_bit_width) noexcept; + + constexpr void Reset(raw_data_const_pointer data, raw_data_size_type data_size, + bit_size_type value_bit_width_) noexcept; + + /// Get the current run with a small parsing cost without advancing the iteration. + [[nodiscard]] std::optional<dynamic_run_type> Peek() const; + + /// Move to the next run. + [[nodiscard]] bool Advance(); + + /// Advance and return the current run. + [[nodiscard]] std::optional<dynamic_run_type> Next(); + + /// Whether there is still runs to iterate over. + /// + /// WARN: Due to lack of proper error handling, iteration with Next and Peek could + /// return not data while the parser is not exhausted. + /// This is how one can check for errors. + [[nodiscard]] bool Exhausted() const; + + /// Enum to return from an ``Parse`` handler. + /// + /// Since a callback has no way to know when to stop, the handler must return + /// a value indicating to the ``Parse`` function whether to stop or continue. + enum class ControlFlow { + Continue, + Break, + }; + + /// A callback approach to parsing. + /// + /// This approach is used to reduce the number of dynamic lookups involved with using a + /// variant. + /// + /// The handler must be of the form + /// ```cpp` + /// struct Handler { + /// ControlFlow OnBitPackedRun(BitPackedRun run); + /// + /// ControlFlow OnRleRun(RleRun run); + /// }; + /// ``` + template <typename Handler> + void Parse(Handler&& handler); + + private: + /// The pointer to the beginning of the run + raw_data_const_pointer data_ = nullptr; + /// Size in bytes of the run. + raw_data_size_type data_size_ = 0; + /// The size in bit of a packed value in the run + bit_size_type value_bit_width_ = 0; + + /// Run the handler on the run read and return the number of values read. + /// Does not advance the parser. + template <typename Handler> + std::pair<raw_data_size_type, ControlFlow> PeekImpl(Handler&&) const; +}; + /// Decoder class for RLE encoded data. +template <typename T> class RleDecoder { public: - /// Create a decoder object. buffer/buffer_len is the decoded data. - /// bit_width is the width of each value (before encoding). - RleDecoder(const uint8_t* buffer, int buffer_len, int bit_width) - : bit_reader_(buffer, buffer_len), - bit_width_(bit_width), - current_value_(0), - repeat_count_(0), - literal_count_(0) { - ARROW_DCHECK_GE(bit_width_, 0); - ARROW_DCHECK_LE(bit_width_, 64); - } + /// The type in which the data should be decoded. + using value_type = T; + /// The type of run that can be decoded. + using run_type = RleRun; + using values_count_type = run_type::values_count_type; - RleDecoder() : bit_width_(-1) {} + constexpr RleDecoder() noexcept = default; - void Reset(const uint8_t* buffer, int buffer_len, int bit_width) { - ARROW_DCHECK_GE(bit_width, 0); - ARROW_DCHECK_LE(bit_width, 64); - bit_reader_.Reset(buffer, buffer_len); - bit_width_ = bit_width; - current_value_ = 0; - repeat_count_ = 0; - literal_count_ = 0; - } + explicit RleDecoder(run_type const& run) noexcept; + + void Reset(run_type const& run) noexcept; + + /// Return the number of values that can be advanced. + [[nodiscard]] values_count_type Remaining() const; Review Comment: Note that if errors are signaled by a special value (e.g. zero), the docstring should make it clear. ########## cpp/src/arrow/util/rle_encoding_internal.h: ########## @@ -84,32 +85,278 @@ namespace util { /// (total 26 bytes, 1 byte overhead) // +class RleRun { + public: + using byte = uint8_t; + /// Enough space to store a 64bit value + using raw_data_storage = std::array<byte, 8>; + using raw_data_const_pointer = const byte*; + using raw_data_size_type = int32_t; + /// The type of the size of either run, between 1 and 2^31-1 as per Parquet spec + using values_count_type = int32_t; + /// The type to represent a size in bits + using bit_size_type = int32_t; + + constexpr RleRun() noexcept = default; + constexpr RleRun(RleRun const&) noexcept = default; + constexpr RleRun(RleRun&&) noexcept = default; + + explicit RleRun(raw_data_const_pointer data, values_count_type values_count, + bit_size_type value_bit_width) noexcept; + + constexpr RleRun& operator=(RleRun const&) noexcept = default; + constexpr RleRun& operator=(RleRun&&) noexcept = default; + + /// The number of repeated values in this run. + [[nodiscard]] constexpr values_count_type ValuesCount() const noexcept; + + /// The size in bits of each encoded value. + [[nodiscard]] constexpr bit_size_type ValuesBitWidth() const noexcept; + + /// A pointer to the repeated value raw bytes. + [[nodiscard]] constexpr raw_data_const_pointer RawDataPtr() const noexcept; + + /// The number of bytes used for the raw repeated value. + [[nodiscard]] constexpr raw_data_size_type RawDataSize() const noexcept; + + private: + /// The repeated value raw bytes stored inside the class + raw_data_storage data_ = {}; + /// The number of time the value is repeated + values_count_type values_count_ = 0; + /// The size in bit of a packed value in the run + bit_size_type value_bit_width_ = 0; +}; + +class BitPackedRun { + public: + using byte = uint8_t; + using raw_data_const_pointer = const byte*; + /// According to the Parquet thrift definition the page size can be written into an + /// int32_t. + using raw_data_size_type = int32_t; + /// The type of the size of either run, between 1 and 2^31-1 as per Parquet spec + using values_count_type = int32_t; + /// The type to represent a size in bits + using bit_size_type = int32_t; + + constexpr BitPackedRun() noexcept = default; + constexpr BitPackedRun(BitPackedRun const&) noexcept = default; + constexpr BitPackedRun(BitPackedRun&&) noexcept = default; + + constexpr BitPackedRun(raw_data_const_pointer data, values_count_type values_count, + bit_size_type value_bit_width) noexcept; + + constexpr BitPackedRun& operator=(BitPackedRun const&) noexcept = default; + constexpr BitPackedRun& operator=(BitPackedRun&&) noexcept = default; + + [[nodiscard]] constexpr values_count_type ValuesCount() const noexcept; + + /// The size in bits of each encoded value. + [[nodiscard]] constexpr bit_size_type ValuesBitWidth() const noexcept; + + [[nodiscard]] constexpr raw_data_const_pointer RawDataPtr() const noexcept; + + [[nodiscard]] constexpr raw_data_size_type RawDataSize() const noexcept; + + private: + /// The pointer to the beginning of the run + raw_data_const_pointer data_ = nullptr; + /// Number of values in this run. + raw_data_size_type values_count_ = 0; + /// The size in bit of a packed value in the run + bit_size_type value_bit_width_ = 0; +}; + +/// A parser that emits either a ``BitPackedRun`` or a ``RleRun``. +class RleBitPackedParser { + public: + using byte = uint8_t; + using raw_data_const_pointer = const byte*; + /// By Parquet thrift definition the page size can be written into an int32_t. + using raw_data_size_type = int32_t; + /// The type to represent a size in bits + using bit_size_type = int32_t; + /// The different types of runs emitted by the parser + using dynamic_run_type = std::variant<RleRun, BitPackedRun>; + + constexpr RleBitPackedParser() noexcept = default; + + constexpr RleBitPackedParser(raw_data_const_pointer data, raw_data_size_type data_size, + bit_size_type value_bit_width) noexcept; + + constexpr void Reset(raw_data_const_pointer data, raw_data_size_type data_size, + bit_size_type value_bit_width_) noexcept; + + /// Get the current run with a small parsing cost without advancing the iteration. + [[nodiscard]] std::optional<dynamic_run_type> Peek() const; + + /// Move to the next run. + [[nodiscard]] bool Advance(); + + /// Advance and return the current run. + [[nodiscard]] std::optional<dynamic_run_type> Next(); + + /// Whether there is still runs to iterate over. + /// + /// WARN: Due to lack of proper error handling, iteration with Next and Peek could + /// return not data while the parser is not exhausted. + /// This is how one can check for errors. + [[nodiscard]] bool Exhausted() const; + + /// Enum to return from an ``Parse`` handler. + /// + /// Since a callback has no way to know when to stop, the handler must return + /// a value indicating to the ``Parse`` function whether to stop or continue. + enum class ControlFlow { + Continue, + Break, + }; + + /// A callback approach to parsing. + /// + /// This approach is used to reduce the number of dynamic lookups involved with using a + /// variant. + /// + /// The handler must be of the form + /// ```cpp` + /// struct Handler { + /// ControlFlow OnBitPackedRun(BitPackedRun run); + /// + /// ControlFlow OnRleRun(RleRun run); + /// }; + /// ``` + template <typename Handler> + void Parse(Handler&& handler); + + private: + /// The pointer to the beginning of the run + raw_data_const_pointer data_ = nullptr; + /// Size in bytes of the run. + raw_data_size_type data_size_ = 0; + /// The size in bit of a packed value in the run + bit_size_type value_bit_width_ = 0; + + /// Run the handler on the run read and return the number of values read. + /// Does not advance the parser. + template <typename Handler> + std::pair<raw_data_size_type, ControlFlow> PeekImpl(Handler&&) const; +}; + /// Decoder class for RLE encoded data. +template <typename T> class RleDecoder { public: - /// Create a decoder object. buffer/buffer_len is the decoded data. - /// bit_width is the width of each value (before encoding). - RleDecoder(const uint8_t* buffer, int buffer_len, int bit_width) - : bit_reader_(buffer, buffer_len), - bit_width_(bit_width), - current_value_(0), - repeat_count_(0), - literal_count_(0) { - ARROW_DCHECK_GE(bit_width_, 0); - ARROW_DCHECK_LE(bit_width_, 64); - } + /// The type in which the data should be decoded. + using value_type = T; + /// The type of run that can be decoded. + using run_type = RleRun; + using values_count_type = run_type::values_count_type; - RleDecoder() : bit_width_(-1) {} + constexpr RleDecoder() noexcept = default; - void Reset(const uint8_t* buffer, int buffer_len, int bit_width) { - ARROW_DCHECK_GE(bit_width, 0); - ARROW_DCHECK_LE(bit_width, 64); - bit_reader_.Reset(buffer, buffer_len); - bit_width_ = bit_width; - current_value_ = 0; - repeat_count_ = 0; - literal_count_ = 0; - } + explicit RleDecoder(run_type const& run) noexcept; + + void Reset(run_type const& run) noexcept; + + /// Return the number of values that can be advanced. + [[nodiscard]] values_count_type Remaining() const; Review Comment: > For all functions reading data, I'd consider it dangerous not to read the output (e.g. number of bytes read) since it carries error information (e.g. when it is zero). What do you think? Ah! Well, in that case we can keep it indeed. But for simple accessors it seems superfluous. ########## cpp/src/arrow/util/rle_encoding_internal.h: ########## @@ -299,385 +552,988 @@ class RleEncoder { uint8_t* literal_indicator_byte_; }; +/************************* + * RleBitPackedDecoder * + *************************/ + +template <typename T> +RleBitPackedDecoder<T>::RleBitPackedDecoder(raw_data_const_pointer data, + raw_data_size_type data_size, + bit_size_type value_bit_width) noexcept { + Reset(data, data_size, value_bit_width); +} + +template <typename T> +void RleBitPackedDecoder<T>::Reset(raw_data_const_pointer data, + raw_data_size_type data_size, + bit_size_type value_bit_width) noexcept { + ARROW_DCHECK_GE(value_bit_width, 0); + ARROW_DCHECK_LE(value_bit_width, 64); + parser_.Reset(data, data_size, value_bit_width); + decoder_ = {}; +} + +template <typename T> +auto RleBitPackedDecoder<T>::RunRemaining() const -> values_count_type { + return std::visit([](auto const& dec) { return dec.Remaining(); }, decoder_); +} + +template <typename T> +bool RleBitPackedDecoder<T>::Exhausted() const { + return (RunRemaining() == 0) && parser_.Exhausted(); +} + template <typename T> -inline bool RleDecoder::Get(T* val) { +bool RleBitPackedDecoder<T>::ParseAndResetDecoder() { + auto dyn_run = parser_.Next(); + if (!dyn_run.has_value()) { + return false; + } + + if (auto* rle_run = std::get_if<BitPackedRun>(dyn_run.operator->())) { + decoder_ = {BitPackedDecoder<value_type>(*rle_run)}; + return true; + } + + auto* bit_packed_run = std::get_if<RleRun>(dyn_run.operator->()); + ARROW_DCHECK(bit_packed_run); // Only two possibilities in the variant + decoder_ = {RleDecoder<value_type>(*bit_packed_run)}; + return true; +} + +template <typename T> +auto RleBitPackedDecoder<T>::RunGetBatch(value_type* out, values_count_type batch_size) + -> values_count_type { + return std::visit([&](auto& dec) { return dec.GetBatch(out, batch_size); }, decoder_); +} + +template <typename T> +bool RleBitPackedDecoder<T>::Get(value_type* val) { return GetBatch(val, 1) == 1; } +namespace internal { + +/// A ``Parse`` handler that calls a single lambda. +/// +/// This lambda would typically take the input run as ``auto run`` (i.e. the lambda is +/// templated) and deduce other types from it. +template <typename Lambda> +struct LambdaHandler { + Lambda handlder_; + + auto OnBitPackedRun(BitPackedRun run) { return handlder_(std::move(run)); } + + auto OnRleRun(RleRun run) { return handlder_(std::move(run)); } +}; + +template <typename Lambda> +LambdaHandler(Lambda) -> LambdaHandler<Lambda>; + +template <typename value_type, typename Run> +struct decoder_for; + +template <typename value_type> +struct decoder_for<value_type, BitPackedRun> { + using type = BitPackedDecoder<value_type>; +}; + +template <typename value_type> +struct decoder_for<value_type, RleRun> { + using type = RleDecoder<value_type>; +}; + +template <typename value_type, typename Run> +using decoder_for_t = typename decoder_for<value_type, Run>::type; + +} // namespace internal + template <typename T> -inline int RleDecoder::GetBatch(T* values, int batch_size) { - ARROW_DCHECK_GE(bit_width_, 0); - int values_read = 0; - - auto* out = values; - - while (values_read < batch_size) { - int remaining = batch_size - values_read; - - if (repeat_count_ > 0) { // Repeated value case. - int repeat_batch = std::min(remaining, repeat_count_); - std::fill(out, out + repeat_batch, static_cast<T>(current_value_)); - - repeat_count_ -= repeat_batch; - values_read += repeat_batch; - out += repeat_batch; - } else if (literal_count_ > 0) { - int literal_batch = std::min(remaining, literal_count_); - int actual_read = bit_reader_.GetBatch(bit_width_, out, literal_batch); - if (actual_read != literal_batch) { - return values_read; - } +auto RleBitPackedDecoder<T>::GetBatch(value_type* out, values_count_type batch_size) + -> values_count_type { + using ControlFlow = RleBitPackedParser::ControlFlow; - literal_count_ -= literal_batch; - values_read += literal_batch; - out += literal_batch; - } else { - if (!NextCounts<T>()) return values_read; + values_count_type values_read = 0; + + // Remaining from a previous call that would have left some unread data from a run. + if (ARROW_PREDICT_FALSE(RunRemaining() > 0)) { + auto const read = RunGetBatch(out, batch_size); + values_read += read; + out += read; + + // Either we fulfilled all the batch to be read or we finished remaining run. + if (ARROW_PREDICT_FALSE(values_read == batch_size)) { + return values_read; } + ARROW_DCHECK(RunRemaining() == 0); } + auto handler = internal::LambdaHandler{ + [&](auto run) { + ARROW_DCHECK_LT(values_read, batch_size); + internal::decoder_for_t<value_type, decltype(run)> decoder(run); + auto const read = decoder.GetBatch(out, batch_size - values_read); + ARROW_DCHECK_LE(read, batch_size - values_read); + values_read += read; + out += read; + + // Stop reading and store remaining decoder + if (ARROW_PREDICT_FALSE(values_read == batch_size || read == 0)) { + decoder_ = std::move(decoder); + return ControlFlow::Break; + } + + return ControlFlow::Continue; + }, + }; + + parser_.Parse(handler); + return values_read; } -template <typename T, typename RunType, typename Converter> -inline int RleDecoder::GetSpaced(Converter converter, int batch_size, int null_count, - const uint8_t* valid_bits, int64_t valid_bits_offset, - T* out) { - if (ARROW_PREDICT_FALSE(null_count == batch_size)) { - converter.FillZero(out, out + batch_size); - return batch_size; +namespace internal { + +/// Utility class to safely handle values and null count without too error-prone +/// verbosity. +class BatchCounter { + public: + using size_type = int32_t; + + [[nodiscard]] static constexpr BatchCounter FromBatchSizeAndNulls( + size_type batch_size, size_type null_count) { + ARROW_DCHECK_LE(null_count, batch_size); + return {batch_size - null_count, null_count}; } - ARROW_DCHECK_GE(bit_width_, 0); - int values_read = 0; - int values_remaining = batch_size - null_count; + constexpr BatchCounter(size_type values_count, size_type null_count) noexcept + : values_count_(values_count), null_count_(null_count) {} - // Assume no bits to start. - arrow::internal::BitRunReader bit_reader(valid_bits, valid_bits_offset, - /*length=*/batch_size); - arrow::internal::BitRun valid_run = bit_reader.NextRun(); - while (values_read < batch_size) { - if (ARROW_PREDICT_FALSE(valid_run.length == 0)) { - valid_run = bit_reader.NextRun(); + [[nodiscard]] constexpr size_type ValuesCount() const noexcept { return values_count_; } + + [[nodiscard]] constexpr size_type ValuesRead() const noexcept { return values_read_; } + + [[nodiscard]] constexpr size_type ValuesRemaining() const noexcept { + ARROW_DCHECK_LE(values_read_, values_count_); + return values_count_ - values_read_; + } + + constexpr void AccrueReadValues(size_type to_read) noexcept { + ARROW_DCHECK_LE(to_read, ValuesRemaining()); + values_read_ += to_read; + } + + [[nodiscard]] constexpr size_type NullCount() const noexcept { return null_count_; } + + [[nodiscard]] constexpr size_type NullRead() const noexcept { return null_read_; } + + [[nodiscard]] constexpr size_type NullRemaining() const noexcept { + ARROW_DCHECK_LE(null_read_, null_count_); + return null_count_ - null_read_; + } + + constexpr void AccrueReadNulls(size_type to_read) noexcept { + ARROW_DCHECK_LE(to_read, NullRemaining()); + null_read_ += to_read; + } + + [[nodiscard]] constexpr size_type TotalRemaining() const noexcept { + return ValuesRemaining() + NullRemaining(); + } + + [[nodiscard]] constexpr size_type TotalRead() const noexcept { + return values_read_ + null_read_; + } + + [[nodiscard]] constexpr bool IsFullyNull() const noexcept { + return ValuesRemaining() == 0; + } + + [[nodiscard]] constexpr bool IsDone() const noexcept { return TotalRemaining() == 0; } + + private: + size_type values_count_ = 0; + size_type values_read_ = 0; + size_type null_count_ = 0; + size_type null_read_ = 0; +}; + +// The maximal unsigned size that a variable can fit. +template <typename T> +constexpr auto max_size_for_v = + static_cast<std::make_unsigned_t<T>>(std::numeric_limits<T>::max()); + +/// Overload for GetSpaced for a single run in a RleDecoder +template <typename Converter, typename BitRunReader, typename BitRun, + typename values_count_type, typename value_type> +auto RunGetSpaced(Converter& converter, typename Converter::out_type* out, + values_count_type batch_size, values_count_type null_count, + BitRunReader&& validity_reader, BitRun&& validity_run, + RleDecoder<value_type>& decoder) + -> std::pair<values_count_type, values_count_type> { + ARROW_DCHECK_GT(batch_size, 0); + // The equality case is handled in the main loop in GetSpaced + ARROW_DCHECK_LT(null_count, batch_size); + + auto batch = BatchCounter::FromBatchSizeAndNulls(batch_size, null_count); + + values_count_type const values_available = decoder.Remaining(); + ARROW_DCHECK_GT(values_available, 0); + auto values_remaining_run = [&]() { + auto out = values_available - batch.ValuesRead(); + ARROW_DCHECK_GE(out, 0); + return out; + }; + + // Consume as much as possible from the repeated run. + // We only need to count the number of nulls and non-nulls because we can fill in the + // same value for nulls and non-nulls. + // This proves to be a big efficiency win. + while (values_remaining_run() > 0 && !batch.IsDone()) { + ARROW_DCHECK_GE(validity_run.length, 0); + ARROW_DCHECK_LT(validity_run.length, max_size_for_v<values_count_type>); + ARROW_DCHECK_LE(validity_run.length, batch.TotalRemaining()); + auto const& validity_run_size = static_cast<values_count_type>(validity_run.length); + + if (validity_run.set) { + // We may end the current RLE run in the middle of the validity run + auto update_size = std::min(validity_run_size, values_remaining_run()); + batch.AccrueReadValues(update_size); + validity_run.length -= update_size; + } else { + // We can consume all nulls here because it does not matter if we consume on this + // RLE run, or an a next encoded run. The value filled does not matter. + auto update_size = std::min(validity_run_size, batch.NullRemaining()); + batch.AccrueReadNulls(update_size); + validity_run.length -= update_size; + } + + if (ARROW_PREDICT_TRUE(validity_run.length == 0)) { + validity_run = validity_reader.NextRun(); } + } + + value_type const value = decoder.Value(); + if (ARROW_PREDICT_FALSE(!converter.InputIsValid(value))) { + return {0, 0}; + } + converter.WriteRepeated(out, out + batch.TotalRead(), value); + auto const actual_values_read = decoder.Advance(batch.ValuesRead()); + // We always cropped the number of values_read by the remaining values in the run. + // What's more the RLE decoder should not encounter any errors. + ARROW_DCHECK_EQ(actual_values_read, batch.ValuesRead()); - ARROW_DCHECK_GT(batch_size, 0); - ARROW_DCHECK_GT(valid_run.length, 0); + return {batch.ValuesRead(), batch.NullRead()}; +} + +template <typename T, typename... Ts> +[[nodiscard]] constexpr T min(T x, Ts... ys) { + ((x = std::min(x, ys)), ...); + return x; +} + +static_assert(min(5) == 5); +static_assert(min(5, 4, -1) == -1); +static_assert(min(5, 41) == 5); + +template <typename Converter, typename BitRunReader, typename BitRun, + typename values_count_type, typename value_type> +auto RunGetSpaced(Converter& converter, typename Converter::out_type* out, + values_count_type batch_size, values_count_type null_count, + BitRunReader&& validity_reader, BitRun&& validity_run, + BitPackedDecoder<value_type>& decoder) + -> std::pair<values_count_type, values_count_type> { + ARROW_DCHECK_GT(batch_size, 0); + // The equality case is handled in the main loop in GetSpaced + ARROW_DCHECK_LT(null_count, batch_size); + + auto batch = BatchCounter::FromBatchSizeAndNulls(batch_size, null_count); + + values_count_type const values_available = decoder.Remaining(); + ARROW_DCHECK_GT(values_available, 0); + auto run_values_remaining = [&]() { + auto out = values_available - batch.ValuesRead(); + ARROW_DCHECK_GE(out, 0); + return out; + }; + + while (run_values_remaining() > 0 && batch.ValuesRemaining() > 0) { + // TODO should this size be tune depending on sizeof(value_size)? cpu cache size? Review Comment: Do you plan to investigate this later? Perhaps open an issue? ########## cpp/src/arrow/util/rle_encoding_test.cc: ########## @@ -602,20 +1038,126 @@ struct GetBatchSpacedTestCase { int bit_width; }; -TEST(RleDecoder, GetBatchSpaced) { - uint32_t kSeed = 1337; - ::arrow::random::RandomArrayGenerator rand(kSeed); - - std::vector<GetBatchSpacedTestCase<int32_t>> int32_cases{ - {1, 100000, 0.01, 1}, {1, 100000, 0.1, 1}, {1, 100000, 0.5, 1}, - {4, 100000, 0.05, 3}, {100, 100000, 0.05, 7}, +template <typename T> +void DoTestGetBatchSpacedRoundtrip() { + using Data = DataTestRleBitPacked<T>; + using ArrowType = typename Data::ArrowType; + using RandomPart = typename Data::RandomPart; + using NullPart = typename Data::NullPart; + using RepeatPart = typename Data::RepeatPart; + + std::vector<Data> test_cases = { + { + {RandomPart{/* max=*/1, /* size=*/400, /* null_proba= */ 0.1}}, + /* bit_width= */ 1, + }, + { + { + RandomPart{/* max=*/7, /* size=*/10037, /* null_proba= */ 0.0}, + NullPart{/* size= */ 1153}, + RandomPart{/* max=*/7, /* size=*/800, /* null_proba= */ 0.5}, + }, + /* bit_width= */ 3, + }, + { + { + NullPart{/* size= */ 80}, + RandomPart{/* max=*/static_cast<T>(1023), /* size=*/800, + /* null_proba= */ 0.01}, + NullPart{/* size= */ 1023}, + }, + /* bit_width= */ 11, + }, + { + {RepeatPart{/* value=*/13, /* size=*/100000, /* null_proba= */ 0.01}}, + /* bit_width= */ 10, + }, + { + { + NullPart{/* size= */ 1024}, + RepeatPart{/* value=*/static_cast<T>(10000), /* size=*/100000, + /* null_proba= */ 0.1}, + NullPart{/* size= */ 77}, + }, + /* bit_width= */ 23, + }, + { + { + RepeatPart{/* value=*/13, /* size=*/100000, /* null_proba= */ 0.0}, + NullPart{/* size= */ 1153}, + RepeatPart{/* value=*/72, /* size=*/100799, /* null_proba= */ 0.5}, + }, + /* bit_width= */ 10, + }, + { + { + RandomPart{/* max=*/1, /* size=*/1013, /* null_proba= */ 0.01}, + NullPart{/* size=*/8}, + RepeatPart{1, /* size= */ 256, /* null_proba= */ 0.1}, + NullPart{/* size=*/128}, + RepeatPart{0, /* size= */ 256, /* null_proba= */ 0.0}, + NullPart{/* size=*/15}, + RandomPart{/* max=*/1, /* size=*/8 * 1024, /* null_proba= */ 0.01}, + }, + /* bit_width= */ 1, + }, }; - for (auto case_ : int32_cases) { - auto arr = rand.Int32(case_.size, /*min=*/0, case_.max_value, case_.null_probability); - CheckRoundTripSpaced<Int32Type>(*arr, case_.bit_width); - CheckRoundTripSpaced<Int32Type>(*arr->Slice(1), case_.bit_width); + + ::arrow::random::RandomArrayGenerator rand(/* seed= */ 12); + // FRAGILE: we create a dictionary large enough so that any encoded value from the + // previous test cases can be used as an index in the dictionary. + // Its size must be increased accordingly if larger values are encoded in the test + // cases. + auto dict = std::static_pointer_cast<arrow::FloatArray>(rand.Float32(20000, -1.0, 1.0)); + + // Number of bits available in T to write a positive integer. + constexpr int kBitsAvailable = 8 * sizeof(T) - (std::is_signed_v<T> ? 1 : 0); Review Comment: Most actual data (whether dictionary indices or definition levels) will opportunistically use a smaller bit-width, for the record. ########## cpp/src/arrow/util/rle_encoding_test.cc: ########## @@ -207,12 +209,310 @@ TEST(BitUtil, RoundTripIntValues) { } } +/// A Rle run is a simple class owning some data and a repetition count. +/// It does not know how to read such data. +TEST(Rle, RleRun) { + const std::array<uint8_t, 4> value = {21, 2, 0, 0}; + + RleRun::values_count_type value_count = 12; + + // 12 times the value 21 fitting over 5 bits + const auto run_5 = RleRun(value.data(), value_count, /* value_bit_width= */ 5); + EXPECT_EQ(run_5.ValuesCount(), value_count); + EXPECT_EQ(run_5.ValuesBitWidth(), 5); + EXPECT_EQ(run_5.RawDataSize(), 1); // 5 bits fit in one byte + EXPECT_EQ(*run_5.RawDataPtr(), 21); + + // 12 times the value 21 fitting over 16 bits Review Comment: ```suggestion // 12 times the value 21 fitting over 8 bits ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org