I just wrote up a ticket about a general purpose multi-consumer
scheduler API, do you think this could be the beginning of a
resolution?

https://issues.apache.org/jira/browse/ARROW-8667

We may also want to design in some affordances so that no consumer is
ever 100% blocked, even if that causes temporary thread
oversubscription (e.g. if we have a 16-thread underlying thread pool,
and they're all being used by one consumer, then a new consumer shows
up, we spawn a 17-th thread to accommodate the new consumer
temporarily until some of the other tasks finish, at which point we
drop down to 16 threads again, but share them fairly)

I haven't read the whole e-mail discussion but I will read more of it
and make some other comments.

On Thu, Apr 30, 2020 at 3:17 PM David Li <li.david...@gmail.com> wrote:
>
> Francois,
>
> Thanks for the pointers. I'll see if I can put together a
> proof-of-concept, might that help discussion? I agree it would be good
> to make it format-agnostic. I'm also curious what thoughts you'd have
> on how to manage cross-file parallelism (coalescing only helps within
> a file). If we just naively start scanning fragments in parallel, we'd
> still want some way to help ensure the actual reads get issued roughly
> in order of file (to avoid the problem discussed above, where reads
> for file B prevent reads for file A from getting scheduled, where B
> follows A from the consumer's standpoint).
>
> Antoine,
>
> We would be interested in that as well. One thing we do want to
> investigate is a better ReadAsync() implementation for S3File as
> preliminary benchmarking on our side has shown it's quite inefficient
> (the default implementation makes lots of memcpy()s).
>
> Thanks,
> David
>
> On 4/30/20, Antoine Pitrou <anto...@python.org> wrote:
> >
> > If we want to discuss IO APIs we should do that comprehensively.
> > There are various ways of expressing what we want to do (explicit
> > readahead, fadvise-like APIs, async APIs, etc.).
> >
> > Regards
> >
> > Antoine.
> >
> >
> > Le 30/04/2020 à 15:08, Francois Saint-Jacques a écrit :
> >> One more point,
> >>
> >> It would seem beneficial if we could express this in
> >> `RandomAccessFile::ReadAhead(vector<ReadRange>)` method: no async
> >> buffering/coalescing would be needed. In the case of Parquet, we'd get
> >> the _exact_ ranges computed from the medata.This method would also
> >> possibly benefit other filesystems since on linux it can call
> >> `readahead` and/or `madvise`.
> >>
> >> François
> >>
> >>
> >> On Thu, Apr 30, 2020 at 8:56 AM Francois Saint-Jacques
> >> <fsaintjacq...@gmail.com> wrote:
> >>>
> >>> Hello David,
> >>>
> >>> I think that what you ask is achievable with the dataset API without
> >>> much effort. You'd have to insert the pre-buffering at
> >>> ParquetFileFormat::ScanFile [1]. The top-level Scanner::Scan method is
> >>> essentially a generator that looks like
> >>> flatmap(Iterator<Fragment<Iterator<ScanTask>>). It consumes the
> >>> fragment in-order. The application consuming the ScanTask could
> >>> control the number of scheduled tasks by looking at the IO pool load.
> >>>
> >>> OTOH, It would be good if we could make this format agnostic, e.g.
> >>> offer this via a ScanOptions toggle, e.g. "readahead_files" and this
> >>> would be applicable to all formats, CSV, ipc, ...
> >>>
> >>> François
> >>> [1]
> >>> https://github.com/apache/arrow/blob/master/cpp/src/arrow/dataset/file_parquet.cc#L383-L401
> >>>
> >>> On Thu, Apr 30, 2020 at 8:20 AM David Li <li.david...@gmail.com> wrote:
> >>>>
> >>>> Sure, and we are still interested in collaborating. The main use case
> >>>> we have is scanning datasets in order of the partition key; it seems
> >>>> ordering is the only missing thing from Antoine's comments. However,
> >>>> from briefly playing around with the Python API, an application could
> >>>> manually order the fragments if so desired, so that still works for
> >>>> us, even if ordering isn't otherwise a guarantee.
> >>>>
> >>>> Performance-wise, we would want intra-file concurrency (coalescing)
> >>>> and inter-file concurrency (buffering files in order, as described in
> >>>> my previous messages). Even if Datasets doesn't directly handle this,
> >>>> it'd be ideal if an application could achieve this if it were willing
> >>>> to manage the details. I also vaguely remember seeing some interest in
> >>>> things like being able to distribute a computation over a dataset via
> >>>> Dask or some other distributed computation system, which would also be
> >>>> interesting to us, though not a concrete requirement.
> >>>>
> >>>> I'd like to reference the original proposal document, which has more
> >>>> detail on our workloads and use cases:
> >>>> https://docs.google.com/document/d/1tZsT3dC7UXbLTkqxgVeFGWm9piXScUDujsa0ncvK_Fs/edit
> >>>> As described there, we have a library that implements both a
> >>>> datasets-like API (hand it a remote directory, get back an Arrow
> >>>> Table) and several optimizations to make that library perform
> >>>> acceptably. Our motivation here is to be able to have a path to
> >>>> migrate to using and contributing to Arrow Datasets, which we see as a
> >>>> cross-language, cross-filesystem library, without regressing in
> >>>> performance. (We are limited to Python and S3.)
> >>>>
> >>>> Best,
> >>>> David
> >>>>
> >>>> On 4/29/20, Wes McKinney <wesmck...@gmail.com> wrote:
> >>>>> On Wed, Apr 29, 2020 at 6:54 PM David Li <li.david...@gmail.com>
> >>>>> wrote:
> >>>>>>
> >>>>>> Ah, sorry, so I am being somewhat unclear here. Yes, you aren't
> >>>>>> guaranteed to download all the files in order, but with more control,
> >>>>>> you can make this more likely. You can also prevent the case where
> >>>>>> due
> >>>>>> to scheduling, file N+1 doesn't even start downloading until after
> >>>>>> file N+2, which can happen if you just submit all reads to a thread
> >>>>>> pool, as demonstrated in the linked trace.
> >>>>>>
> >>>>>> And again, with this level of control, you can also decide to reduce
> >>>>>> or increase parallelism based on network conditions, memory usage,
> >>>>>> other readers, etc. So it is both about improving/smoothing out
> >>>>>> performance, and limiting resource consumption.
> >>>>>>
> >>>>>> Finally, I do not mean to propose that we necessarily build all of
> >>>>>> this into Arrow, just that it we would like to make it possible to
> >>>>>> build this with Arrow, and that Datasets may find this interesting
> >>>>>> for
> >>>>>> its optimization purposes, if concurrent reads are a goal.
> >>>>>>
> >>>>>>>  Except that datasets are essentially unordered.
> >>>>>>
> >>>>>> I did not realize this, but that means it's not really suitable for
> >>>>>> our use case, unfortunately.
> >>>>>
> >>>>> It would be helpful to understand things a bit better so that we do
> >>>>> not miss out on an opportunity to collaborate. I don't know that the
> >>>>> current mode of the some of the public Datasets APIs is a dogmatic
> >>>>> view about how everything should always work, and it's possible that
> >>>>> some relatively minor changes could allow you to use it. So let's try
> >>>>> not to be closing any doors right now
> >>>>>
> >>>>>> Thanks,
> >>>>>> David
> >>>>>>
> >>>>>> On 4/29/20, Antoine Pitrou <anto...@python.org> wrote:
> >>>>>>>
> >>>>>>> Le 29/04/2020 à 23:30, David Li a écrit :
> >>>>>>>> Sure -
> >>>>>>>>
> >>>>>>>> The use case is to read a large partitioned dataset, consisting of
> >>>>>>>> tens or hundreds of Parquet files. A reader expects to scan through
> >>>>>>>> the data in order of the partition key. However, to improve
> >>>>>>>> performance, we'd like to begin loading files N+1, N+2, ... N + k
> >>>>>>>> while the consumer is still reading file N, so that it doesn't have
> >>>>>>>> to
> >>>>>>>> wait every time it opens a new file, and to help hide any latency
> >>>>>>>> or
> >>>>>>>> slowness that might be happening on the backend. We also don't want
> >>>>>>>> to
> >>>>>>>> be in a situation where file N+2 is ready but file N+1 isn't,
> >>>>>>>> because
> >>>>>>>> that doesn't help us (we still have to wait for N+1 to load).
> >>>>>>>
> >>>>>>> But depending on network conditions, you may very well get file N+2
> >>>>>>> before N+1, even if you start loading it after...
> >>>>>>>
> >>>>>>>> This is why I mention the project is quite similar to the Datasets
> >>>>>>>> project - Datasets likely covers all the functionality we would
> >>>>>>>> eventually need.
> >>>>>>>
> >>>>>>> Except that datasets are essentially unordered.
> >>>>>>>
> >>>>>>> Regards
> >>>>>>>
> >>>>>>> Antoine.
> >>>>>>>
> >>>>>
> >

Reply via email to