This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 0c2087452d GH-47981: [C++][Parquet] Add compatibility with
non-compliant RLE stream (#47992)
0c2087452d is described below
commit 0c2087452d138ed51f23d0dc7a5baff7bab87849
Author: Antoine Pitrou <[email protected]>
AuthorDate: Thu Nov 13 11:23:55 2025 +0100
GH-47981: [C++][Parquet] Add compatibility with non-compliant RLE stream
(#47992)
### Rationale for this change
RLE-bit-packed streams are required by the Parquet spec to have 8-padded
bit-packed runs, but some non-compliant encoders (such as Polars versions
before https://github.com/pola-rs/polars/pull/13883) might generate a truncated
last bit-packed run, which nevertheless contains enough *logical* values.
### What changes are included in this PR?
1. Compatibility code for non-compliant RLE streams as described above
2. Guard against zero-size dictionaries to avoid hitting an assertion in
`DictionaryConverter`
### Are these changes tested?
Yes, by additional unit tests.
### Are there any user-facing changes?
No, except a bugfix.
* GitHub Issue: #47981
Authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
cpp/src/arrow/util/bit_util.h | 8 +-
cpp/src/arrow/util/rle_encoding_internal.h | 40 +++++++--
cpp/src/arrow/util/rle_encoding_test.cc | 132 +++++++++++++++++++++++++++++
3 files changed, 169 insertions(+), 11 deletions(-)
diff --git a/cpp/src/arrow/util/bit_util.h b/cpp/src/arrow/util/bit_util.h
index 52a42f538b..c7849db871 100644
--- a/cpp/src/arrow/util/bit_util.h
+++ b/cpp/src/arrow/util/bit_util.h
@@ -372,10 +372,12 @@ void PackBits(const uint32_t* values, uint8_t* out) {
}
}
-constexpr int64_t MaxLEB128ByteLen(int64_t n_bits) { return CeilDiv(n_bits,
7); }
+constexpr int32_t MaxLEB128ByteLen(int32_t n_bits) {
+ return static_cast<int32_t>(CeilDiv(n_bits, 7));
+}
template <typename Int>
-constexpr int64_t kMaxLEB128ByteLenFor = MaxLEB128ByteLen(sizeof(Int) * 8);
+constexpr int32_t kMaxLEB128ByteLenFor = MaxLEB128ByteLen(sizeof(Int) * 8);
/// Write a integer as LEB128
///
@@ -441,7 +443,7 @@ constexpr int32_t WriteLEB128(Int value, uint8_t* out,
int32_t max_out_size) {
template <typename Int>
constexpr int32_t ParseLeadingLEB128(const uint8_t* data, int32_t
max_data_size,
Int* out) {
- constexpr auto kMaxBytes = static_cast<int32_t>(kMaxLEB128ByteLenFor<Int>);
+ constexpr auto kMaxBytes = kMaxLEB128ByteLenFor<Int>;
static_assert(kMaxBytes >= 1);
constexpr uint8_t kLow7Mask = 0x7F;
constexpr uint8_t kContinuationBit = 0x80;
diff --git a/cpp/src/arrow/util/rle_encoding_internal.h
b/cpp/src/arrow/util/rle_encoding_internal.h
index 50193d8903..46991fc717 100644
--- a/cpp/src/arrow/util/rle_encoding_internal.h
+++ b/cpp/src/arrow/util/rle_encoding_internal.h
@@ -659,8 +659,8 @@ auto RleBitPackedParser::PeekImpl(Handler&& handler) const
constexpr auto kMaxSize = bit_util::kMaxLEB128ByteLenFor<uint32_t>;
uint32_t run_len_type = 0;
- const auto header_bytes = bit_util::ParseLeadingLEB128(data_, kMaxSize,
&run_len_type);
-
+ const auto header_bytes =
+ bit_util::ParseLeadingLEB128(data_, std::min(kMaxSize, data_size_),
&run_len_type);
if (ARROW_PREDICT_FALSE(header_bytes == 0)) {
// Malformed LEB128 data
return {0, ControlFlow::Break};
@@ -670,7 +670,7 @@ auto RleBitPackedParser::PeekImpl(Handler&& handler) const
const uint32_t count = run_len_type >> 1;
if (is_bit_packed) {
// Bit-packed run
- constexpr auto kMaxCount =
bit_util::CeilDiv(internal::max_size_for_v<rle_size_t>, 8);
+ constexpr auto kMaxCount = internal::max_size_for_v<rle_size_t> / 8;
if (ARROW_PREDICT_FALSE(count == 0 || count > kMaxCount)) {
// Illegal number of encoded values
return {0, ControlFlow::Break};
@@ -679,12 +679,24 @@ auto RleBitPackedParser::PeekImpl(Handler&& handler) const
ARROW_DCHECK_LT(static_cast<uint64_t>(count) * 8,
internal::max_size_for_v<rle_size_t>);
// Count Already divided by 8 for byte size calculations
- const auto bytes_read = header_bytes + static_cast<int64_t>(count) *
value_bit_width_;
+ auto bytes_read = header_bytes + static_cast<int64_t>(count) *
value_bit_width_;
+ auto values_count = static_cast<rle_size_t>(count * 8);
if (ARROW_PREDICT_FALSE(bytes_read > data_size_)) {
- // Bit-packed run would overflow data buffer
- return {0, ControlFlow::Break};
+ // Bit-packed run would overflow data buffer, but we might still be able
+ // to return a truncated bit-packed such as generated by some
non-compliant
+ // encoders.
+ // Example in GH-47981: column contains 25 5-bit values, has a single
+ // bit-packed run with count=4 (theoretically 32 values), but only 17
+ // bytes of RLE-bit-packed data (including the one-byte header).
+ bytes_read = data_size_;
+ values_count =
+ static_cast<rle_size_t>((bytes_read - header_bytes) * 8 /
value_bit_width_);
+ // Only allow errors where the bit-packed run is not padded to a multiple
+ // of 8 values. Larger truncation should not occur.
+ if (values_count <= static_cast<rle_size_t>((count - 1) * 8)) {
+ return {0, ControlFlow::Break};
+ }
}
- const auto values_count = static_cast<rle_size_t>(count * 8);
auto control = handler.OnBitPackedRun(
BitPackedRun(data_ + header_bytes, values_count, value_bit_width_));
@@ -1215,7 +1227,8 @@ auto RleBitPackedDecoder<T>::GetBatchWithDict(const V*
dictionary,
rle_size_t batch_size) ->
rle_size_t {
using ControlFlow = RleBitPackedParser::ControlFlow;
- if (ARROW_PREDICT_FALSE(batch_size <= 0)) {
+ if (ARROW_PREDICT_FALSE(batch_size <= 0 || dictionary_length == 0)) {
+ // Either empty batch or invalid dictionary
return 0;
}
@@ -1284,6 +1297,17 @@ auto RleBitPackedDecoder<T>::GetBatchWithDictSpaced(
if (null_count == 0) {
return GetBatchWithDict<V>(dictionary, dictionary_length, out, batch_size);
}
+ if (null_count == batch_size) {
+ // All nulls, avoid instantiating DictionaryConverter as dictionary_length
+ // could be 0.
+ std::fill(out, out + batch_size, V{});
+ return batch_size;
+ }
+ if (ARROW_PREDICT_FALSE(batch_size <= 0 || dictionary_length == 0)) {
+ // Either empty batch or invalid dictionary
+ return 0;
+ }
+
internal::DictionaryConverter<V, value_type> converter{dictionary,
dictionary_length};
return GetSpaced(converter, out, batch_size, valid_bits, valid_bits_offset,
null_count);
diff --git a/cpp/src/arrow/util/rle_encoding_test.cc
b/cpp/src/arrow/util/rle_encoding_test.cc
index 453fa78ea4..c709095a6c 100644
--- a/cpp/src/arrow/util/rle_encoding_test.cc
+++ b/cpp/src/arrow/util/rle_encoding_test.cc
@@ -35,6 +35,7 @@
#include "arrow/util/bit_util.h"
#include "arrow/util/io_util.h"
#include "arrow/util/rle_encoding_internal.h"
+#include "arrow/util/span.h"
namespace arrow::util {
@@ -458,6 +459,29 @@ void TestRleBitPackedParser(std::vector<uint8_t> bytes,
rle_size_t bit_width,
EXPECT_EQ(decoded, expected);
}
+void TestRleBitPackedParserError(span<const uint8_t> bytes, rle_size_t
bit_width) {
+ auto parser =
+ RleBitPackedParser(bytes.data(), static_cast<rle_size_t>(bytes.size()),
bit_width);
+ EXPECT_FALSE(parser.exhausted());
+
+ struct {
+ auto OnRleRun(RleRun run) { return
RleBitPackedParser::ControlFlow::Continue; }
+ auto OnBitPackedRun(BitPackedRun run) {
+ return RleBitPackedParser::ControlFlow::Continue;
+ }
+ } handler;
+
+ // Iterate over all runs
+ parser.Parse(handler);
+ // Non-exhaustion despite ControlFlow::Continue signals an error occurred.
+ EXPECT_FALSE(parser.exhausted());
+}
+
+void TestRleBitPackedParserError(const std::vector<uint8_t>& bytes,
+ rle_size_t bit_width) {
+ TestRleBitPackedParserError(span(bytes), bit_width);
+}
+
TEST(RleBitPacked, RleBitPackedParser) {
TestRleBitPackedParser<uint16_t>(
/* bytes= */
@@ -500,6 +524,114 @@ TEST(RleBitPacked, RleBitPackedParser) {
}
}
+TEST(RleBitPacked, RleBitPackedParserInvalidNonPadded) {
+ // GH-47981: a non-padded trailing bit-packed, produced by some non-compliant
+ // encoders, should still be decoded successfully.
+
+ TestRleBitPackedParser<uint16_t>(
+ /* bytes= */
+ {/* LEB128 for 8 values bit packed marker */ 0x3,
+ /* Bitpacked run */ 0x88, 0xc6},
+ /* bit_width= */ 3,
+ /* expected= */ {0, 1, 2, 3, 4});
+ TestRleBitPackedParser<uint16_t>(
+ /* bytes= */
+ {/* LEB128 for 8 values bit packed marker */ 0x3,
+ /* Bitpacked run */ 0x88},
+ /* bit_width= */ 3,
+ /* expected= */ {0, 1});
+ TestRleBitPackedParser<uint16_t>(
+ /* bytes= */
+ {/* LEB128 for 8 values bit packed marker */ 0x3,
+ /* Bitpacked run */ 0x1, 0x2, 0x3},
+ /* bit_width= */ 8,
+ /* expected= */ {1, 2, 3});
+ TestRleBitPackedParser<uint16_t>(
+ /* bytes= */
+ {/* LEB128 for 8 values bit packed marker */ 0x3,
+ /* Bitpacked run */ 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7},
+ /* bit_width= */ 8,
+ /* expected= */ {1, 2, 3, 4, 5, 6, 7});
+ TestRleBitPackedParser<uint16_t>(
+ /* bytes= */
+ {/* LEB128 for 16 values bit packed marker */ 0x5,
+ /* Bitpacked run */ 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9},
+ /* bit_width= */ 8,
+ /* expected= */ {1, 2, 3, 4, 5, 6, 7, 8, 9});
+
+ // If the trailing bit-packed declares more values than padding allows,
+ // it's an error.
+
+ // 2 values encoded, 16 values declared (8 would be ok)
+ TestRleBitPackedParserError(
+ /* bytes= */
+ {/* LEB128 for 16 values bit packed marker */ 0x5,
+ /* Bitpacked run */ 0x88},
+ /* bit_width= */ 3);
+ // 8 values encoded, 16 values declared (8 would be ok)
+ TestRleBitPackedParserError(
+ /* bytes= */
+ {/* LEB128 for 16 values bit packed marker */ 0x5,
+ /* Bitpacked run */ 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8},
+ /* bit_width= */ 8);
+
+ // If the trailing bit-packed run does not have room for at least 1 value,
+ // it's an error.
+
+ TestRleBitPackedParserError(
+ /* bytes= */
+ {/* LEB128 for 8 values bit packed marker */ 0x3},
+ /* bit_width= */ 3);
+ TestRleBitPackedParserError(
+ /* bytes= */
+ {/* LEB128 for 8 values bit packed marker */ 0x3,
+ /* Bitpacked run */ 0x1},
+ /* bit_width= */ 9);
+}
+
+TEST(RleBitPacked, RleBitPackedParserErrors) {
+ using V = std::vector<uint8_t>;
+
+ // Truncated LEB128 header
+ TestRleBitPackedParserError(
+ /* bytes= */
+ V{0x81},
+ /* bit_width= */ 3);
+
+ // Invalid LEB128 header for a 32-bit value
+ TestRleBitPackedParserError(
+ /* bytes= */
+ V{0xFF, 0xFF, 0xFF, 0xFF, 0x7f},
+ /* bit_width= */ 3);
+
+ // Zero-length repeated run
+ TestRleBitPackedParserError(
+ /* bytes= */
+ V{0x00, 0x00},
+ /* bit_width= */ 3);
+ TestRleBitPackedParserError(
+ /* bytes= */
+ V{0x80, 0x00, 0x00},
+ /* bit_width= */ 3);
+
+ // Zero-length bit-packed run
+ TestRleBitPackedParserError(
+ /* bytes= */
+ V{0x01},
+ /* bit_width= */ 3);
+ TestRleBitPackedParserError(
+ /* bytes= */
+ V{0x80, 0x01},
+ /* bit_width= */ 3);
+
+ // Bit-packed run too large
+ // (we pass a span<> on invalid memory, but only the reachable part should
be read)
+ std::vector<uint8_t> bytes = {0x81, 0x80, 0x80, 0x80, 0x02};
+ TestRleBitPackedParserError(
+ /* bytes= */ span(bytes.data(), 1ULL << 30),
+ /* bit_width= */ 1);
+}
+
// Validates encoding of values by encoding and decoding them. If
// expected_encoding != NULL, also validates that the encoded buffer is
// exactly 'expected_encoding'.