westonpace commented on code in PR #37839:
URL: https://github.com/apache/arrow/pull/37839#discussion_r1336267984
##########
cpp/src/arrow/acero/asof_join_node_test.cc:
##########
@@ -1474,9 +1477,16 @@ void TestBackpressure(BatchesMaker maker, int
num_batches, int batch_size) {
std::vector<Declaration::Input> bp_decls;
for (size_t i = 0; i < source_configs.size(); i++) {
const auto& config = source_configs[i];
-
- src_decls.emplace_back("source",
- SourceNodeOptions(config.schema,
GetGen(config.batches)));
+ if (config.is_delayed) {
Review Comment:
I assume this new option triggers the deadlock on the unfixed code?
##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -579,7 +581,16 @@ class BackpressureHandler {
}
}
+ Status ForceShutdown() {
+ // It may be unintuitive to call Resume() here, but this is to avoid a
deadlock.
+ // Since acero's executor won't terminate if any one node is paused, we
need to
+ // force resume the node before stopping production.
+ backpressure_control_->Resume();
+ return input_->StopProducing();
Review Comment:
So if I understand correctly this means we will call `StopProducing` on all
right hand side nodes once:
* The left hand side has finished
* The right hand side has caught up
If so, then I agree this is a valid thing to do.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]