This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new a2a3b9580d GH-36708: [C++] Fully calculate null-counts so the REE 
allocations make sense (#36740)
a2a3b9580d is described below

commit a2a3b9580d66cf40c5df8358f260fdaf5ccc301f
Author: Felipe Oliveira Carvalho <[email protected]>
AuthorDate: Sat Jul 22 02:16:16 2023 -0300

    GH-36708: [C++] Fully calculate null-counts so the REE allocations make 
sense (#36740)
    
    ### Rationale for this change
    
    When `has_validity_buffer` is true, we expect validity buffers to be 
allocated, but if null_count is calculated and ends up being 0, 
`ArrayData::Make()` will sneakily remove the validity buffer from the physical 
array for us and the assumption that it exists stops holding and causes a crash.
    
    Forcing `null_count` calculation with `input.GetNullCount()` ensures 
`has_validity_buffer` won't be `true` if the `null_count` on the input ends up 
being 0.
    
    ### What changes are included in this PR?
    
    The fix and tests to reproduce it.
    
    ### Are these changes tested?
    
    Yes.
    
    ### Are there any user-facing changes?
    
    No.
    * Closes: #36708
    
    Authored-by: Felipe Oliveira Carvalho <[email protected]>
    Signed-off-by: Sutou Kouhei <[email protected]>
---
 cpp/src/arrow/compute/kernels/ree_util_internal.cc |  18 ++--
 cpp/src/arrow/compute/kernels/ree_util_internal.h  |  27 +++++-
 .../arrow/compute/kernels/vector_run_end_encode.cc |  25 +++--
 .../compute/kernels/vector_run_end_encode_test.cc  | 102 +++++++++++++--------
 4 files changed, 113 insertions(+), 59 deletions(-)

diff --git a/cpp/src/arrow/compute/kernels/ree_util_internal.cc 
b/cpp/src/arrow/compute/kernels/ree_util_internal.cc
index 00c885f6fa..d35c000678 100644
--- a/cpp/src/arrow/compute/kernels/ree_util_internal.cc
+++ b/cpp/src/arrow/compute/kernels/ree_util_internal.cc
@@ -59,7 +59,7 @@ Result<std::shared_ptr<ArrayData>> PreallocateRunEndsArray(
 
 Result<std::shared_ptr<ArrayData>> PreallocateValuesArray(
     const std::shared_ptr<DataType>& value_type, bool has_validity_buffer, 
int64_t length,
-    int64_t null_count, MemoryPool* pool, int64_t data_buffer_size) {
+    MemoryPool* pool, int64_t data_buffer_size) {
   std::vector<std::shared_ptr<Buffer>> values_data_buffers;
   std::shared_ptr<Buffer> validity_buffer = NULLPTR;
   if (has_validity_buffer) {
@@ -79,20 +79,22 @@ Result<std::shared_ptr<ArrayData>> PreallocateValuesArray(
   } else {
     values_data_buffers = {std::move(validity_buffer), 
std::move(values_buffer)};
   }
-  return ArrayData::Make(value_type, length, std::move(values_data_buffers), 
null_count);
+  auto data = ArrayData::Make(value_type, length, 
std::move(values_data_buffers),
+                              kUnknownNullCount);
+  DCHECK(!(has_validity_buffer && length > 0) || data->buffers[0]);
+  return data;
 }
 
 Result<std::shared_ptr<ArrayData>> PreallocateREEArray(
     std::shared_ptr<RunEndEncodedType> ree_type, bool has_validity_buffer,
-    int64_t logical_length, int64_t physical_length, int64_t 
physical_null_count,
-    MemoryPool* pool, int64_t data_buffer_size) {
+    int64_t logical_length, int64_t physical_length, MemoryPool* pool,
+    int64_t data_buffer_size) {
   ARROW_ASSIGN_OR_RAISE(
       auto run_ends_data,
       PreallocateRunEndsArray(ree_type->run_end_type(), physical_length, 
pool));
-  ARROW_ASSIGN_OR_RAISE(
-      auto values_data,
-      PreallocateValuesArray(ree_type->value_type(), has_validity_buffer, 
physical_length,
-                             physical_null_count, pool, data_buffer_size));
+  ARROW_ASSIGN_OR_RAISE(auto values_data, PreallocateValuesArray(
+                                              ree_type->value_type(), 
has_validity_buffer,
+                                              physical_length, pool, 
data_buffer_size));
 
   return ArrayData::Make(std::move(ree_type), logical_length, {NULLPTR},
                          {std::move(run_ends_data), std::move(values_data)},
diff --git a/cpp/src/arrow/compute/kernels/ree_util_internal.h 
b/cpp/src/arrow/compute/kernels/ree_util_internal.h
index 080d23c06a..3293e754d3 100644
--- a/cpp/src/arrow/compute/kernels/ree_util_internal.h
+++ b/cpp/src/arrow/compute/kernels/ree_util_internal.h
@@ -333,18 +333,39 @@ Result<std::shared_ptr<ArrayData>> 
PreallocateRunEndsArray(
     const std::shared_ptr<DataType>& run_end_type, int64_t physical_length,
     MemoryPool* pool);
 
+/// \brief Preallocate the physical values array for a run-end encoded array
+///
+/// data_buffer_size is passed here pre-calculated so this function doesn't 
have
+/// to be template-specialized for each type.
+///
+/// The null_count is left as kUnknownNullCount (or 0 if length is 0) and, if
+/// after writing the values, the caller knows the null count, it can be set.
+///
+/// \post if has_validity_buffer and length > 0, then data.buffer[0] != NULLPTR
+///
+/// \param has_validity_buffer a validity buffer must be allocated
+/// \param length the length of the values array
+/// \param data_buffer_size the size of the data buffer for string and binary 
types
 Result<std::shared_ptr<ArrayData>> PreallocateValuesArray(
     const std::shared_ptr<DataType>& value_type, bool has_validity_buffer, 
int64_t length,
-    int64_t null_count, MemoryPool* pool, int64_t data_buffer_size);
+    MemoryPool* pool, int64_t data_buffer_size);
 
 /// \brief Preallocate the ArrayData for the run-end encoded version
 /// of the flat input array
 ///
+/// The top-level null_count is set to 0 (REEs keep all the data in child
+/// arrays). The null_count of the values array (child_data[1]) is left as
+/// kUnknownNullCount (or 0 if physical_length is 0) and, if after writing
+/// the values, the caller knows the null count, it can be set.
+///
+/// \post if has_validity_buffer and physical_length > 0, then
+/// data.child_data[1].buffer[0] != NULLPTR
+///
 /// \param data_buffer_size the size of the data buffer for string and binary 
types
 Result<std::shared_ptr<ArrayData>> PreallocateREEArray(
     std::shared_ptr<RunEndEncodedType> ree_type, bool has_validity_buffer,
-    int64_t logical_length, int64_t physical_length, int64_t 
physical_null_count,
-    MemoryPool* pool, int64_t data_buffer_size);
+    int64_t logical_length, int64_t physical_length, MemoryPool* pool,
+    int64_t data_buffer_size);
 
 /// \brief Writes a single run-end to the first slot of the pre-allocated
 /// run-end encoded array in out
diff --git a/cpp/src/arrow/compute/kernels/vector_run_end_encode.cc 
b/cpp/src/arrow/compute/kernels/vector_run_end_encode.cc
index eef816a149..943fdcd6b1 100644
--- a/cpp/src/arrow/compute/kernels/vector_run_end_encode.cc
+++ b/cpp/src/arrow/compute/kernels/vector_run_end_encode.cc
@@ -179,7 +179,9 @@ class RunEndEncodeImpl {
       ARROW_ASSIGN_OR_RAISE(
           auto output_array_data,
           ree_util::PreallocateREEArray(std::move(ree_type), 
has_validity_buffer,
-                                        input_length, 0, 0, 
ctx_->memory_pool(), 0));
+                                        /*logical_length=*/input_length,
+                                        /*physical_length=*/0, 
ctx_->memory_pool(),
+                                        /*data_buffer_size=*/0));
       output_->value = std::move(output_array_data);
       return Status::OK();
     }
@@ -196,17 +198,22 @@ class RunEndEncodeImpl {
         /*output_run_ends=*/NULLPTR);
     std::tie(num_valid_runs, num_output_runs, data_buffer_size) =
         counting_loop.CountNumberOfRuns();
+    const auto physical_null_count = num_output_runs - num_valid_runs;
+    DCHECK(!has_validity_buffer || physical_null_count > 0)
+        << "has_validity_buffer is expected to imply physical_null_count > 0";
 
     ARROW_ASSIGN_OR_RAISE(
         auto output_array_data,
         ree_util::PreallocateREEArray(
-            std::move(ree_type), has_validity_buffer, input_length, 
num_output_runs,
-            num_output_runs - num_valid_runs, ctx_->memory_pool(), 
data_buffer_size));
+            std::move(ree_type), has_validity_buffer, 
/*logical_length=*/input_length,
+            /*physical_length=*/num_output_runs, ctx_->memory_pool(), 
data_buffer_size));
 
     // Initialize the output pointers
     auto* output_run_ends =
         output_array_data->child_data[0]->template 
