[
https://issues.apache.org/jira/browse/ARROW-16072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17514645#comment-17514645
]
David Li commented on ARROW-16072:
----------------------------------
This sounds good to me.
How is our cancellation story with ExecPlan? From what I recall even if the
APIs are there it isn't used or tested too much either?
> [C++] Migrate scanner logic to ExecPlan, remove merged generator
> ----------------------------------------------------------------
>
> Key: ARROW-16072
> URL: https://issues.apache.org/jira/browse/ARROW-16072
> Project: Apache Arrow
> Issue Type: Improvement
> Components: C++
> Reporter: Weston Pace
> Priority: Major
>
> We've hit a bit of a wall with the merged generator. The current behavior
> is: If one subscription encounters an error we simply stop pulling from the
> other subscriptions. Once everything has settled down we return the error
> and end the stream.
> In reality, we should be sending some kind of cancel signal down to the other
> generators. Otherwise, we are not respecting the rule for AsyncGenerator
> that we recently defined which is "An AsyncGenerator should always be fully
> consumed".
> There is no cancel mechanism for AsyncGenerator. We could add one, it would
> be fun, but it would be further, substantial investment into AsyncGenerator.
> At the same time, we have been putting more and more focus on our push-based
> ExecPlans.
> So, rather than fix the merged generator, I propose we migrate the scanner
> (just the scanner, not the file formats / readers) to ExecPlan instead of
> AsyncGenerator.
> This probably sounds easier than it will be but I think it's doable. It will
> be easy to create a node that lists a dataset and pushes a batch for each
> file. We need to limit fragment readahead but there is no reason we can't
> just buffer all the filenames in memory and process them slowly so this step
> should adapt to ExecPlan pretty well.
> It's tempting to think that the merged generator is just a "union node" but
> that isn't quite true. That would imply that we are going to create a source
> node for each file. We don't know all the files ahead of time and this would
> cause backpressure issues anyways. We could modify the exec plan on the fly,
> adding new nodes as we start processing new files but I think that would be
> overly complex.
> Instead I think we should create one node that holds all the scanner
> complexity in it. This node would keep a list of FragmentScanner objects.
> Each fragment scanner would have a reference to the async toggle so we could
> turn backpressure on and off as needed and all the fragment scanners would
> stop pulling. The fragment scanners would iterate, in a pull based fashion,
> from their sources and for each future they consume they would push the
> result to the output node. If an error occurs then we just cancel each
> fragment scanner and stop creating new fragment scanners.
> This node would not extend SourceNode. In fact, we can probably get rid of
> SourceNode at this point but we could keep it around for future use if needed.
> We can then get rid of the merged generator. We can't get rid of the
> AsyncGenerator code entirely because we still need it for CSV scanning and a
> few other places. We could migrate these spots over to exec plans (e.g. the
> CSV scanner could be an exec plan with a chunk node, parse node, and convert
> node) but I don't think we need to tackle that right now.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)