[ 
https://issues.apache.org/jira/browse/ARROW-13004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17359045#comment-17359045
 ] 

Weston Pace commented on ARROW-13004:
-------------------------------------

[~bkietz] [~apitrou] There is potentially an analogous problem in a push-based 
model with back pressure.  In both cases the problem arises when there is a 
slow sink and a fast source.  Consider the same Source, Transform, Sink graph.  
If sink is slow enough then the graph will eventually pause.

 

Consider the following potential sequence:
 * Initial State: Source will be stopped, Transform will be stopped, and Sink 
will be running.
 * Sink finishes and unpauses Transform
 * Transform delivers an item immediately to Sink (if paused it will presumably 
have one in its buffer).
 * Transform unpauses Source
 * Sink pauses Transform
 * Source delivers an item immediately (again, it has a full buffer) to 
Transform
 * Transform pauses Sink
 * We have now returned to the initial state.

 

Notice that data locality is lost.  Even if Transform is the stateless (and 
just forwards any extra items to Sink allowing a bit of overrun) I think care 
is still needed.  I don't have enough a sense of the details to think it all 
the way through.  Just a note to include this in any test cases (in a 
back-pressured situation ensure that each unpause runs an entire item through 
the chain without stopping at each node) when you get around to adding back 
pressure.

> [C++] Allow the creation of future "chains" to better control parallelism
> -------------------------------------------------------------------------
>
>                 Key: ARROW-13004
>                 URL: https://issues.apache.org/jira/browse/ARROW-13004
>             Project: Apache Arrow
>          Issue Type: Improvement
>          Components: C++
>            Reporter: Weston Pace
>            Priority: Major
>
> This is a bit tricky to explain.  ShouldSchedule::Always works well for 
> AddCallback but falls short for Transfer and Then.  An example may explain 
> best.
> Consider three operators, Source, Transform, and Sink.  They are setup as...
> {code:java}
> source_fut = source(); // 1
> transform_fut = source_fut.Then(Transform(), ScheduleAlways); // 2
> sink_fut = transform_fut.Then(Consume()); // 3
> {code}
> The intent is to run Transform + Consume as a single thread task on each item 
> generated by source().  This is what happens if source() is slow.  If 
> source() is fast (let's pretend it's always finished) then this is not what 
> happens.
> Line 2 causes a new thread task to be launched (since source_fut is 
> finished).  It is possible that new thread task can mark transform_fut 
> finished before line 3 is executed by the original thread.  This causes 
> Consume() and Transform() to run on separate threads.
> The solution (at least as best I can come up with) is unfortunately a little 
> complex (though the complexity can be hidden in future/async_generator 
> internals).  Basically, it is worth waiting to schedule until the future 
> chain has had a chance to finish connecting the pressure.  This means a 
> future created with ScheduleAlways is created in an "unconsumed" mode.  Any 
> callbacks that would normally be launched will not be launched until the 
> future switches to "consumed".  Future.Wait(), VisitAsyncGenerator, 
> CollectAsyncGenerator, and some of the async_generator operators would cause 
> the future to be "consumed".  The "consume" signal will need to propagate 
> backwards up the chain so futures will need to keep a reference to their 
> antecedent future.
> This work meshes well with some other improvements I have been considering, 
> in particular, splitting future/promise and restricting futures to a single 
> callback.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to