This is an automated email from the ASF dual-hosted git repository.
kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new a2a3b9580d GH-36708: [C++] Fully calculate null-counts so the REE
allocations make sense (#36740)
a2a3b9580d is described below
commit a2a3b9580d66cf40c5df8358f260fdaf5ccc301f
Author: Felipe Oliveira Carvalho <[email protected]>
AuthorDate: Sat Jul 22 02:16:16 2023 -0300
GH-36708: [C++] Fully calculate null-counts so the REE allocations make
sense (#36740)
### Rationale for this change
When `has_validity_buffer` is true, we expect validity buffers to be
allocated, but if null_count is calculated and ends up being 0,
`ArrayData::Make()` will sneakily remove the validity buffer from the physical
array for us and the assumption that it exists stops holding and causes a crash.
Forcing `null_count` calculation with `input.GetNullCount()` ensures
`has_validity_buffer` won't be `true` if the `null_count` on the input ends up
being 0.
### What changes are included in this PR?
The fix and tests to reproduce it.
### Are these changes tested?
Yes.
### Are there any user-facing changes?
No.
* Closes: #36708
Authored-by: Felipe Oliveira Carvalho <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
---
cpp/src/arrow/compute/kernels/ree_util_internal.cc | 18 ++--
cpp/src/arrow/compute/kernels/ree_util_internal.h | 27 +++++-
.../arrow/compute/kernels/vector_run_end_encode.cc | 25 +++--
.../compute/kernels/vector_run_end_encode_test.cc | 102 +++++++++++++--------
4 files changed, 113 insertions(+), 59 deletions(-)
diff --git a/cpp/src/arrow/compute/kernels/ree_util_internal.cc
b/cpp/src/arrow/compute/kernels/ree_util_internal.cc
index 00c885f6fa..d35c000678 100644
--- a/cpp/src/arrow/compute/kernels/ree_util_internal.cc
+++ b/cpp/src/arrow/compute/kernels/ree_util_internal.cc
@@ -59,7 +59,7 @@ Result<std::shared_ptr<ArrayData>> PreallocateRunEndsArray(
Result<std::shared_ptr<ArrayData>> PreallocateValuesArray(
const std::shared_ptr<DataType>& value_type, bool has_validity_buffer,
int64_t length,
- int64_t null_count, MemoryPool* pool, int64_t data_buffer_size) {
+ MemoryPool* pool, int64_t data_buffer_size) {
std::vector<std::shared_ptr<Buffer>> values_data_buffers;
std::shared_ptr<Buffer> validity_buffer = NULLPTR;
if (has_validity_buffer) {
@@ -79,20 +79,22 @@ Result<std::shared_ptr<ArrayData>> PreallocateValuesArray(
} else {
values_data_buffers = {std::move(validity_buffer),
std::move(values_buffer)};
}
- return ArrayData::Make(value_type, length, std::move(values_data_buffers),
null_count);
+ auto data = ArrayData::Make(value_type, length,
std::move(values_data_buffers),
+ kUnknownNullCount);
+ DCHECK(!(has_validity_buffer && length > 0) || data->buffers[0]);
+ return data;
}
Result<std::shared_ptr<ArrayData>> PreallocateREEArray(
std::shared_ptr<RunEndEncodedType> ree_type, bool has_validity_buffer,
- int64_t logical_length, int64_t physical_length, int64_t
physical_null_count,
- MemoryPool* pool, int64_t data_buffer_size) {
+ int64_t logical_length, int64_t physical_length, MemoryPool* pool,
+ int64_t data_buffer_size) {
ARROW_ASSIGN_OR_RAISE(
auto run_ends_data,
PreallocateRunEndsArray(ree_type->run_end_type(), physical_length,
pool));
- ARROW_ASSIGN_OR_RAISE(
- auto values_data,
- PreallocateValuesArray(ree_type->value_type(), has_validity_buffer,
physical_length,
- physical_null_count, pool, data_buffer_size));
+ ARROW_ASSIGN_OR_RAISE(auto values_data, PreallocateValuesArray(
+ ree_type->value_type(),
has_validity_buffer,
+ physical_length, pool,
data_buffer_size));
return ArrayData::Make(std::move(ree_type), logical_length, {NULLPTR},
{std::move(run_ends_data), std::move(values_data)},
diff --git a/cpp/src/arrow/compute/kernels/ree_util_internal.h
b/cpp/src/arrow/compute/kernels/ree_util_internal.h
index 080d23c06a..3293e754d3 100644
--- a/cpp/src/arrow/compute/kernels/ree_util_internal.h
+++ b/cpp/src/arrow/compute/kernels/ree_util_internal.h
@@ -333,18 +333,39 @@ Result<std::shared_ptr<ArrayData>>
PreallocateRunEndsArray(
const std::shared_ptr<DataType>& run_end_type, int64_t physical_length,
MemoryPool* pool);
+/// \brief Preallocate the physical values array for a run-end encoded array
+///
+/// data_buffer_size is passed here pre-calculated so this function doesn't
have
+/// to be template-specialized for each type.
+///
+/// The null_count is left as kUnknownNullCount (or 0 if length is 0) and, if
+/// after writing the values, the caller knows the null count, it can be set.
+///
+/// \post if has_validity_buffer and length > 0, then data.buffer[0] != NULLPTR
+///
+/// \param has_validity_buffer a validity buffer must be allocated
+/// \param length the length of the values array
+/// \param data_buffer_size the size of the data buffer for string and binary
types
Result<std::shared_ptr<ArrayData>> PreallocateValuesArray(
const std::shared_ptr<DataType>& value_type, bool has_validity_buffer,
int64_t length,
- int64_t null_count, MemoryPool* pool, int64_t data_buffer_size);
+ MemoryPool* pool, int64_t data_buffer_size);
/// \brief Preallocate the ArrayData for the run-end encoded version
/// of the flat input array
///
+/// The top-level null_count is set to 0 (REEs keep all the data in child
+/// arrays). The null_count of the values array (child_data[1]) is left as
+/// kUnknownNullCount (or 0 if physical_length is 0) and, if after writing
+/// the values, the caller knows the null count, it can be set.
+///
+/// \post if has_validity_buffer and physical_length > 0, then
+/// data.child_data[1].buffer[0] != NULLPTR
+///
/// \param data_buffer_size the size of the data buffer for string and binary
types
Result<std::shared_ptr<ArrayData>> PreallocateREEArray(
std::shared_ptr<RunEndEncodedType> ree_type, bool has_validity_buffer,
- int64_t logical_length, int64_t physical_length, int64_t
physical_null_count,
- MemoryPool* pool, int64_t data_buffer_size);
+ int64_t logical_length, int64_t physical_length, MemoryPool* pool,
+ int64_t data_buffer_size);
/// \brief Writes a single run-end to the first slot of the pre-allocated
/// run-end encoded array in out
diff --git a/cpp/src/arrow/compute/kernels/vector_run_end_encode.cc
b/cpp/src/arrow/compute/kernels/vector_run_end_encode.cc
index eef816a149..943fdcd6b1 100644
--- a/cpp/src/arrow/compute/kernels/vector_run_end_encode.cc
+++ b/cpp/src/arrow/compute/kernels/vector_run_end_encode.cc
@@ -179,7 +179,9 @@ class RunEndEncodeImpl {
ARROW_ASSIGN_OR_RAISE(
auto output_array_data,
ree_util::PreallocateREEArray(std::move(ree_type),
has_validity_buffer,
- input_length, 0, 0,
ctx_->memory_pool(), 0));
+ /*logical_length=*/input_length,
+ /*physical_length=*/0,
ctx_->memory_pool(),
+ /*data_buffer_size=*/0));
output_->value = std::move(output_array_data);
return Status::OK();
}
@@ -196,17 +198,22 @@ class RunEndEncodeImpl {
/*output_run_ends=*/NULLPTR);
std::tie(num_valid_runs, num_output_runs, data_buffer_size) =
counting_loop.CountNumberOfRuns();
+ const auto physical_null_count = num_output_runs - num_valid_runs;
+ DCHECK(!has_validity_buffer || physical_null_count > 0)
+ << "has_validity_buffer is expected to imply physical_null_count > 0";
ARROW_ASSIGN_OR_RAISE(
auto output_array_data,
ree_util::PreallocateREEArray(
- std::move(ree_type), has_validity_buffer, input_length,
num_output_runs,
- num_output_runs - num_valid_runs, ctx_->memory_pool(),
data_buffer_size));
+ std::move(ree_type), has_validity_buffer,
/*logical_length=*/input_length,
+ /*physical_length=*/num_output_runs, ctx_->memory_pool(),
data_buffer_size));
// Initialize the output pointers
auto* output_run_ends =
output_array_data->child_data[0]->template
GetMutableValues<RunEndCType>(1, 0);
auto* output_values_array_data = output_array_data->child_data[1].get();
+ // Set the null_count on the physical array
+ output_values_array_data->null_count = physical_null_count;
// Second pass: write the runs
RunEndEncodingLoop<RunEndType, ValueType, has_validity_buffer>
writing_loop(
@@ -254,7 +261,7 @@ struct RunEndEncodeExec {
return RunEndEncodeNullArray(TypeTraits<RunEndType>::type_singleton(),
ctx,
input_array, result);
} else {
- const bool has_validity_buffer = input_array.MayHaveNulls();
+ const bool has_validity_buffer = input_array.GetNullCount() > 0;
if (has_validity_buffer) {
return RunEndEncodeImpl<RunEndType, ValueType, true>(ctx, input_array,
result)
.Exec();
@@ -398,10 +405,10 @@ class RunEndDecodeImpl {
}
}
- ARROW_ASSIGN_OR_RAISE(auto output_array_data,
- ree_util::PreallocateValuesArray(
- ree_type->value_type(), has_validity_buffer,
length,
- kUnknownNullCount, ctx_->memory_pool(),
data_buffer_size));
+ ARROW_ASSIGN_OR_RAISE(
+ auto output_array_data,
+ ree_util::PreallocateValuesArray(ree_type->value_type(),
has_validity_buffer,
+ length, ctx_->memory_pool(),
data_buffer_size));
int64_t output_null_count = 0;
if (length > 0) {
@@ -435,7 +442,7 @@ struct RunEndDecodeExec {
return RunEndDecodeNullREEArray(ctx, input_array, result);
} else {
const bool has_validity_buffer =
- arrow::ree_util::ValuesArray(input_array).MayHaveNulls();
+ arrow::ree_util::ValuesArray(input_array).GetNullCount() > 0;
if (has_validity_buffer) {
return RunEndDecodeImpl<RunEndType, ValueType, true>(ctx, input_array,
result)
.Exec();
diff --git a/cpp/src/arrow/compute/kernels/vector_run_end_encode_test.cc
b/cpp/src/arrow/compute/kernels/vector_run_end_encode_test.cc
index f718d82774..0bd8e3386e 100644
--- a/cpp/src/arrow/compute/kernels/vector_run_end_encode_test.cc
+++ b/cpp/src/arrow/compute/kernels/vector_run_end_encode_test.cc
@@ -72,11 +72,19 @@ struct REETestData {
std::vector<std::string> inputs_json,
std::vector<std::string> expected_values_json,
std::vector<std::string>
expected_run_ends_json,
- int64_t input_offset = 0) {
+ int64_t input_offset = 0,
+ bool force_validity_bitmap = false) {
std::vector<std::shared_ptr<Array>> inputs;
inputs.reserve(inputs_json.size());
for (const auto& input_json : inputs_json) {
- inputs.push_back(ArrayFromJSON(data_type, input_json));
+ auto chunk = ArrayFromJSON(data_type, input_json);
+ auto& data = chunk->data();
+ if (force_validity_bitmap && !data->HasValidityBitmap()) {
+ EXPECT_OK_AND_ASSIGN(auto validity, AllocateBitmap(data->length));
+ memset(validity->mutable_data(), 0xFF, validity->size());
+ data->buffers[0] = std::move(validity);
+ }
+ inputs.push_back(std::move(chunk));
}
auto chunked_input = std::make_shared<ChunkedArray>(std::move(inputs));
@@ -165,47 +173,52 @@ class TestRunEndEncodeDecode : public
::testing::TestWithParam<
DCHECK(datum.is_chunked_array());
return datum.chunked_array();
}
-};
-
-TEST_P(TestRunEndEncodeDecode, EncodeDecodeArray) {
- auto [data, run_end_type] = GetParam();
- ASSERT_OK_AND_ASSIGN(
- Datum encoded_datum,
- RunEndEncode(data.InputDatum(), RunEndEncodeOptions{run_end_type}));
-
- auto encoded = AsChunkedArray(encoded_datum);
- ASSERT_OK(encoded->ValidateFull());
- ASSERT_EQ(data.input->length(), encoded->length());
+ void TestEncodeDecodeArray(REETestData& data,
+ const std::shared_ptr<DataType>& run_end_type) {
+ ASSERT_OK_AND_ASSIGN(
+ Datum encoded_datum,
+ RunEndEncode(data.InputDatum(), RunEndEncodeOptions{run_end_type}));
+
+ auto encoded = AsChunkedArray(encoded_datum);
+ ASSERT_OK(encoded->ValidateFull());
+ ASSERT_EQ(data.input->length(), encoded->length());
+
+ for (int i = 0; i < encoded->num_chunks(); i++) {
+ auto& chunk = encoded->chunk(i);
+ auto run_ends_array = MakeArray(chunk->data()->child_data[0]);
+ auto values_array = MakeArray(chunk->data()->child_data[1]);
+ ASSERT_OK(chunk->ValidateFull());
+ ASSERT_ARRAYS_EQUAL(*ArrayFromJSON(run_end_type,
data.expected_run_ends_json[i]),
+ *run_ends_array);
+ ASSERT_ARRAYS_EQUAL(*values_array, *data.expected_values[i]);
+ ASSERT_EQ(chunk->data()->buffers.size(), 1);
+ ASSERT_EQ(chunk->data()->buffers[0], NULLPTR);
+ ASSERT_EQ(chunk->data()->child_data.size(), 2);
+ ASSERT_EQ(run_ends_array->data()->buffers[0], NULLPTR);
+ ASSERT_EQ(run_ends_array->length(), data.expected_values[i]->length());
+ ASSERT_EQ(run_ends_array->offset(), 0);
+ ASSERT_EQ(chunk->data()->length, data.input->chunk(i)->length());
+ ASSERT_EQ(chunk->data()->offset, 0);
+ ASSERT_EQ(*chunk->data()->type,
+ RunEndEncodedType(run_end_type, data.input->type()));
+ ASSERT_EQ(chunk->data()->null_count, 0);
+ }
- for (int i = 0; i < encoded->num_chunks(); i++) {
- auto& chunk = encoded->chunk(i);
- auto run_ends_array = MakeArray(chunk->data()->child_data[0]);
- auto values_array = MakeArray(chunk->data()->child_data[1]);
- ASSERT_OK(chunk->ValidateFull());
- ASSERT_ARRAYS_EQUAL(*ArrayFromJSON(run_end_type,
data.expected_run_ends_json[i]),
- *run_ends_array);
- ASSERT_ARRAYS_EQUAL(*values_array, *data.expected_values[i]);
- ASSERT_EQ(chunk->data()->buffers.size(), 1);
- ASSERT_EQ(chunk->data()->buffers[0], NULLPTR);
- ASSERT_EQ(chunk->data()->child_data.size(), 2);
- ASSERT_EQ(run_ends_array->data()->buffers[0], NULLPTR);
- ASSERT_EQ(run_ends_array->length(), data.expected_values[i]->length());
- ASSERT_EQ(run_ends_array->offset(), 0);
- ASSERT_EQ(chunk->data()->length, data.input->chunk(i)->length());
- ASSERT_EQ(chunk->data()->offset, 0);
- ASSERT_EQ(*chunk->data()->type, RunEndEncodedType(run_end_type,
data.input->type()));
- ASSERT_EQ(chunk->data()->null_count, 0);
+ ASSERT_OK_AND_ASSIGN(Datum decoded_datum, data.chunked
+ ? RunEndDecode(encoded)
+ :
RunEndDecode(encoded->chunk(0)));
+ auto decoded = AsChunkedArray(decoded_datum);
+ ASSERT_OK(decoded->ValidateFull());
+ for (int i = 0; i < decoded->num_chunks(); i++) {
+ ASSERT_ARRAYS_EQUAL(*decoded->chunk(i), *data.input->chunk(i));
+ }
}
+};
- ASSERT_OK_AND_ASSIGN(Datum decoded_datum, data.chunked
- ? RunEndDecode(encoded)
- :
RunEndDecode(encoded->chunk(0)));
- auto decoded = AsChunkedArray(decoded_datum);
- ASSERT_OK(decoded->ValidateFull());
- for (int i = 0; i < decoded->num_chunks(); i++) {
- ASSERT_ARRAYS_EQUAL(*decoded->chunk(i), *data.input->chunk(i));
- }
+TEST_P(TestRunEndEncodeDecode, EncodeDecodeArray) {
+ auto [data, run_end_type] = GetParam();
+ TestEncodeDecodeArray(data, run_end_type);
}
// Encoding an input with an offset results in a completely new encoded array
without an
@@ -254,6 +267,17 @@ TEST_P(TestRunEndEncodeDecode, DecodeWithOffset) {
}
}
+// GH-36708
+TEST_P(TestRunEndEncodeDecode, InputWithValidityAndNoNulls) {
+ auto data =
+ REETestData::JSONChunked(int32(),
+ /*inputs=*/{"[1, 1, 2, 2, 2, 3]", "[4, 5, 5, 5,
6, 6]"},
+ /*expected_values=*/{"[1, 2, 3]", "[4, 5, 6]"},
+ /*expected_run_ends=*/{"[2, 5, 6]", "[1, 4,
6]"},
+ /*input_offset=*/0,
/*force_validity_bitmap=*/true);
+ TestEncodeDecodeArray(data, int32());
+}
+
// This test creates an run-end encoded array with an offset in the child
array, which
// removes the first run in the test data. It's no-op for chunked input.
TEST_P(TestRunEndEncodeDecode, DecodeWithOffsetInChildArray) {