rtpsw commented on code in PR #14352:
URL: https://github.com/apache/arrow/pull/14352#discussion_r1019650164
##########
cpp/src/arrow/compute/exec/aggregate_node.cc:
##########
@@ -132,12 +215,14 @@ class ScalarAggregateNode : public ExecNode {
return plan->EmplaceNode<ScalarAggregateNode>(
plan, std::move(inputs), schema(std::move(fields)),
std::move(target_field_ids),
- std::move(aggregates), std::move(kernels), std::move(states));
+ std::move(segment_field_ids), std::move(segmenter),
std::move(aggregates),
+ std::move(kernels), std::move(states));
}
const char* kind_name() const override { return "ScalarAggregateNode"; }
Status DoConsume(const ExecSpan& batch, size_t thread_index) {
+ GatedSharedLock lock(gated_shared_mutex_);
Review Comment:
Without these locks, I observed a race condition between the finishing of
one segment, which reads accumulated states, and the consuming of the next
segment, which accumulates states. The `GatedSharedLock` in the code ensure
that the (multi-threaded) consuming of the next segment does not run until the
finishing of the current segment completes. I can think of ways to reduce the
locking, but they would be more complex and are better left for a future task.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]