I agree we should present a simplified interface, and then also make ScanTask internal, but I think that is orthogonal to whether a fragment produces one or multiple scan tasks.
At first, my worry with having (Parquet)ScanTask handle concurrency itself was that it does need to coordinate with the overall scanner, right? If you have two files with 100 row groups each, that's much different than 100 files with two row groups each. With a scan task per row group, a single rea naturally handles both cases, but with a single scan task per file, you have to juggle the exact amount of readahead on an inter- and intra-file level. That said, there is an issue for making readahead operate by amount of memory used instead of number of files/tasks which would presumably handle that just as well. And right now, one (Parquet)ScanTask-per-row group does lead to some implementation nuisance elsewhere (since all scan tasks for a file have to share the same Parquet reader and pre-buffering task). Also I realize my example is poor, because you do actually want to separate intra- and inter-fragment concurrency - you want to at least be buffering the next files (without decoding them) while decoding the current file. And the proposed model would make it easier to support a consumer that can process batches out of order while limiting memory usage (just limit the inter-scan-task readahead). So on balance I'm in favor of this. I'll also note that there could be other Fragments which may naturally have intra-fragment parallelism, if the concern is mostly that ParquetScanTask is a bit of an outlier. For instance, a hypothetical FlightFragment wrapping a FlightInfo struct could generate multiple scan tasks, one per FlightEndpoint in the FlightInfo. Best, David On Thu, Mar 25, 2021, at 19:48, Weston Pace wrote: > This is a bit of a follow-up on > https://issues.apache.org/jira/browse/ARROW-11782 and also a bit of a > consequence of my work on > https://issues.apache.org/jira/browse/ARROW-7001 (nested scan > parallelism). > > I think the current dataset interface should be simplified. > Currently, we have Dataset ->* Fragment ->* ScanTask ->* RecordBatch > with the components being... > > Dataset - Binds together a format & fragment discovery > Fragment - Something that maps to an input stream (usually a file) > ScanTask - Created by a format, turns an input stream into record batches. > RecordBatch - I hope I don't need to define this one :) > > The change I'm recommending (and starting to implement in ARROW-7001) > is to change the cardinality of Fragment ->* ScanTask to Fragment -> > ScanTask (i.e. one scan task per fragment instead of many). > > The IPC format and CSV format already do this (one scan task per > fragment). The only exception is Parquet which maps "scan task" to > "row group" (keeping in mind row groups may correspond to multiple > batches). However, that feels like it is a detail that can be > encapsulated in ParquetScanTask (I can implement this in > https://issues.apache.org/jira/browse/ARROW-11843). In other words... > > The scanner is responsible for managing inter-fragment parallelism > (how many files to read at once, pipelining file reads, etc.) > The scan task is responsible for managing intra-fragment parallelism > (how many row groups to read at once, whether to scan columns in > parallel, etc) > > Then, scan task can be made fully internal (ala ARROW-11782) and the > primary external interface would be a record batch iterator. > > This doesn't just simplify the external interface by removing a type, > it actually changes the workflow requirements as well (admittedly, > some of this is an inevitable benefit of ARROW-7001 and not directly > related to removing scan task). Currently, if you want maximum > performance from a dataset scan, you need to run the scan tasks in > parallel. For example... > > for scan_task in scanner.scan(): > for record_batch in scan_task: > # Do something, but do it very fast or do it on another thread > because every ms > # you spend here is a ms you could be doing I/O > > With the simplification it should simply be... > > for record_batch in scanner.scan(): > # While you are processing this record batch the scanner is going to > continue > # running on a different thread. It will be queing up a backlog of > batches for you > # to process. As long as you don't take "too long" you should be > able to keep up. > # In other words, as long as your processing time here + the time it took to > # decode and prepare the batch is less than the time it takes to > read the batch > # you will never have a break in I/O. > > -Weston >
