This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch maint-6.0.x
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 8410e34fc8aeff6ee5f1c90ed99bed920c3ebfd6
Author: David Li <[email protected]>
AuthorDate: Mon Nov 8 14:32:46 2021 -0500

    ARROW-14583: [C++] Handle empty chunked arrays in Take, empty datasets in 
GroupByNode
    
    This fixes two issues:
    - A crash in GroupByNode when no batches are processed
    - A spurious error calling Take on a ChunkedArray with no chunks
    
    Closes #11623 from lidavidm/arrow-14583
    
    Authored-by: David Li <[email protected]>
    Signed-off-by: David Li <[email protected]>
---
 cpp/src/arrow/compute/exec/aggregate_node.cc       |  2 ++
 cpp/src/arrow/compute/exec/test_util.h             |  2 --
 .../arrow/compute/kernels/hash_aggregate_test.cc   | 33 ++++++++++++++++++++--
 cpp/src/arrow/compute/kernels/vector_selection.cc  |  9 ++++--
 .../arrow/compute/kernels/vector_selection_test.cc |  7 +++++
 5 files changed, 46 insertions(+), 7 deletions(-)

diff --git a/cpp/src/arrow/compute/exec/aggregate_node.cc 
b/cpp/src/arrow/compute/exec/aggregate_node.cc
index 2959790..904fa4e 100644
--- a/cpp/src/arrow/compute/exec/aggregate_node.cc
+++ b/cpp/src/arrow/compute/exec/aggregate_node.cc
@@ -421,6 +421,8 @@ class GroupByNode : public ExecNode {
 
   Result<ExecBatch> Finalize() {
     ThreadLocalState* state = &local_states_[0];
+    // If we never got any batches, then state won't have been initialized
+    RETURN_NOT_OK(InitLocalStateIfNeeded(state));
 
     ExecBatch out_data{{}, state->grouper->num_groups()};
     out_data.values.resize(agg_kernels_.size() + key_field_ids_.size());
diff --git a/cpp/src/arrow/compute/exec/test_util.h 
b/cpp/src/arrow/compute/exec/test_util.h
index 2ee140a..dad55fc 100644
--- a/cpp/src/arrow/compute/exec/test_util.h
+++ b/cpp/src/arrow/compute/exec/test_util.h
@@ -50,8 +50,6 @@ struct BatchesWithSchema {
   std::shared_ptr<Schema> schema;
 
   AsyncGenerator<util::optional<ExecBatch>> gen(bool parallel, bool slow) 
const {
-    DCHECK_GT(batches.size(), 0);
-
     auto opt_batches = ::arrow::internal::MapVector(
         [](ExecBatch batch) { return util::make_optional(std::move(batch)); }, 
batches);
 
diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc 
b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
index 412290a..b98a369 100644
--- a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
+++ b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
@@ -168,16 +168,23 @@ Result<Datum> GroupByUsingExecPlan(const 
BatchesWithSchema& input,
                         start_and_collect.MoveResult());
 
   ArrayVector out_arrays(aggregates.size() + key_names.size());
+  const auto& output_schema = 
plan->sources()[0]->outputs()[0]->output_schema();
   for (size_t i = 0; i < out_arrays.size(); ++i) {
     std::vector<std::shared_ptr<Array>> arrays(output_batches.size());
     for (size_t j = 0; j < output_batches.size(); ++j) {
       arrays[j] = output_batches[j].values[i].make_array();
     }
-    ARROW_ASSIGN_OR_RAISE(out_arrays[i], Concatenate(arrays));
+    if (arrays.empty()) {
+      ARROW_ASSIGN_OR_RAISE(
+          out_arrays[i],
+          MakeArrayOfNull(output_schema->field(static_cast<int>(i))->type(),
+                          /*length=*/0));
+    } else {
+      ARROW_ASSIGN_OR_RAISE(out_arrays[i], Concatenate(arrays));
+    }
   }
 
-  return StructArray::Make(std::move(out_arrays),
-                           
plan->sources()[0]->outputs()[0]->output_schema()->fields());
+  return StructArray::Make(std::move(out_arrays), output_schema->fields());
 }
 
 /// Simpler overload where you can give the columns as datums
@@ -694,6 +701,26 @@ TEST(GroupBy, Errors) {
                      HasSubstr("Direct execution of HASH_AGGREGATE 
functions")));
 }
 
+TEST(GroupBy, NoBatches) {
+  // Regression test for ARROW-14583: handle when no batches are
+  // passed to the group by node before finalizing
+  auto table =
+      TableFromJSON(schema({field("argument", float64()), field("key", 
int64())}), {});
+  ASSERT_OK_AND_ASSIGN(
+      Datum aggregated_and_grouped,
+      GroupByTest({table->GetColumnByName("argument")}, 
{table->GetColumnByName("key")},
+                  {
+                      {"hash_count", nullptr},
+                  },
+                  /*use_threads=*/true, /*use_exec_plan=*/true));
+  AssertDatumsEqual(ArrayFromJSON(struct_({
+                                      field("hash_count", int64()),
+                                      field("key_0", int64()),
+                                  }),
+                                  R"([])"),
+                    aggregated_and_grouped, /*verbose=*/true);
+}
+
 namespace {
 void SortBy(std::vector<std::string> names, Datum* aggregated_and_grouped) {
   SortOptions options;
diff --git a/cpp/src/arrow/compute/kernels/vector_selection.cc 
b/cpp/src/arrow/compute/kernels/vector_selection.cc
index a33c4c5..2f859be 100644
--- a/cpp/src/arrow/compute/kernels/vector_selection.cc
+++ b/cpp/src/arrow/compute/kernels/vector_selection.cc
@@ -2013,8 +2013,13 @@ Result<std::shared_ptr<ChunkedArray>> TakeCA(const 
ChunkedArray& values,
     // TODO Case 3: If indices are sorted, can slice them and call Array Take
 
     // Case 4: Else, concatenate chunks and call Array Take
-    ARROW_ASSIGN_OR_RAISE(current_chunk,
-                          Concatenate(values.chunks(), ctx->memory_pool()));
+    if (values.chunks().empty()) {
+      ARROW_ASSIGN_OR_RAISE(current_chunk, MakeArrayOfNull(values.type(), 
/*length=*/0,
+                                                           
ctx->memory_pool()));
+    } else {
+      ARROW_ASSIGN_OR_RAISE(current_chunk,
+                            Concatenate(values.chunks(), ctx->memory_pool()));
+    }
   }
   // Call Array Take on our single chunk
   ARROW_ASSIGN_OR_RAISE(new_chunks[0], TakeAA(*current_chunk, indices, 
options, ctx));
diff --git a/cpp/src/arrow/compute/kernels/vector_selection_test.cc 
b/cpp/src/arrow/compute/kernels/vector_selection_test.cc
index 46751ec..959de60 100644
--- a/cpp/src/arrow/compute/kernels/vector_selection_test.cc
+++ b/cpp/src/arrow/compute/kernels/vector_selection_test.cc
@@ -1611,7 +1611,12 @@ class TestTakeKernelWithChunkedArray : public 
TestTakeKernelTyped<ChunkedArray>
 
 TEST_F(TestTakeKernelWithChunkedArray, TakeChunkedArray) {
   this->AssertTake(int8(), {"[]"}, "[]", {"[]"});
+  this->AssertChunkedTake(int8(), {}, {}, {});
+  this->AssertChunkedTake(int8(), {}, {"[]"}, {"[]"});
+  this->AssertChunkedTake(int8(), {}, {"[null]"}, {"[null]"});
+  this->AssertChunkedTake(int8(), {"[]"}, {}, {});
   this->AssertChunkedTake(int8(), {"[]"}, {"[]"}, {"[]"});
+  this->AssertChunkedTake(int8(), {"[]"}, {"[null]"}, {"[null]"});
 
   this->AssertTake(int8(), {"[7]", "[8, 9]"}, "[0, 1, 0, 2]", {"[7, 8, 7, 
9]"});
   this->AssertChunkedTake(int8(), {"[7]", "[8, 9]"}, {"[0, 1, 0]", "[]", 
"[2]"},
@@ -1623,6 +1628,8 @@ TEST_F(TestTakeKernelWithChunkedArray, TakeChunkedArray) {
                 this->TakeWithArray(int8(), {"[7]", "[8, 9]"}, "[0, 5]", 
&arr));
   ASSERT_RAISES(IndexError, this->TakeWithChunkedArray(int8(), {"[7]", "[8, 
9]"},
                                                        {"[0, 1, 0]", "[5, 
1]"}, &arr));
+  ASSERT_RAISES(IndexError, this->TakeWithChunkedArray(int8(), {}, {"[0]"}, 
&arr));
+  ASSERT_RAISES(IndexError, this->TakeWithChunkedArray(int8(), {"[]"}, 
{"[0]"}, &arr));
 }
 
 class TestTakeKernelWithTable : public TestTakeKernelTyped<Table> {

Reply via email to