[ 
https://issues.apache.org/jira/browse/ARROW-16072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17514645#comment-17514645
 ] 

David Li commented on ARROW-16072:
----------------------------------

This sounds good to me.

How is our cancellation story with ExecPlan? From what I recall even if the 
APIs are there it isn't used or tested too much either?

> [C++] Migrate scanner logic to ExecPlan, remove merged generator
> ----------------------------------------------------------------
>
>                 Key: ARROW-16072
>                 URL: https://issues.apache.org/jira/browse/ARROW-16072
>             Project: Apache Arrow
>          Issue Type: Improvement
>          Components: C++
>            Reporter: Weston Pace
>            Priority: Major
>
> We've hit a bit of a wall with the merged generator.  The current behavior 
> is: If one subscription encounters an error we simply stop pulling from the 
> other subscriptions.  Once everything has settled down we return the error 
> and end the stream.
> In reality, we should be sending some kind of cancel signal down to the other 
> generators.  Otherwise, we are not respecting the rule for AsyncGenerator 
> that we recently defined which is "An AsyncGenerator should always be fully 
> consumed".
> There is no cancel mechanism for AsyncGenerator.  We could add one, it would 
> be fun, but it would be further, substantial investment into AsyncGenerator.  
> At the same time, we have been putting more and more focus on our push-based 
> ExecPlans.
> So, rather than fix the merged generator, I propose we migrate the scanner 
> (just the scanner, not the file formats / readers) to ExecPlan instead of 
> AsyncGenerator.
> This probably sounds easier than it will be but I think it's doable.  It will 
> be easy to create a node that lists a dataset and pushes a batch for each 
> file.  We need to limit fragment readahead but there is no reason we can't 
> just buffer all the filenames in memory and process them slowly so this step 
> should adapt to ExecPlan pretty well.
> It's tempting to think that the merged generator is just a "union node" but 
> that isn't quite true.  That would imply that we are going to create a source 
> node for each file.  We don't know all the files ahead of time and this would 
> cause backpressure issues anyways.  We could modify the exec plan on the fly, 
> adding new nodes as we start processing new files but I think that would be 
> overly complex.
> Instead I think we should create one node that holds all the scanner 
> complexity in it.  This node would keep a list of FragmentScanner objects.  
> Each fragment scanner would have a reference to the async toggle so we could 
> turn backpressure on and off as needed and all the fragment scanners would 
> stop pulling.  The fragment scanners would iterate, in a pull based fashion, 
> from their sources and for each future they consume they would push the 
> result to the output node.  If an error occurs then we just cancel each 
> fragment scanner and stop creating new fragment scanners.
> This node would not extend SourceNode.  In fact, we can probably get rid of 
> SourceNode at this point but we could keep it around for future use if needed.
> We can then get rid of the merged generator.  We can't get rid of the 
> AsyncGenerator code entirely because we still need it for CSV scanning and a 
> few other places.  We could migrate these spots over to exec plans (e.g. the 
> CSV scanner could be an exec plan with a chunk node, parse node, and convert 
> node) but I don't think we need to tackle that right now.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to