westonpace commented on code in PR #34311: URL: https://github.com/apache/arrow/pull/34311#discussion_r1119133819
########## cpp/src/arrow/compute/exec/aggregate_node.cc: ########## @@ -16,9 +16,11 @@ // under the License. #include <mutex> +#include <shared_mutex> Review Comment: ```suggestion ``` ########## cpp/src/arrow/compute/exec/aggregate_node.cc: ########## @@ -326,46 +438,77 @@ class ScalarAggregateNode : public ExecNode, public TracedNode { } private: - Status Finish() { - auto scope = TraceFinish(); + Status ResetAggregates() { + auto exec_ctx = plan()->query_context()->exec_context(); + for (size_t i = 0; i < kernels_.size(); ++i) { + const std::vector<TypeHolder>& in_types = in_typesets_[i]; + states_[i].resize(plan()->query_context()->max_concurrency()); + KernelContext kernel_ctx{exec_ctx}; + RETURN_NOT_OK(Kernel::InitAll( + &kernel_ctx, KernelInitArgs{kernels_[i], in_types, aggs_[i].options.get()}, + &states_[i])); + } + return Status::OK(); + } + + Status OutputResult(bool is_last = false) { ExecBatch batch{{}, 1}; - batch.values.resize(kernels_.size()); + batch.values.resize(kernels_.size() + segment_field_ids_.size()); for (size_t i = 0; i < kernels_.size(); ++i) { util::tracing::Span span; START_COMPUTE_SPAN(span, aggs_[i].function, {{"function.name", aggs_[i].function}, {"function.options", aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"}, - {"function.kind", std::string(kind_name()) + "::Finalize"}}); + {"function.kind", std::string(kind_name()) + "::Output"}}); Review Comment: Hmm, maybe keep the old name here? This is tracking how much time is spent in the two kernel calls `MergeAll` and `Finalize` so I think `::Finalize` is more accurate. ########## cpp/src/arrow/compute/exec/aggregate_node.cc: ########## @@ -16,9 +16,11 @@ // under the License. #include <mutex> +#include <shared_mutex> Review Comment: ```suggestion ``` ########## cpp/src/arrow/compute/exec/aggregate_node.cc: ########## @@ -584,29 +778,50 @@ class GroupByNode : public ExecNode, public TracedNode { ARROW_ASSIGN_OR_RAISE(out_data_, Finalize()); int64_t num_output_batches = bit_util::CeilDiv(out_data_.length, output_batch_size()); - RETURN_NOT_OK(output_->InputFinished(this, static_cast<int>(num_output_batches))); - return plan_->query_context()->StartTaskGroup(output_task_group_id_, - num_output_batches); + total_output_batches_ += static_cast<int>(num_output_batches); + if (is_last) { + ARROW_RETURN_NOT_OK(output_->InputFinished(this, total_output_batches_)); + RETURN_NOT_OK(plan_->query_context()->StartTaskGroup(output_task_group_id_, + num_output_batches)); + } else { + for (int64_t i = 0; i < num_output_batches; i++) { + ARROW_RETURN_NOT_OK(OutputNthBatch(i)); Review Comment: This is a little bizarre (though not introduced by you). If we are outputting an Nth batch we just output it immediately. However, when outputting the final batch we divide it into 32k chunks and schedule tasks for each one. I think, longer term, we will want to implement accumulation for segmented group-by, since some segments may be quite small (and we might only output a few rows per segment depending on the grouping keys). We have some prototype for this in `ExecBatchBuilder` (in `light_array.h`). However, let's definitely save for a follow-up. Do you want to create an issue or should I? ########## cpp/src/arrow/compute/exec/options.h: ########## @@ -199,21 +199,39 @@ class ARROW_EXPORT ProjectNodeOptions : public ExecNodeOptions { std::vector<std::string> names; }; -/// \brief Make a node which aggregates input batches, optionally grouped by keys. +/// \brief Make a node which aggregates input batches, optionally grouped by keys and +/// optionally segmented by segment-keys. Both keys and segment-keys determine the group. +/// However segment-keys are also used for determining grouping segments, which should be +/// large, and allow streaming a partial aggregation result after processing each segment. Review Comment: Perhaps, if implement accumulation of groups, we could remove the "which should be large" from the advice here? ########## cpp/src/arrow/compute/exec/aggregate_node.cc: ########## @@ -326,46 +438,77 @@ class ScalarAggregateNode : public ExecNode, public TracedNode { } private: - Status Finish() { - auto scope = TraceFinish(); + Status ResetAggregates() { + auto exec_ctx = plan()->query_context()->exec_context(); + for (size_t i = 0; i < kernels_.size(); ++i) { + const std::vector<TypeHolder>& in_types = in_typesets_[i]; + states_[i].resize(plan()->query_context()->max_concurrency()); + KernelContext kernel_ctx{exec_ctx}; + RETURN_NOT_OK(Kernel::InitAll( + &kernel_ctx, KernelInitArgs{kernels_[i], in_types, aggs_[i].options.get()}, + &states_[i])); + } + return Status::OK(); + } + + Status OutputResult(bool is_last = false) { ExecBatch batch{{}, 1}; - batch.values.resize(kernels_.size()); + batch.values.resize(kernels_.size() + segment_field_ids_.size()); for (size_t i = 0; i < kernels_.size(); ++i) { util::tracing::Span span; START_COMPUTE_SPAN(span, aggs_[i].function, {{"function.name", aggs_[i].function}, {"function.options", aggs_[i].options ? aggs_[i].options->ToString() : "<NULLPTR>"}, - {"function.kind", std::string(kind_name()) + "::Finalize"}}); + {"function.kind", std::string(kind_name()) + "::Output"}}); KernelContext ctx{plan()->query_context()->exec_context()}; ARROW_ASSIGN_OR_RAISE(auto merged, ScalarAggregateKernel::MergeAll( kernels_[i], &ctx, std::move(states_[i]))); RETURN_NOT_OK(kernels_[i]->finalize(&ctx, &batch.values[i])); } + PlaceFields(batch, kernels_.size(), segmenter_values_); - return output_->InputReceived(this, std::move(batch)); + ARROW_RETURN_NOT_OK(output_->InputReceived(this, std::move(batch))); + total_output_batches_++; + if (is_last) { + ARROW_RETURN_NOT_OK(output_->InputFinished(this, total_output_batches_)); + } else { + ARROW_RETURN_NOT_OK(ResetAggregates()); + } + return Status::OK(); Review Comment: ```suggestion if (is_last) { return output_->InputFinished(this, total_output_batches_); } return ResetAggregates(); ``` Minor nit: it's just a touch more compact. ########## cpp/src/arrow/compute/row/grouper.cc: ########## @@ -39,12 +43,336 @@ namespace arrow { using internal::checked_cast; +using internal::PrimitiveScalarBase; namespace compute { namespace { -struct GrouperImpl : Grouper { +constexpr uint32_t kNoGroupId = std::numeric_limits<uint32_t>::max(); + +using group_id_t = std::remove_const<decltype(kNoGroupId)>::type; +using GroupIdType = CTypeTraits<group_id_t>::ArrowType; +auto group_id_type = std::make_shared<GroupIdType>(); Review Comment: ```suggestion const std::shared_ptr<DataType>& group_id_type() { static auto instance = std::make_shared<GroupIdType>(); return instance; } ``` Just a general aversion to global state. However, if you want to keep it then it's probably ok. Maybe just rename to `g_group_id_type`? ########## cpp/src/arrow/compute/exec/options.h: ########## @@ -199,21 +199,39 @@ class ARROW_EXPORT ProjectNodeOptions : public ExecNodeOptions { std::vector<std::string> names; }; -/// \brief Make a node which aggregates input batches, optionally grouped by keys. +/// \brief Make a node which aggregates input batches, optionally grouped by keys and +/// optionally segmented by segment-keys. Both keys and segment-keys determine the group. +/// However segment-keys are also used for determining grouping segments, which should be +/// large, and allow streaming a partial aggregation result after processing each segment. +/// One common use-case for segment-keys is ordered aggregation, in which the segment-key +/// attribute specifies a column with non-decreasing values or a lexigographically-ordered Review Comment: ```suggestion /// attribute specifies a column with non-decreasing values or a lexicographically-ordered ``` ########## cpp/src/arrow/compute/row/grouper.h: ########## @@ -30,6 +30,69 @@ namespace arrow { namespace compute { +/// \brief A segment of contiguous rows for grouping +struct ARROW_EXPORT GroupingSegment { + /// \brief the offset into the batch where the segment starts + int64_t offset; + /// \brief the length of the segment + int64_t length; + /// \brief whether the segment may be extended by a next one + bool is_open; + /// \brief whether the segment extends a preceeding one + bool extends; +}; + +inline bool operator==(const GroupingSegment& segment1, const GroupingSegment& segment2) { + return segment1.offset == segment2.offset && segment1.length == segment2.length && + segment1.is_open == segment2.is_open && segment1.extends == segment2.extends; +} +inline bool operator!=(const GroupingSegment& segment1, const GroupingSegment& segment2) { + return !(segment1 == segment2); +} Review Comment: Is this used anywhere today? It might be useful to have for unit tests so I don't think we should get rid of it. Just checking my understanding. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org