Hi! Since the scheduler improvement work came up in some recent discussions about how backpresures are handled in Acero, I am curious if there has been any more progress on this since May or any future plans?
Thanks, Li On Mon, May 23, 2022 at 10:37 PM Weston Pace <weston.p...@gmail.com> wrote: > > About point 2. I have previously seen the pipeline prioritization you've > > described with both sides running simultaneously. My experience was not > > good with that approach and one side's first approach was much better. > > This is good insight, I appreciate it. I hope we can run this kind of > experiment when we create the scheduler. From my read of the > scheduler doc it looks like we can probably tweak the generator > priority. > > > About these points, if you are using queues, won't they automatically > slow > > down? > > It's a non-blocking queue so when the capacity is hit a backpressure > signal is sent towards the source and yes, we eventually slow down. > This means the capacity is really a "soft capacity" and we may exceed > it by some small amount. > > > Even if we need > > back-pressure join-like operations will break the graph into sections > where > > we don't have to consider backpressure across these boundaries > > This is possible, we can play around with it. However, we are working > on adding spillover support to joins. So the path downstream of the > join could still be asynchronous as we might be reading (slowly) from > the spilled partitions. > > We do have a few places where we utilize a synchronous parallel-for > without backpressure. The scheduler doc refers to these as > non-pipeline tasks. For example, once we've got all the build data we > synchronously divide and conquer to probe any queued batches. > > On Sat, May 21, 2022 at 5:08 AM Supun Kamburugamuve <su...@apache.org> > wrote: > > > > Thanks, Weston. Now I understand a little deeper. > > > > About point 2. I have previously seen the pipeline prioritization you've > > described with both sides running simultaneously. My experience was not > > good with that approach and one side's first approach was much better. > > > > About these points, if you are using queues, won't they automatically > slow > > down? > > > > Further for joins, we need to read the complete tables and apply some > > operations like partitioning before they can do the actual join. Also, > join > > is a synchronous operation across the threads (partitions). So we have to > > synchronize the tasks/threads (doing the join in parallel) at that time > and > > downstream tasks cannot really start working until everything is read up > to > > the join and they start producing some partitions. Even if we need > > back-pressure join-like operations will break the graph into sections > where > > we don't have to consider backpressure across these boundaries > > > > Best, > > Supun. > > > > > > > > > > > > > > > > > > > > On Fri, May 20, 2022 at 9:08 PM Weston Pace <weston.p...@gmail.com> > wrote: > > > > > I think I understand what you are saying now. There are a few > > > different things going on that lead to a need for backpressure. > > > > > > 1. Write backpressure > > > > > > We support asynchronous writes. Filesystems like AWS want many > > > parallel I/O operations. On a two core system you might have 8 > > > different parallel writes. So we do exactly what you described and we > > > create a queue with a defined capacity. When that queue fills up we > > > stop reading. > > > > > > 2. Pipeline prioritization > > > > > > The method you are describing requires that we prioritize the order of > > > pipeline processing. For example, in a join node, we would fully > > > process the shorter build side and then process the probe side in a > > > streaming fashion. We don't currently do that (and it is a separate > > > discussion whether we would even want to). Instead we read from both > > > sides simultaneously. If the build side read takes too long then we > > > need to pause reading from the probe side. > > > > > > 3. Non-typical operators > > > > > > The as-of join node is an interesting operator. It can have many > > > different inputs and It requires us to read from all inputs at roughly > > > the same rate. However, if some inputs are denser than other inputs > > > (e.g. more rows per time) then we need to pause the other inputs while > > > we catch up on the long tail. > > > > > > On Fri, May 20, 2022 at 5:46 PM Supun Kamburugamuve <su...@apache.org> > > > wrote: > > > > > > > > Hi Sasha, > > > > > > > > For case 2, I don't see why we need a back-pressure mechanism. Lets > say > > > > there is an IO thread. All we need is a queue with a defined capacity > > > that > > > > feeds data from IO thread to the Read task. > > > > > > > > Supun.. > > > > > > > > On Fri, May 20, 2022 at 8:25 PM Sasha Krassovsky < > > > krassovskysa...@gmail.com> > > > > wrote: > > > > > > > > > Hi Supun, > > > > > Roughly what happens now is #2. However, in your example, it may > be the > > > > > case that we are reading CSV data from disk faster than we are > > > transcoding > > > > > it into Parquet and writing it. Note that we attempt to use the > full > > > disk > > > > > bandwidth and assign batches to cores once the reads are done, so > > > ideally a > > > > > core is never blocked on IO. In other words, even if we have 2 only > > > cores, > > > > > we may kick off 100 batch reads and process them when the read > > > completes > > > > > and a core is available. This is where backpressure is needed: to > > > prevent > > > > > us from having this huge number of reads piling up and filling up > > > memory > > > > > faster than we can process. > > > > > > > > > > Sasha Krassovsky > > > > > > > > > > > 20 мая 2022 г., в 20:09, Supun Kamburugamuve <su...@apache.org> > > > > > написал(а): > > > > > > > > > > > > > > > > > > > > > > > -- > > > > Supun Kamburugamuve > > > > > > > > > -- > > Supun Kamburugamuve >