This is an automated email from the ASF dual-hosted git repository. kou pushed a commit to branch maint-6.0.x in repository https://gitbox.apache.org/repos/asf/arrow.git
commit 8410e34fc8aeff6ee5f1c90ed99bed920c3ebfd6 Author: David Li <[email protected]> AuthorDate: Mon Nov 8 14:32:46 2021 -0500 ARROW-14583: [C++] Handle empty chunked arrays in Take, empty datasets in GroupByNode This fixes two issues: - A crash in GroupByNode when no batches are processed - A spurious error calling Take on a ChunkedArray with no chunks Closes #11623 from lidavidm/arrow-14583 Authored-by: David Li <[email protected]> Signed-off-by: David Li <[email protected]> --- cpp/src/arrow/compute/exec/aggregate_node.cc | 2 ++ cpp/src/arrow/compute/exec/test_util.h | 2 -- .../arrow/compute/kernels/hash_aggregate_test.cc | 33 ++++++++++++++++++++-- cpp/src/arrow/compute/kernels/vector_selection.cc | 9 ++++-- .../arrow/compute/kernels/vector_selection_test.cc | 7 +++++ 5 files changed, 46 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/compute/exec/aggregate_node.cc b/cpp/src/arrow/compute/exec/aggregate_node.cc index 2959790..904fa4e 100644 --- a/cpp/src/arrow/compute/exec/aggregate_node.cc +++ b/cpp/src/arrow/compute/exec/aggregate_node.cc @@ -421,6 +421,8 @@ class GroupByNode : public ExecNode { Result<ExecBatch> Finalize() { ThreadLocalState* state = &local_states_[0]; + // If we never got any batches, then state won't have been initialized + RETURN_NOT_OK(InitLocalStateIfNeeded(state)); ExecBatch out_data{{}, state->grouper->num_groups()}; out_data.values.resize(agg_kernels_.size() + key_field_ids_.size()); diff --git a/cpp/src/arrow/compute/exec/test_util.h b/cpp/src/arrow/compute/exec/test_util.h index 2ee140a..dad55fc 100644 --- a/cpp/src/arrow/compute/exec/test_util.h +++ b/cpp/src/arrow/compute/exec/test_util.h @@ -50,8 +50,6 @@ struct BatchesWithSchema { std::shared_ptr<Schema> schema; AsyncGenerator<util::optional<ExecBatch>> gen(bool parallel, bool slow) const { - DCHECK_GT(batches.size(), 0); - auto opt_batches = ::arrow::internal::MapVector( [](ExecBatch batch) { return util::make_optional(std::move(batch)); }, batches); diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc index 412290a..b98a369 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc @@ -168,16 +168,23 @@ Result<Datum> GroupByUsingExecPlan(const BatchesWithSchema& input, start_and_collect.MoveResult()); ArrayVector out_arrays(aggregates.size() + key_names.size()); + const auto& output_schema = plan->sources()[0]->outputs()[0]->output_schema(); for (size_t i = 0; i < out_arrays.size(); ++i) { std::vector<std::shared_ptr<Array>> arrays(output_batches.size()); for (size_t j = 0; j < output_batches.size(); ++j) { arrays[j] = output_batches[j].values[i].make_array(); } - ARROW_ASSIGN_OR_RAISE(out_arrays[i], Concatenate(arrays)); + if (arrays.empty()) { + ARROW_ASSIGN_OR_RAISE( + out_arrays[i], + MakeArrayOfNull(output_schema->field(static_cast<int>(i))->type(), + /*length=*/0)); + } else { + ARROW_ASSIGN_OR_RAISE(out_arrays[i], Concatenate(arrays)); + } } - return StructArray::Make(std::move(out_arrays), - plan->sources()[0]->outputs()[0]->output_schema()->fields()); + return StructArray::Make(std::move(out_arrays), output_schema->fields()); } /// Simpler overload where you can give the columns as datums @@ -694,6 +701,26 @@ TEST(GroupBy, Errors) { HasSubstr("Direct execution of HASH_AGGREGATE functions"))); } +TEST(GroupBy, NoBatches) { + // Regression test for ARROW-14583: handle when no batches are + // passed to the group by node before finalizing + auto table = + TableFromJSON(schema({field("argument", float64()), field("key", int64())}), {}); + ASSERT_OK_AND_ASSIGN( + Datum aggregated_and_grouped, + GroupByTest({table->GetColumnByName("argument")}, {table->GetColumnByName("key")}, + { + {"hash_count", nullptr}, + }, + /*use_threads=*/true, /*use_exec_plan=*/true)); + AssertDatumsEqual(ArrayFromJSON(struct_({ + field("hash_count", int64()), + field("key_0", int64()), + }), + R"([])"), + aggregated_and_grouped, /*verbose=*/true); +} + namespace { void SortBy(std::vector<std::string> names, Datum* aggregated_and_grouped) { SortOptions options; diff --git a/cpp/src/arrow/compute/kernels/vector_selection.cc b/cpp/src/arrow/compute/kernels/vector_selection.cc index a33c4c5..2f859be 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection.cc +++ b/cpp/src/arrow/compute/kernels/vector_selection.cc @@ -2013,8 +2013,13 @@ Result<std::shared_ptr<ChunkedArray>> TakeCA(const ChunkedArray& values, // TODO Case 3: If indices are sorted, can slice them and call Array Take // Case 4: Else, concatenate chunks and call Array Take - ARROW_ASSIGN_OR_RAISE(current_chunk, - Concatenate(values.chunks(), ctx->memory_pool())); + if (values.chunks().empty()) { + ARROW_ASSIGN_OR_RAISE(current_chunk, MakeArrayOfNull(values.type(), /*length=*/0, + ctx->memory_pool())); + } else { + ARROW_ASSIGN_OR_RAISE(current_chunk, + Concatenate(values.chunks(), ctx->memory_pool())); + } } // Call Array Take on our single chunk ARROW_ASSIGN_OR_RAISE(new_chunks[0], TakeAA(*current_chunk, indices, options, ctx)); diff --git a/cpp/src/arrow/compute/kernels/vector_selection_test.cc b/cpp/src/arrow/compute/kernels/vector_selection_test.cc index 46751ec..959de60 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection_test.cc +++ b/cpp/src/arrow/compute/kernels/vector_selection_test.cc @@ -1611,7 +1611,12 @@ class TestTakeKernelWithChunkedArray : public TestTakeKernelTyped<ChunkedArray> TEST_F(TestTakeKernelWithChunkedArray, TakeChunkedArray) { this->AssertTake(int8(), {"[]"}, "[]", {"[]"}); + this->AssertChunkedTake(int8(), {}, {}, {}); + this->AssertChunkedTake(int8(), {}, {"[]"}, {"[]"}); + this->AssertChunkedTake(int8(), {}, {"[null]"}, {"[null]"}); + this->AssertChunkedTake(int8(), {"[]"}, {}, {}); this->AssertChunkedTake(int8(), {"[]"}, {"[]"}, {"[]"}); + this->AssertChunkedTake(int8(), {"[]"}, {"[null]"}, {"[null]"}); this->AssertTake(int8(), {"[7]", "[8, 9]"}, "[0, 1, 0, 2]", {"[7, 8, 7, 9]"}); this->AssertChunkedTake(int8(), {"[7]", "[8, 9]"}, {"[0, 1, 0]", "[]", "[2]"}, @@ -1623,6 +1628,8 @@ TEST_F(TestTakeKernelWithChunkedArray, TakeChunkedArray) { this->TakeWithArray(int8(), {"[7]", "[8, 9]"}, "[0, 5]", &arr)); ASSERT_RAISES(IndexError, this->TakeWithChunkedArray(int8(), {"[7]", "[8, 9]"}, {"[0, 1, 0]", "[5, 1]"}, &arr)); + ASSERT_RAISES(IndexError, this->TakeWithChunkedArray(int8(), {}, {"[0]"}, &arr)); + ASSERT_RAISES(IndexError, this->TakeWithChunkedArray(int8(), {"[]"}, {"[0]"}, &arr)); } class TestTakeKernelWithTable : public TestTakeKernelTyped<Table> {
