adriangb commented on PR #20481:
URL: https://github.com/apache/datafusion/pull/20481#issuecomment-4010334496

   > I spent some more time this evening reading this PR -- I think if we did 
some rearranging of the code it would likely be easier to read / understand 
(and possibly test)
   > 
   > Specifically I am thinking of trying to pull it out into a more explicit 
pipeline like this
   > 
   > ```
   > ┌────────────────────┐      ┌────────────────────┐    
┌───────────────────────┐   ┌────────────────────┐ 
   > │     Open File      │      │        Plan        │    │      Fetch Data    
   │   │                    │ 
   > │  (fetch / decode   │─────▶│(Apply statistics + │───▶│(fetch Row Group 
data) │──▶│    Decode Data     │ 
   > │  ParquetMetadata)  │      │      pruning)      │    │                    
   │   │                    │ 
   > └────────────────────┘      └────────────────────┘    
└───────────────────────┘   └────────────────────┘ 
   >                                                                            
                              
   >         IO+CPU                       CPU                         IO        
                 CPU          
   > ```
   > 
   > I will try and spend some of my time on the airplane tomorrow working to 
see if I can rearrange the opener code towards this direction.
   > 
   > My idea is to split the implementation into a few steps:
   > 
   >     1. Introduce the abstraction of ParquetMorsels (but don't yet 
implement work stealing across partitions)
   > 
   >     2. (maybe) restructure the existing FileOpener so it produces a Stream 
of ParquetMorsels
   > 
   > 
   > I think then we would be in a better position to add in the (key) idea of 
"steal morsels from other partitions" idea if the next file/row group wasn't 
yet ready
   > 
   > But I realize this is all just a theory until I actually produce code
   
   I agree I've been thinking about something similar. One key requirement in 
my mind is being able to decouple IO concurrency from CPU work. It's well 
understood that when talking to object stores the way to get good throughput is 
to do a lot of concurrent work so big picture we want to break down the work of 
reading a parquet file into ~ 4-8 MB chunks of IO and do as much of it 
concurrently as possible. SSDs are different; the point is we need to be able 
to tune these things, ideally automatically but at the very least via some sort 
of `mosel_processing_concurrency = 32` or something.
   
   I've been experimenting with a similar design (I think the more the merrier 
here) and came up with this:
   
   ```rust
   /// Generic API for opening a file using an [`ObjectStore`] and resolving to 
a
   /// stream of [`RecordBatch`]
   ///
   /// [`ObjectStore`]: object_store::ObjectStore
   pub trait FileOpener: Unpin + Send + Sync {
       /// Asynchronously open the specified file and return a stream
       /// of [`RecordBatch`]
       fn open(&self, partitioned_file: PartitionedFile) -> 
Result<FileOpenFuture>;
   
       /// Open a file and return a list of [`FileMorsel`]s for parallel 
execution.
       ///
       /// This method performs metadata loading, schema resolution, and 
pruning,
       /// but does NOT read the actual data. Each returned morsel represents a
       /// unit of work (e.g., one or more row groups for Parquet) that can be
       /// independently executed.
       ///
       /// The default implementation wraps the existing [`Self::open`] result 
as a
       /// single [`StreamMorsel`], providing backwards compatibility for 
formats
       /// like CSV, JSON, Arrow, and Avro that don't have sub-file structure.
       fn open_morsels(
           &self,
           partitioned_file: PartitionedFile,
       ) -> Result<FileOpenMorselFuture> {
           let future = self.open(partitioned_file)?;
           Ok(Box::pin(async move {
               let stream = future.await?;
               let morsel: Box<dyn FileMorsel> = 
Box::new(StreamMorsel::new(stream));
               Ok(vec![morsel])
           }))
       }
   }
   ```
   
   The idea is to let the format define what it's morsels are. For Parquet it 
may be one or more row groups. For CSV it could be byte offsets in the file.
   
   Then we have a pipeline of `PartitionedFile -> Vec<FileMorsel>` and 
`FileMorsel -> Vec<RecordBatch>` giving us multiple places to control 
concurrency and pre-fetch depth.
   
   That doesn't map as cleanly to your breakdown above (in particular 
everything is mixed IO / CPU, it's not clear where to isolate IO-only 
operations that can be ramped up with heavy concurrency).
   
   The other big questions in my mind are:
   - NUMA aspect of this: Apple just the other day announced chiplet based M5 
processors; is compute locality going to be an even more important factor going 
forward? How can we achieve that with these sorts of designs? My best idea is 
to have some sort of "home base" pipeline: each CPU thread (must be pinned) 
checks it's "home" pipeline (which has a prefetch buffer and thus should often 
have work) and only if empty falls back to stealing work.
   - Sorting. We are working on optimizations to emit data from scans either in 
a mostly sorted or fully sorted manner. I guess in the latter case we 
essentially cannot do work stealing, it would break the ordering, but we should 
still be able to share the same codepaths (a pipeline per CPU thread/partition 
instead of a shared pipeline with work stealing). In the former case of inexact 
/ best effort ordering we could probably just order file and morsel opens to be 
in order, even if the output order is not strictly guaranteed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to