This is an automated email from the ASF dual-hosted git repository.

jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new de8bcc536b Refactor: add `FileGroup` structure for 
`Vec<PartitionedFile>` (#15379)
de8bcc536b is described below

commit de8bcc536bae5b9448e3e715834d518415152967
Author: xudong.w <[email protected]>
AuthorDate: Wed Mar 26 08:43:56 2025 +0800

    Refactor: add `FileGroup` structure for `Vec<PartitionedFile>` (#15379)
    
    * Refactor: add FileGroup structure
    
    * update
    
    * address comments
    
    * address comments from alamb
    
    * fix clippy
---
 .../examples/parquet_exec_visitor.rs               |   6 +-
 datafusion/catalog-listing/src/helpers.rs          |  18 +-
 .../core/src/datasource/file_format/arrow.rs       |   2 +-
 datafusion/core/src/datasource/file_format/mod.rs  |   3 +-
 .../core/src/datasource/file_format/parquet.rs     |  10 +-
 datafusion/core/src/datasource/listing/table.rs    |  11 +-
 .../src/datasource/physical_plan/arrow_file.rs     |   4 +-
 .../core/src/datasource/physical_plan/json.rs      |   7 +-
 .../core/src/datasource/physical_plan/mod.rs       |   1 +
 .../core/src/datasource/physical_plan/parquet.rs   |  11 +-
 datafusion/core/src/datasource/statistics.rs       |  10 +-
 datafusion/core/src/physical_planner.rs            |   3 +-
 datafusion/core/src/test/mod.rs                    |   8 +-
 .../physical_optimizer/enforce_distribution.rs     |   9 +-
 datafusion/datasource-csv/src/file_format.rs       |   2 +-
 datafusion/datasource-csv/src/mod.rs               |   7 +-
 datafusion/datasource-csv/src/source.rs            |   8 +-
 datafusion/datasource-json/src/file_format.rs      |   2 +-
 datafusion/datasource-json/src/source.rs           |   7 +-
 datafusion/datasource-parquet/src/file_format.rs   |   2 +-
 datafusion/datasource-parquet/src/mod.rs           |   6 +-
 datafusion/datasource/src/display.rs               |  66 ++--
 datafusion/datasource/src/file_groups.rs           | 342 +++++++++++++++------
 datafusion/datasource/src/file_scan_config.rs      |  35 ++-
 datafusion/datasource/src/file_sink_config.rs      |   8 +-
 datafusion/datasource/src/file_stream.rs           |   4 +-
 datafusion/proto/src/physical_plan/from_proto.rs   |  27 +-
 datafusion/proto/src/physical_plan/to_proto.rs     |   4 +-
 .../proto/tests/cases/roundtrip_physical_plan.rs   |  22 +-
 datafusion/substrait/src/physical_plan/consumer.rs |   4 +-
 datafusion/substrait/src/physical_plan/producer.rs |   2 +-
 .../tests/cases/roundtrip_physical_plan.rs         |  10 +-
 32 files changed, 421 insertions(+), 240 deletions(-)

diff --git a/datafusion-examples/examples/parquet_exec_visitor.rs 
b/datafusion-examples/examples/parquet_exec_visitor.rs
index 96bd6c369d..8133457dd9 100644
--- a/datafusion-examples/examples/parquet_exec_visitor.rs
+++ b/datafusion-examples/examples/parquet_exec_visitor.rs
@@ -18,8 +18,8 @@
 use std::sync::Arc;
 
 use datafusion::datasource::file_format::parquet::ParquetFormat;
-use datafusion::datasource::listing::{ListingOptions, PartitionedFile};
-use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
+use datafusion::datasource::listing::ListingOptions;
+use datafusion::datasource::physical_plan::{FileGroup, FileScanConfig, 
ParquetSource};
 use datafusion::datasource::source::DataSourceExec;
 use datafusion::error::DataFusionError;
 use datafusion::execution::context::SessionContext;
@@ -85,7 +85,7 @@ async fn main() {
 /// and `file_groups` from the FileScanConfig.
 #[derive(Debug)]
 struct ParquetExecVisitor {
-    file_groups: Option<Vec<Vec<PartitionedFile>>>,
+    file_groups: Option<Vec<FileGroup>>,
     bytes_scanned: Option<MetricValue>,
 }
 
diff --git a/datafusion/catalog-listing/src/helpers.rs 
b/datafusion/catalog-listing/src/helpers.rs
index 7742f5f9a1..8efb74d4ea 100644
--- a/datafusion/catalog-listing/src/helpers.rs
+++ b/datafusion/catalog-listing/src/helpers.rs
@@ -122,6 +122,7 @@ pub fn expr_applicable_for_cols(col_names: &[&str], expr: 
&Expr) -> bool {
 const CONCURRENCY_LIMIT: usize = 100;
 
 /// Partition the list of files into `n` groups
+#[deprecated(since = "47.0.0", note = "use `FileGroup::split_files` instead")]
 pub fn split_files(
     mut partitioned_files: Vec<PartitionedFile>,
     n: usize,
@@ -535,13 +536,13 @@ pub fn describe_partition(partition: &Partition) -> 
(&str, usize, Vec<&str>) {
 mod tests {
     use async_trait::async_trait;
     use datafusion_common::config::TableOptions;
+    use datafusion_datasource::file_groups::FileGroup;
     use datafusion_execution::config::SessionConfig;
     use datafusion_execution::runtime_env::RuntimeEnv;
     use futures::FutureExt;
     use object_store::memory::InMemory;
     use std::any::Any;
     use std::ops::Not;
-    // use futures::StreamExt;
 
     use super::*;
     use datafusion_expr::{
@@ -553,24 +554,24 @@ mod tests {
     #[test]
     fn test_split_files() {
         let new_partitioned_file = |path: &str| 
PartitionedFile::new(path.to_owned(), 10);
-        let files = vec![
+        let files = FileGroup::new(vec![
             new_partitioned_file("a"),
             new_partitioned_file("b"),
             new_partitioned_file("c"),
             new_partitioned_file("d"),
             new_partitioned_file("e"),
-        ];
+        ]);
 
-        let chunks = split_files(files.clone(), 1);
+        let chunks = files.clone().split_files(1);
         assert_eq!(1, chunks.len());
         assert_eq!(5, chunks[0].len());
 
-        let chunks = split_files(files.clone(), 2);
+        let chunks = files.clone().split_files(2);
         assert_eq!(2, chunks.len());
         assert_eq!(3, chunks[0].len());
         assert_eq!(2, chunks[1].len());
 
-        let chunks = split_files(files.clone(), 5);
+        let chunks = files.clone().split_files(5);
         assert_eq!(5, chunks.len());
         assert_eq!(1, chunks[0].len());
         assert_eq!(1, chunks[1].len());
@@ -578,7 +579,7 @@ mod tests {
         assert_eq!(1, chunks[3].len());
         assert_eq!(1, chunks[4].len());
 
-        let chunks = split_files(files, 123);
+        let chunks = files.clone().split_files(123);
         assert_eq!(5, chunks.len());
         assert_eq!(1, chunks[0].len());
         assert_eq!(1, chunks[1].len());
@@ -586,7 +587,8 @@ mod tests {
         assert_eq!(1, chunks[3].len());
         assert_eq!(1, chunks[4].len());
 
-        let chunks = split_files(vec![], 2);
+        let empty_group = FileGroup::default();
+        let chunks = empty_group.split_files(2);
         assert_eq!(0, chunks.len());
     }
 
diff --git a/datafusion/core/src/datasource/file_format/arrow.rs 
b/datafusion/core/src/datasource/file_format/arrow.rs
index 3172e56925..d0b9df403b 100644
--- a/datafusion/core/src/datasource/file_format/arrow.rs
+++ b/datafusion/core/src/datasource/file_format/arrow.rs
@@ -297,7 +297,7 @@ impl DisplayAs for ArrowFileSink {
         match t {
             DisplayFormatType::Default | DisplayFormatType::Verbose => {
                 write!(f, "ArrowFileSink(file_groups=",)?;
-                FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
+                FileGroupDisplay(&self.config.file_group).fmt_as(t, f)?;
                 write!(f, ")")
             }
             DisplayFormatType::TreeRender => {
diff --git a/datafusion/core/src/datasource/file_format/mod.rs 
b/datafusion/core/src/datasource/file_format/mod.rs
index df74e5d060..4914a91722 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -73,7 +73,8 @@ pub(crate) mod test_util {
             statistics: None,
             extensions: None,
             metadata_size_hint: None,
-        }]];
+        }]
+        .into()];
 
         let exec = format
             .create_physical_plan(
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs 
b/datafusion/core/src/datasource/file_format/parquet.rs
index 3b71593b33..f314ae54a1 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -147,6 +147,7 @@ mod tests {
     };
     use arrow::datatypes::{DataType, Field};
     use async_trait::async_trait;
+    use datafusion_datasource::file_groups::FileGroup;
     use futures::stream::BoxStream;
     use futures::{Stream, StreamExt};
     use log::error;
@@ -1375,7 +1376,7 @@ mod tests {
         let file_sink_config = FileSinkConfig {
             original_url: String::default(),
             object_store_url: object_store_url.clone(),
-            file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
+            file_group: 
FileGroup::new(vec![PartitionedFile::new("/tmp".to_string(), 1)]),
             table_paths: vec![ListingTableUrl::parse(table_path)?],
             output_schema: schema.clone(),
             table_partition_cols: vec![],
@@ -1461,7 +1462,7 @@ mod tests {
         let file_sink_config = FileSinkConfig {
             original_url: String::default(),
             object_store_url: object_store_url.clone(),
-            file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
+            file_group: 
FileGroup::new(vec![PartitionedFile::new("/tmp".to_string(), 1)]),
             table_paths: vec![ListingTableUrl::parse("file:///")?],
             output_schema: schema.clone(),
             table_partition_cols: vec![("a".to_string(), DataType::Utf8)], // 
add partitioning
@@ -1545,7 +1546,10 @@ mod tests {
             let file_sink_config = FileSinkConfig {
                 original_url: String::default(),
                 object_store_url: object_store_url.clone(),
-                file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
+                file_group: FileGroup::new(vec![PartitionedFile::new(
+                    "/tmp".to_string(),
+                    1,
+                )]),
                 table_paths: vec![ListingTableUrl::parse("file:///")?],
                 output_schema: schema.clone(),
                 table_partition_cols: vec![],
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 21b35bac21..0d0dea910d 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -20,7 +20,7 @@
 use std::collections::HashMap;
 use std::{any::Any, str::FromStr, sync::Arc};
 
-use super::helpers::{expr_applicable_for_cols, pruned_partition_list, 
split_files};
+use super::helpers::{expr_applicable_for_cols, pruned_partition_list};
 use super::{ListingTableUrl, PartitionedFile};
 
 use crate::datasource::{
@@ -55,6 +55,7 @@ use datafusion_physical_expr::{
 
 use async_trait::async_trait;
 use datafusion_catalog::Session;
+use datafusion_datasource::file_groups::FileGroup;
 use datafusion_physical_expr_common::sort_expr::LexRequirement;
 use futures::{future, stream, StreamExt, TryStreamExt};
 use itertools::Itertools;
@@ -1031,7 +1032,7 @@ impl TableProvider for ListingTable {
         )
         .await?;
 
-        let file_groups = file_list_stream.try_collect::<Vec<_>>().await?;
+        let file_group = 
file_list_stream.try_collect::<Vec<_>>().await?.into();
         let keep_partition_by_columns =
             state.config_options().execution.keep_partition_by_columns;
 
@@ -1040,7 +1041,7 @@ impl TableProvider for ListingTable {
             original_url: String::default(),
             object_store_url: self.table_paths()[0].object_store(),
             table_paths: self.table_paths().clone(),
-            file_groups,
+            file_group,
             output_schema: self.schema(),
             table_partition_cols: self.options.table_partition_cols.clone(),
             insert_op,
@@ -1088,7 +1089,7 @@ impl ListingTable {
         ctx: &'a dyn Session,
         filters: &'a [Expr],
         limit: Option<usize>,
-    ) -> Result<(Vec<Vec<PartitionedFile>>, Statistics)> {
+    ) -> Result<(Vec<FileGroup>, Statistics)> {
         let store = if let Some(url) = self.table_paths.first() {
             ctx.runtime_env().object_store(url)?
         } else {
@@ -1136,7 +1137,7 @@ impl ListingTable {
         .await?;
 
         Ok((
-            split_files(files, self.options.target_partitions),
+            files.split_files(self.options.target_partitions),
             statistics,
         ))
     }
diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs 
b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
index 59860e3594..5dcf4df73f 100644
--- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs
+++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
@@ -20,7 +20,6 @@
 use std::any::Any;
 use std::sync::Arc;
 
-use crate::datasource::listing::PartitionedFile;
 use crate::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener};
 use crate::error::Result;
 
@@ -42,6 +41,7 @@ use datafusion_physical_plan::{
     DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
 };
 
+use datafusion_datasource::file_groups::FileGroup;
 use futures::StreamExt;
 use itertools::Itertools;
 use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore};
@@ -124,7 +124,7 @@ impl ArrowExec {
         )
     }
 
-    fn with_file_groups(mut self, file_groups: Vec<Vec<PartitionedFile>>) -> 
Self {
+    fn with_file_groups(mut self, file_groups: Vec<FileGroup>) -> Self {
         self.base_config.file_groups = file_groups.clone();
         let mut file_source = self.file_scan_config();
         file_source = file_source.with_file_groups(file_groups);
diff --git a/datafusion/core/src/datasource/physical_plan/json.rs 
b/datafusion/core/src/datasource/physical_plan/json.rs
index 9bab75fc88..041aacf687 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -40,7 +40,6 @@ mod tests {
     use datafusion_common::{assert_batches_eq, assert_batches_sorted_eq};
     use datafusion_datasource::file_compression_type::FileCompressionType;
     use datafusion_datasource::file_format::FileFormat;
-    use datafusion_datasource::PartitionedFile;
     use datafusion_datasource_json::JsonFormat;
     use datafusion_execution::config::SessionConfig;
     use datafusion_execution::object_store::ObjectStoreUrl;
@@ -49,6 +48,7 @@ mod tests {
     use arrow::array::Array;
     use arrow::datatypes::SchemaRef;
     use arrow::datatypes::{Field, SchemaBuilder};
+    use datafusion_datasource::file_groups::FileGroup;
     use object_store::chunked::ChunkedStore;
     use object_store::local::LocalFileSystem;
     use object_store::ObjectStore;
@@ -62,7 +62,7 @@ mod tests {
         state: &SessionState,
         file_compression_type: FileCompressionType,
         work_dir: &Path,
-    ) -> (ObjectStoreUrl, Vec<Vec<PartitionedFile>>, SchemaRef) {
+    ) -> (ObjectStoreUrl, Vec<FileGroup>, SchemaRef) {
         let store_url = ObjectStoreUrl::local_filesystem();
         let store = state.runtime_env().object_store(&store_url).unwrap();
 
@@ -79,6 +79,7 @@ mod tests {
         let meta = file_groups
             .first()
             .unwrap()
+            .files()
             .first()
             .unwrap()
             .clone()
@@ -113,6 +114,7 @@ mod tests {
         let path = file_groups
             .first()
             .unwrap()
+            .files()
             .first()
             .unwrap()
             .object_meta
@@ -560,6 +562,7 @@ mod tests {
         let path = file_groups
             .first()
             .unwrap()
+            .files()
             .first()
             .unwrap()
             .object_meta
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs 
b/datafusion/core/src/datasource/physical_plan/mod.rs
index cae04e5ee6..9d8148ca75 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -53,6 +53,7 @@ pub use csv::{CsvExec, CsvExecBuilder};
 
 pub use csv::{CsvOpener, CsvSource};
 pub use datafusion_datasource::file::FileSource;
+pub use datafusion_datasource::file_groups::FileGroup;
 pub use datafusion_datasource::file_groups::FileGroupPartitioner;
 pub use datafusion_datasource::file_meta::FileMeta;
 pub use datafusion_datasource::file_scan_config::{
diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs 
b/datafusion/core/src/datasource/physical_plan/parquet.rs
index b5534d6b3d..9d21e56847 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -67,6 +67,7 @@ mod tests {
     use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
 
     use chrono::{TimeZone, Utc};
+    use datafusion_datasource::file_groups::FileGroup;
     use futures::StreamExt;
     use object_store::local::LocalFileSystem;
     use object_store::path::Path;
@@ -1123,7 +1124,7 @@ mod tests {
 
         async fn assert_parquet_read(
             state: &SessionState,
-            file_groups: Vec<Vec<PartitionedFile>>,
+            file_groups: Vec<FileGroup>,
             expected_row_num: Option<usize>,
             file_schema: SchemaRef,
         ) -> Result<()> {
@@ -1166,12 +1167,12 @@ mod tests {
             .infer_schema(&state, &store, std::slice::from_ref(&meta))
             .await?;
 
-        let group_empty = vec![vec![file_range(&meta, 0, 2)]];
-        let group_contain = vec![vec![file_range(&meta, 2, i64::MAX)]];
-        let group_all = vec![vec![
+        let group_empty = vec![FileGroup::new(vec![file_range(&meta, 0, 2)])];
+        let group_contain = vec![FileGroup::new(vec![file_range(&meta, 2, 
i64::MAX)])];
+        let group_all = vec![FileGroup::new(vec![
             file_range(&meta, 0, 2),
             file_range(&meta, 2, i64::MAX),
-        ]];
+        ])];
 
         assert_parquet_read(&state, group_empty, None, 
file_schema.clone()).await?;
         assert_parquet_read(&state, group_contain, Some(8), 
file_schema.clone()).await?;
diff --git a/datafusion/core/src/datasource/statistics.rs 
b/datafusion/core/src/datasource/statistics.rs
index acc16e0687..cf283ecee0 100644
--- a/datafusion/core/src/datasource/statistics.rs
+++ b/datafusion/core/src/datasource/statistics.rs
@@ -20,12 +20,12 @@ use std::sync::Arc;
 
 use futures::{Stream, StreamExt};
 
-use datafusion_common::stats::Precision;
-use datafusion_common::ScalarValue;
-
 use crate::arrow::datatypes::SchemaRef;
 use crate::error::Result;
 use crate::physical_plan::{ColumnStatistics, Statistics};
+use datafusion_common::stats::Precision;
+use datafusion_common::ScalarValue;
+use datafusion_datasource::file_groups::FileGroup;
 
 use super::listing::PartitionedFile;
 
@@ -39,8 +39,8 @@ pub async fn get_statistics_with_limit(
     file_schema: SchemaRef,
     limit: Option<usize>,
     collect_stats: bool,
-) -> Result<(Vec<PartitionedFile>, Statistics)> {
-    let mut result_files = vec![];
+) -> Result<(FileGroup, Statistics)> {
+    let mut result_files = FileGroup::default();
     // These statistics can be calculated as long as at least one file provides
     // useful information. If none of the files provides any information, then
     // they will end up having `Precision::Absent` values. Throughout 
calculations,
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index 600a8de1de..9a089796bc 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -90,6 +90,7 @@ use datafusion_physical_plan::unnest::ListUnnest;
 
 use crate::schema_equivalence::schema_satisfied_by;
 use async_trait::async_trait;
+use datafusion_datasource::file_groups::FileGroup;
 use futures::{StreamExt, TryStreamExt};
 use itertools::{multiunzip, Itertools};
 use log::{debug, trace};
@@ -532,7 +533,7 @@ impl DefaultPhysicalPlanner {
                     original_url,
                     object_store_url,
                     table_paths: vec![parsed_url],
-                    file_groups: vec![],
+                    file_group: FileGroup::default(),
                     output_schema: Arc::new(schema),
                     table_partition_cols,
                     insert_op: InsertOp::Append,
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index be707f7e19..2912eed6c9 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -28,7 +28,6 @@ use std::sync::Arc;
 use crate::datasource::file_format::csv::CsvFormat;
 use crate::datasource::file_format::file_compression_type::FileCompressionType;
 use crate::datasource::file_format::FileFormat;
-use crate::datasource::listing::PartitionedFile;
 
 use crate::datasource::physical_plan::CsvSource;
 use crate::datasource::{MemTable, TableProvider};
@@ -46,6 +45,7 @@ use datafusion_datasource::source::DataSourceExec;
 use bzip2::write::BzEncoder;
 #[cfg(feature = "compression")]
 use bzip2::Compression as BzCompression;
+use datafusion_datasource::file_groups::FileGroup;
 use datafusion_datasource_csv::partitioned_csv_config;
 #[cfg(feature = "compression")]
 use flate2::write::GzEncoder;
@@ -96,7 +96,7 @@ pub fn scan_partitioned_csv(
     Ok(config.build())
 }
 
-/// Returns file groups [`Vec<Vec<PartitionedFile>>`] for scanning 
`partitions` of `filename`
+/// Returns file groups [`Vec<FileGroup>`] for scanning `partitions` of 
`filename`
 pub fn partitioned_file_groups(
     path: &str,
     filename: &str,
@@ -104,7 +104,7 @@ pub fn partitioned_file_groups(
     file_format: Arc<dyn FileFormat>,
     file_compression_type: FileCompressionType,
     work_dir: &Path,
-) -> Result<Vec<Vec<PartitionedFile>>> {
+) -> Result<Vec<FileGroup>> {
     let path = format!("{path}/{filename}");
 
     let mut writers = vec![];
@@ -181,7 +181,7 @@ pub fn partitioned_file_groups(
 
     Ok(files
         .into_iter()
-        .map(|f| vec![local_unpartitioned_file(f).into()])
+        .map(|f| FileGroup::new(vec![local_unpartitioned_file(f).into()]))
         .collect::<Vec<_>>())
 }
 
diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
index b71724b8f7..5e3671ea83 100644
--- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs
@@ -35,6 +35,7 @@ use datafusion::datasource::source::DataSourceExec;
 use datafusion_common::error::Result;
 use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
 use datafusion_common::ScalarValue;
+use datafusion_datasource::file_groups::FileGroup;
 use datafusion_expr::{JoinType, Operator};
 use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
 use datafusion_physical_expr::PhysicalExpr;
@@ -189,8 +190,8 @@ fn parquet_exec_multiple_sorted(
         Arc::new(ParquetSource::default()),
     )
     .with_file_groups(vec![
-        vec![PartitionedFile::new("x".to_string(), 100)],
-        vec![PartitionedFile::new("y".to_string(), 100)],
+        FileGroup::new(vec![PartitionedFile::new("x".to_string(), 100)]),
+        FileGroup::new(vec![PartitionedFile::new("y".to_string(), 100)]),
     ])
     .with_output_ordering(output_ordering)
     .build()
@@ -223,8 +224,8 @@ fn csv_exec_multiple_sorted(output_ordering: 
Vec<LexOrdering>) -> Arc<DataSource
         Arc::new(CsvSource::new(false, b',', b'"')),
     )
     .with_file_groups(vec![
-        vec![PartitionedFile::new("x".to_string(), 100)],
-        vec![PartitionedFile::new("y".to_string(), 100)],
+        FileGroup::new(vec![PartitionedFile::new("x".to_string(), 100)]),
+        FileGroup::new(vec![PartitionedFile::new("y".to_string(), 100)]),
     ])
     .with_output_ordering(output_ordering)
     .build()
diff --git a/datafusion/datasource-csv/src/file_format.rs 
b/datafusion/datasource-csv/src/file_format.rs
index 522cb12db0..6cd6a83884 100644
--- a/datafusion/datasource-csv/src/file_format.rs
+++ b/datafusion/datasource-csv/src/file_format.rs
@@ -662,7 +662,7 @@ impl DisplayAs for CsvSink {
         match t {
             DisplayFormatType::Default | DisplayFormatType::Verbose => {
                 write!(f, "CsvSink(file_groups=",)?;
-                FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
+                FileGroupDisplay(&self.config.file_group).fmt_as(t, f)?;
                 write!(f, ")")
             }
             DisplayFormatType::TreeRender => {
diff --git a/datafusion/datasource-csv/src/mod.rs 
b/datafusion/datasource-csv/src/mod.rs
index 90c3689cd1..97f4214cf9 100644
--- a/datafusion/datasource-csv/src/mod.rs
+++ b/datafusion/datasource-csv/src/mod.rs
@@ -25,16 +25,15 @@ pub mod source;
 use std::sync::Arc;
 
 use arrow::datatypes::SchemaRef;
-use datafusion_datasource::{
-    file::FileSource, file_scan_config::FileScanConfig, PartitionedFile,
-};
+use datafusion_datasource::file_groups::FileGroup;
+use datafusion_datasource::{file::FileSource, 
file_scan_config::FileScanConfig};
 use datafusion_execution::object_store::ObjectStoreUrl;
 pub use file_format::*;
 
 /// Returns a [`FileScanConfig`] for given `file_groups`
 pub fn partitioned_csv_config(
     schema: SchemaRef,
-    file_groups: Vec<Vec<PartitionedFile>>,
+    file_groups: Vec<FileGroup>,
     file_source: Arc<dyn FileSource>,
 ) -> FileScanConfig {
     FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema, 
file_source)
diff --git a/datafusion/datasource-csv/src/source.rs 
b/datafusion/datasource-csv/src/source.rs
index b9d974c884..175ee18197 100644
--- a/datafusion/datasource-csv/src/source.rs
+++ b/datafusion/datasource-csv/src/source.rs
@@ -28,7 +28,7 @@ use 
datafusion_datasource::file_compression_type::FileCompressionType;
 use datafusion_datasource::file_meta::FileMeta;
 use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
 use datafusion_datasource::{
-    calculate_range, FileRange, ListingTableUrl, PartitionedFile, 
RangeCalculation,
+    calculate_range, FileRange, ListingTableUrl, RangeCalculation,
 };
 
 use arrow::csv;
@@ -49,13 +49,13 @@ use datafusion_physical_plan::{
     DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, 
PlanProperties,
 };
 
+use crate::file_format::CsvDecoder;
+use datafusion_datasource::file_groups::FileGroup;
 use futures::{StreamExt, TryStreamExt};
 use object_store::buffered::BufWriter;
 use object_store::{GetOptions, GetResultPayload, ObjectStore};
 use tokio::io::AsyncWriteExt;
 
-use crate::file_format::CsvDecoder;
-
 /// Old Csv source, deprecated with DataSourceExec implementation and CsvSource
 ///
 /// See examples on `CsvSource`
@@ -314,7 +314,7 @@ impl CsvExec {
         )
     }
 
-    fn with_file_groups(mut self, file_groups: Vec<Vec<PartitionedFile>>) -> 
Self {
+    fn with_file_groups(mut self, file_groups: Vec<FileGroup>) -> Self {
         self.base_config.file_groups = file_groups.clone();
         let mut file_source = self.file_scan_config();
         file_source = file_source.with_file_groups(file_groups);
diff --git a/datafusion/datasource-json/src/file_format.rs 
b/datafusion/datasource-json/src/file_format.rs
index 9b6d5925fe..054c05b5b0 100644
--- a/datafusion/datasource-json/src/file_format.rs
+++ b/datafusion/datasource-json/src/file_format.rs
@@ -321,7 +321,7 @@ impl DisplayAs for JsonSink {
         match t {
             DisplayFormatType::Default | DisplayFormatType::Verbose => {
                 write!(f, "JsonSink(file_groups=",)?;
-                FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
+                FileGroupDisplay(&self.config.file_group).fmt_as(t, f)?;
                 write!(f, ")")
             }
             DisplayFormatType::TreeRender => {
diff --git a/datafusion/datasource-json/src/source.rs 
b/datafusion/datasource-json/src/source.rs
index 4605ad3d94..f1adccf9de 100644
--- a/datafusion/datasource-json/src/source.rs
+++ b/datafusion/datasource-json/src/source.rs
@@ -30,9 +30,7 @@ use datafusion_datasource::decoder::{deserialize_stream, 
DecoderDeserializer};
 use datafusion_datasource::file_compression_type::FileCompressionType;
 use datafusion_datasource::file_meta::FileMeta;
 use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
-use datafusion_datasource::{
-    calculate_range, ListingTableUrl, PartitionedFile, RangeCalculation,
-};
+use datafusion_datasource::{calculate_range, ListingTableUrl, 
RangeCalculation};
 use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
 
 use arrow::json::ReaderBuilder;
@@ -48,6 +46,7 @@ use datafusion_physical_plan::execution_plan::{Boundedness, 
EmissionType};
 use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
 use datafusion_physical_plan::{DisplayAs, DisplayFormatType, PlanProperties};
 
+use datafusion_datasource::file_groups::FileGroup;
 use futures::{StreamExt, TryStreamExt};
 use object_store::buffered::BufWriter;
 use object_store::{GetOptions, GetResultPayload, ObjectStore};
@@ -146,7 +145,7 @@ impl NdJsonExec {
         )
     }
 
-    fn with_file_groups(mut self, file_groups: Vec<Vec<PartitionedFile>>) -> 
Self {
+    fn with_file_groups(mut self, file_groups: Vec<FileGroup>) -> Self {
         self.base_config.file_groups = file_groups.clone();
         let mut file_source = self.file_scan_config();
         file_source = file_source.with_file_groups(file_groups);
diff --git a/datafusion/datasource-parquet/src/file_format.rs 
b/datafusion/datasource-parquet/src/file_format.rs
index 8a78407c64..b7a8712428 100644
--- a/datafusion/datasource-parquet/src/file_format.rs
+++ b/datafusion/datasource-parquet/src/file_format.rs
@@ -983,7 +983,7 @@ impl DisplayAs for ParquetSink {
         match t {
             DisplayFormatType::Default | DisplayFormatType::Verbose => {
                 write!(f, "ParquetSink(file_groups=",)?;
-                FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
+                FileGroupDisplay(&self.config.file_group).fmt_as(t, f)?;
                 write!(f, ")")
             }
             DisplayFormatType::TreeRender => {
diff --git a/datafusion/datasource-parquet/src/mod.rs 
b/datafusion/datasource-parquet/src/mod.rs
index cecee00317..516b137921 100644
--- a/datafusion/datasource-parquet/src/mod.rs
+++ b/datafusion/datasource-parquet/src/mod.rs
@@ -44,7 +44,6 @@ use datafusion_common::{Constraints, Statistics};
 use datafusion_datasource::file_scan_config::FileScanConfig;
 use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
 use datafusion_datasource::source::DataSourceExec;
-use datafusion_datasource::PartitionedFile;
 use datafusion_execution::{SendableRecordBatchStream, TaskContext};
 use datafusion_physical_expr::{
     EquivalenceProperties, LexOrdering, Partitioning, PhysicalExpr,
@@ -65,6 +64,7 @@ pub use row_group_filter::RowGroupAccessPlanFilter;
 use source::ParquetSource;
 pub use writer::plan_to_parquet;
 
+use datafusion_datasource::file_groups::FileGroup;
 use log::debug;
 
 #[derive(Debug, Clone)]
@@ -134,7 +134,7 @@ impl ParquetExecBuilder {
     }
 
     /// Update the list of files groups to read
-    pub fn with_file_groups(mut self, file_groups: Vec<Vec<PartitionedFile>>) 
-> Self {
+    pub fn with_file_groups(mut self, file_groups: Vec<FileGroup>) -> Self {
         self.file_scan_config.file_groups = file_groups;
         self
     }
@@ -463,7 +463,7 @@ impl ParquetExec {
     /// that depend on the file groups.
     fn with_file_groups_and_update_partitioning(
         mut self,
-        file_groups: Vec<Vec<PartitionedFile>>,
+        file_groups: Vec<FileGroup>,
     ) -> Self {
         let mut config = self.file_scan_config();
         config.file_groups = file_groups;
diff --git a/datafusion/datasource/src/display.rs 
b/datafusion/datasource/src/display.rs
index 7ab8d407be..c9e9795359 100644
--- a/datafusion/datasource/src/display.rs
+++ b/datafusion/datasource/src/display.rs
@@ -17,10 +17,9 @@
 
 use datafusion_physical_plan::{DisplayAs, DisplayFormatType};
 
+use crate::file_groups::FileGroup;
 use std::fmt::{Debug, Formatter, Result as FmtResult};
 
-use crate::PartitionedFile;
-
 /// A wrapper to customize partitioned file display
 ///
 /// Prints in the format:
@@ -28,7 +27,7 @@ use crate::PartitionedFile;
 /// {NUM_GROUPS groups: [[file1, file2,...], [fileN, fileM, ...], ...]}
 /// ```
 #[derive(Debug)]
-pub(crate) struct FileGroupsDisplay<'a>(pub(crate) &'a [Vec<PartitionedFile>]);
+pub(crate) struct FileGroupsDisplay<'a>(pub(crate) &'a [FileGroup]);
 
 impl DisplayAs for FileGroupsDisplay<'_> {
     fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
@@ -60,7 +59,7 @@ impl DisplayAs for FileGroupsDisplay<'_> {
 /// [file1, file2,...]
 /// ```
 #[derive(Debug)]
-pub struct FileGroupDisplay<'a>(pub &'a [PartitionedFile]);
+pub struct FileGroupDisplay<'a>(pub &'a FileGroup);
 
 impl DisplayAs for FileGroupDisplay<'_> {
     fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
@@ -69,7 +68,7 @@ impl DisplayAs for FileGroupDisplay<'_> {
             DisplayFormatType::Default | DisplayFormatType::TreeRender => {
                 // To avoid showing too many files
                 let max_files = 5;
-                fmt_up_to_n_elements(self.0, max_files, f, |pf, f| {
+                fmt_up_to_n_elements(self.0.files(), max_files, f, |pf, f| {
                     write!(f, "{}", pf.object_meta.location.as_ref())?;
                     if let Some(range) = pf.range.as_ref() {
                         write!(f, ":{}..{}", range.start, range.end)?;
@@ -138,6 +137,7 @@ mod tests {
     use datafusion_physical_plan::{DefaultDisplay, VerboseDisplay};
     use object_store::{path::Path, ObjectMeta};
 
+    use crate::PartitionedFile;
     use chrono::Utc;
 
     #[test]
@@ -148,7 +148,10 @@ mod tests {
 
     #[test]
     fn file_groups_display_one() {
-        let files = [vec![partitioned_file("foo"), partitioned_file("bar")]];
+        let files = [FileGroup::new(vec![
+            partitioned_file("foo"),
+            partitioned_file("bar"),
+        ])];
 
         let expected = "{1 group: [[foo, bar]]}";
         assert_eq!(
@@ -160,9 +163,9 @@ mod tests {
     #[test]
     fn file_groups_display_many_default() {
         let files = [
-            vec![partitioned_file("foo"), partitioned_file("bar")],
-            vec![partitioned_file("baz")],
-            vec![],
+            FileGroup::new(vec![partitioned_file("foo"), 
partitioned_file("bar")]),
+            FileGroup::new(vec![partitioned_file("baz")]),
+            FileGroup::default(),
         ];
 
         let expected = "{3 groups: [[foo, bar], [baz], []]}";
@@ -175,9 +178,9 @@ mod tests {
     #[test]
     fn file_groups_display_many_verbose() {
         let files = [
-            vec![partitioned_file("foo"), partitioned_file("bar")],
-            vec![partitioned_file("baz")],
-            vec![],
+            FileGroup::new(vec![partitioned_file("foo"), 
partitioned_file("bar")]),
+            FileGroup::new(vec![partitioned_file("baz")]),
+            FileGroup::default(),
         ];
 
         let expected = "{3 groups: [[foo, bar], [baz], []]}";
@@ -190,13 +193,13 @@ mod tests {
     #[test]
     fn file_groups_display_too_many_default() {
         let files = [
-            vec![partitioned_file("foo"), partitioned_file("bar")],
-            vec![partitioned_file("baz")],
-            vec![partitioned_file("qux")],
-            vec![partitioned_file("quux")],
-            vec![partitioned_file("quuux")],
-            vec![partitioned_file("quuuux")],
-            vec![],
+            FileGroup::new(vec![partitioned_file("foo"), 
partitioned_file("bar")]),
+            FileGroup::new(vec![partitioned_file("baz")]),
+            FileGroup::new(vec![partitioned_file("qux")]),
+            FileGroup::new(vec![partitioned_file("quux")]),
+            FileGroup::new(vec![partitioned_file("quuux")]),
+            FileGroup::new(vec![partitioned_file("quuuux")]),
+            FileGroup::default(),
         ];
 
         let expected = "{7 groups: [[foo, bar], [baz], [qux], [quux], [quuux], 
...]}";
@@ -209,13 +212,13 @@ mod tests {
     #[test]
     fn file_groups_display_too_many_verbose() {
         let files = [
-            vec![partitioned_file("foo"), partitioned_file("bar")],
-            vec![partitioned_file("baz")],
-            vec![partitioned_file("qux")],
-            vec![partitioned_file("quux")],
-            vec![partitioned_file("quuux")],
-            vec![partitioned_file("quuuux")],
-            vec![],
+            FileGroup::new(vec![partitioned_file("foo"), 
partitioned_file("bar")]),
+            FileGroup::new(vec![partitioned_file("baz")]),
+            FileGroup::new(vec![partitioned_file("qux")]),
+            FileGroup::new(vec![partitioned_file("quux")]),
+            FileGroup::new(vec![partitioned_file("quuux")]),
+            FileGroup::new(vec![partitioned_file("quuuux")]),
+            FileGroup::default(),
         ];
 
         let expected =
@@ -228,7 +231,8 @@ mod tests {
 
     #[test]
     fn file_group_display_many_default() {
-        let files = vec![partitioned_file("foo"), partitioned_file("bar")];
+        let files =
+            FileGroup::new(vec![partitioned_file("foo"), 
partitioned_file("bar")]);
 
         let expected = "[foo, bar]";
         assert_eq!(
@@ -239,14 +243,14 @@ mod tests {
 
     #[test]
     fn file_group_display_too_many_default() {
-        let files = vec![
+        let files = FileGroup::new(vec![
             partitioned_file("foo"),
             partitioned_file("bar"),
             partitioned_file("baz"),
             partitioned_file("qux"),
             partitioned_file("quux"),
             partitioned_file("quuux"),
-        ];
+        ]);
 
         let expected = "[foo, bar, baz, qux, quux, ...]";
         assert_eq!(
@@ -257,14 +261,14 @@ mod tests {
 
     #[test]
     fn file_group_display_too_many_verbose() {
-        let files = vec![
+        let files = FileGroup::new(vec![
             partitioned_file("foo"),
             partitioned_file("bar"),
             partitioned_file("baz"),
             partitioned_file("qux"),
             partitioned_file("quux"),
             partitioned_file("quuux"),
-        ];
+        ]);
 
         let expected = "[foo, bar, baz, qux, quux, quuux]";
         assert_eq!(
diff --git a/datafusion/datasource/src/file_groups.rs 
b/datafusion/datasource/src/file_groups.rs
index 2c2b791f23..93895790cb 100644
--- a/datafusion/datasource/src/file_groups.rs
+++ b/datafusion/datasource/src/file_groups.rs
@@ -18,10 +18,13 @@
 //! Logic for managing groups of [`PartitionedFile`]s in DataFusion
 
 use crate::{FileRange, PartitionedFile};
+use datafusion_common::Statistics;
 use itertools::Itertools;
 use std::cmp::min;
 use std::collections::BinaryHeap;
 use std::iter::repeat_with;
+use std::mem;
+use std::ops::{Index, IndexMut};
 
 /// Repartition input files into `target_partitions` partitions, if total file 
size exceed
 /// `repartition_file_min_size`
@@ -179,14 +182,17 @@ impl FileGroupPartitioner {
     /// If no repartitioning is needed or possible, return `None`.
     pub fn repartition_file_groups(
         &self,
-        file_groups: &[Vec<PartitionedFile>],
-    ) -> Option<Vec<Vec<PartitionedFile>>> {
+        file_groups: &[FileGroup],
+    ) -> Option<Vec<FileGroup>> {
         if file_groups.is_empty() {
             return None;
         }
 
         // Perform redistribution only in case all files should be read from 
beginning to end
-        let has_ranges = file_groups.iter().flatten().any(|f| 
f.range.is_some());
+        let has_ranges = file_groups
+            .iter()
+            .flat_map(FileGroup::iter)
+            .any(|f| f.range.is_some());
         if has_ranges {
             return None;
         }
@@ -203,11 +209,11 @@ impl FileGroupPartitioner {
     /// existing grouping / ordering
     fn repartition_evenly_by_size(
         &self,
-        file_groups: &[Vec<PartitionedFile>],
-    ) -> Option<Vec<Vec<PartitionedFile>>> {
+        file_groups: &[FileGroup],
+    ) -> Option<Vec<FileGroup>> {
         let target_partitions = self.target_partitions;
         let repartition_file_min_size = self.repartition_file_min_size;
-        let flattened_files = file_groups.iter().flatten().collect::<Vec<_>>();
+        let flattened_files = 
file_groups.iter().flat_map(FileGroup::iter).collect_vec();
 
         let total_size = flattened_files
             .iter()
@@ -257,7 +263,7 @@ impl FileGroupPartitioner {
             .flatten()
             .chunk_by(|(partition_idx, _)| *partition_idx)
             .into_iter()
-            .map(|(_, group)| group.map(|(_, vals)| vals).collect_vec())
+            .map(|(_, group)| FileGroup::new(group.map(|(_, vals)| 
vals).collect_vec()))
             .collect_vec();
 
         Some(repartitioned_files)
@@ -266,8 +272,8 @@ impl FileGroupPartitioner {
     /// Redistribute file groups across size preserving order
     fn repartition_preserving_order(
         &self,
-        file_groups: &[Vec<PartitionedFile>],
-    ) -> Option<Vec<Vec<PartitionedFile>>> {
+        file_groups: &[FileGroup],
+    ) -> Option<Vec<FileGroup>> {
         // Can't repartition and preserve order if there are more groups
         // than partitions
         if file_groups.len() >= self.target_partitions {
@@ -303,11 +309,12 @@ impl FileGroupPartitioner {
             return None;
         }
 
+        // Add new empty groups to which we will redistribute ranges of 
existing files
         // Add new empty groups to which we will redistribute ranges of 
existing files
         let mut file_groups: Vec<_> = file_groups
             .iter()
             .cloned()
-            .chain(repeat_with(Vec::new).take(num_new_groups))
+            .chain(repeat_with(|| 
FileGroup::new(Vec::new())).take(num_new_groups))
             .collect();
 
         // Divide up empty groups
@@ -354,6 +361,130 @@ impl FileGroupPartitioner {
     }
 }
 
+/// Represents a group of partitioned files that'll be processed by a single 
thread.
+/// Maintains optional statistics across all files in the group.
+#[derive(Debug, Clone)]
+pub struct FileGroup {
+    /// The files in this group
+    files: Vec<PartitionedFile>,
+    /// Optional statistics for the data across all files in the group
+    statistics: Option<Statistics>,
+}
+
+impl FileGroup {
+    /// Creates a new FileGroup from a vector of PartitionedFile objects
+    pub fn new(files: Vec<PartitionedFile>) -> Self {
+        Self {
+            files,
+            statistics: None,
+        }
+    }
+
+    /// Returns the number of files in this group
+    pub fn len(&self) -> usize {
+        self.files.len()
+    }
+
+    /// Set the statistics for this group
+    pub fn with_statistics(mut self, statistics: Statistics) -> Self {
+        self.statistics = Some(statistics);
+        self
+    }
+
+    /// Returns a slice of the files in this group
+    pub fn files(&self) -> &[PartitionedFile] {
+        &self.files
+    }
+
+    pub fn iter(&self) -> impl Iterator<Item = &PartitionedFile> {
+        self.files.iter()
+    }
+
+    pub fn into_inner(self) -> Vec<PartitionedFile> {
+        self.files
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.files.is_empty()
+    }
+
+    /// Removes the last element from the files vector and returns it, or None 
if empty
+    pub fn pop(&mut self) -> Option<PartitionedFile> {
+        self.files.pop()
+    }
+
+    /// Adds a file to the group
+    pub fn push(&mut self, file: PartitionedFile) {
+        self.files.push(file);
+    }
+
+    /// Partition the list of files into `n` groups
+    pub fn split_files(mut self, n: usize) -> Vec<FileGroup> {
+        if self.is_empty() {
+            return vec![];
+        }
+
+        // ObjectStore::list does not guarantee any consistent order and for 
some
+        // implementations such as LocalFileSystem, it may be inconsistent. 
Thus
+        // Sort files by path to ensure consistent plans when run more than 
once.
+        self.files.sort_by(|a, b| a.path().cmp(b.path()));
+
+        // effectively this is div with rounding up instead of truncating
+        let chunk_size = self.len().div_ceil(n);
+        let mut chunks = Vec::with_capacity(n);
+        let mut current_chunk = Vec::with_capacity(chunk_size);
+        for file in self.files.drain(..) {
+            current_chunk.push(file);
+            if current_chunk.len() == chunk_size {
+                let full_chunk = FileGroup::new(mem::replace(
+                    &mut current_chunk,
+                    Vec::with_capacity(chunk_size),
+                ));
+                chunks.push(full_chunk);
+            }
+        }
+
+        if !current_chunk.is_empty() {
+            chunks.push(FileGroup::new(current_chunk))
+        }
+
+        chunks
+    }
+}
+
+impl Index<usize> for FileGroup {
+    type Output = PartitionedFile;
+
+    fn index(&self, index: usize) -> &Self::Output {
+        &self.files[index]
+    }
+}
+
+impl IndexMut<usize> for FileGroup {
+    fn index_mut(&mut self, index: usize) -> &mut Self::Output {
+        &mut self.files[index]
+    }
+}
+
+impl FromIterator<PartitionedFile> for FileGroup {
+    fn from_iter<I: IntoIterator<Item = PartitionedFile>>(iter: I) -> Self {
+        let files = iter.into_iter().collect();
+        FileGroup::new(files)
+    }
+}
+
+impl From<Vec<PartitionedFile>> for FileGroup {
+    fn from(files: Vec<PartitionedFile>) -> Self {
+        FileGroup::new(files)
+    }
+}
+
+impl Default for FileGroup {
+    fn default() -> Self {
+        Self::new(Vec::new())
+    }
+}
+
 /// Tracks how a individual file will be repartitioned
 #[derive(Debug, Clone, PartialEq, Eq)]
 struct ToRepartition {
@@ -393,7 +524,7 @@ mod test {
     #[test]
     fn repartition_empty_file_only() {
         let partitioned_file_empty = pfile("empty", 0);
-        let file_group = vec![vec![partitioned_file_empty]];
+        let file_group = vec![FileGroup::new(vec![partitioned_file_empty])];
 
         let partitioned_files = FileGroupPartitioner::new()
             .with_target_partitions(4)
@@ -411,29 +542,33 @@ mod test {
         let pfile_empty = pfile("empty", 0);
 
         let empty_first = vec![
-            vec![pfile_empty.clone()],
-            vec![pfile_a.clone()],
-            vec![pfile_b.clone()],
+            FileGroup::new(vec![pfile_empty.clone()]),
+            FileGroup::new(vec![pfile_a.clone()]),
+            FileGroup::new(vec![pfile_b.clone()]),
         ];
         let empty_middle = vec![
-            vec![pfile_a.clone()],
-            vec![pfile_empty.clone()],
-            vec![pfile_b.clone()],
+            FileGroup::new(vec![pfile_a.clone()]),
+            FileGroup::new(vec![pfile_empty.clone()]),
+            FileGroup::new(vec![pfile_b.clone()]),
+        ];
+        let empty_last = vec![
+            FileGroup::new(vec![pfile_a]),
+            FileGroup::new(vec![pfile_b]),
+            FileGroup::new(vec![pfile_empty]),
         ];
-        let empty_last = vec![vec![pfile_a], vec![pfile_b], vec![pfile_empty]];
 
         // Repartition file groups into x partitions
         let expected_2 = vec![
-            vec![pfile("a", 10).with_range(0, 10)],
-            vec![pfile("b", 10).with_range(0, 10)],
+            FileGroup::new(vec![pfile("a", 10).with_range(0, 10)]),
+            FileGroup::new(vec![pfile("b", 10).with_range(0, 10)]),
         ];
         let expected_3 = vec![
-            vec![pfile("a", 10).with_range(0, 7)],
-            vec![
+            FileGroup::new(vec![pfile("a", 10).with_range(0, 7)]),
+            FileGroup::new(vec![
                 pfile("a", 10).with_range(7, 10),
                 pfile("b", 10).with_range(0, 4),
-            ],
-            vec![pfile("b", 10).with_range(4, 10)],
+            ]),
+            FileGroup::new(vec![pfile("b", 10).with_range(4, 10)]),
         ];
 
         let file_groups_tests = [empty_first, empty_middle, empty_last];
@@ -454,7 +589,7 @@ mod test {
     #[test]
     fn repartition_single_file() {
         // Single file, single partition into multiple partitions
-        let single_partition = vec![vec![pfile("a", 123)]];
+        let single_partition = vec![FileGroup::new(vec![pfile("a", 123)])];
 
         let actual = FileGroupPartitioner::new()
             .with_target_partitions(4)
@@ -462,10 +597,10 @@ mod test {
             .repartition_file_groups(&single_partition);
 
         let expected = Some(vec![
-            vec![pfile("a", 123).with_range(0, 31)],
-            vec![pfile("a", 123).with_range(31, 62)],
-            vec![pfile("a", 123).with_range(62, 93)],
-            vec![pfile("a", 123).with_range(93, 123)],
+            FileGroup::new(vec![pfile("a", 123).with_range(0, 31)]),
+            FileGroup::new(vec![pfile("a", 123).with_range(31, 62)]),
+            FileGroup::new(vec![pfile("a", 123).with_range(62, 93)]),
+            FileGroup::new(vec![pfile("a", 123).with_range(93, 123)]),
         ]);
         assert_partitioned_files(expected, actual);
     }
@@ -474,7 +609,7 @@ mod test {
     fn repartition_too_much_partitions() {
         // Single file, single partition into 96 partitions
         let partitioned_file = pfile("a", 8);
-        let single_partition = vec![vec![partitioned_file]];
+        let single_partition = vec![FileGroup::new(vec![partitioned_file])];
 
         let actual = FileGroupPartitioner::new()
             .with_target_partitions(96)
@@ -482,14 +617,14 @@ mod test {
             .repartition_file_groups(&single_partition);
 
         let expected = Some(vec![
-            vec![pfile("a", 8).with_range(0, 1)],
-            vec![pfile("a", 8).with_range(1, 2)],
-            vec![pfile("a", 8).with_range(2, 3)],
-            vec![pfile("a", 8).with_range(3, 4)],
-            vec![pfile("a", 8).with_range(4, 5)],
-            vec![pfile("a", 8).with_range(5, 6)],
-            vec![pfile("a", 8).with_range(6, 7)],
-            vec![pfile("a", 8).with_range(7, 8)],
+            FileGroup::new(vec![pfile("a", 8).with_range(0, 1)]),
+            FileGroup::new(vec![pfile("a", 8).with_range(1, 2)]),
+            FileGroup::new(vec![pfile("a", 8).with_range(2, 3)]),
+            FileGroup::new(vec![pfile("a", 8).with_range(3, 4)]),
+            FileGroup::new(vec![pfile("a", 8).with_range(4, 5)]),
+            FileGroup::new(vec![pfile("a", 8).with_range(5, 6)]),
+            FileGroup::new(vec![pfile("a", 8).with_range(6, 7)]),
+            FileGroup::new(vec![pfile("a", 8).with_range(7, 8)]),
         ]);
 
         assert_partitioned_files(expected, actual);
@@ -498,7 +633,10 @@ mod test {
     #[test]
     fn repartition_multiple_partitions() {
         // Multiple files in single partition after redistribution
-        let source_partitions = vec![vec![pfile("a", 40)], vec![pfile("b", 
60)]];
+        let source_partitions = vec![
+            FileGroup::new(vec![pfile("a", 40)]),
+            FileGroup::new(vec![pfile("b", 60)]),
+        ];
 
         let actual = FileGroupPartitioner::new()
             .with_target_partitions(3)
@@ -506,12 +644,12 @@ mod test {
             .repartition_file_groups(&source_partitions);
 
         let expected = Some(vec![
-            vec![pfile("a", 40).with_range(0, 34)],
-            vec![
+            FileGroup::new(vec![pfile("a", 40).with_range(0, 34)]),
+            FileGroup::new(vec![
                 pfile("a", 40).with_range(34, 40),
                 pfile("b", 60).with_range(0, 28),
-            ],
-            vec![pfile("b", 60).with_range(28, 60)],
+            ]),
+            FileGroup::new(vec![pfile("b", 60).with_range(28, 60)]),
         ]);
         assert_partitioned_files(expected, actual);
     }
@@ -519,7 +657,10 @@ mod test {
     #[test]
     fn repartition_same_num_partitions() {
         // "Rebalance" files across partitions
-        let source_partitions = vec![vec![pfile("a", 40)], vec![pfile("b", 
60)]];
+        let source_partitions = vec![
+            FileGroup::new(vec![pfile("a", 40)]),
+            FileGroup::new(vec![pfile("b", 60)]),
+        ];
 
         let actual = FileGroupPartitioner::new()
             .with_target_partitions(2)
@@ -527,11 +668,11 @@ mod test {
             .repartition_file_groups(&source_partitions);
 
         let expected = Some(vec![
-            vec![
+            FileGroup::new(vec![
                 pfile("a", 40).with_range(0, 40),
                 pfile("b", 60).with_range(0, 10),
-            ],
-            vec![pfile("b", 60).with_range(10, 60)],
+            ]),
+            FileGroup::new(vec![pfile("b", 60).with_range(10, 60)]),
         ]);
         assert_partitioned_files(expected, actual);
     }
@@ -540,8 +681,8 @@ mod test {
     fn repartition_no_action_ranges() {
         // No action due to Some(range) in second file
         let source_partitions = vec![
-            vec![pfile("a", 123)],
-            vec![pfile("b", 144).with_range(1, 50)],
+            FileGroup::new(vec![pfile("a", 123)]),
+            FileGroup::new(vec![pfile("b", 144).with_range(1, 50)]),
         ];
 
         let actual = FileGroupPartitioner::new()
@@ -555,7 +696,7 @@ mod test {
     #[test]
     fn repartition_no_action_min_size() {
         // No action due to target_partition_size
-        let single_partition = vec![vec![pfile("a", 123)]];
+        let single_partition = vec![FileGroup::new(vec![pfile("a", 123)])];
 
         let actual = FileGroupPartitioner::new()
             .with_target_partitions(65)
@@ -580,7 +721,10 @@ mod test {
     #[test]
     fn repartition_ordered_no_action_too_few_partitions() {
         // No action as there are no new groups to redistribute to
-        let input_partitions = vec![vec![pfile("a", 100)], vec![pfile("b", 
200)]];
+        let input_partitions = vec![
+            FileGroup::new(vec![pfile("a", 100)]),
+            FileGroup::new(vec![pfile("b", 200)]),
+        ];
 
         let actual = FileGroupPartitioner::new()
             .with_preserve_order_within_groups(true)
@@ -594,7 +738,7 @@ mod test {
     #[test]
     fn repartition_ordered_no_action_file_too_small() {
         // No action as there are no new groups to redistribute to
-        let single_partition = vec![vec![pfile("a", 100)]];
+        let single_partition = vec![FileGroup::new(vec![pfile("a", 100)])];
 
         let actual = FileGroupPartitioner::new()
             .with_preserve_order_within_groups(true)
@@ -609,7 +753,7 @@ mod test {
     #[test]
     fn repartition_ordered_one_large_file() {
         // "Rebalance" the single large file across partitions
-        let source_partitions = vec![vec![pfile("a", 100)]];
+        let source_partitions = vec![FileGroup::new(vec![pfile("a", 100)])];
 
         let actual = FileGroupPartitioner::new()
             .with_preserve_order_within_groups(true)
@@ -618,9 +762,9 @@ mod test {
             .repartition_file_groups(&source_partitions);
 
         let expected = Some(vec![
-            vec![pfile("a", 100).with_range(0, 34)],
-            vec![pfile("a", 100).with_range(34, 68)],
-            vec![pfile("a", 100).with_range(68, 100)],
+            FileGroup::new(vec![pfile("a", 100).with_range(0, 34)]),
+            FileGroup::new(vec![pfile("a", 100).with_range(34, 68)]),
+            FileGroup::new(vec![pfile("a", 100).with_range(68, 100)]),
         ]);
         assert_partitioned_files(expected, actual);
     }
@@ -629,7 +773,10 @@ mod test {
     fn repartition_ordered_one_large_one_small_file() {
         // "Rebalance" the single large file across empty partitions, but 
can't split
         // small file
-        let source_partitions = vec![vec![pfile("a", 100)], vec![pfile("b", 
30)]];
+        let source_partitions = vec![
+            FileGroup::new(vec![pfile("a", 100)]),
+            FileGroup::new(vec![pfile("b", 30)]),
+        ];
 
         let actual = FileGroupPartitioner::new()
             .with_preserve_order_within_groups(true)
@@ -639,13 +786,13 @@ mod test {
 
         let expected = Some(vec![
             // scan first third of "a"
-            vec![pfile("a", 100).with_range(0, 33)],
+            FileGroup::new(vec![pfile("a", 100).with_range(0, 33)]),
             // only b in this group (can't do this)
-            vec![pfile("b", 30).with_range(0, 30)],
+            FileGroup::new(vec![pfile("b", 30).with_range(0, 30)]),
             // second third of "a"
-            vec![pfile("a", 100).with_range(33, 66)],
+            FileGroup::new(vec![pfile("a", 100).with_range(33, 66)]),
             // final third of "a"
-            vec![pfile("a", 100).with_range(66, 100)],
+            FileGroup::new(vec![pfile("a", 100).with_range(66, 100)]),
         ]);
         assert_partitioned_files(expected, actual);
     }
@@ -653,7 +800,10 @@ mod test {
     #[test]
     fn repartition_ordered_two_large_files() {
         // "Rebalance" two large files across empty partitions, but can't mix 
them
-        let source_partitions = vec![vec![pfile("a", 100)], vec![pfile("b", 
100)]];
+        let source_partitions = vec![
+            FileGroup::new(vec![pfile("a", 100)]),
+            FileGroup::new(vec![pfile("b", 100)]),
+        ];
 
         let actual = FileGroupPartitioner::new()
             .with_preserve_order_within_groups(true)
@@ -663,13 +813,13 @@ mod test {
 
         let expected = Some(vec![
             // scan first half of "a"
-            vec![pfile("a", 100).with_range(0, 50)],
+            FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
             // scan first half of "b"
-            vec![pfile("b", 100).with_range(0, 50)],
+            FileGroup::new(vec![pfile("b", 100).with_range(0, 50)]),
             // second half of "a"
-            vec![pfile("a", 100).with_range(50, 100)],
+            FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
             // second half of "b"
-            vec![pfile("b", 100).with_range(50, 100)],
+            FileGroup::new(vec![pfile("b", 100).with_range(50, 100)]),
         ]);
         assert_partitioned_files(expected, actual);
     }
@@ -678,9 +828,9 @@ mod test {
     fn repartition_ordered_two_large_one_small_files() {
         // "Rebalance" two large files and one small file across empty 
partitions
         let source_partitions = vec![
-            vec![pfile("a", 100)],
-            vec![pfile("b", 100)],
-            vec![pfile("c", 30)],
+            FileGroup::new(vec![pfile("a", 100)]),
+            FileGroup::new(vec![pfile("b", 100)]),
+            FileGroup::new(vec![pfile("c", 30)]),
         ];
 
         let partitioner = FileGroupPartitioner::new()
@@ -694,13 +844,13 @@ mod test {
 
         let expected = Some(vec![
             // scan first half of "a"
-            vec![pfile("a", 100).with_range(0, 50)],
+            FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
             // All of "b"
-            vec![pfile("b", 100).with_range(0, 100)],
+            FileGroup::new(vec![pfile("b", 100).with_range(0, 100)]),
             // All of "c"
-            vec![pfile("c", 30).with_range(0, 30)],
+            FileGroup::new(vec![pfile("c", 30).with_range(0, 30)]),
             // second half of "a"
-            vec![pfile("a", 100).with_range(50, 100)],
+            FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
         ]);
         assert_partitioned_files(expected, actual);
 
@@ -711,15 +861,15 @@ mod test {
 
         let expected = Some(vec![
             // scan first half of "a"
-            vec![pfile("a", 100).with_range(0, 50)],
+            FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
             // scan first half of "b"
-            vec![pfile("b", 100).with_range(0, 50)],
+            FileGroup::new(vec![pfile("b", 100).with_range(0, 50)]),
             // All of "c"
-            vec![pfile("c", 30).with_range(0, 30)],
+            FileGroup::new(vec![pfile("c", 30).with_range(0, 30)]),
             // second half of "a"
-            vec![pfile("a", 100).with_range(50, 100)],
+            FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
             // second half of "b"
-            vec![pfile("b", 100).with_range(50, 100)],
+            FileGroup::new(vec![pfile("b", 100).with_range(50, 100)]),
         ]);
         assert_partitioned_files(expected, actual);
     }
@@ -727,8 +877,12 @@ mod test {
     #[test]
     fn repartition_ordered_one_large_one_small_existing_empty() {
         // "Rebalance" files using existing empty partition
-        let source_partitions =
-            vec![vec![pfile("a", 100)], vec![], vec![pfile("b", 40)], vec![]];
+        let source_partitions = vec![
+            FileGroup::new(vec![pfile("a", 100)]),
+            FileGroup::default(),
+            FileGroup::new(vec![pfile("b", 40)]),
+            FileGroup::default(),
+        ];
 
         let actual = FileGroupPartitioner::new()
             .with_preserve_order_within_groups(true)
@@ -740,14 +894,14 @@ mod test {
         // target partitions), assign two to "a" and one to "b"
         let expected = Some(vec![
             // Scan of "a" across three groups
-            vec![pfile("a", 100).with_range(0, 33)],
-            vec![pfile("a", 100).with_range(33, 66)],
+            FileGroup::new(vec![pfile("a", 100).with_range(0, 33)]),
+            FileGroup::new(vec![pfile("a", 100).with_range(33, 66)]),
             // scan first half of "b"
-            vec![pfile("b", 40).with_range(0, 20)],
+            FileGroup::new(vec![pfile("b", 40).with_range(0, 20)]),
             // final third of "a"
-            vec![pfile("a", 100).with_range(66, 100)],
+            FileGroup::new(vec![pfile("a", 100).with_range(66, 100)]),
             // second half of "b"
-            vec![pfile("b", 40).with_range(20, 40)],
+            FileGroup::new(vec![pfile("b", 40).with_range(20, 40)]),
         ]);
         assert_partitioned_files(expected, actual);
     }
@@ -756,8 +910,8 @@ mod test {
         // groups with multiple files in a group can not be changed, but can 
divide others
         let source_partitions = vec![
             // two files in an existing partition
-            vec![pfile("a", 100), pfile("b", 100)],
-            vec![pfile("c", 40)],
+            FileGroup::new(vec![pfile("a", 100), pfile("b", 100)]),
+            FileGroup::new(vec![pfile("c", 40)]),
         ];
 
         let actual = FileGroupPartitioner::new()
@@ -772,11 +926,11 @@ mod test {
             // don't try and rearrange files in the existing partition
             // assuming that the caller had a good reason to put them that way.
             // (it is technically possible to split off ranges from the files 
if desired)
-            vec![pfile("a", 100), pfile("b", 100)],
+            FileGroup::new(vec![pfile("a", 100), pfile("b", 100)]),
             // first half of "c"
-            vec![pfile("c", 40).with_range(0, 20)],
+            FileGroup::new(vec![pfile("c", 40).with_range(0, 20)]),
             // second half of "c"
-            vec![pfile("c", 40).with_range(20, 40)],
+            FileGroup::new(vec![pfile("c", 40).with_range(20, 40)]),
         ]);
         assert_partitioned_files(expected, actual);
     }
@@ -784,8 +938,8 @@ mod test {
     /// Asserts that the two groups of [`PartitionedFile`] are the same
     /// (PartitionedFile doesn't implement PartialEq)
     fn assert_partitioned_files(
-        expected: Option<Vec<Vec<PartitionedFile>>>,
-        actual: Option<Vec<Vec<PartitionedFile>>>,
+        expected: Option<Vec<FileGroup>>,
+        actual: Option<Vec<FileGroup>>,
     ) {
         match (expected, actual) {
             (None, None) => {}
@@ -808,8 +962,8 @@ mod test {
     /// asserting they return the same value and returns that value
     fn repartition_test(
         partitioner: FileGroupPartitioner,
-        file_groups: Vec<Vec<PartitionedFile>>,
-    ) -> Option<Vec<Vec<PartitionedFile>>> {
+        file_groups: Vec<FileGroup>,
+    ) -> Option<Vec<FileGroup>> {
         let repartitioned = partitioner.repartition_file_groups(&file_groups);
 
         let repartitioned_preserving_sort = partitioner
diff --git a/datafusion/datasource/src/file_scan_config.rs 
b/datafusion/datasource/src/file_scan_config.rs
index 82308bda70..770e26bbf4 100644
--- a/datafusion/datasource/src/file_scan_config.rs
+++ b/datafusion/datasource/src/file_scan_config.rs
@@ -48,6 +48,7 @@ use datafusion_physical_plan::{
 };
 use log::{debug, warn};
 
+use crate::file_groups::FileGroup;
 use crate::{
     display::FileGroupsDisplay,
     file::FileSource,
@@ -71,6 +72,7 @@ use crate::{
 /// # use object_store::ObjectStore;
 /// # use datafusion_common::Statistics;
 /// # use datafusion_datasource::file::FileSource;
+/// # use datafusion_datasource::file_groups::FileGroup;
 /// # use datafusion_datasource::PartitionedFile;
 /// # use datafusion_datasource::file_scan_config::FileScanConfig;
 /// # use datafusion_datasource::file_stream::FileOpener;
@@ -111,10 +113,10 @@ use crate::{
 ///   .with_file(PartitionedFile::new("file1.parquet", 1234))
 ///   // Read /tmp/file2.parquet 56 bytes and /tmp/file3.parquet 78 bytes
 ///   // in a  single row group
-///   .with_file_group(vec![
+///   .with_file_group(FileGroup::new(vec![
 ///    PartitionedFile::new("file2.parquet", 56),
 ///    PartitionedFile::new("file3.parquet", 78),
-///   ]);
+///   ]));
 /// // create an execution plan from the config
 /// let plan: Arc<dyn ExecutionPlan> = config.build();
 /// ```
@@ -145,7 +147,7 @@ pub struct FileScanConfig {
     /// DataFusion may attempt to read each partition of files
     /// concurrently, however files *within* a partition will be read
     /// sequentially, one after the next.
-    pub file_groups: Vec<Vec<PartitionedFile>>,
+    pub file_groups: Vec<FileGroup>,
     /// Table constraints
     pub constraints: Constraints,
     /// Estimated overall statistics of the files, taking `filters` into 
account.
@@ -226,7 +228,7 @@ impl DataSource for FileScanConfig {
             DisplayFormatType::TreeRender => {
                 writeln!(f, "format={}", self.file_source.file_type())?;
                 self.file_source.fmt_extra(t, f)?;
-                let num_files = 
self.file_groups.iter().map(Vec::len).sum::<usize>();
+                let num_files = self.file_groups.iter().map(|fg| 
fg.len()).sum::<usize>();
                 writeln!(f, "files={num_files}")?;
                 Ok(())
             }
@@ -450,16 +452,13 @@ impl FileScanConfig {
     ///
     /// See [Self::file_groups] for more information.
     pub fn with_file(self, file: PartitionedFile) -> Self {
-        self.with_file_group(vec![file])
+        self.with_file_group(FileGroup::new(vec![file]))
     }
 
     /// Add the file groups
     ///
     /// See [Self::file_groups] for more information.
-    pub fn with_file_groups(
-        mut self,
-        mut file_groups: Vec<Vec<PartitionedFile>>,
-    ) -> Self {
+    pub fn with_file_groups(mut self, mut file_groups: Vec<FileGroup>) -> Self 
{
         self.file_groups.append(&mut file_groups);
         self
     }
@@ -467,7 +466,7 @@ impl FileScanConfig {
     /// Add a new file group
     ///
     /// See [Self::file_groups] for more information
-    pub fn with_file_group(mut self, file_group: Vec<PartitionedFile>) -> Self 
{
+    pub fn with_file_group(mut self, file_group: FileGroup) -> Self {
         self.file_groups.push(file_group);
         self
     }
@@ -581,10 +580,13 @@ impl FileScanConfig {
     /// It will produce the smallest number of file groups possible.
     pub fn split_groups_by_statistics(
         table_schema: &SchemaRef,
-        file_groups: &[Vec<PartitionedFile>],
+        file_groups: &[FileGroup],
         sort_order: &LexOrdering,
-    ) -> Result<Vec<Vec<PartitionedFile>>> {
-        let flattened_files = file_groups.iter().flatten().collect::<Vec<_>>();
+    ) -> Result<Vec<FileGroup>> {
+        let flattened_files = file_groups
+            .iter()
+            .flat_map(FileGroup::iter)
+            .collect::<Vec<_>>();
         // First Fit:
         // * Choose the first file group that a file can be placed into.
         // * If it fits into no existing file groups, create a new one.
@@ -1044,7 +1046,7 @@ fn get_projected_output_ordering(
                 &new_ordering,
                 projected_schema,
                 base_config.projection.as_deref(),
-                group,
+                group.iter(),
             ) {
                 Ok(statistics) => statistics,
                 Err(e) => {
@@ -1653,8 +1655,9 @@ mod tests {
                     .collect::<Result<Vec<_>>>()?,
             );
 
-            let partitioned_files =
-                case.files.into_iter().map(From::from).collect::<Vec<_>>();
+            let partitioned_files = FileGroup::new(
+                case.files.into_iter().map(From::from).collect::<Vec<_>>(),
+            );
             let result = FileScanConfig::split_groups_by_statistics(
                 &table_schema,
                 &[partitioned_files.clone()],
diff --git a/datafusion/datasource/src/file_sink_config.rs 
b/datafusion/datasource/src/file_sink_config.rs
index 279c9d2100..dbff28692c 100644
--- a/datafusion/datasource/src/file_sink_config.rs
+++ b/datafusion/datasource/src/file_sink_config.rs
@@ -15,8 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::file_groups::FileGroup;
 use crate::write::demux::{start_demuxer_task, DemuxedStreamReceiver};
-use crate::{ListingTableUrl, PartitionedFile};
+use crate::ListingTableUrl;
 use arrow::datatypes::{DataType, SchemaRef};
 use async_trait::async_trait;
 use datafusion_common::Result;
@@ -90,8 +91,9 @@ pub struct FileSinkConfig {
     pub original_url: String,
     /// Object store URL, used to get an ObjectStore instance
     pub object_store_url: ObjectStoreUrl,
-    /// A vector of [`PartitionedFile`] structs, each representing a file 
partition
-    pub file_groups: Vec<PartitionedFile>,
+    /// A collection of files organized into groups.
+    /// Each FileGroup contains one or more PartitionedFile objects.
+    pub file_group: FileGroup,
     /// Vector of partition paths
     pub table_paths: Vec<ListingTableUrl>,
     /// The schema of the output file
diff --git a/datafusion/datasource/src/file_stream.rs 
b/datafusion/datasource/src/file_stream.rs
index 7d17d230fc..904f152287 100644
--- a/datafusion/datasource/src/file_stream.rs
+++ b/datafusion/datasource/src/file_stream.rs
@@ -88,10 +88,10 @@ impl FileStream {
                 .collect::<Vec<_>>(),
         );
 
-        let files = config.file_groups[partition].clone();
+        let file_group = config.file_groups[partition].clone();
 
         Ok(Self {
-            file_iter: files.into(),
+            file_iter: file_group.into_inner().into_iter().collect(),
             projected_schema,
             remain: config.limit,
             file_opener,
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs 
b/datafusion/proto/src/physical_plan/from_proto.rs
index a417eccee1..833c1ad512 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -32,7 +32,9 @@ use datafusion::datasource::file_format::json::JsonSink;
 use datafusion::datasource::file_format::parquet::ParquetSink;
 use datafusion::datasource::listing::{FileRange, ListingTableUrl, 
PartitionedFile};
 use datafusion::datasource::object_store::ObjectStoreUrl;
-use datafusion::datasource::physical_plan::{FileScanConfig, FileSinkConfig, 
FileSource};
+use datafusion::datasource::physical_plan::{
+    FileGroup, FileScanConfig, FileSinkConfig, FileSource,
+};
 use datafusion::execution::FunctionRegistry;
 use datafusion::logical_expr::WindowFunctionDefinition;
 use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr, 
ScalarFunctionExpr};
@@ -496,7 +498,7 @@ pub fn parse_protobuf_file_scan_config(
     let constraints = convert_required!(proto.constraints)?;
     let statistics = convert_required!(proto.statistics)?;
 
-    let file_groups: Vec<Vec<PartitionedFile>> = proto
+    let file_groups = proto
         .file_groups
         .iter()
         .map(|f| f.try_into())
@@ -585,14 +587,16 @@ impl TryFrom<&protobuf::FileRange> for FileRange {
     }
 }
 
-impl TryFrom<&protobuf::FileGroup> for Vec<PartitionedFile> {
+impl TryFrom<&protobuf::FileGroup> for FileGroup {
     type Error = DataFusionError;
 
     fn try_from(val: &protobuf::FileGroup) -> Result<Self, Self::Error> {
-        val.files
+        let files = val
+            .files
             .iter()
             .map(|f| f.try_into())
-            .collect::<Result<Vec<_>, _>>()
+            .collect::<Result<Vec<_>, _>>()?;
+        Ok(FileGroup::new(files))
     }
 }
 
@@ -634,11 +638,12 @@ impl TryFrom<&protobuf::FileSinkConfig> for 
FileSinkConfig {
     type Error = DataFusionError;
 
     fn try_from(conf: &protobuf::FileSinkConfig) -> Result<Self, Self::Error> {
-        let file_groups = conf
-            .file_groups
-            .iter()
-            .map(TryInto::try_into)
-            .collect::<Result<Vec<_>>>()?;
+        let file_group = FileGroup::new(
+            conf.file_groups
+                .iter()
+                .map(|f| f.try_into())
+                .collect::<Result<Vec<_>>>()?,
+        );
         let table_paths = conf
             .table_paths
             .iter()
@@ -660,7 +665,7 @@ impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig {
         Ok(Self {
             original_url: String::default(),
             object_store_url: ObjectStoreUrl::parse(&conf.object_store_url)?,
-            file_groups,
+            file_group,
             table_paths,
             output_schema: Arc::new(convert_required!(conf.output_schema)?),
             table_partition_cols,
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs 
b/datafusion/proto/src/physical_plan/to_proto.rs
index c2cf506eb9..2eed60590c 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -485,7 +485,7 @@ pub fn serialize_file_scan_config(
     let file_groups = conf
         .file_groups
         .iter()
-        .map(|p| p.as_slice().try_into())
+        .map(|p| p.files().try_into())
         .collect::<Result<Vec<_>, _>>()?;
 
     let mut output_orderings = vec![];
@@ -585,7 +585,7 @@ impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig {
 
     fn try_from(conf: &FileSinkConfig) -> Result<Self, Self::Error> {
         let file_groups = conf
-            .file_groups
+            .file_group
             .iter()
             .map(TryInto::try_into)
             .collect::<Result<Vec<_>>>()?;
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index aeae39c4d0..57d10e38ee 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -46,7 +46,7 @@ use datafusion::datasource::file_format::parquet::ParquetSink;
 use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile};
 use datafusion::datasource::object_store::ObjectStoreUrl;
 use datafusion::datasource::physical_plan::{
-    wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig,
+    wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileGroup, 
FileScanConfig,
     FileSinkConfig, FileSource, ParquetSource,
 };
 use datafusion::execution::FunctionRegistry;
@@ -743,10 +743,10 @@ fn roundtrip_parquet_exec_with_pruning_predicate() -> 
Result<()> {
 
     let scan_config =
         FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema, 
file_source)
-            .with_file_groups(vec![vec![PartitionedFile::new(
+            .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
                 "/path/to/file.parquet".to_string(),
                 1024,
-            )]])
+            )])])
             .with_statistics(Statistics {
                 num_rows: Precision::Inexact(100),
                 total_byte_size: Precision::Inexact(1024),
@@ -770,7 +770,7 @@ async fn roundtrip_parquet_exec_with_table_partition_cols() 
-> Result<()> {
     let scan_config =
         FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema, 
file_source)
             .with_projection(Some(vec![0, 1]))
-            .with_file_group(vec![file_group])
+            .with_file_group(FileGroup::new(vec![file_group]))
             .with_table_partition_cols(vec![Field::new(
                 "part".to_string(),
                 wrap_partition_type_in_dict(DataType::Int16),
@@ -797,10 +797,10 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> 
Result<()> {
 
     let scan_config =
         FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema, 
file_source)
-            .with_file_groups(vec![vec![PartitionedFile::new(
+            .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
                 "/path/to/file.parquet".to_string(),
                 1024,
-            )]])
+            )])])
             .with_statistics(Statistics {
                 num_rows: Precision::Inexact(100),
                 total_byte_size: Precision::Inexact(1024),
@@ -1296,7 +1296,7 @@ fn roundtrip_json_sink() -> Result<()> {
     let file_sink_config = FileSinkConfig {
         original_url: String::default(),
         object_store_url: ObjectStoreUrl::local_filesystem(),
-        file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
+        file_group: 
FileGroup::new(vec![PartitionedFile::new("/tmp".to_string(), 1)]),
         table_paths: vec![ListingTableUrl::parse("file:///")?],
         output_schema: schema.clone(),
         table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)],
@@ -1333,7 +1333,7 @@ fn roundtrip_csv_sink() -> Result<()> {
     let file_sink_config = FileSinkConfig {
         original_url: String::default(),
         object_store_url: ObjectStoreUrl::local_filesystem(),
-        file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
+        file_group: 
FileGroup::new(vec![PartitionedFile::new("/tmp".to_string(), 1)]),
         table_paths: vec![ListingTableUrl::parse("file:///")?],
         output_schema: schema.clone(),
         table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)],
@@ -1389,7 +1389,7 @@ fn roundtrip_parquet_sink() -> Result<()> {
     let file_sink_config = FileSinkConfig {
         original_url: String::default(),
         object_store_url: ObjectStoreUrl::local_filesystem(),
-        file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
+        file_group: 
FileGroup::new(vec![PartitionedFile::new("/tmp".to_string(), 1)]),
         table_paths: vec![ListingTableUrl::parse("file:///")?],
         output_schema: schema.clone(),
         table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)],
@@ -1609,10 +1609,10 @@ async fn roundtrip_projection_source() -> Result<()> {
         schema.clone(),
         file_source,
     )
-    .with_file_groups(vec![vec![PartitionedFile::new(
+    .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
         "/path/to/file.parquet".to_string(),
         1024,
-    )]])
+    )])])
     .with_statistics(statistics)
     .with_projection(Some(vec![0, 1, 2]));
 
diff --git a/datafusion/substrait/src/physical_plan/consumer.rs 
b/datafusion/substrait/src/physical_plan/consumer.rs
index 7bbdfc2a5d..cf1e5c2db9 100644
--- a/datafusion/substrait/src/physical_plan/consumer.rs
+++ b/datafusion/substrait/src/physical_plan/consumer.rs
@@ -22,7 +22,7 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema};
 use datafusion::common::{not_impl_err, substrait_err};
 use datafusion::datasource::listing::PartitionedFile;
 use datafusion::datasource::object_store::ObjectStoreUrl;
-use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
+use datafusion::datasource::physical_plan::{FileGroup, FileScanConfig, 
ParquetSource};
 use datafusion::error::{DataFusionError, Result};
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion::prelude::SessionContext;
@@ -134,7 +134,7 @@ pub async fn from_substrait_rel(
 
                         let part_index = file.partition_index as usize;
                         while part_index >= file_groups.len() {
-                            file_groups.push(vec![]);
+                            file_groups.push(FileGroup::default());
                         }
                         file_groups[part_index].push(partitioned_file)
                     }
diff --git a/datafusion/substrait/src/physical_plan/producer.rs 
b/datafusion/substrait/src/physical_plan/producer.rs
index f84db541b7..65eaf79852 100644
--- a/datafusion/substrait/src/physical_plan/producer.rs
+++ b/datafusion/substrait/src/physical_plan/producer.rs
@@ -63,7 +63,7 @@ pub fn to_substrait_rel(
                 let mut substrait_files = vec![];
                 for (partition_index, files) in 
file_config.file_groups.iter().enumerate()
                 {
-                    for file in files {
+                    for file in files.iter() {
                         substrait_files.push(FileOrFiles {
                             partition_index: 
partition_index.try_into().unwrap(),
                             start: 0,
diff --git a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs 
b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
index f1284db2ad..d8b3867732 100644
--- a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
@@ -22,7 +22,7 @@ use datafusion::arrow::datatypes::Schema;
 use datafusion::dataframe::DataFrame;
 use datafusion::datasource::listing::PartitionedFile;
 use datafusion::datasource::object_store::ObjectStoreUrl;
-use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
+use datafusion::datasource::physical_plan::{FileGroup, FileScanConfig, 
ParquetSource};
 use datafusion::error::Result;
 use datafusion::physical_plan::{displayable, ExecutionPlan};
 use datafusion::prelude::{ParquetReadOptions, SessionContext};
@@ -40,14 +40,14 @@ async fn parquet_exec() -> Result<()> {
         source,
     )
     .with_file_groups(vec![
-        vec![PartitionedFile::new(
+        FileGroup::new(vec![PartitionedFile::new(
             "file://foo/part-0.parquet".to_string(),
             123,
-        )],
-        vec![PartitionedFile::new(
+        )]),
+        FileGroup::new(vec![PartitionedFile::new(
             "file://foo/part-1.parquet".to_string(),
             123,
-        )],
+        )]),
     ]);
     let parquet_exec: Arc<dyn ExecutionPlan> = scan_config.build();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to