> Li gave a quick suggestion with blocking queues which I'm using now, though I 
> heard that approach is not suitable for production due to thread blocking.

I think it's a reasonable workaround if you have a relatively small
number of sources.

> 1) I'm assuming here that L(N) and R(N) just represent the batches needed to 
> materialize the Nth batch for Asof and should be followed by the sink?

I think so.  L(N) is the nth batch from the left source.  sink N
represents delivering the joined & materialized batch to the consumer
(for a normal sink node this means dropping the batch into a queue.
For a consuming_sink node this includes the work of the consumer
itself).

> 2) Is there supposed to be an I/O thread for L5, calling Source L5?

Yes, good catch.

> 3) On Pause/Resume, what (ideally) happens to the I/O thread attempting to 
> schedule a task on the source node? For example, after pausing in L4, what 
> happens to the thread handling L5? Can it switch tasks, e.g fulfill a batch 
> from some right hand table instead?

Yes.  If a source is paused then there will be no new I/O tasks for
that source.  Once existing I/O tasks wrap up those I/O threads will
instead focus on any unpaused sources.

>     In our blocking queues approach, I think the I/O thread calling 
> InputReceived on AsOf will be blocked, which means we can potentially 
> deadlock if the number of right hand tables being joined is greater than the 
> number of I/O threads available. Was just wondering if the scenario would be 
> avoided in the Event Loop implementation.

Yes, that deadlock would be possible.  In the event loop
implementation this would not be a problem.  A call to resume
producing would schedule a task (add it to the back of the queue) and
return immediately.  That task (the one that called resume producing)
would then finish whatever else it needs to do and the next task in
the queue would run.  A call to pause producing would just set a flag
on the source node and return immediately.

