[
https://issues.apache.org/jira/browse/ARROW-13004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17359045#comment-17359045
]
Weston Pace commented on ARROW-13004:
-------------------------------------
[~bkietz] [~apitrou] There is potentially an analogous problem in a push-based
model with back pressure. In both cases the problem arises when there is a
slow sink and a fast source. Consider the same Source, Transform, Sink graph.
If sink is slow enough then the graph will eventually pause.
Consider the following potential sequence:
* Initial State: Source will be stopped, Transform will be stopped, and Sink
will be running.
* Sink finishes and unpauses Transform
* Transform delivers an item immediately to Sink (if paused it will presumably
have one in its buffer).
* Transform unpauses Source
* Sink pauses Transform
* Source delivers an item immediately (again, it has a full buffer) to
Transform
* Transform pauses Sink
* We have now returned to the initial state.
Notice that data locality is lost. Even if Transform is the stateless (and
just forwards any extra items to Sink allowing a bit of overrun) I think care
is still needed. I don't have enough a sense of the details to think it all
the way through. Just a note to include this in any test cases (in a
back-pressured situation ensure that each unpause runs an entire item through
the chain without stopping at each node) when you get around to adding back
pressure.
> [C++] Allow the creation of future "chains" to better control parallelism
> -------------------------------------------------------------------------
>
> Key: ARROW-13004
> URL: https://issues.apache.org/jira/browse/ARROW-13004
> Project: Apache Arrow
> Issue Type: Improvement
> Components: C++
> Reporter: Weston Pace
> Priority: Major
>
> This is a bit tricky to explain. ShouldSchedule::Always works well for
> AddCallback but falls short for Transfer and Then. An example may explain
> best.
> Consider three operators, Source, Transform, and Sink. They are setup as...
> {code:java}
> source_fut = source(); // 1
> transform_fut = source_fut.Then(Transform(), ScheduleAlways); // 2
> sink_fut = transform_fut.Then(Consume()); // 3
> {code}
> The intent is to run Transform + Consume as a single thread task on each item
> generated by source(). This is what happens if source() is slow. If
> source() is fast (let's pretend it's always finished) then this is not what
> happens.
> Line 2 causes a new thread task to be launched (since source_fut is
> finished). It is possible that new thread task can mark transform_fut
> finished before line 3 is executed by the original thread. This causes
> Consume() and Transform() to run on separate threads.
> The solution (at least as best I can come up with) is unfortunately a little
> complex (though the complexity can be hidden in future/async_generator
> internals). Basically, it is worth waiting to schedule until the future
> chain has had a chance to finish connecting the pressure. This means a
> future created with ScheduleAlways is created in an "unconsumed" mode. Any
> callbacks that would normally be launched will not be launched until the
> future switches to "consumed". Future.Wait(), VisitAsyncGenerator,
> CollectAsyncGenerator, and some of the async_generator operators would cause
> the future to be "consumed". The "consume" signal will need to propagate
> backwards up the chain so futures will need to keep a reference to their
> antecedent future.
> This work meshes well with some other improvements I have been considering,
> in particular, splitting future/promise and restricting futures to a single
> callback.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)