rtpsw commented on code in PR #34311:
URL: https://github.com/apache/arrow/pull/34311#discussion_r1125648242


##########
cpp/src/arrow/compute/exec/aggregate_node.cc:
##########
@@ -584,29 +775,50 @@ class GroupByNode : public ExecNode, public TracedNode {
     ARROW_ASSIGN_OR_RAISE(out_data_, Finalize());
 
     int64_t num_output_batches = bit_util::CeilDiv(out_data_.length, 
output_batch_size());
-    RETURN_NOT_OK(output_->InputFinished(this, 
static_cast<int>(num_output_batches)));
-    return plan_->query_context()->StartTaskGroup(output_task_group_id_,
-                                                  num_output_batches);
+    total_output_batches_ += static_cast<int>(num_output_batches);
+    if (is_last) {
+      ARROW_RETURN_NOT_OK(output_->InputFinished(this, total_output_batches_));
+      
RETURN_NOT_OK(plan_->query_context()->StartTaskGroup(output_task_group_id_,
+                                                           
num_output_batches));
+    } else {
+      for (int64_t i = 0; i < num_output_batches; i++) {
+        ARROW_RETURN_NOT_OK(OutputNthBatch(i));
+      }
+      ARROW_RETURN_NOT_OK(ResetAggregates());
+    }
+    return Status::OK();
   }
 
   Status InputReceived(ExecNode* input, ExecBatch batch) override {
     auto scope = TraceInputReceived(batch);
 
     DCHECK_EQ(input, inputs_[0]);
 
-    ARROW_RETURN_NOT_OK(Consume(ExecSpan(batch)));
+    auto handler = [this](const ExecBatch& full_batch, const GroupingSegment& 
segment) {
+      if (!segment.extends && segment.offset == 0) 
RETURN_NOT_OK(OutputResult());
+      auto exec_batch = full_batch.Slice(segment.offset, segment.length);
+      auto batch = ExecSpan(exec_batch);
+      RETURN_NOT_OK(Consume(batch));
+      RETURN_NOT_OK(
+          GetScalarFields(&segmenter_values_, exec_batch, 
segment_key_field_ids_));

Review Comment:
   Whether it is specific to segmented aggregation is a matter of view; the 
implementation isn't but the use is. Regardless of view, I think this is a 
minor point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to