This is an automated email from the ASF dual-hosted git repository.
pitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new a3f3d8bb0b GH-49959: [C++][Parquet] Avoid unbounded temp alloc in
BYTE_STREAM_SPLIT decoder (#49960)
a3f3d8bb0b is described below
commit a3f3d8bb0b5386d3c5390db0575398942d156a06
Author: Antoine Pitrou <[email protected]>
AuthorDate: Tue May 12 12:09:14 2026 +0200
GH-49959: [C++][Parquet] Avoid unbounded temp alloc in BYTE_STREAM_SPLIT
decoder (#49960)
### Rationale for this change
The BYTE_STREAM_SPLIT encoder and decoder allocate a `SmallVector` to hold
the addresses of the different streams. While this is valid for small number of
streams (which is most or all legitimate use cases), very large numbers of
streams (e.g. FLBA(100000000)) can trigger a huge temporary memory allocation.
This issue was found by OSS-Fuzz:
https://issues.oss-fuzz.com/issues/511575321
### What changes are included in this PR?
1. When the width of the BYTE_STREAM_SPLIT-encoded type is larger than a
predefined constant, switch to a slower implementation that doesn't need any
temporary allocation
2. When encoding or decoding 0 values, avoid some pointless setup work
3. Add tests for the two conditions above
### Are these changes tested?
Yes, by additional unit tests and by additional fuzz regression file.
### Are there any user-facing changes?
* GitHub Issue: #49959
Authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
cpp/src/arrow/util/byte_stream_split_internal.h | 44 +++++++++++++++++++------
cpp/src/arrow/util/byte_stream_split_test.cc | 11 ++++++-
cpp/src/parquet/decoder.cc | 12 ++++---
cpp/src/parquet/encoder.cc | 10 +++---
cpp/src/parquet/encoding_test.cc | 2 +-
testing | 2 +-
6 files changed, 59 insertions(+), 22 deletions(-)
diff --git a/cpp/src/arrow/util/byte_stream_split_internal.h
b/cpp/src/arrow/util/byte_stream_split_internal.h
index e237beb791..2e713dd42f 100644
--- a/cpp/src/arrow/util/byte_stream_split_internal.h
+++ b/cpp/src/arrow/util/byte_stream_split_internal.h
@@ -421,14 +421,28 @@ void ByteStreamSplitEncodeScalar(const uint8_t*
raw_values, int width,
DoSplitStreams(raw_values, kNumStreams, num_values, dest_streams.data());
}
+// If changing this value, please check that TestByteStreamSplitLargeWidth
still
+// exercises the slow path.
+constexpr inline int kByteStreamSplitMaxTemporaryAlloc = 8192;
+
inline void ByteStreamSplitEncodeScalarDynamic(const uint8_t* raw_values, int
width,
const int64_t num_values,
uint8_t* out) {
- ::arrow::internal::SmallVector<uint8_t*, 16> dest_streams;
- dest_streams.resize(width);
- for (int stream = 0; stream < width; ++stream) {
- dest_streams[stream] = &out[stream * num_values];
+ if (ARROW_PREDICT_TRUE(width < kByteStreamSplitMaxTemporaryAlloc / 8)) {
+ ::arrow::internal::SmallVector<uint8_t*, 32> dest_streams;
+ dest_streams.resize(width);
+ for (int stream = 0; stream < width; ++stream) {
+ dest_streams[stream] = &out[stream * num_values];
+ }
+ DoSplitStreams(raw_values, width, num_values, dest_streams.data());
+ } else {
+ // Slow path to avoid an oversized `dest_streams` container above.
+ for (int stream = 0; stream < width; ++stream) {
+ uint8_t* dest_stream = &out[stream * num_values];
+ for (int64_t i = 0; i < num_values; ++i) {
+ dest_stream[i] = raw_values[stream + i * width];
+ }
+ }
}
- DoSplitStreams(raw_values, width, num_values, dest_streams.data());
}
template <int kNumStreams>
@@ -445,12 +459,22 @@ void ByteStreamSplitDecodeScalar(const uint8_t* data, int
width, int64_t num_val
inline void ByteStreamSplitDecodeScalarDynamic(const uint8_t* data, int width,
int64_t num_values, int64_t
stride,
uint8_t* out) {
- ::arrow::internal::SmallVector<const uint8_t*, 16> src_streams;
- src_streams.resize(width);
- for (int stream = 0; stream < width; ++stream) {
- src_streams[stream] = &data[stream * stride];
+ if (ARROW_PREDICT_TRUE(width < kByteStreamSplitMaxTemporaryAlloc / 8)) {
+ ::arrow::internal::SmallVector<const uint8_t*, 32> src_streams;
+ src_streams.resize(width);
+ for (int stream = 0; stream < width; ++stream) {
+ src_streams[stream] = &data[stream * stride];
+ }
+ DoMergeStreams(src_streams.data(), width, num_values, out);
+ } else {
+ // Slow path to avoid an oversized `src_streams` container above.
+ for (int stream = 0; stream < width; ++stream) {
+ const uint8_t* src_stream = &data[stream * stride];
+ for (int64_t i = 0; i < num_values; ++i) {
+ out[stream + i * width] = src_stream[i];
+ }
+ }
}
- DoMergeStreams(src_streams.data(), width, num_values, out);
}
template <int kNumStreams>
diff --git a/cpp/src/arrow/util/byte_stream_split_test.cc
b/cpp/src/arrow/util/byte_stream_split_test.cc
index 13a99d937c..be678c151f 100644
--- a/cpp/src/arrow/util/byte_stream_split_test.cc
+++ b/cpp/src/arrow/util/byte_stream_split_test.cc
@@ -195,7 +195,7 @@ class TestByteStreamSplitSpecialized : public
::testing::Test {
TYPED_TEST_SUITE(TestByteStreamSplitSpecialized, ByteStreamSplitTypes);
TYPED_TEST(TestByteStreamSplitSpecialized, RoundtripSmall) {
- for (int64_t num_values : {1, 5, 7, 12, 19, 31, 32}) {
+ for (int64_t num_values : {0, 1, 5, 7, 12, 19, 31, 32}) {
this->TestRoundtrip(num_values);
}
}
@@ -210,4 +210,13 @@ TYPED_TEST(TestByteStreamSplitSpecialized,
PiecewiseDecode) {
this->TestPiecewiseDecode(/*num_values=*/500);
}
+class TestByteStreamSplitLargeWidth
+ : public TestByteStreamSplitSpecialized<std::array<uint8_t, 3000>> {};
+
+TEST_F(TestByteStreamSplitLargeWidth, Roundtrip) {
+ for (int64_t num_values : {0, 1, 5, 100}) {
+ this->TestRoundtrip(num_values);
+ }
+}
+
} // namespace arrow::util::internal
diff --git a/cpp/src/parquet/decoder.cc b/cpp/src/parquet/decoder.cc
index 50ce510bb1..c4d3fe5a8a 100644
--- a/cpp/src/parquet/decoder.cc
+++ b/cpp/src/parquet/decoder.cc
@@ -2307,11 +2307,13 @@ class ByteStreamSplitDecoderBase : public
TypedDecoderImpl<DType> {
protected:
int DecodeRaw(uint8_t* out_buffer, int max_values) {
const int values_to_decode = std::min(this->num_values_, max_values);
- ::arrow::util::internal::ByteStreamSplitDecode(this->data_,
this->type_length_,
- values_to_decode, stride_,
out_buffer);
- this->data_ += values_to_decode;
- this->num_values_ -= values_to_decode;
- this->len_ -= this->type_length_ * values_to_decode;
+ if (ARROW_PREDICT_TRUE(values_to_decode > 0)) {
+ ::arrow::util::internal::ByteStreamSplitDecode(
+ this->data_, this->type_length_, values_to_decode, stride_,
out_buffer);
+ this->data_ += values_to_decode;
+ this->num_values_ -= values_to_decode;
+ this->len_ -= this->type_length_ * values_to_decode;
+ }
return values_to_decode;
}
diff --git a/cpp/src/parquet/encoder.cc b/cpp/src/parquet/encoder.cc
index 2c70f63b6b..3e469df277 100644
--- a/cpp/src/parquet/encoder.cc
+++ b/cpp/src/parquet/encoder.cc
@@ -875,10 +875,12 @@ class ByteStreamSplitEncoderBase : public EncoderImpl,
return buf;
}
auto output_buffer = AllocateBuffer(this->memory_pool(),
EstimatedDataEncodedSize());
- uint8_t* output_buffer_raw = output_buffer->mutable_data();
- const uint8_t* raw_values = sink_.data();
- ::arrow::util::internal::ByteStreamSplitEncode(
- raw_values, /*width=*/byte_width_, num_values_in_buffer_,
output_buffer_raw);
+ if (num_values_in_buffer_ > 0) {
+ uint8_t* output_buffer_raw = output_buffer->mutable_data();
+ const uint8_t* raw_values = sink_.data();
+ ::arrow::util::internal::ByteStreamSplitEncode(
+ raw_values, /*width=*/byte_width_, num_values_in_buffer_,
output_buffer_raw);
+ }
sink_.Reset();
num_values_in_buffer_ = 0;
return output_buffer;
diff --git a/cpp/src/parquet/encoding_test.cc b/cpp/src/parquet/encoding_test.cc
index 9c88eb468a..831829e4a2 100644
--- a/cpp/src/parquet/encoding_test.cc
+++ b/cpp/src/parquet/encoding_test.cc
@@ -1705,7 +1705,7 @@ TYPED_TEST(TestByteStreamSplitEncoding, RoundTripSpace) {
for (auto null_prob : {0.001, 0.1, 0.5, 0.9, 0.999}) {
// Test with both size and offset up to 3 Simd block
- for (auto i = 1; i < kSimdSize * 3; i++) {
+ for (auto i = 0; i < kSimdSize * 3; i++) {
ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(i, 1, 0, null_prob));
ASSERT_NO_FATAL_FAILURE(this->ExecuteSpaced(i, 1, i + 1, null_prob));
}
diff --git a/testing b/testing
index 4aeaf00ad3..9bf001b7d6 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit 4aeaf00ad3e726d37852e5be0d3e1bfb7ddc18f9
+Subproject commit 9bf001b7d6ad318e222c06e760bccf480f2f550c