Thanks both! The ExecPlan Sequencing doc is interesting and close to the
problem that we are trying to solve. (Ordered progressing)

One thought is that I can see some cases for deadlock if we are not
careful, for example (Filter Node -> Asof Join Node, assuming Asof Join
node requires ordered input batches):

(Sequence of event happening)

(1)Filter Node has n threads, we got unlucky and batch index 0 is never
processed. T
(2) The n threads starts to process batches and send batches to downstream
node.
(3) Downstream node queues up the batches but cannot process any of them.
At some point, downstream node queue will be filled up (assuming we bound
the queued batches) and tell Filter node "I cannot take any more batches"
(Not sure if back pressuring like this exist now)
(4) Filter node has all its threads processing batches but because
downstream node cannot take any batches, those threads cannot make progress
either.
(5) No progress can be made on either node.

Maybe the Asof Join node in this case needs an unbounded queue (spill to
disk), or the FilterNode needs to know that it needs to process batch 0 and
stop processing other batches until the downstream node can start consuming.

Thoughts?
Li

On Tue, Apr 26, 2022 at 4:07 PM Weston Pace <weston.p...@gmail.com> wrote:

> There was an old design document I proposed on this ML a while back.
> I never got around to implementing it and I think it has aged somewhat
> but it covers some of the points I brought up and it might be worth
> reviewing.
>
>
> https://docs.google.com/document/d/1MfVE9td9D4n5y-PTn66kk4-9xG7feXs1zSFf-qxQgPs/edit#heading=h.e54mys6bvhhe
>
> On Tue, Apr 26, 2022 at 10:05 AM Sasha Krassovsky
> <krassovskysa...@gmail.com> wrote:
> >
> > An ExecPlan is composed of a bunch of implicit “pipelines”. Each node in
> a pipeline (starting with a source node) implements `InputReceived` and
> `InputFinished`. On `InputReceived`, it performs its computation and calls
> `InputReceived` on its output. On `InputFinished`, it performs any cleanup
> and calls `InputFinished` on its output (note that in the code, `outputs_`
> is a vector, but we only ever use `outputs_[0]`. This will probably end up
> getting cleaned up at some point). As such there’s an implicit pipeline of
> chained calls to `InputReceived`. Some nodes, such as Join or GroupBy or
> Sort are pipeline breakers: they must accumulate the whole dataset before
> performing their computation and starting off the next pipeline. Pipeline
> breakers would make use of stuff like TaskGroup and such.
> >
> > So the model of parallelism is driven by the source nodes: if your
> source node is multithreaded, then you may have several concurrent calls to
> `InputReceived`. Weston mentioned to me today that there may be a way to
> give some sort of guarantee of “almost-ordered” input, which may be enough
> to make streaming work well (you’d only have to accumulate at most
> `num_threads` extra batches in memory at a time). I’m not sure the details
> of it, but that may be possible.
> >
> > Hopefully the description of how parallelism works was at least helpful!
> >
> > Sasha
> >
> > > On Apr 26, 2022, at 12:54 PM, Li Jin <ice.xell...@gmail.com> wrote:
> > >
> > > sure how they would output. (i.e., do they output batches / call
> >
>

Reply via email to