bkietz commented on a change in pull request #10660:
URL: https://github.com/apache/arrow/pull/10660#discussion_r664575727



##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -565,5 +565,253 @@ AsyncGenerator<util::optional<ExecBatch>> 
MakeSinkNode(ExecNode* input,
   return out;
 }
 
+struct GroupByNode : ExecNode {
+  GroupByNode(ExecNode* input, std::string label, std::shared_ptr<Schema> 
output_schema,
+              ExecContext* ctx, const std::vector<int>&& key_field_ids,
+              std::unique_ptr<internal::Grouper>&& grouper,
+              const std::vector<int>&& agg_src_field_ids,
+              const std::vector<const HashAggregateKernel*>&& agg_kernels,
+              std::vector<std::unique_ptr<KernelState>>&& agg_states)
+      : ExecNode(input->plan(), std::move(label), {input}, {"groupby"},
+                 std::move(output_schema), /*num_outputs=*/1),
+        ctx_(ctx),
+        key_field_ids_(std::move(key_field_ids)),
+        grouper_(std::move(grouper)),
+        agg_src_field_ids_(std::move(agg_src_field_ids)),
+        agg_kernels_(std::move(agg_kernels)),
+        agg_states_(std::move(agg_states)) {}
+
+  const char* kind_name() override { return "GroupByNode"; }
+
+  Status ProcessInputBatch(const ExecBatch& batch) {
+    // Create a batch with key columns
+    std::vector<Datum> keys(key_field_ids_.size());
+    for (size_t i = 0; i < key_field_ids_.size(); ++i) {
+      keys[i] = batch.values[key_field_ids_[i]];
+    }
+    ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, ExecBatch::Make(keys));
+
+    // Create a batch with group ids
+    ARROW_ASSIGN_OR_RAISE(Datum id_batch, grouper_->Consume(key_batch));
+
+    // Execute aggregate kernels
+    auto num_groups = grouper_->num_groups();
+    for (size_t i = 0; i < agg_kernels_.size(); ++i) {
+      KernelContext kernel_ctx{ctx_};
+      kernel_ctx.SetState(agg_states_[i].get());
+      ARROW_ASSIGN_OR_RAISE(
+          auto agg_batch, 
ExecBatch::Make({batch.values[agg_src_field_ids_[i]], id_batch,
+                                           Datum(num_groups)}));
+      RETURN_NOT_OK(agg_kernels_[i]->consume(&kernel_ctx, agg_batch));
+    }
+
+    return Status::OK();
+  }
+
+  Status OutputResult() {
+    // Finalize output
+    ArrayDataVector out_data(agg_kernels_.size() + key_field_ids_.size());
+    auto it = out_data.begin();
+
+    for (size_t i = 0; i < agg_kernels_.size(); ++i) {
+      KernelContext batch_ctx{ctx_};
+      batch_ctx.SetState(agg_states_[i].get());
+      Datum out;
+      RETURN_NOT_OK(agg_kernels_[i]->finalize(&batch_ctx, &out));
+      *it++ = out.array();
+    }
+
+    ARROW_ASSIGN_OR_RAISE(ExecBatch out_keys, grouper_->GetUniques());
+    for (const auto& key : out_keys.values) {
+      *it++ = key.array();
+    }
+
+    uint32_t num_groups = grouper_->num_groups();
+    int num_result_batches = (num_groups + output_batch_size_ - 1) / 
output_batch_size_;
+    outputs_[0]->InputFinished(this, num_result_batches);
+
+    for (int i = 0; i < num_result_batches; ++i) {
+      // Check finished flag
+      if (finished_) {

Review comment:
       ```suggestion
         if (stop_requested_) {
   ```

##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -565,5 +565,253 @@ AsyncGenerator<util::optional<ExecBatch>> 
MakeSinkNode(ExecNode* input,
   return out;
 }
 
+struct GroupByNode : ExecNode {
+  GroupByNode(ExecNode* input, std::string label, std::shared_ptr<Schema> 
output_schema,
+              ExecContext* ctx, const std::vector<int>&& key_field_ids,
+              std::unique_ptr<internal::Grouper>&& grouper,
+              const std::vector<int>&& agg_src_field_ids,
+              const std::vector<const HashAggregateKernel*>&& agg_kernels,
+              std::vector<std::unique_ptr<KernelState>>&& agg_states)
+      : ExecNode(input->plan(), std::move(label), {input}, {"groupby"},
+                 std::move(output_schema), /*num_outputs=*/1),
+        ctx_(ctx),
+        key_field_ids_(std::move(key_field_ids)),
+        grouper_(std::move(grouper)),
+        agg_src_field_ids_(std::move(agg_src_field_ids)),
+        agg_kernels_(std::move(agg_kernels)),
+        agg_states_(std::move(agg_states)) {}
+
+  const char* kind_name() override { return "GroupByNode"; }
+
+  Status ProcessInputBatch(const ExecBatch& batch) {
+    // Create a batch with key columns
+    std::vector<Datum> keys(key_field_ids_.size());
+    for (size_t i = 0; i < key_field_ids_.size(); ++i) {
+      keys[i] = batch.values[key_field_ids_[i]];
+    }
+    ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, ExecBatch::Make(keys));
+
+    // Create a batch with group ids
+    ARROW_ASSIGN_OR_RAISE(Datum id_batch, grouper_->Consume(key_batch));
+
+    // Execute aggregate kernels
+    auto num_groups = grouper_->num_groups();
+    for (size_t i = 0; i < agg_kernels_.size(); ++i) {
+      KernelContext kernel_ctx{ctx_};
+      kernel_ctx.SetState(agg_states_[i].get());
+      ARROW_ASSIGN_OR_RAISE(
+          auto agg_batch, 
ExecBatch::Make({batch.values[agg_src_field_ids_[i]], id_batch,
+                                           Datum(num_groups)}));
+      RETURN_NOT_OK(agg_kernels_[i]->consume(&kernel_ctx, agg_batch));
+    }
+
+    return Status::OK();
+  }
+
+  Status OutputResult() {
+    // Finalize output
+    ArrayDataVector out_data(agg_kernels_.size() + key_field_ids_.size());
+    auto it = out_data.begin();
+
+    for (size_t i = 0; i < agg_kernels_.size(); ++i) {
+      KernelContext batch_ctx{ctx_};
+      batch_ctx.SetState(agg_states_[i].get());
+      Datum out;
+      RETURN_NOT_OK(agg_kernels_[i]->finalize(&batch_ctx, &out));
+      *it++ = out.array();
+    }
+
+    ARROW_ASSIGN_OR_RAISE(ExecBatch out_keys, grouper_->GetUniques());
+    for (const auto& key : out_keys.values) {
+      *it++ = key.array();
+    }
+
+    uint32_t num_groups = grouper_->num_groups();
+    int num_result_batches = (num_groups + output_batch_size_ - 1) / 
output_batch_size_;
+    outputs_[0]->InputFinished(this, num_result_batches);
+
+    for (int i = 0; i < num_result_batches; ++i) {

Review comment:
       Eventually this will need to re-seed parallelism by sumbitting the calls 
to `outputs_[0]->InputReceived` to the Executor

##########
File path: cpp/src/arrow/compute/exec/exec_plan.cc
##########
@@ -565,5 +565,253 @@ AsyncGenerator<util::optional<ExecBatch>> 
MakeSinkNode(ExecNode* input,
   return out;
 }
 
+struct GroupByNode : ExecNode {
+  GroupByNode(ExecNode* input, std::string label, std::shared_ptr<Schema> 
output_schema,
+              ExecContext* ctx, const std::vector<int>&& key_field_ids,
+              std::unique_ptr<internal::Grouper>&& grouper,
+              const std::vector<int>&& agg_src_field_ids,
+              const std::vector<const HashAggregateKernel*>&& agg_kernels,
+              std::vector<std::unique_ptr<KernelState>>&& agg_states)
+      : ExecNode(input->plan(), std::move(label), {input}, {"groupby"},
+                 std::move(output_schema), /*num_outputs=*/1),
+        ctx_(ctx),
+        key_field_ids_(std::move(key_field_ids)),
+        grouper_(std::move(grouper)),
+        agg_src_field_ids_(std::move(agg_src_field_ids)),
+        agg_kernels_(std::move(agg_kernels)),
+        agg_states_(std::move(agg_states)) {}
+
+  const char* kind_name() override { return "GroupByNode"; }
+
+  Status ProcessInputBatch(const ExecBatch& batch) {
+    // Create a batch with key columns
+    std::vector<Datum> keys(key_field_ids_.size());
+    for (size_t i = 0; i < key_field_ids_.size(); ++i) {
+      keys[i] = batch.values[key_field_ids_[i]];
+    }
+    ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, ExecBatch::Make(keys));
+
+    // Create a batch with group ids
+    ARROW_ASSIGN_OR_RAISE(Datum id_batch, grouper_->Consume(key_batch));
+
+    // Execute aggregate kernels
+    auto num_groups = grouper_->num_groups();
+    for (size_t i = 0; i < agg_kernels_.size(); ++i) {
+      KernelContext kernel_ctx{ctx_};
+      kernel_ctx.SetState(agg_states_[i].get());
+      ARROW_ASSIGN_OR_RAISE(
+          auto agg_batch, 
ExecBatch::Make({batch.values[agg_src_field_ids_[i]], id_batch,
+                                           Datum(num_groups)}));
+      RETURN_NOT_OK(agg_kernels_[i]->consume(&kernel_ctx, agg_batch));
+    }
+
+    return Status::OK();
+  }
+
+  Status OutputResult() {
+    // Finalize output
+    ArrayDataVector out_data(agg_kernels_.size() + key_field_ids_.size());
+    auto it = out_data.begin();
+
+    for (size_t i = 0; i < agg_kernels_.size(); ++i) {
+      KernelContext batch_ctx{ctx_};
+      batch_ctx.SetState(agg_states_[i].get());
+      Datum out;
+      RETURN_NOT_OK(agg_kernels_[i]->finalize(&batch_ctx, &out));
+      *it++ = out.array();
+    }
+
+    ARROW_ASSIGN_OR_RAISE(ExecBatch out_keys, grouper_->GetUniques());
+    for (const auto& key : out_keys.values) {
+      *it++ = key.array();
+    }
+
+    uint32_t num_groups = grouper_->num_groups();
+    int num_result_batches = (num_groups + output_batch_size_ - 1) / 
output_batch_size_;
+    outputs_[0]->InputFinished(this, num_result_batches);
+
+    for (int i = 0; i < num_result_batches; ++i) {
+      // Check finished flag
+      if (finished_) {
+        break;
+      }
+
+      // Slice arrays
+      int64_t batch_start = i * output_batch_size_;
+      int64_t batch_length =
+          std::min(output_batch_size_, static_cast<int>(num_groups - 
batch_start));
+      std::vector<Datum> output_slices(out_data.size());
+      for (size_t out_field_id = 0; out_field_id < out_data.size(); 
++out_field_id) {
+        output_slices[out_field_id] =
+            out_data[out_field_id]->Slice(batch_start, batch_length);
+      }
+
+      ARROW_ASSIGN_OR_RAISE(ExecBatch output_batch, 
ExecBatch::Make(output_slices));
+      outputs_[0]->InputReceived(this, i, output_batch);
+    }
+
+    return Status::OK();
+  }
+
+  void InputReceived(ExecNode* input, int seq, ExecBatch batch) override {
+    DCHECK_EQ(input, inputs_[0]);
+
+    std::unique_lock<std::mutex> lock(mutex_);

Review comment:
       ```suggestion
       std::unique_lock<std::mutex> lock(mutex_);
   
       if (stop_requested_) {
         return;
       }
   ```

##########
File path: cpp/src/arrow/compute/exec/exec_plan.h
##########
@@ -264,5 +265,13 @@ ARROW_EXPORT
 Result<ExecNode*> MakeProjectNode(ExecNode* input, std::string label,
                                   std::vector<Expression> exprs);
 
+/// \brief Make a node which groups input rows based on key fields and computes
+/// aggregates for each group
+Result<ExecNode*> MakeGroupByNode(ExecNode* input, std::string label,
+                                  std::vector<std::string> keys,

Review comment:
       ```suggestion
                                     std::vector<FieldRef> keys,
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to