alamb commented on code in PR #15379:
URL: https://github.com/apache/datafusion/pull/15379#discussion_r2010801289


##########
datafusion/datasource/src/file_groups.rs:
##########
@@ -354,6 +361,115 @@ impl FileGroupPartitioner {
     }
 }
 
+/// Represents a group of partitioned files that'll be processed by a single 
thread.
+/// Maintains optional statistics across all files in the group.
+#[derive(Debug, Clone)]
+pub struct FileGroup {
+    /// The files in this group
+    pub files: Vec<PartitionedFile>,
+    /// Optional statistics for all files in the group
+    pub statistics: Option<Statistics>,
+}
+
+impl FileGroup {
+    /// Creates a new FileGroup from a vector of PartitionedFile objects
+    pub fn new(files: Vec<PartitionedFile>) -> Self {
+        Self {
+            files,
+            statistics: None,
+        }
+    }
+
+    /// Returns the number of files in this group
+    pub fn len(&self) -> usize {
+        self.files.len()
+    }
+
+    /// Set the statistics for this group
+    pub fn with_statistics(mut self, statistics: Statistics) -> Self {
+        self.statistics = Some(statistics);
+        self
+    }
+
+    pub fn iter(&self) -> impl Iterator<Item = &PartitionedFile> {
+        self.files.iter()
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.files.is_empty()
+    }
+
+    /// Removes the last element from the files vector and returns it, or None 
if empty
+    pub fn pop(&mut self) -> Option<PartitionedFile> {
+        self.files.pop()
+    }
+
+    /// Adds a file to the group
+    pub fn push(&mut self, file: PartitionedFile) {
+        self.files.push(file);
+    }
+
+    /// Partition the list of files into `n` groups
+    pub fn split_files(&mut self, n: usize) -> Vec<FileGroup> {

Review Comment:
   Rather than taking `&mut self` here, I think this would be easier to use if 
it took `mut self` 
   
   As written it will leave `self` empty which is not super obvious. If it took 
`mut self` then it would be clear that the FileGroup is consumed



##########
datafusion/datasource/src/file_groups.rs:
##########
@@ -354,6 +361,115 @@ impl FileGroupPartitioner {
     }
 }
 
+/// Represents a group of partitioned files that'll be processed by a single 
thread.
+/// Maintains optional statistics across all files in the group.
+#[derive(Debug, Clone)]
+pub struct FileGroup {
+    /// The files in this group
+    pub files: Vec<PartitionedFile>,
+    /// Optional statistics for all files in the group
+    pub statistics: Option<Statistics>,
+}
+
+impl FileGroup {
+    /// Creates a new FileGroup from a vector of PartitionedFile objects
+    pub fn new(files: Vec<PartitionedFile>) -> Self {
+        Self {
+            files,
+            statistics: None,
+        }
+    }
+
+    /// Returns the number of files in this group
+    pub fn len(&self) -> usize {
+        self.files.len()
+    }
+
+    /// Set the statistics for this group
+    pub fn with_statistics(mut self, statistics: Statistics) -> Self {
+        self.statistics = Some(statistics);
+        self
+    }
+
+    pub fn iter(&self) -> impl Iterator<Item = &PartitionedFile> {
+        self.files.iter()
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.files.is_empty()
+    }
+
+    /// Removes the last element from the files vector and returns it, or None 
if empty
+    pub fn pop(&mut self) -> Option<PartitionedFile> {
+        self.files.pop()
+    }
+
+    /// Adds a file to the group
+    pub fn push(&mut self, file: PartitionedFile) {
+        self.files.push(file);
+    }
+
+    /// Partition the list of files into `n` groups
+    pub fn split_files(&mut self, n: usize) -> Vec<FileGroup> {

Review Comment:
   Rather than taking `&mut self` here, I think this would be easier to use if 
it took `mut self` 
   
   As written it will leave `self` empty which is not super obvious. If it took 
`mut self` then it would be clear that the FileGroup is consumed



##########
datafusion/datasource/src/file_groups.rs:
##########
@@ -354,6 +361,115 @@ impl FileGroupPartitioner {
     }
 }
 
+/// Represents a group of partitioned files that'll be processed by a single 
thread.
+/// Maintains optional statistics across all files in the group.
+#[derive(Debug, Clone)]
+pub struct FileGroup {
+    /// The files in this group
+    pub files: Vec<PartitionedFile>,
+    /// Optional statistics for all files in the group

Review Comment:
   I think we could make it clearer here that the statistics are for the 
combined data in the files. Something like
   
   ```suggestion
       /// Optional statistics for the data across all files in the group
   ```



##########
datafusion/catalog-listing/src/helpers.rs:
##########
@@ -121,40 +120,6 @@ pub fn expr_applicable_for_cols(col_names: &[&str], expr: 
&Expr) -> bool {
 /// The maximum number of concurrent listing requests
 const CONCURRENCY_LIMIT: usize = 100;
 
-/// Partition the list of files into `n` groups
-pub fn split_files(

Review Comment:
   this is an API change -- can you perhaps leave the function here and mark it 
`#deprecated` per 
https://datafusion.apache.org/contributor-guide/api-health.html#deprecation-guidelines



##########
datafusion/datasource/src/file_groups.rs:
##########
@@ -354,6 +361,115 @@ impl FileGroupPartitioner {
     }
 }
 
+/// Represents a group of partitioned files that'll be processed by a single 
thread.
+/// Maintains optional statistics across all files in the group.
+#[derive(Debug, Clone)]
+pub struct FileGroup {
+    /// The files in this group
+    pub files: Vec<PartitionedFile>,
+    /// Optional statistics for all files in the group
+    pub statistics: Option<Statistics>,
+}
+
+impl FileGroup {
+    /// Creates a new FileGroup from a vector of PartitionedFile objects
+    pub fn new(files: Vec<PartitionedFile>) -> Self {
+        Self {
+            files,
+            statistics: None,
+        }
+    }
+
+    /// Returns the number of files in this group
+    pub fn len(&self) -> usize {
+        self.files.len()
+    }
+
+    /// Set the statistics for this group
+    pub fn with_statistics(mut self, statistics: Statistics) -> Self {
+        self.statistics = Some(statistics);
+        self
+    }
+
+    pub fn iter(&self) -> impl Iterator<Item = &PartitionedFile> {
+        self.files.iter()
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.files.is_empty()
+    }
+
+    /// Removes the last element from the files vector and returns it, or None 
if empty
+    pub fn pop(&mut self) -> Option<PartitionedFile> {
+        self.files.pop()
+    }
+
+    /// Adds a file to the group
+    pub fn push(&mut self, file: PartitionedFile) {
+        self.files.push(file);
+    }
+
+    /// Partition the list of files into `n` groups
+    pub fn split_files(&mut self, n: usize) -> Vec<FileGroup> {
+        if self.is_empty() {
+            return vec![];
+        }
+
+        // ObjectStore::list does not guarantee any consistent order and for 
some
+        // implementations such as LocalFileSystem, it may be inconsistent. 
Thus
+        // Sort files by path to ensure consistent plans when run more than 
once.
+        self.files.sort_by(|a, b| a.path().cmp(b.path()));
+
+        // effectively this is div with rounding up instead of truncating
+        let chunk_size = self.len().div_ceil(n);
+        let mut chunks = Vec::with_capacity(n);
+        let mut current_chunk = Vec::with_capacity(chunk_size);
+        for file in self.files.drain(..) {
+            current_chunk.push(file);
+            if current_chunk.len() == chunk_size {
+                let full_chunk = FileGroup::new(mem::replace(
+                    &mut current_chunk,
+                    Vec::with_capacity(chunk_size),
+                ));
+                chunks.push(full_chunk);
+            }
+        }
+
+        if !current_chunk.is_empty() {
+            chunks.push(FileGroup::new(current_chunk))
+        }
+
+        chunks
+    }
+}
+
+impl Index<usize> for FileGroup {
+    type Output = PartitionedFile;
+
+    fn index(&self, index: usize) -> &Self::Output {
+        &self.files[index]
+    }
+}
+
+impl IndexMut<usize> for FileGroup {
+    fn index_mut(&mut self, index: usize) -> &mut Self::Output {
+        &mut self.files[index]
+    }
+}
+
+impl FromIterator<PartitionedFile> for FileGroup {
+    fn from_iter<I: IntoIterator<Item = PartitionedFile>>(iter: I) -> Self {
+        let files = iter.into_iter().collect();
+        FileGroup::new(files)
+    }
+}

Review Comment:
   I also recommend a From impl for the vec
   
   ```suggestion
   }
   
   impl From<Vec<PartitionedFile>> for FileGroup {
   ...
   }
   ```



##########
datafusion/datasource/src/file_groups.rs:
##########
@@ -354,6 +361,115 @@ impl FileGroupPartitioner {
     }
 }
 
+/// Represents a group of partitioned files that'll be processed by a single 
thread.
+/// Maintains optional statistics across all files in the group.
+#[derive(Debug, Clone)]
+pub struct FileGroup {
+    /// The files in this group
+    pub files: Vec<PartitionedFile>,
+    /// Optional statistics for all files in the group
+    pub statistics: Option<Statistics>,

Review Comment:
   I recommend:
   1. Remove the `pub` from the fields (so we can potentially change the 
representation later)
   2. Add a `into_inner()` method that returns the inner Vec



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to