It seems like for this to work currently, we would want to call ResumeProducing 
on the source nodes, but outside of the processing thread.

I tried a solution by making the ResumeProducing call with a fire-and-forget 
std::thread, but this doesn't seem like a great idea; It gets pretty hairy when 
things are getting deconstructed at the end of execution (it seems like 
ResumeProducing still has some invalid reads despite checking finished() on the 
source nodes), and I'm assuming we don't want to spin up more threads anyway. I 
also looked into the plan's executor (ScheduleTask, etc.), but I believe this 
waits for the task to complete, so it causes blocking in the processing.

Do you have any suggestions for a temporary workaround?

Ivan

-----Original Message-----
From: Ivan Chau <ivan.c...@twosigma.com>
Sent: Thursday, July 21, 2022 9:28 AM
To: dev@arrow.apache.org
Subject: RE: [C++] ResumeProducing Future Causing Blocking

Thanks Sasha and Weston! The diagrams would be helpful!

Would the new first class support in the scheduler be something similar to 
what's available currently in BackpressureMonitor? We are looking to implement 
some more custom backpressure schemes that depend on batch ordering/completion 
rather than memory size.

Ivan

-----Original Message-----
From: Weston Pace <weston.p...@gmail.com>
Sent: Wednesday, July 20, 2022 8:31 PM
To: dev@arrow.apache.org
Subject: Re: [C++] ResumeProducing Future Causing Blocking

> 4) control is not returned to the processing thread

Yes, it looks like the current implementation does not return control to the 
processing thread, but I think this is correct, or at least "as designed".  The 
thread will be used to continue iterating the source.

> control is not returned to the processing thread, and instead blocks
> when marking the backpressure_future_ as finished.

As Sasha said, the call to "MarkFinished" will then run callbacks.
One of those callbacks (the only one in this case) then continues to iterate 
from the source, doing the work that was originally started by the call to 
StartProducing.

Generally, the code will only go so far before it creates a new thread task and 
then control will eventually return.  However, if you are running without an 
executor, then there are no thread tasks, all callbacks run immediately in the 
thread calling mark finished, and it can be rather hard to understand the 
logic.  I'll try and draw up some sequence diagrams as an example for this and 
Li Jin's earlier question regarding ordering.

> Anyway, I wouldn’t currently rely on PauseProducing/ResumeProducing 
> unfortunately. It is currently not tested anywhere as far as I can tell and 
> ignored by a lot of nodes (such as HashJoinNode).

We do test, and rely, on the PauseProducing/ResumeProducing mechanics to 
implement back-pressure for the datasets API.  This limits plans to
scan->filter->project->sink and all of these nodes have been tested to
accurately work with backpressure.  I think you're free to experiment with it.  
I agree however, that backpressure could maybe be a more minor concern until 
some of the scheduler improvements are available.


On Wed, Jul 20, 2022 at 3:13 PM Sasha Krassovsky <krassovskysa...@gmail.com> 
wrote:
>
> Hi,
> Futures run callbacks on the thread that marks then as finished. It seems 
> that inside of the Source node’s generator loop does add a callback 
> (https://github.com/iChauster/arrow/blob/asof_join2/cpp/src/arrow/compute/exec/source_node.cc#L130
>  
> <https://github.com/iChauster/arrow/blob/asof_join2/cpp/src/arrow/compute/exec/source_node.cc#L130>)
>  which continues the loop. I’m not entirely sure myself how this code works 
> (this generator + control flow thing is very opaque), but my guess is that’s 
> what’s causing it.
>
> One further note, copying a Future<> actually maintains a reference to the 
> same underlying future, which may also be unexpected at first. Specifically 
> in your code, doing Future<> to_finish = backpressure_future_; 
> to_finish.MarkFinished(); is equivalent to just 
> backpressure_future_.MarkFinished().
>
> Anyway, I wouldn’t currently rely on PauseProducing/ResumeProducing 
> unfortunately. It is currently not tested anywhere as far as I can tell and 
> ignored by a lot of nodes (such as HashJoinNode). Michal and I have some work 
> in progress involving a new scheduler with first-class support for back 
> pressure.
>
> Sasha
>
> > On Jul 20, 2022, at 1:49 PM, Ivan Chau <ivan.m.c...@gmail.com> wrote:
> >
> > backpressure_future_
>

Reply via email to