I agree we should present a simplified interface, and then also make ScanTask 
internal, but I think that is orthogonal to whether a fragment produces one or 
multiple scan tasks. 

At first, my worry with having (Parquet)ScanTask handle concurrency itself was 
that it does need to coordinate with the overall scanner, right? If you have 
two files with 100 row groups each, that's much different than 100 files with 
two row groups each. With a scan task per row group, a single rea naturally 
handles both cases, but with a single scan task per file, you have to juggle 
the exact amount of readahead on an inter- and intra-file level. 

That said, there is an issue for making readahead operate by amount of memory 
used instead of number of files/tasks which would presumably handle that just 
as well. And right now, one (Parquet)ScanTask-per-row group does lead to some 
implementation nuisance elsewhere (since all scan tasks for a file have to 
share the same Parquet reader and pre-buffering task).

Also I realize my example is poor, because you do actually want to separate 
intra- and inter-fragment concurrency - you want to at least be buffering the 
next files (without decoding them) while decoding the current file. And the 
proposed model would make it easier to support a consumer that can process 
batches out of order while limiting memory usage (just limit the 
inter-scan-task readahead).

So on balance I'm in favor of this.

I'll also note that there could be other Fragments which may naturally have 
intra-fragment parallelism, if the concern is mostly that ParquetScanTask is a 
bit of an outlier. For instance, a hypothetical FlightFragment wrapping a 
FlightInfo struct could generate multiple scan tasks, one per FlightEndpoint in 
the FlightInfo.

Best,
David

On Thu, Mar 25, 2021, at 19:48, Weston Pace wrote:
> This is a bit of a follow-up on
> https://issues.apache.org/jira/browse/ARROW-11782 and also a bit of a
> consequence of my work on
> https://issues.apache.org/jira/browse/ARROW-7001 (nested scan
> parallelism).
> 
> I think the current dataset interface should be simplified.
> Currently, we have Dataset ->* Fragment ->* ScanTask ->* RecordBatch
> with the components being...
> 
> Dataset - Binds together a format & fragment discovery
> Fragment - Something that maps to an input stream (usually a file)
> ScanTask - Created by a format, turns an input stream into record batches.
> RecordBatch - I hope I don't need to define this one :)
> 
> The change I'm recommending (and starting to implement in ARROW-7001)
> is to change the cardinality of Fragment ->* ScanTask to Fragment ->
> ScanTask (i.e. one scan task per fragment instead of many).
> 
> The IPC format and CSV format already do this (one scan task per
> fragment).  The only exception is Parquet which maps "scan task" to
> "row group" (keeping in mind row groups may correspond to multiple
> batches).  However, that feels like it is a detail that can be
> encapsulated in ParquetScanTask (I can implement this in
> https://issues.apache.org/jira/browse/ARROW-11843).  In other words...
> 
> The scanner is responsible for managing inter-fragment parallelism
> (how many files to read at once, pipelining file reads, etc.)
> The scan task is responsible for managing intra-fragment parallelism
> (how many row groups to read at once, whether to scan columns in
> parallel, etc)
> 
> Then, scan task can be made fully internal (ala ARROW-11782) and the
> primary external interface would be a record batch iterator.
> 
> This doesn't just simplify the external interface by removing a type,
> it actually changes the workflow requirements as well (admittedly,
> some of this is an inevitable benefit of ARROW-7001 and not directly
> related to removing scan task).  Currently, if you want maximum
> performance from a dataset scan, you need to run the scan tasks in
> parallel.  For example...
> 
> for scan_task in scanner.scan():
>   for record_batch in scan_task:
>     # Do something, but do it very fast or do it on another thread
> because every ms
>     # you spend here is a ms you could be doing I/O
> 
> With the simplification it should simply be...
> 
> for record_batch in scanner.scan():
>   # While you are processing this record batch the scanner is going to 
> continue
>   # running on a different thread.  It will be queing up a backlog of
> batches for you
>   # to process.  As long as you don't take "too long" you should be
> able to keep up.
>   # In other words, as long as your processing time here + the time it took to
>   # decode and prepare the batch is less than the time it takes to
> read the batch
>   # you will never have a break in I/O.
> 
> -Weston
> 

Reply via email to