alamb commented on code in PR #15379: URL: https://github.com/apache/datafusion/pull/15379#discussion_r2010801289
########## datafusion/datasource/src/file_groups.rs: ########## @@ -354,6 +361,115 @@ impl FileGroupPartitioner { } } +/// Represents a group of partitioned files that'll be processed by a single thread. +/// Maintains optional statistics across all files in the group. +#[derive(Debug, Clone)] +pub struct FileGroup { + /// The files in this group + pub files: Vec<PartitionedFile>, + /// Optional statistics for all files in the group + pub statistics: Option<Statistics>, +} + +impl FileGroup { + /// Creates a new FileGroup from a vector of PartitionedFile objects + pub fn new(files: Vec<PartitionedFile>) -> Self { + Self { + files, + statistics: None, + } + } + + /// Returns the number of files in this group + pub fn len(&self) -> usize { + self.files.len() + } + + /// Set the statistics for this group + pub fn with_statistics(mut self, statistics: Statistics) -> Self { + self.statistics = Some(statistics); + self + } + + pub fn iter(&self) -> impl Iterator<Item = &PartitionedFile> { + self.files.iter() + } + + pub fn is_empty(&self) -> bool { + self.files.is_empty() + } + + /// Removes the last element from the files vector and returns it, or None if empty + pub fn pop(&mut self) -> Option<PartitionedFile> { + self.files.pop() + } + + /// Adds a file to the group + pub fn push(&mut self, file: PartitionedFile) { + self.files.push(file); + } + + /// Partition the list of files into `n` groups + pub fn split_files(&mut self, n: usize) -> Vec<FileGroup> { Review Comment: Rather than taking `&mut self` here, I think this would be easier to use if it took `mut self` As written it will leave `self` empty which is not super obvious. If it took `mut self` then it would be clear that the FileGroup is consumed ########## datafusion/datasource/src/file_groups.rs: ########## @@ -354,6 +361,115 @@ impl FileGroupPartitioner { } } +/// Represents a group of partitioned files that'll be processed by a single thread. +/// Maintains optional statistics across all files in the group. +#[derive(Debug, Clone)] +pub struct FileGroup { + /// The files in this group + pub files: Vec<PartitionedFile>, + /// Optional statistics for all files in the group + pub statistics: Option<Statistics>, +} + +impl FileGroup { + /// Creates a new FileGroup from a vector of PartitionedFile objects + pub fn new(files: Vec<PartitionedFile>) -> Self { + Self { + files, + statistics: None, + } + } + + /// Returns the number of files in this group + pub fn len(&self) -> usize { + self.files.len() + } + + /// Set the statistics for this group + pub fn with_statistics(mut self, statistics: Statistics) -> Self { + self.statistics = Some(statistics); + self + } + + pub fn iter(&self) -> impl Iterator<Item = &PartitionedFile> { + self.files.iter() + } + + pub fn is_empty(&self) -> bool { + self.files.is_empty() + } + + /// Removes the last element from the files vector and returns it, or None if empty + pub fn pop(&mut self) -> Option<PartitionedFile> { + self.files.pop() + } + + /// Adds a file to the group + pub fn push(&mut self, file: PartitionedFile) { + self.files.push(file); + } + + /// Partition the list of files into `n` groups + pub fn split_files(&mut self, n: usize) -> Vec<FileGroup> { Review Comment: Rather than taking `&mut self` here, I think this would be easier to use if it took `mut self` As written it will leave `self` empty which is not super obvious. If it took `mut self` then it would be clear that the FileGroup is consumed ########## datafusion/datasource/src/file_groups.rs: ########## @@ -354,6 +361,115 @@ impl FileGroupPartitioner { } } +/// Represents a group of partitioned files that'll be processed by a single thread. +/// Maintains optional statistics across all files in the group. +#[derive(Debug, Clone)] +pub struct FileGroup { + /// The files in this group + pub files: Vec<PartitionedFile>, + /// Optional statistics for all files in the group Review Comment: I think we could make it clearer here that the statistics are for the combined data in the files. Something like ```suggestion /// Optional statistics for the data across all files in the group ``` ########## datafusion/catalog-listing/src/helpers.rs: ########## @@ -121,40 +120,6 @@ pub fn expr_applicable_for_cols(col_names: &[&str], expr: &Expr) -> bool { /// The maximum number of concurrent listing requests const CONCURRENCY_LIMIT: usize = 100; -/// Partition the list of files into `n` groups -pub fn split_files( Review Comment: this is an API change -- can you perhaps leave the function here and mark it `#deprecated` per https://datafusion.apache.org/contributor-guide/api-health.html#deprecation-guidelines ########## datafusion/datasource/src/file_groups.rs: ########## @@ -354,6 +361,115 @@ impl FileGroupPartitioner { } } +/// Represents a group of partitioned files that'll be processed by a single thread. +/// Maintains optional statistics across all files in the group. +#[derive(Debug, Clone)] +pub struct FileGroup { + /// The files in this group + pub files: Vec<PartitionedFile>, + /// Optional statistics for all files in the group + pub statistics: Option<Statistics>, +} + +impl FileGroup { + /// Creates a new FileGroup from a vector of PartitionedFile objects + pub fn new(files: Vec<PartitionedFile>) -> Self { + Self { + files, + statistics: None, + } + } + + /// Returns the number of files in this group + pub fn len(&self) -> usize { + self.files.len() + } + + /// Set the statistics for this group + pub fn with_statistics(mut self, statistics: Statistics) -> Self { + self.statistics = Some(statistics); + self + } + + pub fn iter(&self) -> impl Iterator<Item = &PartitionedFile> { + self.files.iter() + } + + pub fn is_empty(&self) -> bool { + self.files.is_empty() + } + + /// Removes the last element from the files vector and returns it, or None if empty + pub fn pop(&mut self) -> Option<PartitionedFile> { + self.files.pop() + } + + /// Adds a file to the group + pub fn push(&mut self, file: PartitionedFile) { + self.files.push(file); + } + + /// Partition the list of files into `n` groups + pub fn split_files(&mut self, n: usize) -> Vec<FileGroup> { + if self.is_empty() { + return vec![]; + } + + // ObjectStore::list does not guarantee any consistent order and for some + // implementations such as LocalFileSystem, it may be inconsistent. Thus + // Sort files by path to ensure consistent plans when run more than once. + self.files.sort_by(|a, b| a.path().cmp(b.path())); + + // effectively this is div with rounding up instead of truncating + let chunk_size = self.len().div_ceil(n); + let mut chunks = Vec::with_capacity(n); + let mut current_chunk = Vec::with_capacity(chunk_size); + for file in self.files.drain(..) { + current_chunk.push(file); + if current_chunk.len() == chunk_size { + let full_chunk = FileGroup::new(mem::replace( + &mut current_chunk, + Vec::with_capacity(chunk_size), + )); + chunks.push(full_chunk); + } + } + + if !current_chunk.is_empty() { + chunks.push(FileGroup::new(current_chunk)) + } + + chunks + } +} + +impl Index<usize> for FileGroup { + type Output = PartitionedFile; + + fn index(&self, index: usize) -> &Self::Output { + &self.files[index] + } +} + +impl IndexMut<usize> for FileGroup { + fn index_mut(&mut self, index: usize) -> &mut Self::Output { + &mut self.files[index] + } +} + +impl FromIterator<PartitionedFile> for FileGroup { + fn from_iter<I: IntoIterator<Item = PartitionedFile>>(iter: I) -> Self { + let files = iter.into_iter().collect(); + FileGroup::new(files) + } +} Review Comment: I also recommend a From impl for the vec ```suggestion } impl From<Vec<PartitionedFile>> for FileGroup { ... } ``` ########## datafusion/datasource/src/file_groups.rs: ########## @@ -354,6 +361,115 @@ impl FileGroupPartitioner { } } +/// Represents a group of partitioned files that'll be processed by a single thread. +/// Maintains optional statistics across all files in the group. +#[derive(Debug, Clone)] +pub struct FileGroup { + /// The files in this group + pub files: Vec<PartitionedFile>, + /// Optional statistics for all files in the group + pub statistics: Option<Statistics>, Review Comment: I recommend: 1. Remove the `pub` from the fields (so we can potentially change the representation later) 2. Add a `into_inner()` method that returns the inner Vec -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org