alamb commented on PR #20481: URL: https://github.com/apache/datafusion/pull/20481#issuecomment-4013275682
Sorry did not make much progress on this today as I have been tied up trying to tie up ClickBench Q29 so I can focus on helping this project through I am now basically done with Q29 - https://github.com/apache/datafusion/pull/20749 So will now switch focus to this one. Unfortunately my battery is almost dead and this weekend is packed with family stuff. I will try to work on it but may not have a chance until Monday @adriangb > One key requirement in my mind is being able to decouple IO concurrency from CPU work. It's well understood that when talking to object stores the way to get good throughput is to do a lot of concurrent work ... 100% agree -- the ideal outcome is something that can be tuned via apis / settings for different scenarios (e.g. object store vs SSD). In fact I think a well tuned system should be able to keep the CPU busy almost *ALL* of the time (what this PR does makes it much better but there is more): <img width="981" height="361" alt="559238168-6b0c7748-fd50-4a86-8d3b-62c79d59da6d" src="https://github.com/user-attachments/assets/7ca0b985-a3f5-4df8-812c-338c592ddfa0" /> > The idea is to let the format define what it's morsels are. For Parquet it may be one or more row groups. For CSV it could be byte offsets in the file. > Then we have a pipeline of PartitionedFile -> Vec<FileMorsel> and FileMorsel -> Vec<RecordBatch> giving us multiple places to control concurrency and pre-fetch depth. I like this idea > That doesn't map as cleanly to your breakdown above (in particular everything is mixed IO / CPU, it's not clear where to isolate IO-only operations that can be ramped up with heavy concurrency). I was imagnining something like ```text PartitonedFile --> Stream<FileMorsel> -> Stream<ParquetReadPlan> -> Stream<RecordBatch> ``` Where the `Stream<FileMorsel>` is the thing where we can control IO/Buffering -- with something like `futures::stream::buffered` `Stream<ParquetReadPlan> ` gives us a similar place to break up work from a single I/O (aka a morsel) into multiple CPU jobs. I haven't quite sketched out what those buffered Streams would look like, but I think you could then model the work stealing as connecting the related Streams across partitions > NUMA aspect of this: Apple just the other day announced chiplet based M5 processors; is compute locality going to be an even more important factor going forward? How can we achieve that with these sorts of designs? My best idea is to have some sort of "home base" pipeline: each CPU thread (must be pinned) checks it's "home" pipeline (which has a prefetch buffer and thus should often have work) and only if empty falls back to stealing work. I think the key is to use workstealing -- so the CPU bound tasks normally create the job description (the ParquetReadPlan) and then also execute them on the same tasks(cores). Only if another core runs out of useful stuff to do with its own (in cache) pipeine, does it try and load data from the others. > Sorting. We are working on optimizations to emit data from scans either in a mostly sorted or fully sorted manner. I guess in the latter case we essentially cannot do work stealing, it would break the ordering, but we should still be able to share the same codepaths (a pipeline per CPU thread/partition instead of a shared pipeline with work stealing). Agreed > In the former case of inexact / best effort ordering we could probably just order file and morsel opens to be in order, even if the output order is not strictly guaranteed. Agreed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
