Hi all, Thanks to Antoine for implementing the core read coalescing logic.
We've taken a look at what else needs to be done to get this working, and it sounds like the following changes would be worthwhile, independent of the rest of the optimizations we discussed: - Add benchmarks of the current Parquet reader with the current S3File (and other file implementations) so we can track improvements/regressions - Use the coalescing inside the Parquet reader (even without a column filter hint - this would subsume PARQUET-1698) - In coalescing, split large read ranges into smaller ones (this would further improve on PARQUET-1698 by taking advantage of parallel reads) - Accept a column filter hint in the Parquet reader and use that to compute read ranges Does this sound reasonable? Thanks, David On 2/6/20, David Li <li.david...@gmail.com> wrote: > Catching up on questions here... > >> Typically you can solve this by having enough IO concurrency at once :-) >> I'm not sure having sophisticated global coordination (based on which >> algorithms) would bring anything. Would you care to elaborate? > > We aren't proposing *sophisticated* global coordination, rather, just > using a global pool with a global limit, so that a user doesn't > unintentionally start hundreds of requests in parallel, and so that > you can adjust the resource consumption/performance tradeoff. > > Essentially, what our library does is maintain two pools (for I/O): > - One pool produces I/O requests, by going through the list of files, > fetching the Parquet footers, and queuing up I/O requests on the main > pool. (This uses a pool so we can fetch and parse metadata from > multiple Parquet files at once.) > - One pool serves I/O requests, by fetching chunks and placing them in > buffers inside the file object implementation. > > The global concurrency manager additionally limits the second pool by > not servicing I/O requests for a file until all of the I/O requests > for previous files have at least started. (By just having lots of > concurrency, you might end up starving yourself by reading data you > don't want quite yet.) > > Additionally, the global pool could still be a win for non-Parquet > files - an implementation can at least submit, say, an entire CSV file > as a "chunk" and have it read in the background. > >> Actually, on a more high-level basis, is the goal to prefetch for >> sequential consumption of row groups? > > At least for us, our query pattern is to sequentially consume row > groups from a large dataset, where we select a subset of columns and a > subset of the partition key range (usually time range). Prefetching > speeds this up substantially, or in general, pipelining discovery of > files, I/O, and deserialization. > >> There are no situations where you would want to consume a scattered >> subset of row groups (e.g. predicate pushdown)? > > With coalescing, this "automatically" gets optimized. If you happen to > need column chunks from separate row groups that are adjacent or close > on-disk, coalescing will still fetch them in a single IO call. > > We found that having large row groups was more beneficial than small > row groups, since when you combine small row groups with column > selection, you end up with a lot of small non-adjacent column chunks - > which coalescing can't help with. The exact tradeoff depends on the > dataset and workload, of course. > >> This seems like too much to try to build into RandomAccessFile. I would >> suggest a class that wraps a random access file and manages cached >> segments >> and their lifetimes through explicit APIs. > > A wrapper class seems ideal, especially as the logic is agnostic to > the storage backend (except for some parameters which can either be > hand-tuned or estimated on the fly). It also keeps the scope of the > changes down. > >> Where to put the "async multiple range request" API is a separate >> question, >> though. Probably makes sense to start writing some working code and sort >> it >> out there. > > We haven't looked in this direction much. Our designs are based around > thread pools partly because we wanted to avoid modifying the Parquet > and Arrow internals, instead choosing to modify the I/O layer to "keep > Parquet fed" as quickly as possible. > > Overall, I recall there's an issue open for async APIs in > Arrow...perhaps we want to move that to a separate discussion, or on > the contrary, explore some experimental APIs here to inform the > overall design. > > Thanks, > David > > On 2/6/20, Wes McKinney <wesmck...@gmail.com> wrote: >> On Thu, Feb 6, 2020 at 1:30 PM Antoine Pitrou <anto...@python.org> wrote: >>> >>> >>> Le 06/02/2020 à 20:20, Wes McKinney a écrit : >>> >> Actually, on a more high-level basis, is the goal to prefetch for >>> >> sequential consumption of row groups? >>> >> >>> > >>> > Essentially yes. One "easy" optimization is to prefetch the entire >>> > serialized row group. This is an evolution of that idea where we want >>> > to >>> > prefetch only the needed parts of a row group in a minimum number of >>> > IO >>> > calls (consider reading the first 10 columns from a file with 1000 >>> > columns >>> > -- so we want to do one IO call instead of 10 like we do now). >>> >>> There are no situations where you would want to consume a scattered >>> subset of row groups (e.g. predicate pushdown)? >> >> There are. If it can be demonstrated that there are performance gains >> resulting from IO optimizations involving multiple row groups then I >> see no reason not to implement them. >> >>> Regards >>> >>> Antoine. >> >