alamb commented on PR #20481:
URL: https://github.com/apache/datafusion/pull/20481#issuecomment-4013275682

   Sorry did not make much progress on this today as I have been tied up trying 
to tie up ClickBench Q29  so I can focus on helping this project through
   
   I am now basically done with Q29 
   - https://github.com/apache/datafusion/pull/20749
   
   So will now switch focus to this one. Unfortunately my battery is almost 
dead and this weekend is packed with family stuff. I will try to work on it but 
may not have a chance until Monday
   
   @adriangb
   
   > One key requirement in my mind is being able to decouple IO concurrency 
from CPU work. It's well understood that when talking to object stores the way 
to get good throughput is to do a lot of concurrent work ...
   
   100% agree -- the ideal outcome is something that can be tuned via apis / 
settings for different scenarios (e.g. object store vs SSD).
   
   In fact I think a well tuned system should be able to keep the CPU busy 
almost *ALL* of the time (what this PR does makes it much better but there is 
more):
   
   <img width="981" height="361" 
alt="559238168-6b0c7748-fd50-4a86-8d3b-62c79d59da6d" 
src="https://github.com/user-attachments/assets/7ca0b985-a3f5-4df8-812c-338c592ddfa0";
 />
   
   > The idea is to let the format define what it's morsels are. For Parquet it 
may be one or more row groups. For CSV it could be byte offsets in the file.
   > Then we have a pipeline of PartitionedFile -> Vec<FileMorsel> and 
FileMorsel -> Vec<RecordBatch> giving us multiple places to control concurrency 
and pre-fetch depth.
   
   I like this idea
   
   >  That doesn't map as cleanly to your breakdown above (in particular 
everything is mixed IO / CPU, it's not clear where to isolate IO-only 
operations that can be ramped up with heavy concurrency).
   
   I was imagnining something like
   
   ```text
   PartitonedFile --> Stream<FileMorsel> -> Stream<ParquetReadPlan> -> 
Stream<RecordBatch>
   ```
   
   Where the `Stream<FileMorsel>` is the thing where we can control 
IO/Buffering -- with something like `futures::stream::buffered`
   
   `Stream<ParquetReadPlan> ` gives us a similar place to break up work from a 
single I/O (aka a morsel) into multiple CPU jobs. 
   
   I haven't quite sketched out what those buffered Streams would look like, 
but I think you could then model the work stealing as connecting the related 
Streams across partitions
   
   
   > NUMA aspect of this: Apple just the other day announced chiplet based M5 
processors; is compute locality going to be an even more important factor going 
forward? How can we achieve that with these sorts of designs? My best idea is 
to have some sort of "home base" pipeline: each CPU thread (must be pinned) 
checks it's "home" pipeline (which has a prefetch buffer and thus should often 
have work) and only if empty falls back to stealing work.
   
   I think the key is to use workstealing -- so  the CPU bound tasks normally 
create the job description (the ParquetReadPlan) and then also execute them on 
the same tasks(cores). Only if another core runs out of useful stuff to do with 
its own (in cache) pipeine, does it try and load data from the others. 
   
   > Sorting. We are working on optimizations to emit data from scans either in 
a mostly sorted or fully sorted manner. I guess in the latter case we 
essentially cannot do work stealing, it would break the ordering, but we should 
still be able to share the same codepaths (a pipeline per CPU thread/partition 
instead of a shared pipeline with work stealing). 
   
   Agreed 
   > In the former case of inexact / best effort ordering we could probably 
just order file and morsel opens to be in order, even if the output order is 
not strictly guaranteed.
   
   Agreed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to