GetMutableValues<RunEndCType>(1, 0);
     auto* output_values_array_data = output_array_data->child_data[1].get();
+    // Set the null_count on the physical array
+    output_values_array_data->null_count = physical_null_count;
 
     // Second pass: write the runs
     RunEndEncodingLoop<RunEndType, ValueType, has_validity_buffer> 
writing_loop(
@@ -254,7 +261,7 @@ struct RunEndEncodeExec {
       return RunEndEncodeNullArray(TypeTraits<RunEndType>::type_singleton(), 
ctx,
                                    input_array, result);
     } else {
-      const bool has_validity_buffer = input_array.MayHaveNulls();
+      const bool has_validity_buffer = input_array.GetNullCount() > 0;
       if (has_validity_buffer) {
         return RunEndEncodeImpl<RunEndType, ValueType, true>(ctx, input_array, 
result)
             .Exec();
@@ -398,10 +405,10 @@ class RunEndDecodeImpl {
       }
     }
 
-    ARROW_ASSIGN_OR_RAISE(auto output_array_data,
-                          ree_util::PreallocateValuesArray(
-                              ree_type->value_type(), has_validity_buffer, 
length,
-                              kUnknownNullCount, ctx_->memory_pool(), 
data_buffer_size));
+    ARROW_ASSIGN_OR_RAISE(
+        auto output_array_data,
+        ree_util::PreallocateValuesArray(ree_type->value_type(), 
has_validity_buffer,
+                                         length, ctx_->memory_pool(), 
data_buffer_size));
 
     int64_t output_null_count = 0;
     if (length > 0) {
@@ -435,7 +442,7 @@ struct RunEndDecodeExec {
       return RunEndDecodeNullREEArray(ctx, input_array, result);
     } else {
       const bool has_validity_buffer =
-          arrow::ree_util::ValuesArray(input_array).MayHaveNulls();
+          arrow::ree_util::ValuesArray(input_array).GetNullCount() > 0;
       if (has_validity_buffer) {
         return RunEndDecodeImpl<RunEndType, ValueType, true>(ctx, input_array, 
result)
             .Exec();
diff --git a/cpp/src/arrow/compute/kernels/vector_run_end_encode_test.cc 
b/cpp/src/arrow/compute/kernels/vector_run_end_encode_test.cc
index f718d82774..0bd8e3386e 100644
--- a/cpp/src/arrow/compute/kernels/vector_run_end_encode_test.cc
+++ b/cpp/src/arrow/compute/kernels/vector_run_end_encode_test.cc
@@ -72,11 +72,19 @@ struct REETestData {
                                  std::vector<std::string> inputs_json,
                                  std::vector<std::string> expected_values_json,
                                  std::vector<std::string> 
expected_run_ends_json,
-                                 int64_t input_offset = 0) {
+                                 int64_t input_offset = 0,
+                                 bool force_validity_bitmap = false) {
     std::vector<std::shared_ptr<Array>> inputs;
     inputs.reserve(inputs_json.size());
     for (const auto& input_json : inputs_json) {
-      inputs.push_back(ArrayFromJSON(data_type, input_json));
+      auto chunk = ArrayFromJSON(data_type, input_json);
+      auto& data = chunk->data();
+      if (force_validity_bitmap && !data->HasValidityBitmap()) {
+        EXPECT_OK_AND_ASSIGN(auto validity, AllocateBitmap(data->length));
+        memset(validity->mutable_data(), 0xFF, validity->size());
+        data->buffers[0] = std::move(validity);
+      }
+      inputs.push_back(std::move(chunk));
     }
     auto chunked_input = std::make_shared<ChunkedArray>(std::move(inputs));
 
@@ -165,47 +173,52 @@ class TestRunEndEncodeDecode : public 
::testing::TestWithParam<
     DCHECK(datum.is_chunked_array());
     return datum.chunked_array();
   }
-};
-
-TEST_P(TestRunEndEncodeDecode, EncodeDecodeArray) {
-  auto [data, run_end_type] = GetParam();
 
-  ASSERT_OK_AND_ASSIGN(
-      Datum encoded_datum,
-      RunEndEncode(data.InputDatum(), RunEndEncodeOptions{run_end_type}));
-
-  auto encoded = AsChunkedArray(encoded_datum);
-  ASSERT_OK(encoded->ValidateFull());
-  ASSERT_EQ(data.input->length(), encoded->length());
+  void TestEncodeDecodeArray(REETestData& data,
+                             const std::shared_ptr<DataType>& run_end_type) {
+    ASSERT_OK_AND_ASSIGN(
+        Datum encoded_datum,
+        RunEndEncode(data.InputDatum(), RunEndEncodeOptions{run_end_type}));
+
+    auto encoded = AsChunkedArray(encoded_datum);
+    ASSERT_OK(encoded->ValidateFull());
+    ASSERT_EQ(data.input->length(), encoded->length());
+
+    for (int i = 0; i < encoded->num_chunks(); i++) {
+      auto& chunk = encoded->chunk(i);
+      auto run_ends_array = MakeArray(chunk->data()->child_data[0]);
+      auto values_array = MakeArray(chunk->data()->child_data[1]);
+      ASSERT_OK(chunk->ValidateFull());
+      ASSERT_ARRAYS_EQUAL(*ArrayFromJSON(run_end_type, 
data.expected_run_ends_json[i]),
+                          *run_ends_array);
+      ASSERT_ARRAYS_EQUAL(*values_array, *data.expected_values[i]);
+      ASSERT_EQ(chunk->data()->buffers.size(), 1);
+      ASSERT_EQ(chunk->data()->buffers[0], NULLPTR);
+      ASSERT_EQ(chunk->data()->child_data.size(), 2);
+      ASSERT_EQ(run_ends_array->data()->buffers[0], NULLPTR);
+      ASSERT_EQ(run_ends_array->length(), data.expected_values[i]->length());
+      ASSERT_EQ(run_ends_array->offset(), 0);
+      ASSERT_EQ(chunk->data()->length, data.input->chunk(i)->length());
+      ASSERT_EQ(chunk->data()->offset, 0);
+      ASSERT_EQ(*chunk->data()->type,
+                RunEndEncodedType(run_end_type, data.input->type()));
+      ASSERT_EQ(chunk->data()->null_count, 0);
+    }
 
-  for (int i = 0; i < encoded->num_chunks(); i++) {
-    auto& chunk = encoded->chunk(i);
-    auto run_ends_array = MakeArray(chunk->data()->child_data[0]);
-    auto values_array = MakeArray(chunk->data()->child_data[1]);
-    ASSERT_OK(chunk->ValidateFull());
-    ASSERT_ARRAYS_EQUAL(*ArrayFromJSON(run_end_type, 
data.expected_run_ends_json[i]),
-                        *run_ends_array);
-    ASSERT_ARRAYS_EQUAL(*values_array, *data.expected_values[i]);
-    ASSERT_EQ(chunk->data()->buffers.size(), 1);
-    ASSERT_EQ(chunk->data()->buffers[0], NULLPTR);
-    ASSERT_EQ(chunk->data()->child_data.size(), 2);
-    ASSERT_EQ(run_ends_array->data()->buffers[0], NULLPTR);
-    ASSERT_EQ(run_ends_array->length(), data.expected_values[i]->length());
-    ASSERT_EQ(run_ends_array->offset(), 0);
-    ASSERT_EQ(chunk->data()->length, data.input->chunk(i)->length());
-    ASSERT_EQ(chunk->data()->offset, 0);
-    ASSERT_EQ(*chunk->data()->type, RunEndEncodedType(run_end_type, 
data.input->type()));
-    ASSERT_EQ(chunk->data()->null_count, 0);
+    ASSERT_OK_AND_ASSIGN(Datum decoded_datum, data.chunked
+                                                  ? RunEndDecode(encoded)
+                                                  : 
RunEndDecode(encoded->chunk(0)));
+    auto decoded = AsChunkedArray(decoded_datum);
+    ASSERT_OK(decoded->ValidateFull());
+    for (int i = 0; i < decoded->num_chunks(); i++) {
+      ASSERT_ARRAYS_EQUAL(*decoded->chunk(i), *data.input->chunk(i));
+    }
   }
+};
 
-  ASSERT_OK_AND_ASSIGN(Datum decoded_datum, data.chunked
-                                                ? RunEndDecode(encoded)
-                                                : 
RunEndDecode(encoded->chunk(0)));
-  auto decoded = AsChunkedArray(decoded_datum);
-  ASSERT_OK(decoded->ValidateFull());
-  for (int i = 0; i < decoded->num_chunks(); i++) {
-    ASSERT_ARRAYS_EQUAL(*decoded->chunk(i), *data.input->chunk(i));
-  }
+TEST_P(TestRunEndEncodeDecode, EncodeDecodeArray) {
+  auto [data, run_end_type] = GetParam();
+  TestEncodeDecodeArray(data, run_end_type);
 }
 
 // Encoding an input with an offset results in a completely new encoded array 
without an
@@ -254,6 +267,17 @@ TEST_P(TestRunEndEncodeDecode, DecodeWithOffset) {
   }
 }
 
+// GH-36708
+TEST_P(TestRunEndEncodeDecode, InputWithValidityAndNoNulls) {
+  auto data =
+      REETestData::JSONChunked(int32(),
+                               /*inputs=*/{"[1, 1, 2, 2, 2, 3]", "[4, 5, 5, 5, 
6, 6]"},
+                               /*expected_values=*/{"[1, 2, 3]", "[4, 5, 6]"},
+                               /*expected_run_ends=*/{"[2, 5, 6]", "[1, 4, 
6]"},
+                               /*input_offset=*/0, 
/*force_validity_bitmap=*/true);
+  TestEncodeDecodeArray(data, int32());
+}
+
 // This test creates an run-end encoded array with an offset in the child 
array, which
 // removes the first run in the test data. It's no-op for chunked input.
 TEST_P(TestRunEndEncodeDecode, DecodeWithOffsetInChildArray) {

Reply via email to