This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 055c2f4e91 GH-47973: [C++][Parquet] Fix invalid Parquet files written 
when dictionary encoded pages are large (#47998)
055c2f4e91 is described below

commit 055c2f4e91c63593aacab38250ac9da899cabb31
Author: Adam Reeve <[email protected]>
AuthorDate: Fri Oct 31 21:08:36 2025 +1300

    GH-47973: [C++][Parquet] Fix invalid Parquet files written when dictionary 
encoded pages are large (#47998)
    
    ### Rationale for this change
    
    Prevents silently writing invalid data when using dictionary encoding and 
the number of bits in the estimated max buffer size is greater than the max 
int32 value.
    
    Also fixes an overflow resulting in a "Negative buffer resize" error if the 
buffer size in bytes is greater than max int32, and instead throw a more 
helpful exception.
    
    ### What changes are included in this PR?
    
    * Fix overflow when computing the bit position in `BitWriter::PutValue`. 
This overflow would cause the method to return without writing data, and the 
return value is only checked in debug builds.
    * Change buffer size calculations to use int64 and check for overflow 
before casting to int
    
    ### Are these changes tested?
    
    Yes, I've added unit tests for both issues. These require enabling 
`ARROW_LARGE_MEMORY_TESTS` as they allocate a lot of memory.
    
    ### Are there any user-facing changes?
    
    **This PR contains a "Critical Fix".**
    
    This fixes a bug where invalid Parquet files can be silently written when 
the buffer size for dictionary indices is large.
    
    * GitHub Issue: #47973
    
    Authored-by: Adam Reeve <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/arrow/util/bit_stream_utils_internal.h |   6 +-
 cpp/src/arrow/util/rle_encoding_internal.h     |  28 +++----
 cpp/src/arrow/util/rle_encoding_test.cc        |   5 +-
 cpp/src/parquet/column_writer.cc               |  13 ++-
 cpp/src/parquet/column_writer_test.cc          | 107 +++++++++++++++++++++++++
 cpp/src/parquet/encoder.cc                     |  32 +++++---
 6 files changed, 158 insertions(+), 33 deletions(-)

diff --git a/cpp/src/arrow/util/bit_stream_utils_internal.h 
b/cpp/src/arrow/util/bit_stream_utils_internal.h
index 1057a0bf38..d8c7317fe8 100644
--- a/cpp/src/arrow/util/bit_stream_utils_internal.h
+++ b/cpp/src/arrow/util/bit_stream_utils_internal.h
@@ -58,7 +58,7 @@ class BitWriter {
   int buffer_len() const { return max_bytes_; }
 
   /// Writes a value to buffered_values_, flushing to buffer_ if necessary.  
This is bit
-  /// packed.  Returns false if there was not enough space. num_bits must be 
<= 32.
+  /// packed.  Returns false if there was not enough space. num_bits must be 
<= 64.
   bool PutValue(uint64_t v, int num_bits);
 
   /// Writes v to the next aligned byte using num_bytes. If T is larger than
@@ -197,7 +197,9 @@ inline bool BitWriter::PutValue(uint64_t v, int num_bits) {
     ARROW_DCHECK_EQ(v >> num_bits, 0) << "v = " << v << ", num_bits = " << 
num_bits;
   }
 
-  if (ARROW_PREDICT_FALSE(byte_offset_ * 8 + bit_offset_ + num_bits > 
max_bytes_ * 8))
+  if (ARROW_PREDICT_FALSE(static_cast<int64_t>(byte_offset_) * 8 + bit_offset_ 
+
+                              num_bits >
+                          static_cast<int64_t>(max_bytes_) * 8))
     return false;
 
   buffered_values_ |= v << bit_offset_;
diff --git a/cpp/src/arrow/util/rle_encoding_internal.h 
b/cpp/src/arrow/util/rle_encoding_internal.h
index 6b2782da31..50193d8903 100644
--- a/cpp/src/arrow/util/rle_encoding_internal.h
+++ b/cpp/src/arrow/util/rle_encoding_internal.h
@@ -513,7 +513,7 @@ class RleBitPackedEncoder {
       : bit_width_(bit_width), bit_writer_(buffer, buffer_len) {
     ARROW_DCHECK_GE(bit_width_, 0);
     ARROW_DCHECK_LE(bit_width_, 64);
-    max_run_byte_size_ = MinBufferSize(bit_width);
+    max_run_byte_size_ = static_cast<int>(MinBufferSize(bit_width));
     ARROW_DCHECK_GE(buffer_len, max_run_byte_size_) << "Input buffer not big 
enough.";
     Clear();
   }
@@ -521,32 +521,30 @@ class RleBitPackedEncoder {
   /// Returns the minimum buffer size needed to use the encoder for 'bit_width'
   /// This is the maximum length of a single run for 'bit_width'.
   /// It is not valid to pass a buffer less than this length.
-  static int MinBufferSize(int bit_width) {
+  static int64_t MinBufferSize(int bit_width) {
     // 1 indicator byte and MAX_VALUES_PER_LITERAL_RUN 'bit_width' values.
-    int max_literal_run_size = 1 + 
static_cast<int>(::arrow::bit_util::BytesForBits(
-                                       MAX_VALUES_PER_LITERAL_RUN * 
bit_width));
+    int64_t max_literal_run_size =
+        1 + ::arrow::bit_util::BytesForBits(MAX_VALUES_PER_LITERAL_RUN * 
bit_width);
     // Up to kMaxVlqByteLength indicator and a single 'bit_width' value.
-    int max_repeated_run_size =
-        bit_util::kMaxLEB128ByteLenFor<int32_t> +
-        static_cast<int>(::arrow::bit_util::BytesForBits(bit_width));
+    int64_t max_repeated_run_size = bit_util::kMaxLEB128ByteLenFor<int32_t> +
+                                    ::arrow::bit_util::BytesForBits(bit_width);
     return std::max(max_literal_run_size, max_repeated_run_size);
   }
 
   /// Returns the maximum byte size it could take to encode 'num_values'.
-  static int MaxBufferSize(int bit_width, int num_values) {
+  static int64_t MaxBufferSize(int bit_width, int64_t num_values) {
     // For a bit_width > 1, the worst case is the repetition of "literal run 
of length 8
     // and then a repeated run of length 8".
     // 8 values per smallest run, 8 bits per byte
-    int bytes_per_run = bit_width;
-    int num_runs = static_cast<int>(::arrow::bit_util::CeilDiv(num_values, 8));
-    int literal_max_size = num_runs + num_runs * bytes_per_run;
+    int64_t bytes_per_run = bit_width;
+    int64_t num_runs = ::arrow::bit_util::CeilDiv(num_values, 8);
+    int64_t literal_max_size = num_runs + num_runs * bytes_per_run;
 
     // In the very worst case scenario, the data is a concatenation of repeated
     // runs of 8 values. Repeated run has a 1 byte varint followed by the
     // bit-packed repeated value
-    int min_repeated_run_size =
-        1 + static_cast<int>(::arrow::bit_util::BytesForBits(bit_width));
-    int repeated_max_size = num_runs * min_repeated_run_size;
+    int64_t min_repeated_run_size = 1 + 
::arrow::bit_util::BytesForBits(bit_width);
+    int64_t repeated_max_size = num_runs * min_repeated_run_size;
 
     return std::max(literal_max_size, repeated_max_size);
   }
@@ -1432,7 +1430,7 @@ inline int RleBitPackedEncoder::Flush() {
 }
 
 inline void RleBitPackedEncoder::CheckBufferFull() {
-  int bytes_written = bit_writer_.bytes_written();
+  int64_t bytes_written = bit_writer_.bytes_written();
   if (bytes_written + max_run_byte_size_ > bit_writer_.buffer_len()) {
     buffer_full_ = true;
   }
diff --git a/cpp/src/arrow/util/rle_encoding_test.cc 
b/cpp/src/arrow/util/rle_encoding_test.cc
index f3a14af441..453fa78ea4 100644
--- a/cpp/src/arrow/util/rle_encoding_test.cc
+++ b/cpp/src/arrow/util/rle_encoding_test.cc
@@ -813,7 +813,7 @@ TEST(BitRle, RepeatedPattern) {
 
 TEST(BitRle, Overflow) {
   for (int bit_width = 1; bit_width < 32; bit_width += 3) {
-    int len = RleBitPackedEncoder::MinBufferSize(bit_width);
+    int len = static_cast<int>(RleBitPackedEncoder::MinBufferSize(bit_width));
     std::vector<uint8_t> buffer(len);
     int num_added = 0;
     bool parity = true;
@@ -861,7 +861,8 @@ void CheckRoundTrip(const Array& data, int bit_width, bool 
spaced, int32_t parts
   const int data_size = static_cast<int>(data.length());
   const int data_values_count =
       static_cast<int>(data.length() - spaced * data.null_count());
-  const int buffer_size = RleBitPackedEncoder::MaxBufferSize(bit_width, 
data_size);
+  const int buffer_size =
+      static_cast<int>(RleBitPackedEncoder::MaxBufferSize(bit_width, 
data_size));
   ASSERT_GE(parts, 1);
   ASSERT_LE(parts, data_size);
 
diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index 22c36531cd..94b67dfa80 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -185,7 +185,7 @@ void LevelEncoder::Init(Encoding::type encoding, int16_t 
max_level,
 int LevelEncoder::MaxBufferSize(Encoding::type encoding, int16_t max_level,
                                 int num_buffered_values) {
   int bit_width = bit_util::Log2(max_level + 1);
-  int num_bytes = 0;
+  int64_t num_bytes = 0;
   switch (encoding) {
     case Encoding::RLE: {
       // TODO: Due to the way we currently check if the buffer is full enough,
@@ -195,14 +195,19 @@ int LevelEncoder::MaxBufferSize(Encoding::type encoding, 
int16_t max_level,
       break;
     }
     case Encoding::BIT_PACKED: {
-      num_bytes =
-          static_cast<int>(bit_util::BytesForBits(num_buffered_values * 
bit_width));
+      num_bytes = bit_util::BytesForBits(num_buffered_values * bit_width);
       break;
     }
     default:
       throw ParquetException("Unknown encoding type for levels.");
   }
-  return num_bytes;
+  if (num_bytes > std::numeric_limits<int>::max()) {
+    std::stringstream ss;
+    ss << "Maximum buffer size for LevelEncoder (" << num_bytes
+       << ") is greater than the maximum int32 value";
+    throw ParquetException(ss.str());
+  }
+  return static_cast<int>(num_bytes);
 }
 
 int LevelEncoder::Encode(int batch_size, const int16_t* levels) {
diff --git a/cpp/src/parquet/column_writer_test.cc 
b/cpp/src/parquet/column_writer_test.cc
index 48cac04f07..dedf25abca 100644
--- a/cpp/src/parquet/column_writer_test.cc
+++ b/cpp/src/parquet/column_writer_test.cc
@@ -1034,6 +1034,113 @@ TEST_F(TestValuesWriterInt32Type, 
PagesSplitWithListAlignedWrites) {
   ASSERT_EQ(values_out_, values_);
 }
 
+// Test writing a dictionary encoded page where the number of
+// bits is greater than max int32.
+// For https://github.com/apache/arrow/issues/47973
+TEST(TestColumnWriter, LARGE_MEMORY_TEST(WriteLargeDictEncodedPage)) {
+  auto sink = CreateOutputStream();
+  auto schema = std::static_pointer_cast<GroupNode>(
+      GroupNode::Make("schema", Repetition::REQUIRED,
+                      {
+                          PrimitiveNode::Make("item", Repetition::REQUIRED, 
Type::INT32),
+                      }));
+  auto properties =
+      WriterProperties::Builder().data_pagesize(1024 * 1024 * 1024)->build();
+  auto file_writer = ParquetFileWriter::Open(sink, schema, properties);
+  auto rg_writer = file_writer->AppendRowGroup();
+
+  constexpr int64_t num_batches = 150;
+  constexpr int64_t batch_size = 1'000'000;
+  constexpr int64_t unique_count = 200'000;
+  static_assert(batch_size % unique_count == 0);
+
+  std::vector<int32_t> values(batch_size, 0);
+  for (int64_t i = 0; i < batch_size; i++) {
+    values[i] = static_cast<int32_t>(i % unique_count);
+  }
+
+  auto col_writer = 
dynamic_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
+  for (int64_t i = 0; i < num_batches; i++) {
+    col_writer->WriteBatch(batch_size, nullptr, nullptr, values.data());
+  }
+  file_writer->Close();
+
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  auto file_reader = ParquetFileReader::Open(
+      std::make_shared<::arrow::io::BufferReader>(buffer), 
default_reader_properties());
+  auto metadata = file_reader->metadata();
+  ASSERT_EQ(1, metadata->num_row_groups());
+  auto row_group_reader = file_reader->RowGroup(0);
+
+  // Verify page size property was applied and only 1 data page was written
+  auto page_reader = row_group_reader->GetColumnPageReader(0);
+  int64_t page_count = 0;
+  while (true) {
+    auto page = page_reader->NextPage();
+    if (page == nullptr) {
+      break;
+    }
+    if (page_count == 0) {
+      ASSERT_EQ(page->type(), PageType::DICTIONARY_PAGE);
+    } else {
+      ASSERT_EQ(page->type(), PageType::DATA_PAGE);
+    }
+    page_count++;
+  }
+  ASSERT_EQ(page_count, 2);
+
+  auto col_reader = 
std::static_pointer_cast<Int32Reader>(row_group_reader->Column(0));
+
+  constexpr int64_t buffer_size = 1024 * 1024;
+  values.resize(buffer_size);
+
+  // Verify values were round-tripped correctly
+  int64_t levels_read = 0;
+  while (levels_read < num_batches * batch_size) {
+    int64_t batch_values;
+    int64_t batch_levels = col_reader->ReadBatch(buffer_size, nullptr, nullptr,
+                                                 values.data(), &batch_values);
+    for (int64_t i = 0; i < batch_levels; i++) {
+      ASSERT_EQ(values[i], (levels_read + i) % unique_count);
+    }
+    levels_read += batch_levels;
+  }
+}
+
+TEST(TestColumnWriter, LARGE_MEMORY_TEST(ThrowsOnDictIndicesTooLarge)) {
+  auto sink = CreateOutputStream();
+  auto schema = std::static_pointer_cast<GroupNode>(
+      GroupNode::Make("schema", Repetition::REQUIRED,
+                      {
+                          PrimitiveNode::Make("item", Repetition::REQUIRED, 
Type::INT32),
+                      }));
+  auto properties =
+      WriterProperties::Builder().data_pagesize(4 * 1024LL * 1024 * 
1024)->build();
+  auto file_writer = ParquetFileWriter::Open(sink, schema, properties);
+  auto rg_writer = file_writer->AppendRowGroup();
+
+  constexpr int64_t num_batches = 1'000;
+  constexpr int64_t batch_size = 1'000'000;
+  constexpr int64_t unique_count = 200'000;
+  static_assert(batch_size % unique_count == 0);
+
+  std::vector<int32_t> values(batch_size, 0);
+  for (int64_t i = 0; i < batch_size; i++) {
+    values[i] = static_cast<int32_t>(i % unique_count);
+  }
+
+  auto col_writer = 
dynamic_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
+  for (int64_t i = 0; i < num_batches; i++) {
+    col_writer->WriteBatch(batch_size, nullptr, nullptr, values.data());
+  }
+
+  EXPECT_THROW_THAT(
+      [&]() { file_writer->Close(); }, ParquetException,
+      ::testing::Property(&ParquetException::what,
+                          ::testing::HasSubstr("exceeds maximum int value")));
+}
+
 TEST(TestPageWriter, ThrowsOnPagesTooLarge) {
   NodePtr item = schema::Int32("item");  // optional item
   NodePtr list(GroupNode::Make("b", Repetition::REPEATED, {item}, 
ConvertedType::LIST));
diff --git a/cpp/src/parquet/encoder.cc b/cpp/src/parquet/encoder.cc
index f9367555d9..04f079ce70 100644
--- a/cpp/src/parquet/encoder.cc
+++ b/cpp/src/parquet/encoder.cc
@@ -438,7 +438,7 @@ struct DictEncoderTraits<FLBAType> {
 // Initially 1024 elements
 static constexpr int32_t kInitialHashTableSize = 1 << 10;
 
-int RlePreserveBufferSize(int num_values, int bit_width) {
+int64_t RlePreserveBufferSize(int64_t num_values, int bit_width) {
   // Note: because of the way RleEncoder::CheckBufferFull()
   // is called, we have to reserve an extra "RleEncoder::MinBufferSize"
   // bytes. These extra bytes won't be used but not reserving them
@@ -496,7 +496,8 @@ class DictEncoderImpl : public EncoderImpl, virtual public 
DictEncoder<DType> {
   /// indices. Used to size the buffer passed to WriteIndices().
   int64_t EstimatedDataEncodedSize() override {
     return kDataPageBitWidthBytes +
-           RlePreserveBufferSize(static_cast<int>(buffered_indices_.size()), 
bit_width());
+           
RlePreserveBufferSize(static_cast<int64_t>(buffered_indices_.size()),
+                                 bit_width());
   }
 
   /// The minimum bit width required to encode the currently buffered indices.
@@ -582,10 +583,15 @@ class DictEncoderImpl : public EncoderImpl, virtual 
public DictEncoder<DType> {
   }
 
   std::shared_ptr<Buffer> FlushValues() override {
-    std::shared_ptr<ResizableBuffer> buffer =
-        AllocateBuffer(this->pool_, EstimatedDataEncodedSize());
-    int result_size = WriteIndices(buffer->mutable_data(),
-                                   
static_cast<int>(EstimatedDataEncodedSize()));
+    const int64_t buffer_size = EstimatedDataEncodedSize();
+    if (buffer_size > std::numeric_limits<int>::max()) {
+      std::stringstream ss;
+      ss << "Buffer size for DictEncoder (" << buffer_size
+         << ") exceeds maximum int value";
+      throw ParquetException(ss.str());
+    }
+    std::shared_ptr<ResizableBuffer> buffer = AllocateBuffer(this->pool_, 
buffer_size);
+    int result_size = WriteIndices(buffer->mutable_data(), 
static_cast<int>(buffer_size));
     PARQUET_THROW_NOT_OK(buffer->Resize(result_size, false));
     return buffer;
   }
@@ -1690,8 +1696,8 @@ class RleBooleanEncoder final : public EncoderImpl, 
virtual public BooleanEncode
   template <typename SequenceType>
   void PutImpl(const SequenceType& src, int num_values);
 
-  int MaxRleBufferSize() const noexcept {
-    return 
RlePreserveBufferSize(static_cast<int>(buffered_append_values_.size()),
+  int64_t MaxRleBufferSize() const noexcept {
+    return 
RlePreserveBufferSize(static_cast<int64_t>(buffered_append_values_.size()),
                                  kBitWidth);
   }
 
@@ -1719,11 +1725,17 @@ void RleBooleanEncoder::PutImpl(const SequenceType& 
src, int num_values) {
 }
 
 std::shared_ptr<Buffer> RleBooleanEncoder::FlushValues() {
-  int rle_buffer_size_max = MaxRleBufferSize();
+  int64_t rle_buffer_size_max = MaxRleBufferSize();
+  if (rle_buffer_size_max > std::numeric_limits<int>::max()) {
+    std::stringstream ss;
+    ss << "Buffer size for RleBooleanEncoder (" << rle_buffer_size_max
+       << ") exceeds maximum int value";
+    throw ParquetException(ss.str());
+  }
   std::shared_ptr<ResizableBuffer> buffer =
       AllocateBuffer(this->pool_, rle_buffer_size_max + kRleLengthInBytes);
   ::arrow::util::RleBitPackedEncoder encoder(buffer->mutable_data() + 
kRleLengthInBytes,
-                                             rle_buffer_size_max,
+                                             
static_cast<int>(rle_buffer_size_max),
                                              /*bit_width*/ kBitWidth);
 
   for (bool value : buffered_append_values_) {

Reply via email to