westonpace commented on code in PR #34311:
URL: https://github.com/apache/arrow/pull/34311#discussion_r1119120080
##########
cpp/src/arrow/compute/kernels/hash_aggregate_test.cc:
##########
@@ -135,22 +141,84 @@ Result<Datum> NaiveGroupBy(std::vector<Datum> arguments,
std::vector<Datum> keys
return Take(struct_arr, sorted_indices);
}
+Result<Datum> MakeGroupByOutput(const std::vector<ExecBatch>& output_batches,
+ const std::shared_ptr<Schema> output_schema,
+ size_t num_aggregates, size_t num_keys, bool
naive) {
+ ArrayVector out_arrays(num_aggregates + num_keys);
+ for (size_t i = 0; i < out_arrays.size(); ++i) {
+ std::vector<std::shared_ptr<Array>> arrays(output_batches.size());
+ for (size_t j = 0; j < output_batches.size(); ++j) {
+ arrays[j] = output_batches[j].values[i].make_array();
+ }
+ if (arrays.empty()) {
+ ARROW_ASSIGN_OR_RAISE(
+ out_arrays[i],
+ MakeArrayOfNull(output_schema->field(static_cast<int>(i))->type(),
+ /*length=*/0));
+ } else {
+ ARROW_ASSIGN_OR_RAISE(out_arrays[i], Concatenate(arrays));
+ }
+ }
+
+ ARROW_ASSIGN_OR_RAISE(
+ std::shared_ptr<Array> struct_arr,
+ StructArray::Make(std::move(out_arrays), output_schema->fields()));
+
+ bool need_sort = !naive;
+ for (size_t i = num_aggregates; need_sort && i < out_arrays.size(); i++) {
+ if (output_schema->field(i)->type()->id() == Type::DICTIONARY) {
Review Comment:
Windows compilers will also emit a warning in this case. We could suppress
that warning but I think it is useful to be explicit and should help remind the
author to at least do a basic sanity check that an overflow is not going to
happen here.
##########
cpp/src/arrow/compute/exec/aggregate_node.cc:
##########
@@ -169,35 +186,117 @@ void AggregatesToString(std::stringstream* ss, const
Schema& input_schema,
*ss << ']';
}
+template <typename BatchHandler>
+Status HandleSegments(std::unique_ptr<GroupingSegmenter>& segmenter,
+ const ExecBatch& batch, const std::vector<int>& ids,
+ const BatchHandler& handle_batch) {
+ int64_t offset = 0;
+ ARROW_ASSIGN_OR_RAISE(auto segment_exec_batch, batch.SelectValues(ids));
+ ExecSpan segment_batch(segment_exec_batch);
+ while (true) {
+ ARROW_ASSIGN_OR_RAISE(auto segment,
segmenter->GetNextSegment(segment_batch, offset));
+ if (segment.offset >= segment_batch.length) break; // condition of
no-next-segment
+ ARROW_RETURN_NOT_OK(handle_batch(batch, segment));
+ offset = segment.offset + segment.length;
+ }
+ return Status::OK();
+}
+
+Status GetScalarFields(std::vector<Datum>* values_ptr, const ExecBatch&
input_batch,
+ const std::vector<int>& field_ids) {
+ DCHECK_GT(input_batch.length, 0);
+ std::vector<Datum>& values = *values_ptr;
+ int64_t row = input_batch.length - 1;
+ values.clear();
+ values.resize(field_ids.size());
+ for (size_t i = 0; i < field_ids.size(); i++) {
+ const Datum& value = input_batch.values[field_ids[i]];
+ if (value.is_scalar()) {
+ values[i] = value;
+ } else if (value.is_array()) {
+ ARROW_ASSIGN_OR_RAISE(auto scalar, value.make_array()->GetScalar(row));
+ values[i] = scalar;
+ } else {
+ DCHECK(false);
+ }
+ }
+ return Status::OK();
+}
+
+void PlaceFields(ExecBatch& batch, size_t base, std::vector<Datum>& values) {
+ DCHECK_LE(base + values.size(), batch.values.size());
+ for (size_t i = 0; i < values.size(); i++) {
+ batch.values[base + i] = values[i];
+ }
+}
+
class ScalarAggregateNode : public ExecNode, public TracedNode {
public:
ScalarAggregateNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
std::shared_ptr<Schema> output_schema,
+ std::unique_ptr<GroupingSegmenter> segmenter,
+ std::vector<int> segment_field_ids,
std::vector<std::vector<int>> target_fieldsets,
std::vector<Aggregate> aggs,
std::vector<const ScalarAggregateKernel*> kernels,
std::vector<std::vector<std::unique_ptr<KernelState>>>
states)
: ExecNode(plan, std::move(inputs), {"target"},
/*output_schema=*/std::move(output_schema)),
TracedNode(this),
+ segmenter_(std::move(segmenter)),
+ segment_field_ids_(std::move(segment_field_ids)),
target_fieldsets_(std::move(target_fieldsets)),
aggs_(std::move(aggs)),
kernels_(std::move(kernels)),
- states_(std::move(states)) {}
+ states_(std::move(states)) {
+ const auto& input_schema = *this->inputs()[0]->output_schema();
+ for (size_t i = 0; i < kernels_.size(); ++i) {
+ std::vector<TypeHolder> in_types;
+ for (const auto& target : target_fieldsets_[i]) {
+ in_types.emplace_back(input_schema.field(target)->type().get());
+ }
+ in_typesets_.push_back(std::move(in_types));
+ }
+ }
static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1,
"ScalarAggregateNode"));
const auto& aggregate_options = checked_cast<const
AggregateNodeOptions&>(options);
auto aggregates = aggregate_options.aggregates;
+ const auto& keys = aggregate_options.keys;
+ const auto& segment_keys = aggregate_options.segment_keys;
+
+ if (keys.size() > 0) {
+ return Status::Invalid("Scalar aggregation with some key");
+ }
+ if (plan->query_context()->exec_context()->executor()->GetCapacity() > 1 &&
+ segment_keys.size() > 0) {
+ return Status::NotImplemented("Segmented aggregation in a multi-threaded
plan");
+ }
const auto& input_schema = *inputs[0]->output_schema();
auto exec_ctx = plan->query_context()->exec_context();
+ std::vector<int> segment_field_ids(segment_keys.size());
+ std::vector<TypeHolder> segment_key_types(segment_keys.size());
+ for (size_t i = 0; i < segment_keys.size(); i++) {
+ ARROW_ASSIGN_OR_RAISE(auto match, segment_keys[i].FindOne(input_schema));
+ if (match.indices().size() > 1) {
+ // ARROW-18369: Support nested references as segment ids
+ return Status::Invalid("Nested references cannot be used as segment
ids");
+ }
+ segment_field_ids[i] = match[0];
+ segment_key_types[i] = input_schema.field(match[0])->type().get();
+ }
+
+ ARROW_ASSIGN_OR_RAISE(
+ auto segmenter, GroupingSegmenter::Make(std::move(segment_key_types),
exec_ctx));
Review Comment:
We are asserting here that the segment keys will not be null since
`nullable_keys` is defaulting to `false`. At the very least, we should
document this in `options.h`
On the other hand, why can't the fact grouping implementation tolerate
nulls? It seems we could pick some meaning for `null`. Either:
1. null represents a key of it's own
2. null means we don't know the value and we assume we are maintaining the
previous key
Do you know how groupby is handling null keys today (and by extension the
`AnyKeysGroupingSegmenter`)? I think it is `#1` but I could be mistaken. If
that is the case then it probably wouldn't be too much more complexity to
handle nulls in the same fashion in `SimpleKeyGroupingSegmenter`.
We need to document the constraint now. However, I'd be find deferring any
extended implementation for a follow-up.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]