Thanks, Weston. Now I understand a little deeper. About point 2. I have previously seen the pipeline prioritization you've described with both sides running simultaneously. My experience was not good with that approach and one side's first approach was much better.
About these points, if you are using queues, won't they automatically slow down? Further for joins, we need to read the complete tables and apply some operations like partitioning before they can do the actual join. Also, join is a synchronous operation across the threads (partitions). So we have to synchronize the tasks/threads (doing the join in parallel) at that time and downstream tasks cannot really start working until everything is read up to the join and they start producing some partitions. Even if we need back-pressure join-like operations will break the graph into sections where we don't have to consider backpressure across these boundaries Best, Supun. On Fri, May 20, 2022 at 9:08 PM Weston Pace <weston.p...@gmail.com> wrote: > I think I understand what you are saying now. There are a few > different things going on that lead to a need for backpressure. > > 1. Write backpressure > > We support asynchronous writes. Filesystems like AWS want many > parallel I/O operations. On a two core system you might have 8 > different parallel writes. So we do exactly what you described and we > create a queue with a defined capacity. When that queue fills up we > stop reading. > > 2. Pipeline prioritization > > The method you are describing requires that we prioritize the order of > pipeline processing. For example, in a join node, we would fully > process the shorter build side and then process the probe side in a > streaming fashion. We don't currently do that (and it is a separate > discussion whether we would even want to). Instead we read from both > sides simultaneously. If the build side read takes too long then we > need to pause reading from the probe side. > > 3. Non-typical operators > > The as-of join node is an interesting operator. It can have many > different inputs and It requires us to read from all inputs at roughly > the same rate. However, if some inputs are denser than other inputs > (e.g. more rows per time) then we need to pause the other inputs while > we catch up on the long tail. > > On Fri, May 20, 2022 at 5:46 PM Supun Kamburugamuve <su...@apache.org> > wrote: > > > > Hi Sasha, > > > > For case 2, I don't see why we need a back-pressure mechanism. Lets say > > there is an IO thread. All we need is a queue with a defined capacity > that > > feeds data from IO thread to the Read task. > > > > Supun.. > > > > On Fri, May 20, 2022 at 8:25 PM Sasha Krassovsky < > krassovskysa...@gmail.com> > > wrote: > > > > > Hi Supun, > > > Roughly what happens now is #2. However, in your example, it may be the > > > case that we are reading CSV data from disk faster than we are > transcoding > > > it into Parquet and writing it. Note that we attempt to use the full > disk > > > bandwidth and assign batches to cores once the reads are done, so > ideally a > > > core is never blocked on IO. In other words, even if we have 2 only > cores, > > > we may kick off 100 batch reads and process them when the read > completes > > > and a core is available. This is where backpressure is needed: to > prevent > > > us from having this huge number of reads piling up and filling up > memory > > > faster than we can process. > > > > > > Sasha Krassovsky > > > > > > > 20 мая 2022 г., в 20:09, Supun Kamburugamuve <su...@apache.org> > > > написал(а): > > > > > > > > > > > > > -- > > Supun Kamburugamuve > -- Supun Kamburugamuve