If the amount of batch data you are processing is larger than the RAM
on the system then back pressure is needed.  A common use case is
dataset repartitioning.  If you are repartitioning a large (e.g.
300GB) dataset from CSV to parquet then the bottleneck will typically
be the "write" stage.  Backpressure must be applied or else the system
will run out of RAM.

I'm also not sure I would describe the engine as a "batch processing
engine".  I think the C++ engine operates at a lower level than a
typical Spark vs. Hadoop (e.g. batch vs. streaming) abstraction.  A
streaming application and a batch application could both make use of
the C++ engine.  If I had to pick one then I would pick a "streaming
engine" because we do process data in a streaming fashion.

On Fri, May 20, 2022 at 4:14 PM Supun Kamburugamuve <su...@apache.org> wrote:
>
> Looking at the proposal I couldn't understand why there is a need for
> back-pressure handling. My understanding of the Arrow C++ engine is that it
> is meant to process batch data. So I couldn't think of why we need to
> handle back-pressure as it is normally needed in streaming engines.
>
> Best,
> Supun.;
>
> On Thu, May 12, 2022 at 1:14 PM Andrew Lamb <al...@influxdata.com> wrote:
>
> > Thank you for sharing this document.
> >
> > Raphael Taylor-Davies is working on a similar exercise  scheduling
> > execution for DataFusion plans. The design doc[1] and initial PR [2] may be
> > an interesting reference.
> >
> > In the DataFusion case we were trying to improve performance in a few ways:
> > 1. Within a pipeline (same definition as in C++ proposal) consume a batch
> > that was produced in the same thread if possible
> > 2. Restrict parallelism by the number of available workers rather than the
> > plan structure (e.g. if reading 100 parquet files, with 8 workers, don't
> > start reading all of them at once)
> > 3. Segregate pools used  to do async IO and CPU bound work within the same
> > plan execution
> >
> > I think the C++ proposal would achieve 1, but it isn't clear to me that it
> > would achieve 2 (though I will admit to not fully understanding it) and I
> > don't know about 3
> >
> > While there are many similarities with what is described in the C++
> > proposal, I would say the Rust implementation is significantly less
> > complicated than what I think is described. In particular:
> > * There is no notion of generators
> > * There is no notion of internal tasks (the operators themselves are single
> > threaded and the parallelism is created by generating batches in parallel
> > * The scheduler logic is run directly by the worker threads (rather than a
> > separate thread with message queues) as the operators produce each new
> > batch
> >
> > Andrew
> >
> > [1]
> >
> > https://docs.google.com/document/d/1txX60thXn1tQO1ENNT8rwfU3cXLofa7ZccnvP4jD6AA/edit#
> > [2] https://github.com/apache/arrow-datafusion/pull/2226
> >
> >
> >
> > On Thu, May 12, 2022 at 3:24 PM Li Jin <ice.xell...@gmail.com> wrote:
> >
> > > Thanks Wes and Michal.
> > >
> > > We have similar concern about the current eager-push control flow with
> > time
> > > series / ordered data processing and am glad that we are not the only one
> > > thinking about this.
> > >
> > > I have read the doc and so far just left some questions to make sure I
> > > understand the proposal (admittedly the generator concept is somewhat new
> > > to me) and also thinking about it in the context of streaming ordered
> > data
> > > processing.
> > >
> > > Excited to see where this goes,
> > > Li
> > >
> > > On Wed, May 11, 2022 at 6:43 PM Wes McKinney <wesmck...@gmail.com>
> > wrote:
> > >
> > > > I talked about these problems with my colleague Michal Nowakiewicz who
> > > > has been developing some of the C++ engine implementation over the
> > > > last year and a half, and he wrote up this document with some ideas
> > > > about task scheduling and control flow in the query engine for
> > > > everyone to look at and comment:
> > > >
> > > >
> > > >
> > >
> > https://docs.google.com/document/d/1216CUQZ7u4acZvC2jX7juqqQCXtdXMellk3lRrgP_WY/edit#
> > > >
> > > > Feedback also welcome from the Rust developers to compare/contrast
> > > > with how DataFusion works
> > > >
> > > > On Tue, May 3, 2022 at 1:05 AM Weston Pace <weston.p...@gmail.com>
> > > wrote:
> > > > >
> > > > > Thanks for investigating and looking through this.  Your
> > understanding
> > > > > of how things work is pretty much spot on.  In addition, I think the
> > > > > points you are making are valid.  Our ExecNode/ExecPlan interfaces
> > are
> > > > > extremely bare bones and similar nodes have had to reimplement the
> > > > > same solutions (e.g. many nodes are using things like AtomicCounter,
> > > > > ThreadIndexer, AsyncTaskGroup, etc. in similar ways).  Probably the
> > > > > most significant short term impact of cleaning this up would be to
> > > > > avoid things like the race condition in [1] which happened because
> > one
> > > > > node was doing things in a slightly older way.  If anyone is
> > > > > particularly interested in tackling this problem I'd be happy to go
> > > > > into more details.
> > > > >
> > > > > However, I think you are slightly overselling the potential benefits.
> > > > > I don't think this would make it easier to adopt morsel/batch,
> > > > > implement asymmetric backpressure, better scheduling, work stealing,
> > > > > or sequencing (all of which I agree are good ideas with the exception
> > > > > of work stealing which I don't think we would significantly benefit
> > > > > from).  What's more, we don't have very many nodes today and I think
> > > > > there is a risk of over-learning from this small sample size.  For
> > > > > example, this sequencing discussion is very interesting.  I think an
> > > > > asof join node is not a pipeline breaker, but it also does not fit
> > the
> > > > > mold of a standard pipeline node.  It has multiple inputs and there
> > is
> > > > > not a clear 1:1 mapping between input and output batches.  I don't
> > > > > know the Velox driver model well enough to comment on it specifically
> > > > > but if you were to put this node in the middle of a pipeline you
> > might
> > > > > end up generating empty batches, too-large batches, or not enough
> > > > > thread tasks to saturate the cores.  If you were to put it between
> > > > > pipeline drivers you would potentially lose cache locality.
> > > > >
> > > > > Regarding morsel/batch.  The main thing really preventing us from
> > > > > moving to this model is the overhead cost of running small batches.
> > > > > This is due to things like the problem you described in [2] and
> > > > > somewhat demonstrated by benchmarks like [3].  As a result, as soon
> > as
> > > > > we shrink the batch size small enough to fit into L2, we start to see
> > > > > overhead increase to eliminate the benefits we get from better cache
> > > > > utilization (not just CPU overhead but also thread contention).
> > > > > Unfortunately, some of the fixes here could possibly involve changes
> > > > > to ExecBatch & Datum, which are used extensively in the kernel
> > > > > infrastructure.  From my profiling, this underutilization of cache is
> > > > > one of the most significant performance issues we have today.
> > > > >
> > > > > [1] https://github.com/apache/arrow/pull/12894
> > > > > [2] https://lists.apache.org/thread/mp68ofm2hnvs2v2oz276rvw7y5kwqoyd
> > > > > [3] https://github.com/apache/arrow/pull/12755
> > > > > On Mon, May 2, 2022 at 1:20 PM Wes McKinney <wesmck...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > hi all,
> > > > > >
> > > > > > I've been catching up on the C++ execution engine codebase after a
> > > > > > fairly long development hiatus.
> > > > > >
> > > > > > I have several questions / comments about the current design of the
> > > > > > ExecNode and their implementations (currently: source / scan,
> > filter,
> > > > > > project, union, aggregate, sink, hash join).
> > > > > >
> > > > > > My current understanding of how things work is the following:
> > > > > >
> > > > > > * Scan/Source nodes initiate execution through the StartProducing()
> > > > > > function, which spawns an asynchronous generator that yields a
> > > > > > sequence of input data batches. When each batch is available, it is
> > > > > > passed to child operators by calling their InputReceived methods
> > > > > >
> > > > > > * When InputReceived is called
> > > > > >     * For non-blocking operators (e.g. Filter, Project), the unit
> > of
> > > > > > work is performed immediately and the result is passed to the child
> > > > > > operator by calling its InputReceived method
> > > > > >     * For blocking operators (e.g. HashAggregate, HashJoin),
> > partial
> > > > > > results are accumulated until the operator can begin producing
> > output
> > > > > > (all input for aggregation, or until the HT has been built for the
> > > > > > HashJoin)
> > > > > >
> > > > > > * When an error occurs, a signal to abort will be propagated up and
> > > > > > down the execution tree
> > > > > >
> > > > > > * Eventually output lands in a Sink node, which is the desired
> > result
> > > > > >
> > > > > > One concern I have about the current structure is the way in which
> > > > > > ExecNode implementations are responsible for downstream control
> > flow,
> > > > > > and the extent to which operator pipelining (the same thread
> > > advancing
> > > > > > input-output chains until reaching a pipeline breaker) is implicit
> > > > > > versus explicit. To give a couple examples:
> > > > > >
> > > > > > * In hash aggregations (GroupByNode), when the input has been
> > > > > > exhausted, the GroupByNode splits the result into the desired
> > > > > > execution chunk size (e.g. splitting a 1M row aggregate into
> > batches
> > > > > > of 64K rows) and then spawns future tasks that push these chunks
> > > > > > through the child output exec node (by calling InputReceived)
> > > > > >
> > > > > > * In hash joins, the ExecNode accumulates batches to be inserted
> > into
> > > > > > the hash table (the "probed" input), until the probed input is
> > > > > > exhausted, and then start asynchronously spawning tasks to probe
> > the
> > > > > > completed hash table and passing the probed results into the child
> > > > > > output node
> > > > > >
> > > > > > I would suggest that we consider a different design that decouples
> > > > > > task control flow from the ExecNode implementation. The purpose
> > would
> > > > > > be to give the user of the C++ engine more control over task
> > > > > > scheduling (including the order of execution) and prioritization.
> > > > > >
> > > > > > One system that does things different from the Arrow C++ Engine is
> > > > > > Meta's Velox project, whose operators work like this (slightly
> > > > > > simplified and colored by my own imperfect understanding):
> > > > > >
> > > > > > * The Driver class (which is associated with a single thread) is
> > > > > > responsible for execution control flow. A driver moves input
> > batches
> > > > > > through an operator pipeline.
> > > > > >
> > > > > > * The Driver calls the Operator::addInput function with an input
> > > > > > batch. Operators are blocking vs. non-blocking based on whether the
> > > > > > Operator::needsMoreInput() function returns true. Simple operators
> > > > > > like Project can produce their output immediately by calling
> > > > > > Operator::getOutput
> > > > > >
> > > > > > * When the Driver hits a blocking operator in a pipeline, it
> > returns
> > > > > > control to the calling thread so the thread can switch to doing
> > work
> > > > > > for a different driver
> > > > > >
> > > > > > * One artifact of this design is that hash joins are split into a
> > > > > > HashBuild operator and a HashProbe operator so that the build and
> > > > > > probe stages of the hash join can be scheduled and executed more
> > > > > > precisely (for example: work for the pipeline that feeds the build
> > > > > > operator can be prioritized over the pipeline feeding the other
> > input
> > > > > > to the probe).
> > > > > >
> > > > > > The idea in refactoring the Arrow C++ Engine would be instead of
> > > > > > having a tree of ExecNodes, each of which has its own internal
> > > control
> > > > > > flow (including the ability to spawn downstream tasks), instead
> > > > > > pipelinable operators can be grouped into PipelineExecutors (which
> > > > > > correspond roughly to Velox's Driver concept) which are responsible
> > > > > > for control flow and invoking the ExecNodes in sequence. This would
> > > > > > make it much easier for users to customize the control flow for
> > > > > > particular needs (for example, the recent discussion of adding time
> > > > > > series joins to the C++ engine means that the current eager-push /
> > > > > > "local" control flow can create problematic input ordering
> > problems).
> > > > > > I think this might make the codebase easier to understand and test
> > > > > > also (and profile / trace, maybe, too), but that is just
> > conjecture.
> > > > > >
> > > > > > As a separate matter, the C++ Engine does not have a separation
> > > > > > between input batches (what are called "morsels" in the HyPer
> > paper)
> > > > > > and pipeline tasks (smaller cache-friendly units to move through
> > the
> > > > > > pipeline), nor the ability (AFAICT) to do nested parallelism / work
> > > > > > stealing within pipelines (this concept is discussed in [1]).
> > > > > >
> > > > > > Hopefully the above makes sense and I look forward to others'
> > > thoughts.
> > > > > >
> > > > > > Thanks,
> > > > > > Wes
> > > > > >
> > > > > > [1]:
> > > https://15721.courses.cs.cmu.edu/spring2016/papers/p743-leis.pdf
> > > >
> > >
> >
>
>
> --
> Supun Kamburugamuve

Reply via email to