icexelloss commented on code in PR #34904:
URL: https://github.com/apache/arrow/pull/34904#discussion_r1161707254
##########
cpp/src/arrow/acero/aggregate_node.cc:
##########
@@ -665,14 +682,47 @@ class GroupByNode : public ExecNode, public TracedNode {
}
base += segment_keys.size();
for (size_t i = 0; i < aggs.size(); ++i) {
- output_fields[base + i] =
- agg_result_fields[i]->WithName(aggregate_options.aggregates[i].name);
+ output_fields[base + i] = agg_result_fields[i]->WithName(aggs[i].name);
}
+ return
AggregateNodeArgs<HashAggregateKernel>{schema(std::move(output_fields)),
+ std::move(key_field_ids),
+
std::move(segment_key_field_ids),
+ std::move(segmenter),
+ std::move(agg_src_fieldsets),
+ std::move(aggs),
+ std::move(agg_kernels),
+ std::move(agg_src_types),
+ {}};
+ }
+
+ static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+ const ExecNodeOptions& options) {
+ RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "GroupByNode"));
+
+ auto input = inputs[0];
+ const auto& aggregate_options = checked_cast<const
AggregateNodeOptions&>(options);
+ const auto& keys = aggregate_options.keys;
+ const auto& segment_keys = aggregate_options.segment_keys;
+ // Copy (need to modify options pointer below)
+ auto aggs = aggregate_options.aggregates;
+
+ if (plan->query_context()->exec_context()->executor()->GetCapacity() > 1 &&
+ segment_keys.size() > 0) {
+ return Status::NotImplemented("Segmented aggregation in a multi-threaded
plan");
Review Comment:
nit: "multi-threaded plan" -> "multi-threaded execution context"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]