It seems like for this to work currently, we would want to call ResumeProducing on the source nodes, but outside of the processing thread.
I tried a solution by making the ResumeProducing call with a fire-and-forget std::thread, but this doesn't seem like a great idea; It gets pretty hairy when things are getting deconstructed at the end of execution (it seems like ResumeProducing still has some invalid reads despite checking finished() on the source nodes), and I'm assuming we don't want to spin up more threads anyway. I also looked into the plan's executor (ScheduleTask, etc.), but I believe this waits for the task to complete, so it causes blocking in the processing. Do you have any suggestions for a temporary workaround? Ivan -----Original Message----- From: Ivan Chau <ivan.c...@twosigma.com> Sent: Thursday, July 21, 2022 9:28 AM To: dev@arrow.apache.org Subject: RE: [C++] ResumeProducing Future Causing Blocking Thanks Sasha and Weston! The diagrams would be helpful! Would the new first class support in the scheduler be something similar to what's available currently in BackpressureMonitor? We are looking to implement some more custom backpressure schemes that depend on batch ordering/completion rather than memory size. Ivan -----Original Message----- From: Weston Pace <weston.p...@gmail.com> Sent: Wednesday, July 20, 2022 8:31 PM To: dev@arrow.apache.org Subject: Re: [C++] ResumeProducing Future Causing Blocking > 4) control is not returned to the processing thread Yes, it looks like the current implementation does not return control to the processing thread, but I think this is correct, or at least "as designed". The thread will be used to continue iterating the source. > control is not returned to the processing thread, and instead blocks > when marking the backpressure_future_ as finished. As Sasha said, the call to "MarkFinished" will then run callbacks. One of those callbacks (the only one in this case) then continues to iterate from the source, doing the work that was originally started by the call to StartProducing. Generally, the code will only go so far before it creates a new thread task and then control will eventually return. However, if you are running without an executor, then there are no thread tasks, all callbacks run immediately in the thread calling mark finished, and it can be rather hard to understand the logic. I'll try and draw up some sequence diagrams as an example for this and Li Jin's earlier question regarding ordering. > Anyway, I wouldn’t currently rely on PauseProducing/ResumeProducing > unfortunately. It is currently not tested anywhere as far as I can tell and > ignored by a lot of nodes (such as HashJoinNode). We do test, and rely, on the PauseProducing/ResumeProducing mechanics to implement back-pressure for the datasets API. This limits plans to scan->filter->project->sink and all of these nodes have been tested to accurately work with backpressure. I think you're free to experiment with it. I agree however, that backpressure could maybe be a more minor concern until some of the scheduler improvements are available. On Wed, Jul 20, 2022 at 3:13 PM Sasha Krassovsky <krassovskysa...@gmail.com> wrote: > > Hi, > Futures run callbacks on the thread that marks then as finished. It seems > that inside of the Source node’s generator loop does add a callback > (https://github.com/iChauster/arrow/blob/asof_join2/cpp/src/arrow/compute/exec/source_node.cc#L130 > > <https://github.com/iChauster/arrow/blob/asof_join2/cpp/src/arrow/compute/exec/source_node.cc#L130>) > which continues the loop. I’m not entirely sure myself how this code works > (this generator + control flow thing is very opaque), but my guess is that’s > what’s causing it. > > One further note, copying a Future<> actually maintains a reference to the > same underlying future, which may also be unexpected at first. Specifically > in your code, doing Future<> to_finish = backpressure_future_; > to_finish.MarkFinished(); is equivalent to just > backpressure_future_.MarkFinished(). > > Anyway, I wouldn’t currently rely on PauseProducing/ResumeProducing > unfortunately. It is currently not tested anywhere as far as I can tell and > ignored by a lot of nodes (such as HashJoinNode). Michal and I have some work > in progress involving a new scheduler with first-class support for back > pressure. > > Sasha > > > On Jul 20, 2022, at 1:49 PM, Ivan Chau <ivan.m.c...@gmail.com> wrote: > > > > backpressure_future_ >