[
https://issues.apache.org/jira/browse/ARROW-10183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17272684#comment-17272684
]
Weston Pace commented on ARROW-10183:
-------------------------------------
[~apitrou] [~bkietz] I wanted to add a note about the parallelism of the
AsyncGenerator. This is not so much a question as it is an update on my brain
trying to wrap my head around all of this. I will confess that I have not yet
fully internalized all of what can go on. For the purposes of discussion I
will consider a chain (graph where each node has 1 or 2 edges and there are no
cycles) of async generators, each node in the chain mutates the stream in some
way. For example, in the CSV case there is the original Buffer generator (a
background generator), the CSV block reader, the chunker, and then the "parse &
convert scheduler" (a Visitor which terminates the chain). The "fan-out" in
the CSV is still delegated to the task group so that parallelism need hasn't
been fully explored, although it could remain a chain (I think).
An AsyncGenerator's Next() function should never have to be called by more than
one thread at once in the way you might do with an iterator. Instead, the
question comes down to whether you can call a generator's Next() function
before the promise returned by the previous call has completed. It is similar
and different. So it's not so much a question of "thread safe" as it is a
question of "reentrant".
AsyncGenerators come in both flavors. A decompressing node (or the CSV block
reader and the CSV chunker) are all quite stateful and must finish processing a
block before they can begin consuming the next. So you should not call Next()
until the future returned is resolved. The parsing and converting on the other
hand is free to run in parallel. In addition, any queuing stage (AddReadahead
and BackgroundIterator) can be called in parallel, thus allowing for pipeline
parallelism. Since everything is pull driven this "parallel pull" is driven
from the AddReadahead nodes and could be driven from a "visit" or "collect"
sink as well.
So just summarizing what we have today...
Sources:
BackgroundGenerator - Reentrant, Does not pull
Intermediate Nodes:
Transformer - Not reentrant (by necessity of has_next), Does not pull
AddReadahead - Reentrant, Pulls reentrantly
Sinks:
Visit - Pulls serially (could be reentrantly in the future)
Collect - Pulls serially (could be reentrantly in the future)
Today we have...
BackgroundGenerator -> AddReadahead -> Transformer -> Transformer -> Visit(1)
It would be an error for example to do...
BackgroundGenerator -> AddReadahead -> Transformer -> Transformer -> Visit(N)
...or...
BackgroundGenerator -> AddReadahead -> Transformer -> Transformer ->
AddReadahead -> Visit(1)
...both of those would cause Transformer (which is not reentrant) to be pulled
reentrantly. I am wondering if there is some merit in encoding these rules
somehow into the types themselves so that something like that would fail to
compile.
> [C++] Create a ForEach library function that runs on an iterator of futures
> ---------------------------------------------------------------------------
>
> Key: ARROW-10183
> URL: https://issues.apache.org/jira/browse/ARROW-10183
> Project: Apache Arrow
> Issue Type: New Feature
> Components: C++
> Reporter: Weston Pace
> Assignee: Weston Pace
> Priority: Major
> Labels: pull-request-available
> Attachments: arrow-continuation-flow.jpg
>
> Time Spent: 7h 50m
> Remaining Estimate: 0h
>
> This method should take in an iterator of futures and a callback and pull an
> item off the iterator, "await" it, run the callback on it, and then fetch the
> next item from the iterator.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)