save-buffer commented on code in PR #13143:
URL: https://github.com/apache/arrow/pull/13143#discussion_r882909683


##########
cpp/src/arrow/compute/exec/aggregate_node.cc:
##########
@@ -508,47 +519,24 @@ class GroupByNode : public ExecNode {
     std::move(out_keys.values.begin(), out_keys.values.end(),
               out_data.values.begin() + agg_kernels_.size());
     state->grouper.reset();
-
-    if (output_counter_.SetTotal(
-            static_cast<int>(bit_util::CeilDiv(out_data.length, 
output_batch_size())))) {
-      // this will be hit if out_data.length == 0
-      finished_.MarkFinished();
-    }
     return out_data;
   }
 
-  void OutputNthBatch(int n) {
+  void OutputNthBatch(int64_t n) {
     // bail if StopProducing was called
     if (finished_.is_finished()) return;
 
     int64_t batch_size = output_batch_size();
     outputs_[0]->InputReceived(this, out_data_.Slice(batch_size * n, 
batch_size));
-
-    if (output_counter_.Increment()) {
-      finished_.MarkFinished();
-    }
   }
 
   Status OutputResult() {
     RETURN_NOT_OK(Merge());
     ARROW_ASSIGN_OR_RAISE(out_data_, Finalize());
 
-    int num_output_batches = *output_counter_.total();
-    outputs_[0]->InputFinished(this, num_output_batches);
-
-    auto executor = ctx_->executor();
-    for (int i = 0; i < num_output_batches; ++i) {
-      if (executor) {
-        // bail if StopProducing was called
-        if (finished_.is_finished()) break;
-
-        auto plan = this->plan()->shared_from_this();
-        RETURN_NOT_OK(executor->Spawn([plan, this, i] { OutputNthBatch(i); }));
-      } else {
-        OutputNthBatch(i);
-      }
-    }
-
+    int64_t num_output_batches = bit_util::CeilDiv(out_data_.length, 
output_batch_size());
+    outputs_[0]->InputFinished(this, static_cast<int>(num_output_batches));

Review Comment:
   Yes, it's why we have that `AtomicCounter` thingy that's supposed to only 
run any cleanup if its number of received batches is equal to the number of 
batches specified in `InputFinished`. I guess the role of `InputFinished` is 
really to just say how many batches there will be total, not to have any sort 
of guarantees of "thou shalt not receive batches after `InputFinished` is 
called"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to