Weston Pace created ARROW-16072:
-----------------------------------

             Summary: [C++] Migrate scanner logic to ExecPlan, remove merged 
generator
                 Key: ARROW-16072
                 URL: https://issues.apache.org/jira/browse/ARROW-16072
             Project: Apache Arrow
          Issue Type: Improvement
          Components: C++
            Reporter: Weston Pace


We've hit a bit of a wall with the merged generator.  The current behavior is: 
If one subscription encounters an error we simply stop pulling from the other 
subscriptions.  Once everything has settled down we return the error and end 
the stream.

In reality, we should be sending some kind of cancel signal down to the other 
generators.  Otherwise, we are not respecting the rule for AsyncGenerator that 
we recently defined which is "An AsyncGenerator should always be fully 
consumed".

There is no cancel mechanism for AsyncGenerator.  We could add one, it would be 
fun, but it would be further, substantial investment into AsyncGenerator.  At 
the same time, we have been putting more and more focus on our push-based 
ExecPlans.

So, rather than fix the merged generator, I propose we migrate the scanner 
(just the scanner, not the file formats / readers) to ExecPlan instead of 
AsyncGenerator.

This probably sounds easier than it will be but I think it's doable.  It will 
be easy to create a node that lists a dataset and pushes a batch for each file. 
 We need to limit fragment readahead but there is no reason we can't just 
buffer all the filenames in memory and process them slowly so this step should 
adapt to ExecPlan pretty well.

It's tempting to think that the merged generator is just a "union node" but 
that isn't quite true.  That would imply that we are going to create a source 
node for each file.  We don't know all the files ahead of time and this would 
cause backpressure issues anyways.  We could modify the exec plan on the fly, 
adding new nodes as we start processing new files but I think that would be 
overly complex.

Instead I think we should create one node that holds all the scanner complexity 
in it.  This node would keep a list of FragmentScanner objects.  Each fragment 
scanner would have a reference to the async toggle so we could turn 
backpressure on and off as needed and all the fragment scanners would stop 
pulling.  The fragment scanners would iterate, in a pull based fashion, from 
their sources and for each future they consume they would push the result to 
the output node.  If an error occurs then we just cancel each fragment scanner 
and stop creating new fragment scanners.

This node would not extend SourceNode.  In fact, we can probably get rid of 
SourceNode at this point but we could keep it around for future use if needed.

We can then get rid of the merged generator.  We can't get rid of the 
AsyncGenerator code entirely because we still need it for CSV scanning and a 
few other places.  We could migrate these spots over to exec plans (e.g. the 
CSV scanner could be an exec plan with a chunk node, parse node, and convert 
node) but I don't think we need to tackle that right now.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to