I just wrote up a ticket about a general purpose multi-consumer scheduler API, do you think this could be the beginning of a resolution?
https://issues.apache.org/jira/browse/ARROW-8667 We may also want to design in some affordances so that no consumer is ever 100% blocked, even if that causes temporary thread oversubscription (e.g. if we have a 16-thread underlying thread pool, and they're all being used by one consumer, then a new consumer shows up, we spawn a 17-th thread to accommodate the new consumer temporarily until some of the other tasks finish, at which point we drop down to 16 threads again, but share them fairly) I haven't read the whole e-mail discussion but I will read more of it and make some other comments. On Thu, Apr 30, 2020 at 3:17 PM David Li <li.david...@gmail.com> wrote: > > Francois, > > Thanks for the pointers. I'll see if I can put together a > proof-of-concept, might that help discussion? I agree it would be good > to make it format-agnostic. I'm also curious what thoughts you'd have > on how to manage cross-file parallelism (coalescing only helps within > a file). If we just naively start scanning fragments in parallel, we'd > still want some way to help ensure the actual reads get issued roughly > in order of file (to avoid the problem discussed above, where reads > for file B prevent reads for file A from getting scheduled, where B > follows A from the consumer's standpoint). > > Antoine, > > We would be interested in that as well. One thing we do want to > investigate is a better ReadAsync() implementation for S3File as > preliminary benchmarking on our side has shown it's quite inefficient > (the default implementation makes lots of memcpy()s). > > Thanks, > David > > On 4/30/20, Antoine Pitrou <anto...@python.org> wrote: > > > > If we want to discuss IO APIs we should do that comprehensively. > > There are various ways of expressing what we want to do (explicit > > readahead, fadvise-like APIs, async APIs, etc.). > > > > Regards > > > > Antoine. > > > > > > Le 30/04/2020 à 15:08, Francois Saint-Jacques a écrit : > >> One more point, > >> > >> It would seem beneficial if we could express this in > >> `RandomAccessFile::ReadAhead(vector<ReadRange>)` method: no async > >> buffering/coalescing would be needed. In the case of Parquet, we'd get > >> the _exact_ ranges computed from the medata.This method would also > >> possibly benefit other filesystems since on linux it can call > >> `readahead` and/or `madvise`. > >> > >> François > >> > >> > >> On Thu, Apr 30, 2020 at 8:56 AM Francois Saint-Jacques > >> <fsaintjacq...@gmail.com> wrote: > >>> > >>> Hello David, > >>> > >>> I think that what you ask is achievable with the dataset API without > >>> much effort. You'd have to insert the pre-buffering at > >>> ParquetFileFormat::ScanFile [1]. The top-level Scanner::Scan method is > >>> essentially a generator that looks like > >>> flatmap(Iterator<Fragment<Iterator<ScanTask>>). It consumes the > >>> fragment in-order. The application consuming the ScanTask could > >>> control the number of scheduled tasks by looking at the IO pool load. > >>> > >>> OTOH, It would be good if we could make this format agnostic, e.g. > >>> offer this via a ScanOptions toggle, e.g. "readahead_files" and this > >>> would be applicable to all formats, CSV, ipc, ... > >>> > >>> François > >>> [1] > >>> https://github.com/apache/arrow/blob/master/cpp/src/arrow/dataset/file_parquet.cc#L383-L401 > >>> > >>> On Thu, Apr 30, 2020 at 8:20 AM David Li <li.david...@gmail.com> wrote: > >>>> > >>>> Sure, and we are still interested in collaborating. The main use case > >>>> we have is scanning datasets in order of the partition key; it seems > >>>> ordering is the only missing thing from Antoine's comments. However, > >>>> from briefly playing around with the Python API, an application could > >>>> manually order the fragments if so desired, so that still works for > >>>> us, even if ordering isn't otherwise a guarantee. > >>>> > >>>> Performance-wise, we would want intra-file concurrency (coalescing) > >>>> and inter-file concurrency (buffering files in order, as described in > >>>> my previous messages). Even if Datasets doesn't directly handle this, > >>>> it'd be ideal if an application could achieve this if it were willing > >>>> to manage the details. I also vaguely remember seeing some interest in > >>>> things like being able to distribute a computation over a dataset via > >>>> Dask or some other distributed computation system, which would also be > >>>> interesting to us, though not a concrete requirement. > >>>> > >>>> I'd like to reference the original proposal document, which has more > >>>> detail on our workloads and use cases: > >>>> https://docs.google.com/document/d/1tZsT3dC7UXbLTkqxgVeFGWm9piXScUDujsa0ncvK_Fs/edit > >>>> As described there, we have a library that implements both a > >>>> datasets-like API (hand it a remote directory, get back an Arrow > >>>> Table) and several optimizations to make that library perform > >>>> acceptably. Our motivation here is to be able to have a path to > >>>> migrate to using and contributing to Arrow Datasets, which we see as a > >>>> cross-language, cross-filesystem library, without regressing in > >>>> performance. (We are limited to Python and S3.) > >>>> > >>>> Best, > >>>> David > >>>> > >>>> On 4/29/20, Wes McKinney <wesmck...@gmail.com> wrote: > >>>>> On Wed, Apr 29, 2020 at 6:54 PM David Li <li.david...@gmail.com> > >>>>> wrote: > >>>>>> > >>>>>> Ah, sorry, so I am being somewhat unclear here. Yes, you aren't > >>>>>> guaranteed to download all the files in order, but with more control, > >>>>>> you can make this more likely. You can also prevent the case where > >>>>>> due > >>>>>> to scheduling, file N+1 doesn't even start downloading until after > >>>>>> file N+2, which can happen if you just submit all reads to a thread > >>>>>> pool, as demonstrated in the linked trace. > >>>>>> > >>>>>> And again, with this level of control, you can also decide to reduce > >>>>>> or increase parallelism based on network conditions, memory usage, > >>>>>> other readers, etc. So it is both about improving/smoothing out > >>>>>> performance, and limiting resource consumption. > >>>>>> > >>>>>> Finally, I do not mean to propose that we necessarily build all of > >>>>>> this into Arrow, just that it we would like to make it possible to > >>>>>> build this with Arrow, and that Datasets may find this interesting > >>>>>> for > >>>>>> its optimization purposes, if concurrent reads are a goal. > >>>>>> > >>>>>>> Except that datasets are essentially unordered. > >>>>>> > >>>>>> I did not realize this, but that means it's not really suitable for > >>>>>> our use case, unfortunately. > >>>>> > >>>>> It would be helpful to understand things a bit better so that we do > >>>>> not miss out on an opportunity to collaborate. I don't know that the > >>>>> current mode of the some of the public Datasets APIs is a dogmatic > >>>>> view about how everything should always work, and it's possible that > >>>>> some relatively minor changes could allow you to use it. So let's try > >>>>> not to be closing any doors right now > >>>>> > >>>>>> Thanks, > >>>>>> David > >>>>>> > >>>>>> On 4/29/20, Antoine Pitrou <anto...@python.org> wrote: > >>>>>>> > >>>>>>> Le 29/04/2020 à 23:30, David Li a écrit : > >>>>>>>> Sure - > >>>>>>>> > >>>>>>>> The use case is to read a large partitioned dataset, consisting of > >>>>>>>> tens or hundreds of Parquet files. A reader expects to scan through > >>>>>>>> the data in order of the partition key. However, to improve > >>>>>>>> performance, we'd like to begin loading files N+1, N+2, ... N + k > >>>>>>>> while the consumer is still reading file N, so that it doesn't have > >>>>>>>> to > >>>>>>>> wait every time it opens a new file, and to help hide any latency > >>>>>>>> or > >>>>>>>> slowness that might be happening on the backend. We also don't want > >>>>>>>> to > >>>>>>>> be in a situation where file N+2 is ready but file N+1 isn't, > >>>>>>>> because > >>>>>>>> that doesn't help us (we still have to wait for N+1 to load). > >>>>>>> > >>>>>>> But depending on network conditions, you may very well get file N+2 > >>>>>>> before N+1, even if you start loading it after... > >>>>>>> > >>>>>>>> This is why I mention the project is quite similar to the Datasets > >>>>>>>> project - Datasets likely covers all the functionality we would > >>>>>>>> eventually need. > >>>>>>> > >>>>>>> Except that datasets are essentially unordered. > >>>>>>> > >>>>>>> Regards > >>>>>>> > >>>>>>> Antoine. > >>>>>>> > >>>>> > >