On Fri, Jul 22, 2022 at 1:52 PM Ivan Chau <ivan.c...@twosigma.com> wrote:
>
> Thanks Weston -- the first diagram indeed makes it look like backpressure 
> will be a headache. Li gave a quick suggestion with blocking queues which I'm 
> using now, though I heard that approach is not suitable for production due to 
> thread blocking.
>
> Some questions about your event loop diagram (the second attachment):
>
> 1) I'm assuming here that L(N) and R(N) just represent the batches needed to 
> materialize the Nth batch for Asof and should be followed by the sink?
> 2) Is there supposed to be an I/O thread for L5, calling Source L5?
> 3) On Pause/Resume, what (ideally) happens to the I/O thread attempting to 
> schedule a task on the source node? For example, after pausing in L4, what 
> happens to the thread handling L5? Can it switch tasks, e.g fulfill a batch 
> from some right hand table instead?
>     In our blocking queues approach, I think the I/O thread calling 
> InputReceived on AsOf will be blocked, which means we can potentially 
> deadlock if the number of right hand tables being joined is greater than the 
> number of I/O threads available. Was just wondering if the scenario would be 
> avoided in the Event Loop implementation.
>
> Ivan
>
>
> -----Original Message-----
> From: Weston Pace <weston.p...@gmail.com>
> Sent: Friday, July 22, 2022 3:40 PM
> To: dev@arrow.apache.org
> Subject: Re: [C++] ResumeProducing Future Causing Blocking
>
> Probably attachments don't work with the ML.  Here are links to my drive:
>
> Current Serial:
> https://drive.google.com/file/d/1URbDAqXVSYfHJixzHA00CI1f0aedms8W/view?usp=sharing
> Proposed Serial:
> https://drive.google.com/file/d/1JpQiIVaGAL9mrderkid5uf888zGx21fO/view?usp=sharing
>
> On Fri, Jul 22, 2022 at 12:32 PM Ivan Chau <ivan.c...@twosigma.com> wrote:
> >
> > Hi Weston,
> >
> > Not sure if the diagrams came through here -- is there some other place I 
> > need to view them?
> >
> > Ivan
> >
> > -----Original Message-----
> > From: Weston Pace <weston.p...@gmail.com>
> > Sent: Thursday, July 21, 2022 10:59 PM
> > To: dev@arrow.apache.org
> > Subject: Re: [C++] ResumeProducing Future Causing Blocking
> >
> > > Do you have any suggestions for a temporary workaround?
> >
> > I don't have any great ideas at the moment but I will continue to think it 
> > over.  Unfortunately, this is somewhat tied up with the ordering issue.  
> > Details...
> >
> > Our current implementation of "serial execution" is "executor==nullptr" and 
> > that makes it really hard to do a lot of things, for example asynchronous 
> > I/O (in that case we just cheat by running the I/O in the I/O thread pool 
> > anyways, so its not truly serial).  When we want to "resume producing on 
> > source X" what we really would like to do is "add a task to the scheduler 
> > to process the next batch on source X".  However, since we don't have an 
> > executor, we can't do that.  Our only choice is to hijack the calling 
> > thread and resume immediately.  However, this makes for very confusing 
> > scheduling.  It works ok for the very simple plans we process today but, 
> > for more complicated things like backpressure, it gets a bit silly, as 
> > you've observed.
> >
> > A better way to do "serial execution" is to use an event loop.  When you 
> > want to "resume producing on source X" you "add a task tot he scheduler to 
> > process the next batch on source X".  This task goes at the end of the 
> > event loop and then you return control to the caller of resume producing.  
> > Once that caller is done you can go back to the event queue.  We have an 
> > event loop, it is called the SerialExecutor, and I came very close to 
> > switching the exec plan over to it in [1] but failed because of a 
> > limitation in the scanner [2].  Fixing this mess is my personal top 
> > priority but I haven't been able to get enough time to sit down and do it 
> > for a few months.  I'm hoping I will have this time after this week when 
> > 9.0.0 wraps up.
> >
> > > I tried a solution by making the ResumeProducing call with a
> > > fire-and-forget std::thread, but this doesn't seem like a great
> > > idea; It gets pretty hairy when things are getting deconstructed at
> > > the end of execution (it seems like ResumeProducing still has some
> > > invalid reads despite checking finished() on the source nodes), and
> > > I'm assuming we don't want to spin up more threads anyway. I also
> > > looked into the plan's executor (ScheduleTask, etc.), but I believe
> > > this waits for the task to > complete, so it causes blocking in the 
> > > processing.
> >
> > This isn't all that bad of an idea.  The plan's executor doesn't exist
> > (executor==nullptr) and so ScheduleTask does indeed just run the task 
> > immediately.  A slightly more "integrated" way of doing this would be to 
> > spin up a task to call resume producing on the I/O thread pool.
> > This will help limit the total number of threads which is important for 
> > things like thread local state.  Maybe something like...
> >
> > ```
> > ARROW_ASSIGN_OR_RAISE(Future<> task_lifetime,
> > plan()->BeginExternalTask()); if (task_lifetime.is_valid()) {
> > ::arrow::io::default_io_context().executor()->Spawn([this,
> > task_lifetime] () mutable {
> >   inputs()[0]->ResumeProducing(this, counter_); }); } else {
> >   // Plan has been aborted, just return and don't worry about resuming
> > producing } ```
> >
> > The main downside here is that this is probably going to introduce 
> > parallelism back into the plan and mess up your ordering.  The thread that 
> > submits this task is going to return, and then it is going to grab the next 
> > batch from whatever unpaused source you have, and this will run in parallel 
> > with the resume task, which continues.
> >
> > Another workaround is to just enable parallel execution.  Backpressure is a 
> > lot more manageable then.  You can submit your resume producing task using 
> > ScheduleTask.  Or, even better, we should probably change source node so 
> > that it schedules a new task for the backpressure resumption automatically. 
> >  I've created [3] to track this.  Of course, parallel execution introduces 
> > out of order processing, which if I understand, is a problem.
> >
> > As promised, I'm attaching some example diagrams.  The first shows the
> > status quo, it's pretty messy.  The second shows how things should
> > work in the solutions proposed in
> > https://github.com/apache/arrow/pull/12468
> >
> > I will try and make an example diagram for threaded execution (executor != 
> > nullptr) tomorrow and also make some diagrams on how sequencing might be 
> > tackled.
> >
> > [1] https://github.com/apache/arrow/pull/12468
> > [2] https://issues.apache.org/jira/browse/ARROW-16072
> > [3] https://issues.apache.org/jira/browse/ARROW-17180
> >
> > On Thu, Jul 21, 2022 at 2:31 PM Ivan Chau <ivan.c...@twosigma.com> wrote:
> > >
> > > It seems like for this to work currently, we would want to call 
> > > ResumeProducing on the source nodes, but outside of the processing thread.
> > >
> > > I tried a solution by making the ResumeProducing call with a 
> > > fire-and-forget std::thread, but this doesn't seem like a great idea; It 
> > > gets pretty hairy when things are getting deconstructed at the end of 
> > > execution (it seems like ResumeProducing still has some invalid reads 
> > > despite checking finished() on the source nodes), and I'm assuming we 
> > > don't want to spin up more threads anyway. I also looked into the plan's 
> > > executor (ScheduleTask, etc.), but I believe this waits for the task to 
> > > complete, so it causes blocking in the processing.
> > >
> > > Do you have any suggestions for a temporary workaround?
> > >
> > > Ivan
> > >
> > > -----Original Message-----
> > > From: Ivan Chau <ivan.c...@twosigma.com>
> > > Sent: Thursday, July 21, 2022 9:28 AM
> > > To: dev@arrow.apache.org
> > > Subject: RE: [C++] ResumeProducing Future Causing Blocking
> > >
> > > Thanks Sasha and Weston! The diagrams would be helpful!
> > >
> > > Would the new first class support in the scheduler be something similar 
> > > to what's available currently in BackpressureMonitor? We are looking to 
> > > implement some more custom backpressure schemes that depend on batch 
> > > ordering/completion rather than memory size.
> > >
> > > Ivan
> > >
> > > -----Original Message-----
> > > From: Weston Pace <weston.p...@gmail.com>
> > > Sent: Wednesday, July 20, 2022 8:31 PM
> > > To: dev@arrow.apache.org
> > > Subject: Re: [C++] ResumeProducing Future Causing Blocking
> > >
> > > > 4) control is not returned to the processing thread
> > >
> > > Yes, it looks like the current implementation does not return control to 
> > > the processing thread, but I think this is correct, or at least "as 
> > > designed".  The thread will be used to continue iterating the source.
> > >
> > > > control is not returned to the processing thread, and instead
> > > > blocks when marking the backpressure_future_ as finished.
> > >
> > > As Sasha said, the call to "MarkFinished" will then run callbacks.
> > > One of those callbacks (the only one in this case) then continues to 
> > > iterate from the source, doing the work that was originally started by 
> > > the call to StartProducing.
> > >
> > > Generally, the code will only go so far before it creates a new thread 
> > > task and then control will eventually return.  However, if you are 
> > > running without an executor, then there are no thread tasks, all 
> > > callbacks run immediately in the thread calling mark finished, and it can 
> > > be rather hard to understand the logic.  I'll try and draw up some 
> > > sequence diagrams as an example for this and Li Jin's earlier question 
> > > regarding ordering.
> > >
> > > > Anyway, I wouldn’t currently rely on PauseProducing/ResumeProducing 
> > > > unfortunately. It is currently not tested anywhere as far as I can tell 
> > > > and ignored by a lot of nodes (such as HashJoinNode).
> > >
> > > We do test, and rely, on the PauseProducing/ResumeProducing
> > > mechanics to implement back-pressure for the datasets API.  This
> > > limits plans to
> > > scan->filter->project->sink and all of these nodes have been tested
> > > scan->filter->project->to
> > > accurately work with backpressure.  I think you're free to experiment 
> > > with it.  I agree however, that backpressure could maybe be a more minor 
> > > concern until some of the scheduler improvements are available.
> > >
> > >
> > > On Wed, Jul 20, 2022 at 3:13 PM Sasha Krassovsky 
> > > <krassovskysa...@gmail.com> wrote:
> > > >
> > > > Hi,
> > > > Futures run callbacks on the thread that marks then as finished. It 
> > > > seems that inside of the Source node’s generator loop does add a 
> > > > callback 
> > > > (https://github.com/iChauster/arrow/blob/asof_join2/cpp/src/arrow/compute/exec/source_node.cc#L130
> > > >  
> > > > <https://github.com/iChauster/arrow/blob/asof_join2/cpp/src/arrow/compute/exec/source_node.cc#L130>)
> > > >  which continues the loop. I’m not entirely sure myself how this code 
> > > > works (this generator + control flow thing is very opaque), but my 
> > > > guess is that’s what’s causing it.
> > > >
> > > > One further note, copying a Future<> actually maintains a reference to 
> > > > the same underlying future, which may also be unexpected at first. 
> > > > Specifically in your code, doing Future<> to_finish = 
> > > > backpressure_future_; to_finish.MarkFinished(); is equivalent to just 
> > > > backpressure_future_.MarkFinished().
> > > >
> > > > Anyway, I wouldn’t currently rely on PauseProducing/ResumeProducing 
> > > > unfortunately. It is currently not tested anywhere as far as I can tell 
> > > > and ignored by a lot of nodes (such as HashJoinNode). Michal and I have 
> > > > some work in progress involving a new scheduler with first-class 
> > > > support for back pressure.
> > > >
> > > > Sasha
> > > >
> > > > > On Jul 20, 2022, at 1:49 PM, Ivan Chau <ivan.m.c...@gmail.com> wrote:
> > > > >
> > > > > backpressure_future_
> > > >

Reply via email to