@david - I think all the readers (CSV, IPC, & parquet) will eventually
have support for some intra-file parallelism.  You're also right I
think that there is some global consideration of max concurrent
operations.  For example, if you readahead 10 files and 10 blocks in
each file but only have 16 cores then how will you divy up the work?
In some graphs it doesn't matter (e.g. building a table in memory).
In other graphs it might.  A query graph with reduce nodes (or maybe
the right term is "blocking nodes") might be collecting the first
batch from all files.  In that case a breadth-first prioritization
might be better than depth-first. Off the cuff my answer would be that
backpressure applies the appropriate prioritization.  Although I have
a hunch that long term the scanning node will need to be more formally
integrated with the query engine (i.e. read file and read dataset are
two different nodes).

@Wes Yes, I have been aiming to keep all tasks for a block on a single
core (as part of the same thread task even).  I haven't gotten to the
work stealing part but that can be added later.


On Fri, Mar 26, 2021 at 7:22 AM Wes McKinney <[email protected]> wrote:
>
> I agree with making the decomposition of a fragment into tasks an
> internal detail of the scan implementation. It seems that we want to
> be moving toward a world of consuming a stream of
> Future<shared_ptr<RecordBatch>> and not pushing the complexity of
> concurrency management (necessarily) onto the consumer. The nature of
> multithreading/scheduling would be pushed higher in the stack -- for
> example, you might decide that a fragment and all its child parallel /
> nested tasks could go into the task queue of a single CPU core, where
> idle CPUs are able to steal work from that queue if they want.
>
> On Fri, Mar 26, 2021 at 11:32 AM David Li <[email protected]> wrote:
> >
> > I agree we should present a simplified interface, and then also make 
> > ScanTask internal, but I think that is orthogonal to whether a fragment 
> > produces one or multiple scan tasks.
> >
> > At first, my worry with having (Parquet)ScanTask handle concurrency itself 
> > was that it does need to coordinate with the overall scanner, right? If you 
> > have two files with 100 row groups each, that's much different than 100 
> > files with two row groups each. With a scan task per row group, a single 
> > rea naturally handles both cases, but with a single scan task per file, you 
> > have to juggle the exact amount of readahead on an inter- and intra-file 
> > level.
> >
> > That said, there is an issue for making readahead operate by amount of 
> > memory used instead of number of files/tasks which would presumably handle 
> > that just as well. And right now, one (Parquet)ScanTask-per-row group does 
> > lead to some implementation nuisance elsewhere (since all scan tasks for a 
> > file have to share the same Parquet reader and pre-buffering task).
> >
> > Also I realize my example is poor, because you do actually want to separate 
> > intra- and inter-fragment concurrency - you want to at least be buffering 
> > the next files (without decoding them) while decoding the current file. And 
> > the proposed model would make it easier to support a consumer that can 
> > process batches out of order while limiting memory usage (just limit the 
> > inter-scan-task readahead).
> >
> > So on balance I'm in favor of this.
> >
> > I'll also note that there could be other Fragments which may naturally have 
> > intra-fragment parallelism, if the concern is mostly that ParquetScanTask 
> > is a bit of an outlier. For instance, a hypothetical FlightFragment 
> > wrapping a FlightInfo struct could generate multiple scan tasks, one per 
> > FlightEndpoint in the FlightInfo.
> >
> > Best,
> > David
> >
> > On Thu, Mar 25, 2021, at 19:48, Weston Pace wrote:
> > > This is a bit of a follow-up on
> > > https://issues.apache.org/jira/browse/ARROW-11782 and also a bit of a
> > > consequence of my work on
> > > https://issues.apache.org/jira/browse/ARROW-7001 (nested scan
> > > parallelism).
> > >
> > > I think the current dataset interface should be simplified.
> > > Currently, we have Dataset ->* Fragment ->* ScanTask ->* RecordBatch
> > > with the components being...
> > >
> > > Dataset - Binds together a format & fragment discovery
> > > Fragment - Something that maps to an input stream (usually a file)
> > > ScanTask - Created by a format, turns an input stream into record batches.
> > > RecordBatch - I hope I don't need to define this one :)
> > >
> > > The change I'm recommending (and starting to implement in ARROW-7001)
> > > is to change the cardinality of Fragment ->* ScanTask to Fragment ->
> > > ScanTask (i.e. one scan task per fragment instead of many).
> > >
> > > The IPC format and CSV format already do this (one scan task per
> > > fragment).  The only exception is Parquet which maps "scan task" to
> > > "row group" (keeping in mind row groups may correspond to multiple
> > > batches).  However, that feels like it is a detail that can be
> > > encapsulated in ParquetScanTask (I can implement this in
> > > https://issues.apache.org/jira/browse/ARROW-11843).  In other words...
> > >
> > > The scanner is responsible for managing inter-fragment parallelism
> > > (how many files to read at once, pipelining file reads, etc.)
> > > The scan task is responsible for managing intra-fragment parallelism
> > > (how many row groups to read at once, whether to scan columns in
> > > parallel, etc)
> > >
> > > Then, scan task can be made fully internal (ala ARROW-11782) and the
> > > primary external interface would be a record batch iterator.
> > >
> > > This doesn't just simplify the external interface by removing a type,
> > > it actually changes the workflow requirements as well (admittedly,
> > > some of this is an inevitable benefit of ARROW-7001 and not directly
> > > related to removing scan task).  Currently, if you want maximum
> > > performance from a dataset scan, you need to run the scan tasks in
> > > parallel.  For example...
> > >
> > > for scan_task in scanner.scan():
> > >   for record_batch in scan_task:
> > >     # Do something, but do it very fast or do it on another thread
> > > because every ms
> > >     # you spend here is a ms you could be doing I/O
> > >
> > > With the simplification it should simply be...
> > >
> > > for record_batch in scanner.scan():
> > >   # While you are processing this record batch the scanner is going to 
> > > continue
> > >   # running on a different thread.  It will be queing up a backlog of
> > > batches for you
> > >   # to process.  As long as you don't take "too long" you should be
> > > able to keep up.
> > >   # In other words, as long as your processing time here + the time it 
> > > took to
> > >   # decode and prepare the batch is less than the time it takes to
> > > read the batch
> > >   # you will never have a break in I/O.
> > >
> > > -Weston
> > >

Reply via email to