This is an automated email from the ASF dual-hosted git repository.

pitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new a3f3d8bb0b GH-49959: [C++][Parquet] Avoid unbounded temp alloc in 
BYTE_STREAM_SPLIT decoder (#49960)
a3f3d8bb0b is described below

commit a3f3d8bb0b5386d3c5390db0575398942d156a06
Author: Antoine Pitrou <[email protected]>
AuthorDate: Tue May 12 12:09:14 2026 +0200

    GH-49959: [C++][Parquet] Avoid unbounded temp alloc in BYTE_STREAM_SPLIT 
decoder (#49960)
    
    ### Rationale for this change
    
    The BYTE_STREAM_SPLIT encoder and decoder allocate a `SmallVector` to hold 
the addresses of the different streams. While this is valid for small number of 
streams (which is most or all legitimate use cases), very large numbers of 
streams (e.g. FLBA(100000000)) can trigger a huge temporary memory allocation.
    
    This issue was found by OSS-Fuzz: 
https://issues.oss-fuzz.com/issues/511575321
    
    ### What changes are included in this PR?
    
    1. When the width of the BYTE_STREAM_SPLIT-encoded type is larger than a 
predefined constant, switch to a slower implementation that doesn't need any 
temporary allocation
    2. When encoding or decoding 0 values, avoid some pointless setup work
    3. Add tests for the two conditions above
    
    ### Are these changes tested?
    
    Yes, by additional unit tests and by additional fuzz regression file.
    
    ### Are there any user-facing changes?
    
    * GitHub Issue: #49959
    
    Authored-by: Antoine Pitrou <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/arrow/util/byte_stream_split_internal.h | 44 +++++++++++++++++++------
 cpp/src/arrow/util/byte_stream_split_test.cc    | 11 ++++++-
 cpp/src/parquet/decoder.cc                      | 12 ++++---
 cpp/src/parquet/encoder.cc                      | 10 +++---
 cpp/src/parquet/encoding_test.cc                |  2 +-
 testing                                         |  2 +-
 6 files changed, 59 insertions(+), 22 deletions(-)

diff --git a/cpp/src/arrow/util/byte_stream_split_internal.h 
b/cpp/src/arrow/util/byte_stream_split_internal.h
index e237beb791..2e713dd42f 100644
--- a/cpp/src/arrow/util/byte_stream_split_internal.h
+++ b/cpp/src/arrow/util/byte_stream_split_internal.h
@@ -421,14 +421,28 @@ void ByteStreamSplitEncodeScalar(const uint8_t* 
raw_values, int width,
   DoSplitStreams(raw_values, kNumStreams, num_values, dest_streams.data());
 }
 
+// If changing this value, please check that TestByteStreamSplitLargeWidth 
still
+// exercises the slow path.
+constexpr inline int kByteStreamSplitMaxTemporaryAlloc = 8192;
+
 inline void ByteStreamSplitEncodeScalarDynamic(const uint8_t* raw_values, int 
width,
                                                const int64_t num_values, 
uint8_t* out) {
-  ::arrow::internal::SmallVector<uint8_t*, 16> dest_streams;
-  dest_streams.resize(width);
-  for (int stream = 0; stream < width; ++stream) {
-    dest_streams[stream] = &out[stream * num_values];
+  if (ARROW_PREDICT_TRUE(width < kByteStreamSplitMaxTemporaryAlloc / 8)) {
+    ::arrow::internal::SmallVector<uint8_t*, 32> dest_streams;
+    dest_streams.resize(width);
+    for (int stream = 0; stream < width; ++stream) {
+      dest_streams[stream] = &out[stream * num_values];
+    }
+    DoSplitStreams(raw_values, width, num_values, dest_streams.data());
+  } else {
+    // Slow path to avoid an oversized `dest_streams` container above.
+    for (int stream = 0; stream < width; ++stream) {
+      uint8_t* dest_stream = &out[stream * num_values];
+      for (int64_t i = 0; i < num_values; ++i) {
+        dest_stream[i] = raw_values[stream + i * width];
+      }
+    }
   }
-  DoSplitStreams(raw_values, width, num_values, dest_streams.data());
 }
 
 template <int kNumStreams>
@@ -445,12 +459,22 @@ void ByteStreamSplitDecodeScalar(const uint8_t* data, int 
width, int64_t num_val
 inline void ByteStreamSplitDecodeScalarDynamic(const uint8_t* data, int width,
                                                int64_t num_values, int64_t 
stride,
                                                uint8_t* out) {
-  ::arrow::internal::SmallVector<const uint8_t*, 16> src_streams;
-  src_streams.resize(width);
-  for (int stream = 0; stream < width; ++stream) {
-    src_streams[stream] = &data[stream * stride];
+  if (ARROW_PREDICT_TRUE(width < kByteStreamSplitMaxTemporaryAlloc / 8)) {
+    ::arrow::internal::SmallVector<const uint8_t*, 32> src_streams;
+    src_streams.resize(width);
+    for (int stream = 0; stream < width; ++stream) {
+      src_streams[stream] = &data[stream * stride];
+    }
+    DoMergeStreams(src_streams.data(), width, num_values, out);
+  } else {
+    // Slow path to avoid an oversized `src_streams` container above.
+    for (int stream = 0; stream < width; ++stream) {
+      const uint8_t* src_stream = &data[stream * stride];
+      for (int64_t i = 0; i < num_values; ++i) {
+        out[stream + i * width] = src_stream[i];
+      }
+    }
   }
-  DoMergeStreams(src_streams.data(), width, num_values, out);
 }
 
 template <int kNumStreams>
diff --git a/cpp/src/arrow/util/byte_stream_split_test.cc 
b/cpp/src/arrow/util/byte_stream_split_test.cc
index 13a99d937c..be678c151f 100644
--- a/cpp/src/arrow/util/byte_stream_split_test.cc
+++ b/cpp/src/arrow/util/byte_stream_split_test.cc
@@ -195,7 +195,7 @@ class TestByteStreamSplitSpecialized : public 
::testing::Test {
 TYPED_TEST_SUITE(TestByteStreamSplitSpecialized, ByteStreamSplitTypes);
 
 TYPED_TEST(TestByteStreamSplitSpecialized, RoundtripSmall) {
-  for (int64_t num_values : {1, 5, 7, 12, 19, 31, 32}) {
+  for (int64_t num_values : {0, 1, 5, 7, 12, 19, 31, 32}) {
     this->TestRoundtrip(num_values);
   }
 }
@@ -210,4 +210,13 @@ TYPED_TEST(TestByteStreamSplitSpecialized, 
PiecewiseDecode) {
   this->TestPiecewiseDecode(/*num_values=*/500);
 }
 
+class TestByteStreamSplitLargeWidth
+    : public TestByteStreamSplitSpecialized<std::array<uint8_t, 3000>> {};
+
+TEST_F(TestByteStreamSplitLargeWidth, Roundtrip) {
+  for (int64_t num_values : {0, 1, 5, 100}) {
+    this->TestRoundtrip(num_values);
+  }
+}
+
 }  // namespace arrow::util::internal
diff --git a/cpp/src/parquet/decoder.cc b/cpp/src/parquet/decoder.cc
index 50ce510bb1..c4d3fe5a8a 100644
--- a/cpp/src/parquet/decoder.cc
+++ b/cpp/src/parquet/decoder.cc
@@ -2307,11 +2307,13 @@ class ByteStreamSplitDecoderBase : public 
TypedDecoderImpl<DType> {
  protected:
   int DecodeRaw(uint8_t* out_buffer, int max_values) {
     const int values_to_decode = std::min(this->num_values_, max_values);
-    ::arrow::util::internal::ByteStreamSplitDecode(this->data_, 
this->type_length_,
-                                                   values_to_decode, stride_, 
out_buffer);
-    this->data_ += values_to_decode;
-    this->num_values_ -= values_to_decode;
-    this->len_ -= this->type_length_ * values_to_decode;
+    if (ARROW_PREDICT_TRUE(values_to_decode > 0)) {
+      ::arrow::util::internal::ByteStreamSplitDecode(
+          this->data_, this->type_length_, values_to_decode, stride_, 
out_buffer);
+      this->data_ += values_to_decode;
+      this->num_values_ -= values_to_decode;
+      this->len_ -= this->type_length_ * values_to_decode;
+    }
     return values_to_decode;
   }
 
diff --git a/cpp/src/parquet/encoder.cc b/cpp/src/parquet/encoder.cc
index 2c70f63b6b..3e469df277 100644
--- a/cpp/src/parquet/encoder.cc
+++ b/cpp/src/parquet/encoder.cc
@@ -875,10 +875,12 @@ class ByteStreamSplitEncoderBase : public EncoderImpl,
       return buf;
     }
     auto output_buffer = AllocateBuffer(this->memory_pool(), 
EstimatedDataEncodedSize());
-    uint8_t* output_buffer_raw = output_buffer->mutable_data();
-    const uint8_t* raw_values = sink_.data();
-    ::arrow::util::internal::ByteStreamSplitEncode(
-        raw_values, /*width=*/byte_width_, num_values_in_buffer_, 
output_buffer_raw);
+    if (num_values_in_buffer_ > 0) {
+      uint8_t* output_buffer_raw = output_buffer->mutable_data();
+      const uint8_t* raw_values = sink_.data();
+      ::arrow::util::internal::ByteStreamSplitEncode(
+          raw_values, /*width=*/byte_width_, num_values_in_buffer_, 
output_buffer_raw);
+    }
     sink_.Reset();
     num_values_in_buffer_ = 0;
     return output_buffer;
diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc
index 9c88eb468a..831829e4a2 100644
--- a/cpp/src/parquet/encoding_test.cc
+++ b/cpp/src/parquet/encoding_test.cc
@@ -1705,7 +1705,7 @@ TYPED_TEST(TestByteStreamSplitEncoding, RoundTripSpace) {
 
   for (auto null_prob : {0.001, 0.1, 0.5, 0.9, 0.999}) {
     // Test with both size and offset up to 3 Simd block
-    for (auto i = 1; i < kSimdSize * 3; i++) {
+    for (auto i = 0; i < kSimdSize * 3; i++) {
       ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(i, 1, 0, null_prob));
       ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(i, 1, i + 1, null_prob));
     }
diff --git a/testing b/testing
index 4aeaf00ad3..9bf001b7d6 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit 4aeaf00ad3e726d37852e5be0d3e1bfb7ddc18f9
+Subproject commit 9bf001b7d6ad318e222c06e760bccf480f2f550c

Reply via email to