This is an automated email from the ASF dual-hosted git repository.

raulcd pushed a commit to branch maint-15.0.x
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 881eec514255e6cdc78e28c390d674d218b86059
Author: Rossi(Ruoxi) Sun <[email protected]>
AuthorDate: Thu Jan 18 19:44:26 2024 +0800

    GH-39332: [C++] Explicit error in ExecBatchBuilder when appending var 
length data exceeds offset limit (int32 max) (#39383)
    
    
    
    ### Rationale for this change
    
    When appending var length data in `ExecBatchBuilder`, the offset is 
potentially to overflow if the batch contains 4GB data or more. This may 
further result in segmentation fault during the subsequent data content 
copying. For details, please refer to this comment: 
https://github.com/apache/arrow/issues/39332#issuecomment-1870690063.
    
    The solution is let user to use the "large" counterpart data type to avoid 
the overflow, but we may need explicit error information when such overflow 
happens.
    
    ### What changes are included in this PR?
    
    1. Detect the offset overflow in appending data in `ExecBatchBuilder` and 
explicitly throw.
    2. Change the offset type from `uint32_t` to `int32_t` in 
`ExecBatchBuilder` and respects the `BinaryBuilder::memory_limit()` which is 
`2GB - 2B` as the rest part of the codebase.
    
    ### Are these changes tested?
    
    UT included.
    
    ### Are there any user-facing changes?
    
    No.
    
    * Closes: #39332
    
    Lead-authored-by: zanmato <[email protected]>
    Co-authored-by: Antoine Pitrou <[email protected]>
    Co-authored-by: Rossi(Ruoxi) Sun <[email protected]>
    Co-authored-by: Antoine Pitrou <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/arrow/compute/light_array.cc      | 47 +++++++++++++----------
 cpp/src/arrow/compute/light_array.h       |  2 +-
 cpp/src/arrow/compute/light_array_test.cc | 64 +++++++++++++++++++++++++++++++
 cpp/src/arrow/testing/generator.cc        |  9 ++++-
 4 files changed, 100 insertions(+), 22 deletions(-)

diff --git a/cpp/src/arrow/compute/light_array.cc 
b/cpp/src/arrow/compute/light_array.cc
index 66d8477b02..b225e04b05 100644
--- a/cpp/src/arrow/compute/light_array.cc
+++ b/cpp/src/arrow/compute/light_array.cc
@@ -20,6 +20,8 @@
 #include <type_traits>
 
 #include "arrow/util/bitmap_ops.h"
+#include "arrow/util/int_util_overflow.h"
+#include "arrow/util/macros.h"
 
 namespace arrow {
 namespace compute {
@@ -325,11 +327,10 @@ Status ResizableArrayData::ResizeVaryingLengthBuffer() {
   column_metadata = ColumnMetadataFromDataType(data_type_).ValueOrDie();
 
   if (!column_metadata.is_fixed_length) {
-    int min_new_size = static_cast<int>(reinterpret_cast<const uint32_t*>(
-        buffers_[kFixedLengthBuffer]->data())[num_rows_]);
+    int64_t min_new_size = 
buffers_[kFixedLengthBuffer]->data_as<int32_t>()[num_rows_];
     ARROW_DCHECK(var_len_buf_size_ > 0);
     if (var_len_buf_size_ < min_new_size) {
-      int new_size = var_len_buf_size_;
+      int64_t new_size = var_len_buf_size_;
       while (new_size < min_new_size) {
         new_size *= 2;
       }
@@ -465,12 +466,11 @@ void ExecBatchBuilder::Visit(const 
std::shared_ptr<ArrayData>& column, int num_r
 
   if (!metadata.is_fixed_length) {
     const uint8_t* ptr_base = column->buffers[2]->data();
-    const uint32_t* offsets =
-        reinterpret_cast<const uint32_t*>(column->buffers[1]->data()) + 
column->offset;
+    const int32_t* offsets = column->GetValues<int32_t>(1);
     for (int i = 0; i < num_rows; ++i) {
       uint16_t row_id = row_ids[i];
       const uint8_t* field_ptr = ptr_base + offsets[row_id];
-      uint32_t field_length = offsets[row_id + 1] - offsets[row_id];
+      int32_t field_length = offsets[row_id + 1] - offsets[row_id];
       process_value_fn(i, field_ptr, field_length);
     }
   } else {
@@ -480,7 +480,7 @@ void ExecBatchBuilder::Visit(const 
std::shared_ptr<ArrayData>& column, int num_r
       const uint8_t* field_ptr =
           column->buffers[1]->data() +
           (column->offset + row_id) * 
static_cast<int64_t>(metadata.fixed_length);
-      process_value_fn(i, field_ptr, metadata.fixed_length);
+      process_value_fn(i, field_ptr, 
static_cast<int32_t>(metadata.fixed_length));
     }
   }
 }
@@ -511,14 +511,14 @@ Status ExecBatchBuilder::AppendSelected(const 
std::shared_ptr<ArrayData>& source
         break;
       case 1:
         Visit(source, num_rows_to_append, row_ids,
-              [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+              [&](int i, const uint8_t* ptr, int32_t num_bytes) {
                 target->mutable_data(1)[num_rows_before + i] = *ptr;
               });
         break;
       case 2:
         Visit(
             source, num_rows_to_append, row_ids,
-            [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+            [&](int i, const uint8_t* ptr, int32_t num_bytes) {
               
reinterpret_cast<uint16_t*>(target->mutable_data(1))[num_rows_before + i] =
                   *reinterpret_cast<const uint16_t*>(ptr);
             });
@@ -526,7 +526,7 @@ Status ExecBatchBuilder::AppendSelected(const 
std::shared_ptr<ArrayData>& source
       case 4:
         Visit(
             source, num_rows_to_append, row_ids,
-            [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+            [&](int i, const uint8_t* ptr, int32_t num_bytes) {
               
reinterpret_cast<uint32_t*>(target->mutable_data(1))[num_rows_before + i] =
                   *reinterpret_cast<const uint32_t*>(ptr);
             });
@@ -534,7 +534,7 @@ Status ExecBatchBuilder::AppendSelected(const 
std::shared_ptr<ArrayData>& source
       case 8:
         Visit(
             source, num_rows_to_append, row_ids,
-            [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+            [&](int i, const uint8_t* ptr, int32_t num_bytes) {
               
reinterpret_cast<uint64_t*>(target->mutable_data(1))[num_rows_before + i] =
                   *reinterpret_cast<const uint64_t*>(ptr);
             });
@@ -544,7 +544,7 @@ Status ExecBatchBuilder::AppendSelected(const 
std::shared_ptr<ArrayData>& source
             num_rows_to_append -
             NumRowsToSkip(source, num_rows_to_append, row_ids, 
sizeof(uint64_t));
         Visit(source, num_rows_to_process, row_ids,
-              [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+              [&](int i, const uint8_t* ptr, int32_t num_bytes) {
                 uint64_t* dst = reinterpret_cast<uint64_t*>(
                     target->mutable_data(1) +
                     static_cast<int64_t>(num_bytes) * (num_rows_before + i));
@@ -558,7 +558,7 @@ Status ExecBatchBuilder::AppendSelected(const 
std::shared_ptr<ArrayData>& source
         if (num_rows_to_append > num_rows_to_process) {
           Visit(source, num_rows_to_append - num_rows_to_process,
                 row_ids + num_rows_to_process,
-                [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+                [&](int i, const uint8_t* ptr, int32_t num_bytes) {
                   uint64_t* dst = reinterpret_cast<uint64_t*>(
                       target->mutable_data(1) +
                       static_cast<int64_t>(num_bytes) *
@@ -575,16 +575,23 @@ Status ExecBatchBuilder::AppendSelected(const 
std::shared_ptr<ArrayData>& source
 
     // Step 1: calculate target offsets
     //
-    uint32_t* offsets = reinterpret_cast<uint32_t*>(target->mutable_data(1));
-    uint32_t sum = num_rows_before == 0 ? 0 : offsets[num_rows_before];
+    int32_t* offsets = reinterpret_cast<int32_t*>(target->mutable_data(1));
+    int32_t sum = num_rows_before == 0 ? 0 : offsets[num_rows_before];
     Visit(source, num_rows_to_append, row_ids,
-          [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+          [&](int i, const uint8_t* ptr, int32_t num_bytes) {
             offsets[num_rows_before + i] = num_bytes;
           });
     for (int i = 0; i < num_rows_to_append; ++i) {
-      uint32_t length = offsets[num_rows_before + i];
+      int32_t length = offsets[num_rows_before + i];
       offsets[num_rows_before + i] = sum;
-      sum += length;
+      int32_t new_sum_maybe_overflow = 0;
+      if (ARROW_PREDICT_FALSE(
+              arrow::internal::AddWithOverflow(sum, length, 
&new_sum_maybe_overflow))) {
+        return Status::Invalid("Overflow detected in ExecBatchBuilder when 
appending ",
+                               num_rows_before + i + 1, "-th element of length 
", length,
+                               " bytes to current length ", sum, " bytes");
+      }
+      sum = new_sum_maybe_overflow;
     }
     offsets[num_rows_before + num_rows_to_append] = sum;
 
@@ -598,7 +605,7 @@ Status ExecBatchBuilder::AppendSelected(const 
std::shared_ptr<ArrayData>& source
         num_rows_to_append -
         NumRowsToSkip(source, num_rows_to_append, row_ids, sizeof(uint64_t));
     Visit(source, num_rows_to_process, row_ids,
-          [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+          [&](int i, const uint8_t* ptr, int32_t num_bytes) {
             uint64_t* dst = 
reinterpret_cast<uint64_t*>(target->mutable_data(2) +
                                                         
offsets[num_rows_before + i]);
             const uint64_t* src = reinterpret_cast<const uint64_t*>(ptr);
@@ -608,7 +615,7 @@ Status ExecBatchBuilder::AppendSelected(const 
std::shared_ptr<ArrayData>& source
             }
           });
     Visit(source, num_rows_to_append - num_rows_to_process, row_ids + 
num_rows_to_process,
-          [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+          [&](int i, const uint8_t* ptr, int32_t num_bytes) {
             uint64_t* dst = reinterpret_cast<uint64_t*>(
                 target->mutable_data(2) +
                 offsets[num_rows_before + num_rows_to_process + i]);
diff --git a/cpp/src/arrow/compute/light_array.h 
b/cpp/src/arrow/compute/light_array.h
index 84aa86d64b..67de71bf56 100644
--- a/cpp/src/arrow/compute/light_array.h
+++ b/cpp/src/arrow/compute/light_array.h
@@ -353,7 +353,7 @@ class ARROW_EXPORT ResizableArrayData {
   MemoryPool* pool_;
   int num_rows_;
   int num_rows_allocated_;
-  int var_len_buf_size_;
+  int64_t var_len_buf_size_;
   static constexpr int kMaxBuffers = 3;
   std::shared_ptr<ResizableBuffer> buffers_[kMaxBuffers];
 };
diff --git a/cpp/src/arrow/compute/light_array_test.cc 
b/cpp/src/arrow/compute/light_array_test.cc
index d50e967551..ecc5f3ad37 100644
--- a/cpp/src/arrow/compute/light_array_test.cc
+++ b/cpp/src/arrow/compute/light_array_test.cc
@@ -407,6 +407,70 @@ TEST(ExecBatchBuilder, AppendValuesBeyondLimit) {
   ASSERT_EQ(0, pool->bytes_allocated());
 }
 
+TEST(ExecBatchBuilder, AppendVarLengthBeyondLimit) {
+  // GH-39332: check appending variable-length data past 2GB.
+  if constexpr (sizeof(void*) == 4) {
+    GTEST_SKIP() << "Test only works on 64-bit platforms";
+  }
+
+  std::unique_ptr<MemoryPool> owned_pool = MemoryPool::CreateDefault();
+  MemoryPool* pool = owned_pool.get();
+  constexpr auto eight_mb = 8 * 1024 * 1024;
+  constexpr auto eight_mb_minus_one = eight_mb - 1;
+  // String of size 8mb to repetitively fill the heading multiple of 8mbs of 
an array
+  // of int32_max bytes.
+  std::string str_8mb(eight_mb, 'a');
+  // String of size (8mb - 1) to be the last element of an array of int32_max 
bytes.
+  std::string str_8mb_minus_1(eight_mb_minus_one, 'b');
+  std::shared_ptr<Array> values_8mb = ConstantArrayGenerator::String(1, 
str_8mb);
+  std::shared_ptr<Array> values_8mb_minus_1 =
+      ConstantArrayGenerator::String(1, str_8mb_minus_1);
+
+  ExecBatch batch_8mb({values_8mb}, 1);
+  ExecBatch batch_8mb_minus_1({values_8mb_minus_1}, 1);
+
+  auto num_rows = std::numeric_limits<int32_t>::max() / eight_mb;
+  std::vector<uint16_t> body_row_ids(num_rows, 0);
+  std::vector<uint16_t> tail_row_id(1, 0);
+
+  {
+    // Building an array of (int32_max + 1) = (8mb * num_rows + 8mb) bytes 
should raise an
+    // error of overflow.
+    ExecBatchBuilder builder;
+    ASSERT_OK(builder.AppendSelected(pool, batch_8mb, num_rows, 
body_row_ids.data(),
+                                     /*num_cols=*/1));
+    std::stringstream ss;
+    ss << "Invalid: Overflow detected in ExecBatchBuilder when appending " << 
num_rows + 1
+       << "-th element of length " << eight_mb << " bytes to current length "
+       << eight_mb * num_rows << " bytes";
+    ASSERT_RAISES_WITH_MESSAGE(
+        Invalid, ss.str(),
+        builder.AppendSelected(pool, batch_8mb, 1, tail_row_id.data(),
+                               /*num_cols=*/1));
+  }
+
+  {
+    // Building an array of int32_max = (8mb * num_rows + 8mb - 1) bytes 
should succeed.
+    ExecBatchBuilder builder;
+    ASSERT_OK(builder.AppendSelected(pool, batch_8mb, num_rows, 
body_row_ids.data(),
+                                     /*num_cols=*/1));
+    ASSERT_OK(builder.AppendSelected(pool, batch_8mb_minus_1, 1, 
tail_row_id.data(),
+                                     /*num_cols=*/1));
+    ExecBatch built = builder.Flush();
+    auto datum = built[0];
+    ASSERT_TRUE(datum.is_array());
+    auto array = datum.array_as<StringArray>();
+    ASSERT_EQ(array->length(), num_rows + 1);
+    for (int i = 0; i < num_rows; ++i) {
+      ASSERT_EQ(array->GetString(i), str_8mb);
+    }
+    ASSERT_EQ(array->GetString(num_rows), str_8mb_minus_1);
+    ASSERT_NE(0, pool->bytes_allocated());
+  }
+
+  ASSERT_EQ(0, pool->bytes_allocated());
+}
+
 TEST(KeyColumnArray, FromExecBatch) {
   ExecBatch batch =
       JSONToExecBatch({int64(), boolean()}, "[[1, true], [2, false], [null, 
null]]");
diff --git a/cpp/src/arrow/testing/generator.cc 
b/cpp/src/arrow/testing/generator.cc
index 36c88c20ef..5ea6a541e8 100644
--- a/cpp/src/arrow/testing/generator.cc
+++ b/cpp/src/arrow/testing/generator.cc
@@ -38,6 +38,7 @@
 #include "arrow/type.h"
 #include "arrow/type_traits.h"
 #include "arrow/util/checked_cast.h"
+#include "arrow/util/logging.h"
 #include "arrow/util/macros.h"
 #include "arrow/util/string.h"
 
@@ -103,7 +104,13 @@ std::shared_ptr<arrow::Array> 
ConstantArrayGenerator::Float64(int64_t size,
 
 std::shared_ptr<arrow::Array> ConstantArrayGenerator::String(int64_t size,
                                                              std::string 
value) {
-  return ConstantArray<StringType>(size, value);
+  using BuilderType = typename TypeTraits<StringType>::BuilderType;
+  auto type = TypeTraits<StringType>::type_singleton();
+  auto builder_fn = [&](BuilderType* builder) {
+    DCHECK_OK(builder->Append(std::string_view(value.data())));
+  };
+  return ArrayFromBuilderVisitor(type, value.size() * size, size, builder_fn)
+      .ValueOrDie();
 }
 
 std::shared_ptr<arrow::Array> ConstantArrayGenerator::Zeroes(

Reply via email to