westonpace commented on code in PR #34311:
URL: https://github.com/apache/arrow/pull/34311#discussion_r1119133819


##########
cpp/src/arrow/compute/exec/aggregate_node.cc:
##########
@@ -16,9 +16,11 @@
 // under the License.
 
 #include <mutex>
+#include <shared_mutex>

Review Comment:
   ```suggestion
   ```



##########
cpp/src/arrow/compute/exec/aggregate_node.cc:
##########
@@ -326,46 +438,77 @@ class ScalarAggregateNode : public ExecNode, public 
TracedNode {
   }
 
  private:
-  Status Finish() {
-    auto scope = TraceFinish();
+  Status ResetAggregates() {
+    auto exec_ctx = plan()->query_context()->exec_context();
+    for (size_t i = 0; i < kernels_.size(); ++i) {
+      const std::vector<TypeHolder>& in_types = in_typesets_[i];
+      states_[i].resize(plan()->query_context()->max_concurrency());
+      KernelContext kernel_ctx{exec_ctx};
+      RETURN_NOT_OK(Kernel::InitAll(
+          &kernel_ctx, KernelInitArgs{kernels_[i], in_types, 
aggs_[i].options.get()},
+          &states_[i]));
+    }
+    return Status::OK();
+  }
+
+  Status OutputResult(bool is_last = false) {
     ExecBatch batch{{}, 1};
-    batch.values.resize(kernels_.size());
+    batch.values.resize(kernels_.size() + segment_field_ids_.size());
 
     for (size_t i = 0; i < kernels_.size(); ++i) {
       util::tracing::Span span;
       START_COMPUTE_SPAN(span, aggs_[i].function,
                          {{"function.name", aggs_[i].function},
                           {"function.options",
                            aggs_[i].options ? aggs_[i].options->ToString() : 
"<NULLPTR>"},
-                          {"function.kind", std::string(kind_name()) + 
"::Finalize"}});
+                          {"function.kind", std::string(kind_name()) + 
"::Output"}});

Review Comment:
   Hmm, maybe keep the old name here?  This is tracking how much time is spent 
in the two kernel calls `MergeAll` and `Finalize` so I think `::Finalize` is 
more accurate.



##########
cpp/src/arrow/compute/exec/aggregate_node.cc:
##########
@@ -16,9 +16,11 @@
 // under the License.
 
 #include <mutex>
+#include <shared_mutex>

Review Comment:
   ```suggestion
   ```



##########
cpp/src/arrow/compute/exec/aggregate_node.cc:
##########
@@ -584,29 +778,50 @@ class GroupByNode : public ExecNode, public TracedNode {
     ARROW_ASSIGN_OR_RAISE(out_data_, Finalize());
 
     int64_t num_output_batches = bit_util::CeilDiv(out_data_.length, 
output_batch_size());
-    RETURN_NOT_OK(output_->InputFinished(this, 
static_cast<int>(num_output_batches)));
-    return plan_->query_context()->StartTaskGroup(output_task_group_id_,
-                                                  num_output_batches);
+    total_output_batches_ += static_cast<int>(num_output_batches);
+    if (is_last) {
+      ARROW_RETURN_NOT_OK(output_->InputFinished(this, total_output_batches_));
+      
RETURN_NOT_OK(plan_->query_context()->StartTaskGroup(output_task_group_id_,
+                                                           
num_output_batches));
+    } else {
+      for (int64_t i = 0; i < num_output_batches; i++) {
+        ARROW_RETURN_NOT_OK(OutputNthBatch(i));

Review Comment:
   This is a little bizarre (though not introduced by you).  If we are 
outputting an Nth batch we just output it immediately.  However, when 
outputting the final batch we divide it into 32k chunks and schedule tasks for 
each one.  I think, longer term, we will want to implement accumulation for 
segmented group-by, since some segments may be quite small (and we might only 
output a few rows per segment depending on the grouping keys).
   
   We have some prototype for this in `ExecBatchBuilder` (in `light_array.h`).  
However, let's definitely save for a follow-up.  Do you want to create an issue 
or should I?



##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -199,21 +199,39 @@ class ARROW_EXPORT ProjectNodeOptions : public 
ExecNodeOptions {
   std::vector<std::string> names;
 };
 
-/// \brief Make a node which aggregates input batches, optionally grouped by 
keys.
+/// \brief Make a node which aggregates input batches, optionally grouped by 
keys and
+/// optionally segmented by segment-keys. Both keys and segment-keys determine 
the group.
+/// However segment-keys are also used for determining grouping segments, 
which should be
+/// large, and allow streaming a partial aggregation result after processing 
each segment.

Review Comment:
   Perhaps, if implement accumulation of groups, we could remove the "which 
should be large" from the advice here?



##########
cpp/src/arrow/compute/exec/aggregate_node.cc:
##########
@@ -326,46 +438,77 @@ class ScalarAggregateNode : public ExecNode, public 
TracedNode {
   }
 
  private:
-  Status Finish() {
-    auto scope = TraceFinish();
+  Status ResetAggregates() {
+    auto exec_ctx = plan()->query_context()->exec_context();
+    for (size_t i = 0; i < kernels_.size(); ++i) {
+      const std::vector<TypeHolder>& in_types = in_typesets_[i];
+      states_[i].resize(plan()->query_context()->max_concurrency());
+      KernelContext kernel_ctx{exec_ctx};
+      RETURN_NOT_OK(Kernel::InitAll(
+          &kernel_ctx, KernelInitArgs{kernels_[i], in_types, 
aggs_[i].options.get()},
+          &states_[i]));
+    }
+    return Status::OK();
+  }
+
+  Status OutputResult(bool is_last = false) {
     ExecBatch batch{{}, 1};
-    batch.values.resize(kernels_.size());
+    batch.values.resize(kernels_.size() + segment_field_ids_.size());
 
     for (size_t i = 0; i < kernels_.size(); ++i) {
       util::tracing::Span span;
       START_COMPUTE_SPAN(span, aggs_[i].function,
                          {{"function.name", aggs_[i].function},
                           {"function.options",
                            aggs_[i].options ? aggs_[i].options->ToString() : 
"<NULLPTR>"},
-                          {"function.kind", std::string(kind_name()) + 
"::Finalize"}});
+                          {"function.kind", std::string(kind_name()) + 
"::Output"}});
       KernelContext ctx{plan()->query_context()->exec_context()};
       ARROW_ASSIGN_OR_RAISE(auto merged, ScalarAggregateKernel::MergeAll(
                                              kernels_[i], &ctx, 
std::move(states_[i])));
       RETURN_NOT_OK(kernels_[i]->finalize(&ctx, &batch.values[i]));
     }
+    PlaceFields(batch, kernels_.size(), segmenter_values_);
 
-    return output_->InputReceived(this, std::move(batch));
+    ARROW_RETURN_NOT_OK(output_->InputReceived(this, std::move(batch)));
+    total_output_batches_++;
+    if (is_last) {
+      ARROW_RETURN_NOT_OK(output_->InputFinished(this, total_output_batches_));
+    } else {
+      ARROW_RETURN_NOT_OK(ResetAggregates());
+    }
+    return Status::OK();

Review Comment:
   ```suggestion
       if (is_last) {
         return output_->InputFinished(this, total_output_batches_);
       }
       return ResetAggregates();
   ```
   
   Minor nit: it's just a touch more compact.



##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -39,12 +43,336 @@
 namespace arrow {
 
 using internal::checked_cast;
+using internal::PrimitiveScalarBase;
 
 namespace compute {
 
 namespace {
 
-struct GrouperImpl : Grouper {
+constexpr uint32_t kNoGroupId = std::numeric_limits<uint32_t>::max();
+
+using group_id_t = std::remove_const<decltype(kNoGroupId)>::type;
+using GroupIdType = CTypeTraits<group_id_t>::ArrowType;
+auto group_id_type = std::make_shared<GroupIdType>();

Review Comment:
   ```suggestion
   const std::shared_ptr<DataType>& group_id_type() {
     static auto instance = std::make_shared<GroupIdType>();
     return instance;
   }
   ```
   
   Just a general aversion to global state.  However, if you want to keep it 
then it's probably ok.  Maybe just rename to `g_group_id_type`?



##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -199,21 +199,39 @@ class ARROW_EXPORT ProjectNodeOptions : public 
ExecNodeOptions {
   std::vector<std::string> names;
 };
 
-/// \brief Make a node which aggregates input batches, optionally grouped by 
keys.
+/// \brief Make a node which aggregates input batches, optionally grouped by 
keys and
+/// optionally segmented by segment-keys. Both keys and segment-keys determine 
the group.
+/// However segment-keys are also used for determining grouping segments, 
which should be
+/// large, and allow streaming a partial aggregation result after processing 
each segment.
+/// One common use-case for segment-keys is ordered aggregation, in which the 
segment-key
+/// attribute specifies a column with non-decreasing values or a 
lexigographically-ordered

Review Comment:
   ```suggestion
   /// attribute specifies a column with non-decreasing values or a 
lexicographically-ordered
   ```



##########
cpp/src/arrow/compute/row/grouper.h:
##########
@@ -30,6 +30,69 @@
 namespace arrow {
 namespace compute {
 
+/// \brief A segment of contiguous rows for grouping
+struct ARROW_EXPORT GroupingSegment {
+  /// \brief the offset into the batch where the segment starts
+  int64_t offset;
+  /// \brief the length of the segment
+  int64_t length;
+  /// \brief whether the segment may be extended by a next one
+  bool is_open;
+  /// \brief whether the segment extends a preceeding one
+  bool extends;
+};
+
+inline bool operator==(const GroupingSegment& segment1, const GroupingSegment& 
segment2) {
+  return segment1.offset == segment2.offset && segment1.length == 
segment2.length &&
+         segment1.is_open == segment2.is_open && segment1.extends == 
segment2.extends;
+}
+inline bool operator!=(const GroupingSegment& segment1, const GroupingSegment& 
segment2) {
+  return !(segment1 == segment2);
+}

Review Comment:
   Is this used anywhere today?  It might be useful to have for unit tests so I 
don't think we should get rid of it.  Just checking my understanding.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to