This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0c2087452d GH-47981: [C++][Parquet] Add compatibility with 
non-compliant RLE stream (#47992)
0c2087452d is described below

commit 0c2087452d138ed51f23d0dc7a5baff7bab87849
Author: Antoine Pitrou <[email protected]>
AuthorDate: Thu Nov 13 11:23:55 2025 +0100

    GH-47981: [C++][Parquet] Add compatibility with non-compliant RLE stream 
(#47992)
    
    ### Rationale for this change
    
    RLE-bit-packed streams are required by the Parquet spec to have 8-padded 
bit-packed runs, but some non-compliant encoders (such as Polars versions 
before https://github.com/pola-rs/polars/pull/13883) might generate a truncated 
last bit-packed run, which nevertheless contains enough *logical* values.
    
    ### What changes are included in this PR?
    
    1. Compatibility code for non-compliant RLE streams as described above
    2. Guard against zero-size dictionaries to avoid hitting an assertion in 
`DictionaryConverter`
    
    ### Are these changes tested?
    
    Yes, by additional unit tests.
    
    ### Are there any user-facing changes?
    
    No, except a bugfix.
    
    * GitHub Issue: #47981
    
    Authored-by: Antoine Pitrou <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/arrow/util/bit_util.h              |   8 +-
 cpp/src/arrow/util/rle_encoding_internal.h |  40 +++++++--
 cpp/src/arrow/util/rle_encoding_test.cc    | 132 +++++++++++++++++++++++++++++
 3 files changed, 169 insertions(+), 11 deletions(-)

diff --git a/cpp/src/arrow/util/bit_util.h b/cpp/src/arrow/util/bit_util.h
index 52a42f538b..c7849db871 100644
--- a/cpp/src/arrow/util/bit_util.h
+++ b/cpp/src/arrow/util/bit_util.h
@@ -372,10 +372,12 @@ void PackBits(const uint32_t* values, uint8_t* out) {
   }
 }
 
-constexpr int64_t MaxLEB128ByteLen(int64_t n_bits) { return CeilDiv(n_bits, 
7); }
+constexpr int32_t MaxLEB128ByteLen(int32_t n_bits) {
+  return static_cast<int32_t>(CeilDiv(n_bits, 7));
+}
 
 template <typename Int>
-constexpr int64_t kMaxLEB128ByteLenFor = MaxLEB128ByteLen(sizeof(Int) * 8);
+constexpr int32_t kMaxLEB128ByteLenFor = MaxLEB128ByteLen(sizeof(Int) * 8);
 
 /// Write a integer as LEB128
 ///
@@ -441,7 +443,7 @@ constexpr int32_t WriteLEB128(Int value, uint8_t* out, 
int32_t max_out_size) {
 template <typename Int>
 constexpr int32_t ParseLeadingLEB128(const uint8_t* data, int32_t 
max_data_size,
                                      Int* out) {
-  constexpr auto kMaxBytes = static_cast<int32_t>(kMaxLEB128ByteLenFor<Int>);
+  constexpr auto kMaxBytes = kMaxLEB128ByteLenFor<Int>;
   static_assert(kMaxBytes >= 1);
   constexpr uint8_t kLow7Mask = 0x7F;
   constexpr uint8_t kContinuationBit = 0x80;
diff --git a/cpp/src/arrow/util/rle_encoding_internal.h 
b/cpp/src/arrow/util/rle_encoding_internal.h
index 50193d8903..46991fc717 100644
--- a/cpp/src/arrow/util/rle_encoding_internal.h
+++ b/cpp/src/arrow/util/rle_encoding_internal.h
@@ -659,8 +659,8 @@ auto RleBitPackedParser::PeekImpl(Handler&& handler) const
 
   constexpr auto kMaxSize = bit_util::kMaxLEB128ByteLenFor<uint32_t>;
   uint32_t run_len_type = 0;
-  const auto header_bytes = bit_util::ParseLeadingLEB128(data_, kMaxSize, 
&run_len_type);
-
+  const auto header_bytes =
+      bit_util::ParseLeadingLEB128(data_, std::min(kMaxSize, data_size_), 
&run_len_type);
   if (ARROW_PREDICT_FALSE(header_bytes == 0)) {
     // Malformed LEB128 data
     return {0, ControlFlow::Break};
@@ -670,7 +670,7 @@ auto RleBitPackedParser::PeekImpl(Handler&& handler) const
   const uint32_t count = run_len_type >> 1;
   if (is_bit_packed) {
     // Bit-packed run
-    constexpr auto kMaxCount = 
bit_util::CeilDiv(internal::max_size_for_v<rle_size_t>, 8);
+    constexpr auto kMaxCount = internal::max_size_for_v<rle_size_t> / 8;
     if (ARROW_PREDICT_FALSE(count == 0 || count > kMaxCount)) {
       // Illegal number of encoded values
       return {0, ControlFlow::Break};
@@ -679,12 +679,24 @@ auto RleBitPackedParser::PeekImpl(Handler&& handler) const
     ARROW_DCHECK_LT(static_cast<uint64_t>(count) * 8,
                     internal::max_size_for_v<rle_size_t>);
     // Count Already divided by 8 for byte size calculations
-    const auto bytes_read = header_bytes + static_cast<int64_t>(count) * 
value_bit_width_;
+    auto bytes_read = header_bytes + static_cast<int64_t>(count) * 
value_bit_width_;
+    auto values_count = static_cast<rle_size_t>(count * 8);
     if (ARROW_PREDICT_FALSE(bytes_read > data_size_)) {
-      // Bit-packed run would overflow data buffer
-      return {0, ControlFlow::Break};
+      // Bit-packed run would overflow data buffer, but we might still be able
+      // to return a truncated bit-packed such as generated by some 
non-compliant
+      // encoders.
+      // Example in GH-47981: column contains 25 5-bit values, has a single
+      // bit-packed run with count=4 (theoretically 32 values), but only 17
+      // bytes of RLE-bit-packed data (including the one-byte header).
+      bytes_read = data_size_;
+      values_count =
+          static_cast<rle_size_t>((bytes_read - header_bytes) * 8 / 
value_bit_width_);
+      // Only allow errors where the bit-packed run is not padded to a multiple
+      // of 8 values. Larger truncation should not occur.
+      if (values_count <= static_cast<rle_size_t>((count - 1) * 8)) {
+        return {0, ControlFlow::Break};
+      }
     }
-    const auto values_count = static_cast<rle_size_t>(count * 8);
 
     auto control = handler.OnBitPackedRun(
         BitPackedRun(data_ + header_bytes, values_count, value_bit_width_));
@@ -1215,7 +1227,8 @@ auto RleBitPackedDecoder<T>::GetBatchWithDict(const V* 
dictionary,
                                               rle_size_t batch_size) -> 
rle_size_t {
   using ControlFlow = RleBitPackedParser::ControlFlow;
 
-  if (ARROW_PREDICT_FALSE(batch_size <= 0)) {
+  if (ARROW_PREDICT_FALSE(batch_size <= 0 || dictionary_length == 0)) {
+    // Either empty batch or invalid dictionary
     return 0;
   }
 
@@ -1284,6 +1297,17 @@ auto RleBitPackedDecoder<T>::GetBatchWithDictSpaced(
   if (null_count == 0) {
     return GetBatchWithDict<V>(dictionary, dictionary_length, out, batch_size);
   }
+  if (null_count == batch_size) {
+    // All nulls, avoid instantiating DictionaryConverter as dictionary_length
+    // could be 0.
+    std::fill(out, out + batch_size, V{});
+    return batch_size;
+  }
+  if (ARROW_PREDICT_FALSE(batch_size <= 0 || dictionary_length == 0)) {
+    // Either empty batch or invalid dictionary
+    return 0;
+  }
+
   internal::DictionaryConverter<V, value_type> converter{dictionary, 
dictionary_length};
 
   return GetSpaced(converter, out, batch_size, valid_bits, valid_bits_offset, 
null_count);
diff --git a/cpp/src/arrow/util/rle_encoding_test.cc 
b/cpp/src/arrow/util/rle_encoding_test.cc
index 453fa78ea4..c709095a6c 100644
--- a/cpp/src/arrow/util/rle_encoding_test.cc
+++ b/cpp/src/arrow/util/rle_encoding_test.cc
@@ -35,6 +35,7 @@
 #include "arrow/util/bit_util.h"
 #include "arrow/util/io_util.h"
 #include "arrow/util/rle_encoding_internal.h"
+#include "arrow/util/span.h"
 
 namespace arrow::util {
 
@@ -458,6 +459,29 @@ void TestRleBitPackedParser(std::vector<uint8_t> bytes, 
rle_size_t bit_width,
   EXPECT_EQ(decoded, expected);
 }
 
+void TestRleBitPackedParserError(span<const uint8_t> bytes, rle_size_t 
bit_width) {
+  auto parser =
+      RleBitPackedParser(bytes.data(), static_cast<rle_size_t>(bytes.size()), 
bit_width);
+  EXPECT_FALSE(parser.exhausted());
+
+  struct {
+    auto OnRleRun(RleRun run) { return 
RleBitPackedParser::ControlFlow::Continue; }
+    auto OnBitPackedRun(BitPackedRun run) {
+      return RleBitPackedParser::ControlFlow::Continue;
+    }
+  } handler;
+
+  // Iterate over all runs
+  parser.Parse(handler);
+  // Non-exhaustion despite ControlFlow::Continue signals an error occurred.
+  EXPECT_FALSE(parser.exhausted());
+}
+
+void TestRleBitPackedParserError(const std::vector<uint8_t>& bytes,
+                                 rle_size_t bit_width) {
+  TestRleBitPackedParserError(span(bytes), bit_width);
+}
+
 TEST(RleBitPacked, RleBitPackedParser) {
   TestRleBitPackedParser<uint16_t>(
       /* bytes= */
@@ -500,6 +524,114 @@ TEST(RleBitPacked, RleBitPackedParser) {
   }
 }
 
+TEST(RleBitPacked, RleBitPackedParserInvalidNonPadded) {
+  // GH-47981: a non-padded trailing bit-packed, produced by some non-compliant
+  // encoders, should still be decoded successfully.
+
+  TestRleBitPackedParser<uint16_t>(
+      /* bytes= */
+      {/* LEB128 for 8 values bit packed marker */ 0x3,
+       /* Bitpacked run */ 0x88, 0xc6},
+      /* bit_width= */ 3,
+      /* expected= */ {0, 1, 2, 3, 4});
+  TestRleBitPackedParser<uint16_t>(
+      /* bytes= */
+      {/* LEB128 for 8 values bit packed marker */ 0x3,
+       /* Bitpacked run */ 0x88},
+      /* bit_width= */ 3,
+      /* expected= */ {0, 1});
+  TestRleBitPackedParser<uint16_t>(
+      /* bytes= */
+      {/* LEB128 for 8 values bit packed marker */ 0x3,
+       /* Bitpacked run */ 0x1, 0x2, 0x3},
+      /* bit_width= */ 8,
+      /* expected= */ {1, 2, 3});
+  TestRleBitPackedParser<uint16_t>(
+      /* bytes= */
+      {/* LEB128 for 8 values bit packed marker */ 0x3,
+       /* Bitpacked run */ 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7},
+      /* bit_width= */ 8,
+      /* expected= */ {1, 2, 3, 4, 5, 6, 7});
+  TestRleBitPackedParser<uint16_t>(
+      /* bytes= */
+      {/* LEB128 for 16 values bit packed marker */ 0x5,
+       /* Bitpacked run */ 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9},
+      /* bit_width= */ 8,
+      /* expected= */ {1, 2, 3, 4, 5, 6, 7, 8, 9});
+
+  // If the trailing bit-packed declares more values than padding allows,
+  // it's an error.
+
+  // 2 values encoded, 16 values declared (8 would be ok)
+  TestRleBitPackedParserError(
+      /* bytes= */
+      {/* LEB128 for 16 values bit packed marker */ 0x5,
+       /* Bitpacked run */ 0x88},
+      /* bit_width= */ 3);
+  // 8 values encoded, 16 values declared (8 would be ok)
+  TestRleBitPackedParserError(
+      /* bytes= */
+      {/* LEB128 for 16 values bit packed marker */ 0x5,
+       /* Bitpacked run */ 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8},
+      /* bit_width= */ 8);
+
+  // If the trailing bit-packed run does not have room for at least 1 value,
+  // it's an error.
+
+  TestRleBitPackedParserError(
+      /* bytes= */
+      {/* LEB128 for 8 values bit packed marker */ 0x3},
+      /* bit_width= */ 3);
+  TestRleBitPackedParserError(
+      /* bytes= */
+      {/* LEB128 for 8 values bit packed marker */ 0x3,
+       /* Bitpacked run */ 0x1},
+      /* bit_width= */ 9);
+}
+
+TEST(RleBitPacked, RleBitPackedParserErrors) {
+  using V = std::vector<uint8_t>;
+
+  // Truncated LEB128 header
+  TestRleBitPackedParserError(
+      /* bytes= */
+      V{0x81},
+      /* bit_width= */ 3);
+
+  // Invalid LEB128 header for a 32-bit value
+  TestRleBitPackedParserError(
+      /* bytes= */
+      V{0xFF, 0xFF, 0xFF, 0xFF, 0x7f},
+      /* bit_width= */ 3);
+
+  // Zero-length repeated run
+  TestRleBitPackedParserError(
+      /* bytes= */
+      V{0x00, 0x00},
+      /* bit_width= */ 3);
+  TestRleBitPackedParserError(
+      /* bytes= */
+      V{0x80, 0x00, 0x00},
+      /* bit_width= */ 3);
+
+  // Zero-length bit-packed run
+  TestRleBitPackedParserError(
+      /* bytes= */
+      V{0x01},
+      /* bit_width= */ 3);
+  TestRleBitPackedParserError(
+      /* bytes= */
+      V{0x80, 0x01},
+      /* bit_width= */ 3);
+
+  // Bit-packed run too large
+  // (we pass a span<> on invalid memory, but only the reachable part should 
be read)
+  std::vector<uint8_t> bytes = {0x81, 0x80, 0x80, 0x80, 0x02};
+  TestRleBitPackedParserError(
+      /* bytes= */ span(bytes.data(), 1ULL << 30),
+      /* bit_width= */ 1);
+}
+
 // Validates encoding of values by encoding and decoding them.  If
 // expected_encoding != NULL, also validates that the encoded buffer is
 // exactly 'expected_encoding'.

Reply via email to