bkietz commented on a change in pull request #10660: URL: https://github.com/apache/arrow/pull/10660#discussion_r664575727
########## File path: cpp/src/arrow/compute/exec/exec_plan.cc ########## @@ -565,5 +565,253 @@ AsyncGenerator<util::optional<ExecBatch>> MakeSinkNode(ExecNode* input, return out; } +struct GroupByNode : ExecNode { + GroupByNode(ExecNode* input, std::string label, std::shared_ptr<Schema> output_schema, + ExecContext* ctx, const std::vector<int>&& key_field_ids, + std::unique_ptr<internal::Grouper>&& grouper, + const std::vector<int>&& agg_src_field_ids, + const std::vector<const HashAggregateKernel*>&& agg_kernels, + std::vector<std::unique_ptr<KernelState>>&& agg_states) + : ExecNode(input->plan(), std::move(label), {input}, {"groupby"}, + std::move(output_schema), /*num_outputs=*/1), + ctx_(ctx), + key_field_ids_(std::move(key_field_ids)), + grouper_(std::move(grouper)), + agg_src_field_ids_(std::move(agg_src_field_ids)), + agg_kernels_(std::move(agg_kernels)), + agg_states_(std::move(agg_states)) {} + + const char* kind_name() override { return "GroupByNode"; } + + Status ProcessInputBatch(const ExecBatch& batch) { + // Create a batch with key columns + std::vector<Datum> keys(key_field_ids_.size()); + for (size_t i = 0; i < key_field_ids_.size(); ++i) { + keys[i] = batch.values[key_field_ids_[i]]; + } + ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, ExecBatch::Make(keys)); + + // Create a batch with group ids + ARROW_ASSIGN_OR_RAISE(Datum id_batch, grouper_->Consume(key_batch)); + + // Execute aggregate kernels + auto num_groups = grouper_->num_groups(); + for (size_t i = 0; i < agg_kernels_.size(); ++i) { + KernelContext kernel_ctx{ctx_}; + kernel_ctx.SetState(agg_states_[i].get()); + ARROW_ASSIGN_OR_RAISE( + auto agg_batch, ExecBatch::Make({batch.values[agg_src_field_ids_[i]], id_batch, + Datum(num_groups)})); + RETURN_NOT_OK(agg_kernels_[i]->consume(&kernel_ctx, agg_batch)); + } + + return Status::OK(); + } + + Status OutputResult() { + // Finalize output + ArrayDataVector out_data(agg_kernels_.size() + key_field_ids_.size()); + auto it = out_data.begin(); + + for (size_t i = 0; i < agg_kernels_.size(); ++i) { + KernelContext batch_ctx{ctx_}; + batch_ctx.SetState(agg_states_[i].get()); + Datum out; + RETURN_NOT_OK(agg_kernels_[i]->finalize(&batch_ctx, &out)); + *it++ = out.array(); + } + + ARROW_ASSIGN_OR_RAISE(ExecBatch out_keys, grouper_->GetUniques()); + for (const auto& key : out_keys.values) { + *it++ = key.array(); + } + + uint32_t num_groups = grouper_->num_groups(); + int num_result_batches = (num_groups + output_batch_size_ - 1) / output_batch_size_; + outputs_[0]->InputFinished(this, num_result_batches); + + for (int i = 0; i < num_result_batches; ++i) { + // Check finished flag + if (finished_) { Review comment: ```suggestion if (stop_requested_) { ``` ########## File path: cpp/src/arrow/compute/exec/exec_plan.cc ########## @@ -565,5 +565,253 @@ AsyncGenerator<util::optional<ExecBatch>> MakeSinkNode(ExecNode* input, return out; } +struct GroupByNode : ExecNode { + GroupByNode(ExecNode* input, std::string label, std::shared_ptr<Schema> output_schema, + ExecContext* ctx, const std::vector<int>&& key_field_ids, + std::unique_ptr<internal::Grouper>&& grouper, + const std::vector<int>&& agg_src_field_ids, + const std::vector<const HashAggregateKernel*>&& agg_kernels, + std::vector<std::unique_ptr<KernelState>>&& agg_states) + : ExecNode(input->plan(), std::move(label), {input}, {"groupby"}, + std::move(output_schema), /*num_outputs=*/1), + ctx_(ctx), + key_field_ids_(std::move(key_field_ids)), + grouper_(std::move(grouper)), + agg_src_field_ids_(std::move(agg_src_field_ids)), + agg_kernels_(std::move(agg_kernels)), + agg_states_(std::move(agg_states)) {} + + const char* kind_name() override { return "GroupByNode"; } + + Status ProcessInputBatch(const ExecBatch& batch) { + // Create a batch with key columns + std::vector<Datum> keys(key_field_ids_.size()); + for (size_t i = 0; i < key_field_ids_.size(); ++i) { + keys[i] = batch.values[key_field_ids_[i]]; + } + ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, ExecBatch::Make(keys)); + + // Create a batch with group ids + ARROW_ASSIGN_OR_RAISE(Datum id_batch, grouper_->Consume(key_batch)); + + // Execute aggregate kernels + auto num_groups = grouper_->num_groups(); + for (size_t i = 0; i < agg_kernels_.size(); ++i) { + KernelContext kernel_ctx{ctx_}; + kernel_ctx.SetState(agg_states_[i].get()); + ARROW_ASSIGN_OR_RAISE( + auto agg_batch, ExecBatch::Make({batch.values[agg_src_field_ids_[i]], id_batch, + Datum(num_groups)})); + RETURN_NOT_OK(agg_kernels_[i]->consume(&kernel_ctx, agg_batch)); + } + + return Status::OK(); + } + + Status OutputResult() { + // Finalize output + ArrayDataVector out_data(agg_kernels_.size() + key_field_ids_.size()); + auto it = out_data.begin(); + + for (size_t i = 0; i < agg_kernels_.size(); ++i) { + KernelContext batch_ctx{ctx_}; + batch_ctx.SetState(agg_states_[i].get()); + Datum out; + RETURN_NOT_OK(agg_kernels_[i]->finalize(&batch_ctx, &out)); + *it++ = out.array(); + } + + ARROW_ASSIGN_OR_RAISE(ExecBatch out_keys, grouper_->GetUniques()); + for (const auto& key : out_keys.values) { + *it++ = key.array(); + } + + uint32_t num_groups = grouper_->num_groups(); + int num_result_batches = (num_groups + output_batch_size_ - 1) / output_batch_size_; + outputs_[0]->InputFinished(this, num_result_batches); + + for (int i = 0; i < num_result_batches; ++i) { Review comment: Eventually this will need to re-seed parallelism by sumbitting the calls to `outputs_[0]->InputReceived` to the Executor ########## File path: cpp/src/arrow/compute/exec/exec_plan.cc ########## @@ -565,5 +565,253 @@ AsyncGenerator<util::optional<ExecBatch>> MakeSinkNode(ExecNode* input, return out; } +struct GroupByNode : ExecNode { + GroupByNode(ExecNode* input, std::string label, std::shared_ptr<Schema> output_schema, + ExecContext* ctx, const std::vector<int>&& key_field_ids, + std::unique_ptr<internal::Grouper>&& grouper, + const std::vector<int>&& agg_src_field_ids, + const std::vector<const HashAggregateKernel*>&& agg_kernels, + std::vector<std::unique_ptr<KernelState>>&& agg_states) + : ExecNode(input->plan(), std::move(label), {input}, {"groupby"}, + std::move(output_schema), /*num_outputs=*/1), + ctx_(ctx), + key_field_ids_(std::move(key_field_ids)), + grouper_(std::move(grouper)), + agg_src_field_ids_(std::move(agg_src_field_ids)), + agg_kernels_(std::move(agg_kernels)), + agg_states_(std::move(agg_states)) {} + + const char* kind_name() override { return "GroupByNode"; } + + Status ProcessInputBatch(const ExecBatch& batch) { + // Create a batch with key columns + std::vector<Datum> keys(key_field_ids_.size()); + for (size_t i = 0; i < key_field_ids_.size(); ++i) { + keys[i] = batch.values[key_field_ids_[i]]; + } + ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, ExecBatch::Make(keys)); + + // Create a batch with group ids + ARROW_ASSIGN_OR_RAISE(Datum id_batch, grouper_->Consume(key_batch)); + + // Execute aggregate kernels + auto num_groups = grouper_->num_groups(); + for (size_t i = 0; i < agg_kernels_.size(); ++i) { + KernelContext kernel_ctx{ctx_}; + kernel_ctx.SetState(agg_states_[i].get()); + ARROW_ASSIGN_OR_RAISE( + auto agg_batch, ExecBatch::Make({batch.values[agg_src_field_ids_[i]], id_batch, + Datum(num_groups)})); + RETURN_NOT_OK(agg_kernels_[i]->consume(&kernel_ctx, agg_batch)); + } + + return Status::OK(); + } + + Status OutputResult() { + // Finalize output + ArrayDataVector out_data(agg_kernels_.size() + key_field_ids_.size()); + auto it = out_data.begin(); + + for (size_t i = 0; i < agg_kernels_.size(); ++i) { + KernelContext batch_ctx{ctx_}; + batch_ctx.SetState(agg_states_[i].get()); + Datum out; + RETURN_NOT_OK(agg_kernels_[i]->finalize(&batch_ctx, &out)); + *it++ = out.array(); + } + + ARROW_ASSIGN_OR_RAISE(ExecBatch out_keys, grouper_->GetUniques()); + for (const auto& key : out_keys.values) { + *it++ = key.array(); + } + + uint32_t num_groups = grouper_->num_groups(); + int num_result_batches = (num_groups + output_batch_size_ - 1) / output_batch_size_; + outputs_[0]->InputFinished(this, num_result_batches); + + for (int i = 0; i < num_result_batches; ++i) { + // Check finished flag + if (finished_) { + break; + } + + // Slice arrays + int64_t batch_start = i * output_batch_size_; + int64_t batch_length = + std::min(output_batch_size_, static_cast<int>(num_groups - batch_start)); + std::vector<Datum> output_slices(out_data.size()); + for (size_t out_field_id = 0; out_field_id < out_data.size(); ++out_field_id) { + output_slices[out_field_id] = + out_data[out_field_id]->Slice(batch_start, batch_length); + } + + ARROW_ASSIGN_OR_RAISE(ExecBatch output_batch, ExecBatch::Make(output_slices)); + outputs_[0]->InputReceived(this, i, output_batch); + } + + return Status::OK(); + } + + void InputReceived(ExecNode* input, int seq, ExecBatch batch) override { + DCHECK_EQ(input, inputs_[0]); + + std::unique_lock<std::mutex> lock(mutex_); Review comment: ```suggestion std::unique_lock<std::mutex> lock(mutex_); if (stop_requested_) { return; } ``` ########## File path: cpp/src/arrow/compute/exec/exec_plan.h ########## @@ -264,5 +265,13 @@ ARROW_EXPORT Result<ExecNode*> MakeProjectNode(ExecNode* input, std::string label, std::vector<Expression> exprs); +/// \brief Make a node which groups input rows based on key fields and computes +/// aggregates for each group +Result<ExecNode*> MakeGroupByNode(ExecNode* input, std::string label, + std::vector<std::string> keys, Review comment: ```suggestion std::vector<FieldRef> keys, ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org