Hi!

Since the scheduler improvement work came up in some recent discussions
about how backpresures are handled in Acero, I am curious if there has been
any more progress on this since May or any future plans?

Thanks,
Li

On Mon, May 23, 2022 at 10:37 PM Weston Pace <weston.p...@gmail.com> wrote:

> > About point 2. I have previously seen the pipeline prioritization you've
> > described with both sides running simultaneously. My experience was not
> > good with that approach and one side's first approach was much better.
>
> This is good insight, I appreciate it.  I hope we can run this kind of
> experiment when we create the scheduler.  From my read of the
> scheduler doc it looks like we can probably tweak the generator
> priority.
>
> > About these points, if you are using queues, won't they automatically
> slow
> > down?
>
> It's a non-blocking queue so when the capacity is hit a backpressure
> signal is sent towards the source and yes, we eventually slow down.
> This means the capacity is really a "soft capacity" and we may exceed
> it by some small amount.
>
> > Even if we need
> > back-pressure join-like operations will break the graph into sections
> where
> > we don't have to consider backpressure across these boundaries
>
> This is possible, we can play around with it.  However, we are working
> on adding spillover support to joins.  So the path downstream of the
> join could still be asynchronous as we might be reading (slowly) from
> the spilled partitions.
>
> We do have a few places where we utilize a synchronous parallel-for
> without backpressure.  The scheduler doc refers to these as
> non-pipeline tasks.  For example, once we've got all the build data we
> synchronously divide and conquer to probe any queued batches.
>
> On Sat, May 21, 2022 at 5:08 AM Supun Kamburugamuve <su...@apache.org>
> wrote:
> >
> > Thanks, Weston. Now I understand a little deeper.
> >
> > About point 2. I have previously seen the pipeline prioritization you've
> > described with both sides running simultaneously. My experience was not
> > good with that approach and one side's first approach was much better.
> >
> > About these points, if you are using queues, won't they automatically
> slow
> > down?
> >
> > Further for joins, we need to read the complete tables and apply some
> > operations like partitioning before they can do the actual join. Also,
> join
> > is a synchronous operation across the threads (partitions). So we have to
> > synchronize the tasks/threads (doing the join in parallel) at that time
> and
> > downstream tasks cannot really start working until everything is read up
> to
> > the join and they start producing some partitions. Even if we need
> > back-pressure join-like operations will break the graph into sections
> where
> > we don't have to consider backpressure across these boundaries
> >
> > Best,
> > Supun.
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > On Fri, May 20, 2022 at 9:08 PM Weston Pace <weston.p...@gmail.com>
> wrote:
> >
> > > I think I understand what you are saying now.  There are a few
> > > different things going on that lead to a need for backpressure.
> > >
> > > 1. Write backpressure
> > >
> > > We support asynchronous writes.  Filesystems like AWS want many
> > > parallel I/O operations.  On a two core system you might have 8
> > > different parallel writes.  So we do exactly what you described and we
> > > create a queue with a defined capacity.  When that queue fills up we
> > > stop reading.
> > >
> > > 2. Pipeline prioritization
> > >
> > > The method you are describing requires that we prioritize the order of
> > > pipeline processing.  For example, in a join node, we would fully
> > > process the shorter build side and then process the probe side in a
> > > streaming fashion.  We don't currently do that (and it is a separate
> > > discussion whether we would even want to).  Instead we read from both
> > > sides simultaneously.  If the build side read takes too long then we
> > > need to pause reading from the probe side.
> > >
> > > 3. Non-typical operators
> > >
> > > The as-of join node is an interesting operator.  It can have many
> > > different inputs and It requires us to read from all inputs at roughly
> > > the same rate.  However, if some inputs are denser than other inputs
> > > (e.g. more rows per time) then we need to pause the other inputs while
> > > we catch up on the long tail.
> > >
> > > On Fri, May 20, 2022 at 5:46 PM Supun Kamburugamuve <su...@apache.org>
> > > wrote:
> > > >
> > > > Hi Sasha,
> > > >
> > > > For case 2, I don't see why we need a back-pressure mechanism. Lets
> say
> > > > there is an IO thread. All we need is a queue with a defined capacity
> > > that
> > > > feeds data from IO thread to the Read task.
> > > >
> > > > Supun..
> > > >
> > > > On Fri, May 20, 2022 at 8:25 PM Sasha Krassovsky <
> > > krassovskysa...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Supun,
> > > > > Roughly what happens now is #2. However, in your example, it may
> be the
> > > > > case that we are reading CSV data from disk faster than we are
> > > transcoding
> > > > > it into Parquet and writing it. Note that we attempt to use the
> full
> > > disk
> > > > > bandwidth and assign batches to cores once the reads are done, so
> > > ideally a
> > > > > core is never blocked on IO. In other words, even if we have 2 only
> > > cores,
> > > > > we may kick off 100 batch reads and process them when the read
> > > completes
> > > > > and a core is available. This is where backpressure is needed: to
> > > prevent
> > > > > us from having this huge number of reads piling up and filling up
> > > memory
> > > > > faster than we can process.
> > > > >
> > > > > Sasha Krassovsky
> > > > >
> > > > > > 20 мая 2022 г., в 20:09, Supun Kamburugamuve <su...@apache.org>
> > > > > написал(а):
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Supun Kamburugamuve
> > >
> >
> >
> > --
> > Supun Kamburugamuve
>

Reply via email to