[ 
https://issues.apache.org/jira/browse/ARROW-10183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17272684#comment-17272684
 ] 

Weston Pace commented on ARROW-10183:
-------------------------------------

[~apitrou] [~bkietz] I wanted to add a note about the parallelism of the 
AsyncGenerator.  This is not so much a question as it is an update on my brain 
trying to wrap my head around all of this.  I will confess that I have not yet 
fully internalized all of what can go on.  For the purposes of discussion I 
will consider a chain (graph where each node has 1 or 2 edges and there are no 
cycles) of async generators, each node in the chain mutates the stream in some 
way.  For example, in the CSV case there is the original Buffer generator (a 
background generator), the CSV block reader, the chunker, and then the "parse & 
convert scheduler" (a Visitor which terminates the chain).  The "fan-out" in 
the CSV is still delegated to the task group so that parallelism need hasn't 
been fully explored, although it could remain a chain (I think).

An AsyncGenerator's Next() function should never have to be called by more than 
one thread at once in the way you might do with an iterator.  Instead, the 
question comes down to whether you can call a generator's Next() function 
before the promise returned by the previous call has completed.  It is similar 
and different.  So it's not so much a question of "thread safe" as it is a 
question of "reentrant".

AsyncGenerators come in both flavors.  A decompressing node (or the CSV block 
reader and the CSV chunker) are all quite stateful and must finish processing a 
block before they can begin consuming the next.  So you should not call Next() 
until the future returned is resolved.  The parsing and converting on the other 
hand is free to run in parallel.  In addition, any queuing stage (AddReadahead 
and BackgroundIterator) can be called in parallel, thus allowing for pipeline 
parallelism.  Since everything is pull driven this "parallel pull" is driven 
from the AddReadahead nodes and could be driven from a "visit" or "collect" 
sink as well.

So just summarizing what we have today...

Sources:
BackgroundGenerator - Reentrant, Does not pull

Intermediate Nodes:
Transformer - Not reentrant (by necessity of has_next), Does not pull
AddReadahead - Reentrant, Pulls reentrantly

Sinks:
Visit - Pulls serially (could be reentrantly in the future)
Collect - Pulls serially (could be reentrantly in the future)

Today we have...

BackgroundGenerator -> AddReadahead -> Transformer -> Transformer -> Visit(1)

It would be an error for example to do...

BackgroundGenerator -> AddReadahead -> Transformer -> Transformer -> Visit(N)

...or...

BackgroundGenerator -> AddReadahead -> Transformer -> Transformer -> 
AddReadahead -> Visit(1)

...both of those would cause Transformer (which is not reentrant) to be pulled 
reentrantly.  I am wondering if there is some merit in encoding these rules 
somehow into the types themselves so that something like that would fail to 
compile.

> [C++] Create a ForEach library function that runs on an iterator of futures
> ---------------------------------------------------------------------------
>
>                 Key: ARROW-10183
>                 URL: https://issues.apache.org/jira/browse/ARROW-10183
>             Project: Apache Arrow
>          Issue Type: New Feature
>          Components: C++
>            Reporter: Weston Pace
>            Assignee: Weston Pace
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: arrow-continuation-flow.jpg
>
>          Time Spent: 7h 50m
>  Remaining Estimate: 0h
>
> This method should take in an iterator of futures and a callback and pull an 
> item off the iterator, "await" it, run the callback on it, and then fetch the 
> next item from the iterator.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to