This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 4709fc65f7 Add `FileScanConfig::new()` API (#10623)
4709fc65f7 is described below

commit 4709fc65f7debc143696fa3a23ab6569ec8a383c
Author: Andrew Lamb <[email protected]>
AuthorDate: Sat May 25 07:00:13 2024 -0400

    Add `FileScanConfig::new()` API (#10623)
    
    * Add FileScanConfig::new() API, update code to use new API
    
    * Remove add_* api
---
 datafusion-examples/examples/csv_opener.rs         |  16 +--
 datafusion-examples/examples/json_opener.rs        |  16 +--
 datafusion/core/src/datasource/file_format/mod.rs  |  15 +--
 datafusion/core/src/datasource/listing/table.rs    |  17 ++-
 .../core/src/datasource/physical_plan/avro.rs      |  55 ++++-----
 .../core/src/datasource/physical_plan/csv.rs       |  12 +-
 .../datasource/physical_plan/file_scan_config.rs   | 124 +++++++++++++++++++--
 .../src/datasource/physical_plan/file_stream.rs    |  16 +--
 .../core/src/datasource/physical_plan/json.rs      |  52 ++-------
 .../src/datasource/physical_plan/parquet/mod.rs    |  56 +++-------
 .../combine_partial_final_agg.rs                   |  17 +--
 .../src/physical_optimizer/enforce_distribution.rs |  74 ++++--------
 .../src/physical_optimizer/projection_pushdown.rs  |  34 +++---
 .../replace_with_order_preserving_variants.rs      |  22 ++--
 .../core/src/physical_optimizer/test_utils.rs      |  27 +----
 datafusion/core/src/test/mod.rs                    |  42 ++-----
 datafusion/core/src/test_util/parquet.rs           |  27 ++---
 datafusion/core/tests/parquet/custom_reader.rs     |  17 +--
 datafusion/core/tests/parquet/page_pruning.rs      |  14 +--
 datafusion/core/tests/parquet/schema_adapter.rs    |  14 +--
 datafusion/core/tests/parquet/schema_coercion.rs   |  31 ++----
 datafusion/execution/src/object_store.rs           |   9 +-
 datafusion/substrait/src/physical_plan/consumer.rs |  17 +--
 .../tests/cases/roundtrip_physical_plan.rs         |  35 +++---
 24 files changed, 316 insertions(+), 443 deletions(-)

diff --git a/datafusion-examples/examples/csv_opener.rs 
b/datafusion-examples/examples/csv_opener.rs
index 96753c8c52..d02aa9b308 100644
--- a/datafusion-examples/examples/csv_opener.rs
+++ b/datafusion-examples/examples/csv_opener.rs
@@ -17,7 +17,6 @@
 
 use std::{sync::Arc, vec};
 
-use datafusion::common::Statistics;
 use datafusion::{
     assert_batches_eq,
     datasource::{
@@ -58,16 +57,11 @@ async fn main() -> Result<()> {
 
     let path = std::path::Path::new(&path).canonicalize()?;
 
-    let scan_config = FileScanConfig {
-        object_store_url: ObjectStoreUrl::local_filesystem(),
-        file_schema: schema.clone(),
-        file_groups: 
vec![vec![PartitionedFile::new(path.display().to_string(), 10)]],
-        statistics: Statistics::new_unknown(&schema),
-        projection: Some(vec![12, 0]),
-        limit: Some(5),
-        table_partition_cols: vec![],
-        output_ordering: vec![],
-    };
+    let scan_config =
+        FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema.clone())
+            .with_projection(Some(vec![12, 0]))
+            .with_limit(Some(5))
+            .with_file(PartitionedFile::new(path.display().to_string(), 10));
 
     let result =
         FileStream::new(&scan_config, 0, opener, 
&ExecutionPlanMetricsSet::new())
diff --git a/datafusion-examples/examples/json_opener.rs 
b/datafusion-examples/examples/json_opener.rs
index ee33f969ca..e32fb9b096 100644
--- a/datafusion-examples/examples/json_opener.rs
+++ b/datafusion-examples/examples/json_opener.rs
@@ -29,7 +29,6 @@ use datafusion::{
     error::Result,
     physical_plan::metrics::ExecutionPlanMetricsSet,
 };
-use datafusion_common::Statistics;
 
 use futures::StreamExt;
 use object_store::ObjectStore;
@@ -61,16 +60,11 @@ async fn main() -> Result<()> {
         Arc::new(object_store),
     );
 
-    let scan_config = FileScanConfig {
-        object_store_url: ObjectStoreUrl::local_filesystem(),
-        file_schema: schema.clone(),
-        file_groups: vec![vec![PartitionedFile::new(path.to_string(), 10)]],
-        statistics: Statistics::new_unknown(&schema),
-        projection: Some(vec![1, 0]),
-        limit: Some(5),
-        table_partition_cols: vec![],
-        output_ordering: vec![],
-    };
+    let scan_config =
+        FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema.clone())
+            .with_projection(Some(vec![1, 0]))
+            .with_limit(Some(5))
+            .with_file(PartitionedFile::new(path.to_string(), 10));
 
     let result =
         FileStream::new(&scan_config, 0, opener, 
&ExecutionPlanMetricsSet::new())
diff --git a/datafusion/core/src/datasource/file_format/mod.rs 
b/datafusion/core/src/datasource/file_format/mod.rs
index 243a91b743..7cc3421ebb 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -154,16 +154,11 @@ pub(crate) mod test_util {
         let exec = format
             .create_physical_plan(
                 state,
-                FileScanConfig {
-                    object_store_url: ObjectStoreUrl::local_filesystem(),
-                    file_schema,
-                    file_groups,
-                    statistics,
-                    projection,
-                    limit,
-                    table_partition_cols: vec![],
-                    output_ordering: vec![],
-                },
+                FileScanConfig::new(ObjectStoreUrl::local_filesystem(), 
file_schema)
+                    .with_file_groups(file_groups)
+                    .with_statistics(statistics)
+                    .with_projection(projection)
+                    .with_limit(limit),
                 None,
             )
             .await?;
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index cf70894806..746e4b8e33 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -805,16 +805,13 @@ impl TableProvider for ListingTable {
             .format
             .create_physical_plan(
                 state,
-                FileScanConfig {
-                    object_store_url,
-                    file_schema: Arc::clone(&self.file_schema),
-                    file_groups: partitioned_file_lists,
-                    statistics,
-                    projection: projection.cloned(),
-                    limit,
-                    output_ordering,
-                    table_partition_cols,
-                },
+                FileScanConfig::new(object_store_url, 
Arc::clone(&self.file_schema))
+                    .with_file_groups(partitioned_file_lists)
+                    .with_statistics(statistics)
+                    .with_projection(projection.cloned())
+                    .with_limit(limit)
+                    .with_output_ordering(output_ordering)
+                    .with_table_partition_cols(table_partition_cols),
                 filters.as_ref(),
             )
             .await
diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs 
b/datafusion/core/src/datasource/physical_plan/avro.rs
index a8a29e9bba..4bb8f28860 100644
--- a/datafusion/core/src/datasource/physical_plan/avro.rs
+++ b/datafusion/core/src/datasource/physical_plan/avro.rs
@@ -271,16 +271,11 @@ mod tests {
             .infer_schema(&state, &store, &[meta.clone()])
             .await?;
 
-        let avro_exec = AvroExec::new(FileScanConfig {
-            object_store_url: ObjectStoreUrl::local_filesystem(),
-            file_groups: vec![vec![meta.into()]],
-            statistics: Statistics::new_unknown(&file_schema),
-            file_schema,
-            projection: Some(vec![0, 1, 2]),
-            limit: None,
-            table_partition_cols: vec![],
-            output_ordering: vec![],
-        });
+        let avro_exec = AvroExec::new(
+            FileScanConfig::new(ObjectStoreUrl::local_filesystem(), 
file_schema)
+                .with_file(meta.into())
+                .with_projection(Some(vec![0, 1, 2])),
+        );
         assert_eq!(
             avro_exec
                 .properties()
@@ -348,16 +343,11 @@ mod tests {
         // Include the missing column in the projection
         let projection = Some(vec![0, 1, 2, actual_schema.fields().len()]);
 
-        let avro_exec = AvroExec::new(FileScanConfig {
-            object_store_url,
-            file_groups: vec![vec![meta.into()]],
-            statistics: Statistics::new_unknown(&file_schema),
-            file_schema,
-            projection,
-            limit: None,
-            table_partition_cols: vec![],
-            output_ordering: vec![],
-        });
+        let avro_exec = AvroExec::new(
+            FileScanConfig::new(object_store_url, file_schema)
+                .with_file(meta.into())
+                .with_projection(projection),
+        );
         assert_eq!(
             avro_exec
                 .properties()
@@ -422,18 +412,19 @@ mod tests {
         let mut partitioned_file = PartitionedFile::from(meta);
         partitioned_file.partition_values = 
vec![ScalarValue::from("2021-10-26")];
 
-        let avro_exec = AvroExec::new(FileScanConfig {
-            // select specific columns of the files as well as the partitioning
-            // column which is supposed to be the last column in the table 
schema.
-            projection: Some(vec![0, 1, file_schema.fields().len(), 2]),
-            object_store_url,
-            file_groups: vec![vec![partitioned_file]],
-            statistics: Statistics::new_unknown(&file_schema),
-            file_schema,
-            limit: None,
-            table_partition_cols: vec![Field::new("date", DataType::Utf8, 
false)],
-            output_ordering: vec![],
-        });
+        let projection = Some(vec![0, 1, file_schema.fields().len(), 2]);
+        let avro_exec = AvroExec::new(
+            FileScanConfig::new(object_store_url, file_schema)
+                // select specific columns of the files as well as the 
partitioning
+                // column which is supposed to be the last column in the table 
schema.
+                .with_projection(projection)
+                .with_file(partitioned_file)
+                .with_table_partition_cols(vec![Field::new(
+                    "date",
+                    DataType::Utf8,
+                    false,
+                )]),
+        );
         assert_eq!(
             avro_exec
                 .properties()
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs 
b/datafusion/core/src/datasource/physical_plan/csv.rs
index a266b9b014..679f6c0109 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -561,7 +561,7 @@ mod tests {
             tmp_dir.path(),
         )?;
 
-        let mut config = partitioned_csv_config(file_schema, file_groups)?;
+        let mut config = partitioned_csv_config(file_schema, file_groups);
         config.projection = Some(vec![0, 2, 4]);
 
         let csv = CsvExec::new(
@@ -627,7 +627,7 @@ mod tests {
             tmp_dir.path(),
         )?;
 
-        let mut config = partitioned_csv_config(file_schema, file_groups)?;
+        let mut config = partitioned_csv_config(file_schema, file_groups);
         config.projection = Some(vec![4, 0, 2]);
 
         let csv = CsvExec::new(
@@ -693,7 +693,7 @@ mod tests {
             tmp_dir.path(),
         )?;
 
-        let mut config = partitioned_csv_config(file_schema, file_groups)?;
+        let mut config = partitioned_csv_config(file_schema, file_groups);
         config.limit = Some(5);
 
         let csv = CsvExec::new(
@@ -756,7 +756,7 @@ mod tests {
             tmp_dir.path(),
         )?;
 
-        let mut config = partitioned_csv_config(file_schema, file_groups)?;
+        let mut config = partitioned_csv_config(file_schema, file_groups);
         config.limit = Some(5);
 
         let csv = CsvExec::new(
@@ -809,7 +809,7 @@ mod tests {
             tmp_dir.path(),
         )?;
 
-        let mut config = partitioned_csv_config(file_schema, file_groups)?;
+        let mut config = partitioned_csv_config(file_schema, file_groups);
 
         // Add partition columns
         config.table_partition_cols = vec![Field::new("date", DataType::Utf8, 
false)];
@@ -914,7 +914,7 @@ mod tests {
         )
         .unwrap();
 
-        let config = partitioned_csv_config(file_schema, file_groups).unwrap();
+        let config = partitioned_csv_config(file_schema, file_groups);
         let csv = CsvExec::new(
             config,
             true,
diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs 
b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
index 4de7eb136f..f5d3c7a641 100644
--- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
@@ -64,12 +64,41 @@ pub fn wrap_partition_value_in_dict(val: ScalarValue) -> 
ScalarValue {
 
 /// The base configurations to provide when creating a physical plan for
 /// any given file format.
+///
+/// # Example
+/// ```
+/// # use std::sync::Arc;
+/// # use arrow_schema::Schema;
+/// use datafusion::datasource::listing::PartitionedFile;
+/// # use datafusion::datasource::physical_plan::FileScanConfig;
+/// # use datafusion_execution::object_store::ObjectStoreUrl;
+/// # let file_schema = Arc::new(Schema::empty());
+/// // create FileScan config for reading data from file://
+/// let object_store_url = ObjectStoreUrl::local_filesystem();
+/// let config = FileScanConfig::new(object_store_url, file_schema)
+///   .with_limit(Some(1000))            // read only the first 1000 records
+///   .with_projection(Some(vec![2, 3])) // project columns 2 and 3
+///    // Read /tmp/file1.parquet with known size of 1234 bytes in a single 
group
+///   .with_file(PartitionedFile::new("file1.parquet", 1234))
+///   // Read /tmp/file2.parquet 56 bytes and /tmp/file3.parquet 78 bytes
+///   // in a  single row group
+///   .with_file_group(vec![
+///    PartitionedFile::new("file2.parquet", 56),
+///    PartitionedFile::new("file3.parquet", 78),
+///   ]);
+/// ```
 #[derive(Clone)]
 pub struct FileScanConfig {
     /// Object store URL, used to get an [`ObjectStore`] instance from
     /// [`RuntimeEnv::object_store`]
     ///
+    /// This `ObjectStoreUrl` should be the prefix of the absolute url for 
files
+    /// as `file://` or `s3://my_bucket`. It should not include the path to the
+    /// file itself. The relevant URL prefix must be registered via
+    /// [`RuntimeEnv::register_object_store`]
+    ///
     /// [`ObjectStore`]: object_store::ObjectStore
+    /// [`RuntimeEnv::register_object_store`]: 
datafusion_execution::runtime_env::RuntimeEnv::register_object_store
     /// [`RuntimeEnv::object_store`]: 
datafusion_execution::runtime_env::RuntimeEnv::object_store
     pub object_store_url: ObjectStoreUrl,
     /// Schema before `projection` is applied. It contains the all columns 
that may
@@ -87,6 +116,7 @@ pub struct FileScanConfig {
     /// sequentially, one after the next.
     pub file_groups: Vec<Vec<PartitionedFile>>,
     /// Estimated overall statistics of the files, taking `filters` into 
account.
+    /// Defaults to [`Statistics::new_unknown`].
     pub statistics: Statistics,
     /// Columns on which to project the data. Indexes that are higher than the
     /// number of columns of `file_schema` refer to `table_partition_cols`.
@@ -101,6 +131,86 @@ pub struct FileScanConfig {
 }
 
 impl FileScanConfig {
+    /// Create a new `FileScanConfig` with default settings for scanning files.
+    ///
+    /// See example on [`FileScanConfig`]
+    ///
+    /// No file groups are added by default. See [`Self::with_file`], 
[`Self::with_file_group]` and
+    /// [`Self::with_file_groups`].
+    ///
+    /// # Parameters:
+    /// * `object_store_url`: See [`Self::object_store_url`]
+    /// * `file_schema`: See [`Self::file_schema`]
+    pub fn new(object_store_url: ObjectStoreUrl, file_schema: SchemaRef) -> 
Self {
+        let statistics = Statistics::new_unknown(&file_schema);
+        Self {
+            object_store_url,
+            file_schema,
+            file_groups: vec![],
+            statistics,
+            projection: None,
+            limit: None,
+            table_partition_cols: vec![],
+            output_ordering: vec![],
+        }
+    }
+
+    /// Set the statistics of the files
+    pub fn with_statistics(mut self, statistics: Statistics) -> Self {
+        self.statistics = statistics;
+        self
+    }
+
+    /// Set the projection of the files
+    pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
+        self.projection = projection;
+        self
+    }
+
+    /// Set the limit of the files
+    pub fn with_limit(mut self, limit: Option<usize>) -> Self {
+        self.limit = limit;
+        self
+    }
+
+    /// Add a file as a single group
+    ///
+    /// See [Self::file_groups] for more information.
+    pub fn with_file(self, file: PartitionedFile) -> Self {
+        self.with_file_group(vec![file])
+    }
+
+    /// Add the file groups
+    ///
+    /// See [Self::file_groups] for more information.
+    pub fn with_file_groups(
+        mut self,
+        mut file_groups: Vec<Vec<PartitionedFile>>,
+    ) -> Self {
+        self.file_groups.append(&mut file_groups);
+        self
+    }
+
+    /// Add a new file group
+    ///
+    /// See [Self::file_groups] for more information
+    pub fn with_file_group(mut self, file_group: Vec<PartitionedFile>) -> Self 
{
+        self.file_groups.push(file_group);
+        self
+    }
+
+    /// Set the partitioning columns of the files
+    pub fn with_table_partition_cols(mut self, table_partition_cols: 
Vec<Field>) -> Self {
+        self.table_partition_cols = table_partition_cols;
+        self
+    }
+
+    /// Set the output ordering of the files
+    pub fn with_output_ordering(mut self, output_ordering: Vec<LexOrdering>) 
-> Self {
+        self.output_ordering = output_ordering;
+        self
+    }
+
     /// Project the schema and the statistics on the given column indices
     pub fn project(&self) -> (SchemaRef, Statistics, Vec<LexOrdering>) {
         if self.projection.is_none() && self.table_partition_cols.is_empty() {
@@ -1117,16 +1227,10 @@ mod tests {
         statistics: Statistics,
         table_partition_cols: Vec<Field>,
     ) -> FileScanConfig {
-        FileScanConfig {
-            file_schema,
-            file_groups: vec![vec![]],
-            limit: None,
-            object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
-            projection,
-            statistics,
-            table_partition_cols,
-            output_ordering: vec![],
-        }
+        FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), 
file_schema)
+            .with_projection(projection)
+            .with_statistics(statistics)
+            .with_table_partition_cols(table_partition_cols)
     }
 
     /// Convert partition columns from Vec<String DataType> to Vec<Field>
diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs 
b/datafusion/core/src/datasource/physical_plan/file_stream.rs
index 9732d08c7a..6f354b31ae 100644
--- a/datafusion/core/src/datasource/physical_plan/file_stream.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs
@@ -524,7 +524,7 @@ mod tests {
     use crate::test::{make_partition, object_store::register_test_store};
 
     use arrow_schema::Schema;
-    use datafusion_common::{internal_err, Statistics};
+    use datafusion_common::internal_err;
 
     /// Test `FileOpener` which will simulate errors during file opening or 
scanning
     #[derive(Default)]
@@ -643,16 +643,12 @@ mod tests {
 
             let on_error = self.on_error;
 
-            let config = FileScanConfig {
-                object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
-                statistics: Statistics::new_unknown(&file_schema),
+            let config = FileScanConfig::new(
+                ObjectStoreUrl::parse("test:///").unwrap(),
                 file_schema,
-                file_groups: vec![file_group],
-                projection: None,
-                limit: self.limit,
-                table_partition_cols: vec![],
-                output_ordering: vec![],
-            };
+            )
+            .with_file_group(file_group)
+            .with_limit(self.limit);
             let metrics_set = ExecutionPlanMetricsSet::new();
             let file_stream = FileStream::new(&config, 0, self.opener, 
&metrics_set)
                 .unwrap()
diff --git a/datafusion/core/src/datasource/physical_plan/json.rs 
b/datafusion/core/src/datasource/physical_plan/json.rs
index 4728069f19..5e8ba52659 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -521,16 +521,9 @@ mod tests {
             prepare_store(&state, file_compression_type.to_owned(), 
tmp_dir.path()).await;
 
         let exec = NdJsonExec::new(
-            FileScanConfig {
-                object_store_url,
-                file_groups,
-                statistics: Statistics::new_unknown(&file_schema),
-                file_schema,
-                projection: None,
-                limit: Some(3),
-                table_partition_cols: vec![],
-                output_ordering: vec![],
-            },
+            FileScanConfig::new(object_store_url, file_schema)
+                .with_file_groups(file_groups)
+                .with_limit(Some(3)),
             file_compression_type.to_owned(),
         );
 
@@ -599,16 +592,9 @@ mod tests {
         let missing_field_idx = file_schema.fields.len() - 1;
 
         let exec = NdJsonExec::new(
-            FileScanConfig {
-                object_store_url,
-                file_groups,
-                statistics: Statistics::new_unknown(&file_schema),
-                file_schema,
-                projection: None,
-                limit: Some(3),
-                table_partition_cols: vec![],
-                output_ordering: vec![],
-            },
+            FileScanConfig::new(object_store_url, file_schema)
+                .with_file_groups(file_groups)
+                .with_limit(Some(3)),
             file_compression_type.to_owned(),
         );
 
@@ -646,16 +632,9 @@ mod tests {
             prepare_store(&state, file_compression_type.to_owned(), 
tmp_dir.path()).await;
 
         let exec = NdJsonExec::new(
-            FileScanConfig {
-                object_store_url,
-                file_groups,
-                statistics: Statistics::new_unknown(&file_schema),
-                file_schema,
-                projection: Some(vec![0, 2]),
-                limit: None,
-                table_partition_cols: vec![],
-                output_ordering: vec![],
-            },
+            FileScanConfig::new(object_store_url, file_schema)
+                .with_file_groups(file_groups)
+                .with_projection(Some(vec![0, 2])),
             file_compression_type.to_owned(),
         );
         let inferred_schema = exec.schema();
@@ -698,16 +677,9 @@ mod tests {
             prepare_store(&state, file_compression_type.to_owned(), 
tmp_dir.path()).await;
 
         let exec = NdJsonExec::new(
-            FileScanConfig {
-                object_store_url,
-                file_groups,
-                statistics: Statistics::new_unknown(&file_schema),
-                file_schema,
-                projection: Some(vec![3, 0, 2]),
-                limit: None,
-                table_partition_cols: vec![],
-                output_ordering: vec![],
-            },
+            FileScanConfig::new(object_store_url, file_schema)
+                .with_file_groups(file_groups)
+                .with_projection(Some(vec![3, 0, 2])),
             file_compression_type.to_owned(),
         );
         let inferred_schema = exec.schema();
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index 410413ebd7..17cb6a66c7 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -925,23 +925,16 @@ mod tests {
             // files with multiple pages
             let multi_page = page_index_predicate;
             let (meta, _files) = store_parquet(batches, 
multi_page).await.unwrap();
-            let file_groups = meta.into_iter().map(Into::into).collect();
+            let file_group = meta.into_iter().map(Into::into).collect();
 
             // set up predicate (this is normally done by a layer higher up)
             let predicate = predicate.map(|p| logical2physical(&p, 
&file_schema));
 
             // prepare the scan
             let mut parquet_exec = ParquetExec::new(
-                FileScanConfig {
-                    object_store_url: ObjectStoreUrl::local_filesystem(),
-                    file_groups: vec![file_groups],
-                    statistics: Statistics::new_unknown(&file_schema),
-                    file_schema,
-                    projection,
-                    limit: None,
-                    table_partition_cols: vec![],
-                    output_ordering: vec![],
-                },
+                FileScanConfig::new(ObjectStoreUrl::local_filesystem(), 
file_schema)
+                    .with_file_group(file_group)
+                    .with_projection(projection),
                 predicate,
                 None,
                 Default::default(),
@@ -1590,16 +1583,8 @@ mod tests {
             file_schema: SchemaRef,
         ) -> Result<()> {
             let parquet_exec = ParquetExec::new(
-                FileScanConfig {
-                    object_store_url: ObjectStoreUrl::local_filesystem(),
-                    file_groups,
-                    statistics: Statistics::new_unknown(&file_schema),
-                    file_schema,
-                    projection: None,
-                    limit: None,
-                    table_partition_cols: vec![],
-                    output_ordering: vec![],
-                },
+                FileScanConfig::new(ObjectStoreUrl::local_filesystem(), 
file_schema)
+                    .with_file_groups(file_groups),
                 None,
                 None,
                 Default::default(),
@@ -1700,15 +1685,11 @@ mod tests {
         ]);
 
         let parquet_exec = ParquetExec::new(
-            FileScanConfig {
-                object_store_url,
-                file_groups: vec![vec![partitioned_file]],
-                file_schema: schema.clone(),
-                statistics: Statistics::new_unknown(&schema),
+            FileScanConfig::new(object_store_url, schema.clone())
+                .with_file(partitioned_file)
                 // file has 10 cols so index 12 should be month and 13 should 
be day
-                projection: Some(vec![0, 1, 2, 12, 13]),
-                limit: None,
-                table_partition_cols: vec![
+                .with_projection(Some(vec![0, 1, 2, 12, 13]))
+                .with_table_partition_cols(vec![
                     Field::new("year", DataType::Utf8, false),
                     Field::new("month", DataType::UInt8, false),
                     Field::new(
@@ -1719,9 +1700,7 @@ mod tests {
                         ),
                         false,
                     ),
-                ],
-                output_ordering: vec![],
-            },
+                ]),
             None,
             None,
             Default::default(),
@@ -1779,17 +1758,10 @@ mod tests {
             extensions: None,
         };
 
+        let file_schema = Arc::new(Schema::empty());
         let parquet_exec = ParquetExec::new(
-            FileScanConfig {
-                object_store_url: ObjectStoreUrl::local_filesystem(),
-                file_groups: vec![vec![partitioned_file]],
-                file_schema: Arc::new(Schema::empty()),
-                statistics: Statistics::new_unknown(&Schema::empty()),
-                projection: None,
-                limit: None,
-                table_partition_cols: vec![],
-                output_ordering: vec![],
-            },
+            FileScanConfig::new(ObjectStoreUrl::local_filesystem(), 
file_schema)
+                .with_file(partitioned_file),
             None,
             None,
             Default::default(),
diff --git 
a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs 
b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
index e41e4dd316..b93f4012b0 100644
--- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
+++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
@@ -203,7 +203,7 @@ mod tests {
     use crate::datasource::physical_plan::{FileScanConfig, ParquetExec};
     use crate::physical_plan::expressions::lit;
     use crate::physical_plan::repartition::RepartitionExec;
-    use crate::physical_plan::{displayable, Partitioning, Statistics};
+    use crate::physical_plan::{displayable, Partitioning};
 
     use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
     use datafusion_physical_expr::expressions::{col, Count, Sum};
@@ -246,16 +246,11 @@ mod tests {
 
     fn parquet_exec(schema: &SchemaRef) -> Arc<ParquetExec> {
         Arc::new(ParquetExec::new(
-            FileScanConfig {
-                object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
-                file_schema: schema.clone(),
-                file_groups: vec![vec![PartitionedFile::new("x".to_string(), 
100)]],
-                statistics: Statistics::new_unknown(schema),
-                projection: None,
-                limit: None,
-                table_partition_cols: vec![],
-                output_ordering: vec![],
-            },
+            FileScanConfig::new(
+                ObjectStoreUrl::parse("test:///").unwrap(),
+                schema.clone(),
+            )
+            .with_file(PartitionedFile::new("x".to_string(), 100)),
             None,
             None,
             Default::default(),
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index cd84e911d3..033cec5301 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -1432,16 +1432,9 @@ pub(crate) mod tests {
         output_ordering: Vec<Vec<PhysicalSortExpr>>,
     ) -> Arc<ParquetExec> {
         Arc::new(ParquetExec::new(
-            FileScanConfig {
-                object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
-                file_schema: schema(),
-                file_groups: vec![vec![PartitionedFile::new("x".to_string(), 
100)]],
-                statistics: Statistics::new_unknown(&schema()),
-                projection: None,
-                limit: None,
-                table_partition_cols: vec![],
-                output_ordering,
-            },
+            FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), 
schema())
+                .with_file(PartitionedFile::new("x".to_string(), 100))
+                .with_output_ordering(output_ordering),
             None,
             None,
             Default::default(),
@@ -1457,19 +1450,12 @@ pub(crate) mod tests {
         output_ordering: Vec<Vec<PhysicalSortExpr>>,
     ) -> Arc<ParquetExec> {
         Arc::new(ParquetExec::new(
-            FileScanConfig {
-                object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
-                file_schema: schema(),
-                file_groups: vec![
+            FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), 
schema())
+                .with_file_groups(vec![
                     vec![PartitionedFile::new("x".to_string(), 100)],
                     vec![PartitionedFile::new("y".to_string(), 100)],
-                ],
-                statistics: Statistics::new_unknown(&schema()),
-                projection: None,
-                limit: None,
-                table_partition_cols: vec![],
-                output_ordering,
-            },
+                ])
+                .with_output_ordering(output_ordering),
             None,
             None,
             Default::default(),
@@ -1482,16 +1468,9 @@ pub(crate) mod tests {
 
     fn csv_exec_with_sort(output_ordering: Vec<Vec<PhysicalSortExpr>>) -> 
Arc<CsvExec> {
         Arc::new(CsvExec::new(
-            FileScanConfig {
-                object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
-                file_schema: schema(),
-                file_groups: vec![vec![PartitionedFile::new("x".to_string(), 
100)]],
-                statistics: Statistics::new_unknown(&schema()),
-                projection: None,
-                limit: None,
-                table_partition_cols: vec![],
-                output_ordering,
-            },
+            FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), 
schema())
+                .with_file(PartitionedFile::new("x".to_string(), 100))
+                .with_output_ordering(output_ordering),
             false,
             b',',
             b'"',
@@ -1509,19 +1488,12 @@ pub(crate) mod tests {
         output_ordering: Vec<Vec<PhysicalSortExpr>>,
     ) -> Arc<CsvExec> {
         Arc::new(CsvExec::new(
-            FileScanConfig {
-                object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
-                file_schema: schema(),
-                file_groups: vec![
+            FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), 
schema())
+                .with_file_groups(vec![
                     vec![PartitionedFile::new("x".to_string(), 100)],
                     vec![PartitionedFile::new("y".to_string(), 100)],
-                ],
-                statistics: Statistics::new_unknown(&schema()),
-                projection: None,
-                limit: None,
-                table_partition_cols: vec![],
-                output_ordering,
-            },
+                ])
+                .with_output_ordering(output_ordering),
             false,
             b',',
             b'"',
@@ -3790,19 +3762,11 @@ pub(crate) mod tests {
 
             let plan = aggregate_exec_with_alias(
                 Arc::new(CsvExec::new(
-                    FileScanConfig {
-                        object_store_url: 
ObjectStoreUrl::parse("test:///").unwrap(),
-                        file_schema: schema(),
-                        file_groups: vec![vec![PartitionedFile::new(
-                            "x".to_string(),
-                            100,
-                        )]],
-                        statistics: Statistics::new_unknown(&schema()),
-                        projection: None,
-                        limit: None,
-                        table_partition_cols: vec![],
-                        output_ordering: vec![],
-                    },
+                    FileScanConfig::new(
+                        ObjectStoreUrl::parse("test:///").unwrap(),
+                        schema(),
+                    )
+                    .with_file(PartitionedFile::new("x".to_string(), 100)),
                     false,
                     b',',
                     b'"',
diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs 
b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
index fe1290e407..a15b9d4fbc 100644
--- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
@@ -1297,7 +1297,7 @@ mod tests {
     use crate::physical_plan::joins::StreamJoinPartitionMode;
 
     use arrow_schema::{DataType, Field, Schema, SortOptions};
-    use datafusion_common::{JoinType, ScalarValue, Statistics};
+    use datafusion_common::{JoinType, ScalarValue};
     use datafusion_execution::object_store::ObjectStoreUrl;
     use datafusion_execution::{SendableRecordBatchStream, TaskContext};
     use datafusion_expr::{
@@ -1676,16 +1676,12 @@ mod tests {
             Field::new("e", DataType::Int32, true),
         ]));
         Arc::new(CsvExec::new(
-            FileScanConfig {
-                object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
-                file_schema: schema.clone(),
-                file_groups: vec![vec![PartitionedFile::new("x".to_string(), 
100)]],
-                statistics: Statistics::new_unknown(&schema),
-                projection: Some(vec![0, 1, 2, 3, 4]),
-                limit: None,
-                table_partition_cols: vec![],
-                output_ordering: vec![vec![]],
-            },
+            FileScanConfig::new(
+                ObjectStoreUrl::parse("test:///").unwrap(),
+                schema.clone(),
+            )
+            .with_file(PartitionedFile::new("x".to_string(), 100))
+            .with_projection(Some(vec![0, 1, 2, 3, 4])),
             false,
             0,
             0,
@@ -1702,16 +1698,12 @@ mod tests {
             Field::new("d", DataType::Int32, true),
         ]));
         Arc::new(CsvExec::new(
-            FileScanConfig {
-                object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
-                file_schema: schema.clone(),
-                file_groups: vec![vec![PartitionedFile::new("x".to_string(), 
100)]],
-                statistics: Statistics::new_unknown(&schema),
-                projection: Some(vec![3, 2, 1]),
-                limit: None,
-                table_partition_cols: vec![],
-                output_ordering: vec![vec![]],
-            },
+            FileScanConfig::new(
+                ObjectStoreUrl::parse("test:///").unwrap(),
+                schema.clone(),
+            )
+            .with_file(PartitionedFile::new("x".to_string(), 100))
+            .with_projection(Some(vec![3, 2, 1])),
             false,
             0,
             0,
diff --git 
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
 
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index f69c0df32e..e3ef3b95aa 100644
--- 
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++ 
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -291,7 +291,7 @@ mod tests {
     use arrow::compute::SortOptions;
     use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
     use datafusion_common::tree_node::{TransformedResult, TreeNode};
-    use datafusion_common::{Result, Statistics};
+    use datafusion_common::Result;
     use datafusion_execution::object_store::ObjectStoreUrl;
     use datafusion_expr::{JoinType, Operator};
     use datafusion_physical_expr::expressions::{self, col, Column};
@@ -1491,19 +1491,13 @@ mod tests {
         let projection: Vec<usize> = vec![0, 2, 3];
 
         Arc::new(CsvExec::new(
-            FileScanConfig {
-                object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
-                file_schema: schema.clone(),
-                file_groups: vec![vec![PartitionedFile::new(
-                    "file_path".to_string(),
-                    100,
-                )]],
-                statistics: Statistics::new_unknown(schema),
-                projection: Some(projection),
-                limit: None,
-                table_partition_cols: vec![],
-                output_ordering: vec![sort_exprs],
-            },
+            FileScanConfig::new(
+                ObjectStoreUrl::parse("test:///").unwrap(),
+                schema.clone(),
+            )
+            .with_file(PartitionedFile::new("file_path".to_string(), 100))
+            .with_projection(Some(projection))
+            .with_output_ordering(vec![sort_exprs]),
             true,
             0,
             b'"',
diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs 
b/datafusion/core/src/physical_optimizer/test_utils.rs
index 7bc1eeb7c4..4d926847e4 100644
--- a/datafusion/core/src/physical_optimizer/test_utils.rs
+++ b/datafusion/core/src/physical_optimizer/test_utils.rs
@@ -41,7 +41,7 @@ use crate::prelude::{CsvReadOptions, SessionContext};
 
 use arrow_schema::{Schema, SchemaRef, SortOptions};
 use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
-use datafusion_common::{JoinType, Statistics};
+use datafusion_common::JoinType;
 use datafusion_execution::object_store::ObjectStoreUrl;
 use datafusion_expr::{AggregateFunction, WindowFrame, 
WindowFunctionDefinition};
 use datafusion_physical_expr::expressions::col;
@@ -275,16 +275,8 @@ pub fn sort_preserving_merge_exec(
 /// Create a non sorted parquet exec
 pub fn parquet_exec(schema: &SchemaRef) -> Arc<ParquetExec> {
     Arc::new(ParquetExec::new(
-        FileScanConfig {
-            object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
-            file_schema: schema.clone(),
-            file_groups: vec![vec![PartitionedFile::new("x".to_string(), 
100)]],
-            statistics: Statistics::new_unknown(schema),
-            projection: None,
-            limit: None,
-            table_partition_cols: vec![],
-            output_ordering: vec![],
-        },
+        FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), 
schema.clone())
+            .with_file(PartitionedFile::new("x".to_string(), 100)),
         None,
         None,
         Default::default(),
@@ -299,16 +291,9 @@ pub fn parquet_exec_sorted(
     let sort_exprs = sort_exprs.into_iter().collect();
 
     Arc::new(ParquetExec::new(
-        FileScanConfig {
-            object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
-            file_schema: schema.clone(),
-            file_groups: vec![vec![PartitionedFile::new("x".to_string(), 
100)]],
-            statistics: Statistics::new_unknown(schema),
-            projection: None,
-            limit: None,
-            table_partition_cols: vec![],
-            output_ordering: vec![sort_exprs],
-        },
+        FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), 
schema.clone())
+            .with_file(PartitionedFile::new("x".to_string(), 100))
+            .with_output_ordering(vec![sort_exprs]),
         None,
         None,
         Default::default(),
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index 1152c70d43..b03aaabcad 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -91,7 +91,7 @@ pub fn scan_partitioned_csv(partitions: usize, work_dir: 
&Path) -> Result<Arc<Cs
         FileCompressionType::UNCOMPRESSED,
         work_dir,
     )?;
-    let config = partitioned_csv_config(schema, file_groups)?;
+    let config = partitioned_csv_config(schema, file_groups);
     Ok(Arc::new(CsvExec::new(
         config,
         true,
@@ -196,17 +196,9 @@ pub fn partitioned_file_groups(
 pub fn partitioned_csv_config(
     schema: SchemaRef,
     file_groups: Vec<Vec<PartitionedFile>>,
-) -> Result<FileScanConfig> {
-    Ok(FileScanConfig {
-        object_store_url: ObjectStoreUrl::local_filesystem(),
-        file_schema: schema.clone(),
-        file_groups,
-        statistics: Statistics::new_unknown(&schema),
-        projection: None,
-        limit: None,
-        table_partition_cols: vec![],
-        output_ordering: vec![],
-    })
+) -> FileScanConfig {
+    FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema)
+        .with_file_groups(file_groups)
 }
 
 pub fn assert_fields_eq(plan: &LogicalPlan, expected: Vec<&str>) {
@@ -283,16 +275,9 @@ pub fn csv_exec_sorted(
     let sort_exprs = sort_exprs.into_iter().collect();
 
     Arc::new(CsvExec::new(
-        FileScanConfig {
-            object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
-            file_schema: schema.clone(),
-            file_groups: vec![vec![PartitionedFile::new("x".to_string(), 
100)]],
-            statistics: Statistics::new_unknown(schema),
-            projection: None,
-            limit: None,
-            table_partition_cols: vec![],
-            output_ordering: vec![sort_exprs],
-        },
+        FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), 
schema.clone())
+            .with_file(PartitionedFile::new("x".to_string(), 100))
+            .with_output_ordering(vec![sort_exprs]),
         false,
         0,
         0,
@@ -345,16 +330,9 @@ pub fn csv_exec_ordered(
     let sort_exprs = sort_exprs.into_iter().collect();
 
     Arc::new(CsvExec::new(
-        FileScanConfig {
-            object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
-            file_schema: schema.clone(),
-            file_groups: 
vec![vec![PartitionedFile::new("file_path".to_string(), 100)]],
-            statistics: Statistics::new_unknown(schema),
-            projection: None,
-            limit: None,
-            table_partition_cols: vec![],
-            output_ordering: vec![sort_exprs],
-        },
+        FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), 
schema.clone())
+            .with_file(PartitionedFile::new("file_path".to_string(), 100))
+            .with_output_ordering(vec![sort_exprs]),
         true,
         0,
         b'"',
diff --git a/datafusion/core/src/test_util/parquet.rs 
b/datafusion/core/src/test_util/parquet.rs
index 1d5668c7ec..df1d2c6f09 100644
--- a/datafusion/core/src/test_util/parquet.rs
+++ b/datafusion/core/src/test_util/parquet.rs
@@ -37,8 +37,6 @@ use crate::physical_plan::metrics::MetricsSet;
 use crate::physical_plan::ExecutionPlan;
 use crate::prelude::{Expr, SessionConfig, SessionContext};
 
-use datafusion_common::Statistics;
-
 use object_store::path::Path;
 use object_store::ObjectMeta;
 use parquet::arrow::ArrowWriter;
@@ -144,22 +142,15 @@ impl TestParquetFile {
         ctx: &SessionContext,
         maybe_filter: Option<Expr>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        let scan_config = FileScanConfig {
-            object_store_url: self.object_store_url.clone(),
-            file_schema: self.schema.clone(),
-            file_groups: vec![vec![PartitionedFile {
-                object_meta: self.object_meta.clone(),
-                partition_values: vec![],
-                range: None,
-                statistics: None,
-                extensions: None,
-            }]],
-            statistics: Statistics::new_unknown(&self.schema),
-            projection: None,
-            limit: None,
-            table_partition_cols: vec![],
-            output_ordering: vec![],
-        };
+        let scan_config =
+            FileScanConfig::new(self.object_store_url.clone(), 
self.schema.clone())
+                .with_file(PartitionedFile {
+                    object_meta: self.object_meta.clone(),
+                    partition_values: vec![],
+                    range: None,
+                    statistics: None,
+                    extensions: None,
+                });
 
         let df_schema = self.schema.clone().to_dfschema_ref()?;
 
diff --git a/datafusion/core/tests/parquet/custom_reader.rs 
b/datafusion/core/tests/parquet/custom_reader.rs
index e4f4d229c4..4f50c55c62 100644
--- a/datafusion/core/tests/parquet/custom_reader.rs
+++ b/datafusion/core/tests/parquet/custom_reader.rs
@@ -30,8 +30,8 @@ use datafusion::datasource::object_store::ObjectStoreUrl;
 use datafusion::datasource::physical_plan::{
     FileMeta, FileScanConfig, ParquetExec, ParquetFileMetrics, 
ParquetFileReaderFactory,
 };
+use datafusion::physical_plan::collect;
 use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
-use datafusion::physical_plan::{collect, Statistics};
 use datafusion::prelude::SessionContext;
 use datafusion_common::Result;
 
@@ -63,7 +63,7 @@ async fn 
route_data_access_ops_to_parquet_file_reader_factory() {
     let file_schema = batch.schema().clone();
     let (in_memory_object_store, parquet_files_meta) =
         store_parquet_in_memory(vec![batch]).await;
-    let file_groups = parquet_files_meta
+    let file_group = parquet_files_meta
         .into_iter()
         .map(|meta| PartitionedFile {
             object_meta: meta,
@@ -76,17 +76,12 @@ async fn 
route_data_access_ops_to_parquet_file_reader_factory() {
 
     // prepare the scan
     let parquet_exec = ParquetExec::new(
-        FileScanConfig {
+        FileScanConfig::new(
             // just any url that doesn't point to in memory object store
-            object_store_url: ObjectStoreUrl::local_filesystem(),
-            file_groups: vec![file_groups],
-            statistics: Statistics::new_unknown(&file_schema),
+            ObjectStoreUrl::local_filesystem(),
             file_schema,
-            projection: None,
-            limit: None,
-            table_partition_cols: vec![],
-            output_ordering: vec![],
-        },
+        )
+        .with_file_group(file_group),
         None,
         None,
         Default::default(),
diff --git a/datafusion/core/tests/parquet/page_pruning.rs 
b/datafusion/core/tests/parquet/page_pruning.rs
index 8f42f21834..2e9cda40c3 100644
--- a/datafusion/core/tests/parquet/page_pruning.rs
+++ b/datafusion/core/tests/parquet/page_pruning.rs
@@ -27,7 +27,7 @@ use datafusion::execution::context::SessionState;
 use datafusion::physical_plan::metrics::MetricValue;
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion::prelude::SessionContext;
-use datafusion_common::{ScalarValue, Statistics, ToDFSchema};
+use datafusion_common::{ScalarValue, ToDFSchema};
 use datafusion_expr::execution_props::ExecutionProps;
 use datafusion_expr::{col, lit, Expr};
 use datafusion_physical_expr::create_physical_expr;
@@ -71,17 +71,7 @@ async fn get_parquet_exec(state: &SessionState, filter: 
Expr) -> ParquetExec {
     let predicate = create_physical_expr(&filter, &df_schema, 
&execution_props).unwrap();
 
     let parquet_exec = ParquetExec::new(
-        FileScanConfig {
-            object_store_url,
-            file_groups: vec![vec![partitioned_file]],
-            file_schema: schema.clone(),
-            statistics: Statistics::new_unknown(&schema),
-            // file has 10 cols so index 12 should be month
-            projection: None,
-            limit: None,
-            table_partition_cols: vec![],
-            output_ordering: vec![],
-        },
+        FileScanConfig::new(object_store_url, 
schema).with_file(partitioned_file),
         Some(predicate),
         None,
         Default::default(),
diff --git a/datafusion/core/tests/parquet/schema_adapter.rs 
b/datafusion/core/tests/parquet/schema_adapter.rs
index 10c4e8a4c0..ead2884e43 100644
--- a/datafusion/core/tests/parquet/schema_adapter.rs
+++ b/datafusion/core/tests/parquet/schema_adapter.rs
@@ -30,7 +30,7 @@ use datafusion::datasource::object_store::ObjectStoreUrl;
 use datafusion::datasource::physical_plan::{
     FileScanConfig, ParquetExec, SchemaAdapter, SchemaAdapterFactory, 
SchemaMapper,
 };
-use datafusion::physical_plan::{collect, Statistics};
+use datafusion::physical_plan::collect;
 use datafusion::prelude::SessionContext;
 
 use datafusion::datasource::listing::PartitionedFile;
@@ -83,16 +83,8 @@ async fn can_override_schema_adapter() {
 
     // prepare the scan
     let parquet_exec = ParquetExec::new(
-        FileScanConfig {
-            object_store_url: ObjectStoreUrl::local_filesystem(),
-            file_groups: vec![vec![partitioned_file]],
-            statistics: Statistics::new_unknown(&schema),
-            file_schema: schema,
-            projection: None,
-            limit: None,
-            table_partition_cols: vec![],
-            output_ordering: vec![],
-        },
+        FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema)
+            .with_file(partitioned_file),
         None,
         None,
         Default::default(),
diff --git a/datafusion/core/tests/parquet/schema_coercion.rs 
b/datafusion/core/tests/parquet/schema_coercion.rs
index 88f795d2a4..ac51b4f712 100644
--- a/datafusion/core/tests/parquet/schema_coercion.rs
+++ b/datafusion/core/tests/parquet/schema_coercion.rs
@@ -26,7 +26,7 @@ use datafusion::assert_batches_sorted_eq;
 use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
 use datafusion::physical_plan::collect;
 use datafusion::prelude::SessionContext;
-use datafusion_common::{Result, Statistics};
+use datafusion_common::Result;
 use datafusion_execution::object_store::ObjectStoreUrl;
 
 use object_store::path::Path;
@@ -51,7 +51,7 @@ async fn multi_parquet_coercion() {
     let batch2 = RecordBatch::try_from_iter(vec![("c2", c2), ("c3", 
c3)]).unwrap();
 
     let (meta, _files) = store_parquet(vec![batch1, batch2]).await.unwrap();
-    let file_groups = meta.into_iter().map(Into::into).collect();
+    let file_group = meta.into_iter().map(Into::into).collect();
 
     // cast c1 to utf8, c2 to int32, c3 to float64
     let file_schema = Arc::new(Schema::new(vec![
@@ -60,16 +60,8 @@ async fn multi_parquet_coercion() {
         Field::new("c3", DataType::Float64, true),
     ]));
     let parquet_exec = ParquetExec::new(
-        FileScanConfig {
-            object_store_url: ObjectStoreUrl::local_filesystem(),
-            file_groups: vec![file_groups],
-            statistics: Statistics::new_unknown(&file_schema),
-            file_schema,
-            projection: None,
-            limit: None,
-            table_partition_cols: vec![],
-            output_ordering: vec![],
-        },
+        FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema)
+            .with_file_group(file_group),
         None,
         None,
         Default::default(),
@@ -115,7 +107,7 @@ async fn multi_parquet_coercion_projection() {
         RecordBatch::try_from_iter(vec![("c2", c2), ("c1", c1s), ("c3", 
c3)]).unwrap();
 
     let (meta, _files) = store_parquet(vec![batch1, batch2]).await.unwrap();
-    let file_groups = meta.into_iter().map(Into::into).collect();
+    let file_group = meta.into_iter().map(Into::into).collect();
 
     // cast c1 to utf8, c2 to int32, c3 to float64
     let file_schema = Arc::new(Schema::new(vec![
@@ -124,16 +116,9 @@ async fn multi_parquet_coercion_projection() {
         Field::new("c3", DataType::Float64, true),
     ]));
     let parquet_exec = ParquetExec::new(
-        FileScanConfig {
-            object_store_url: ObjectStoreUrl::local_filesystem(),
-            file_groups: vec![file_groups],
-            statistics: Statistics::new_unknown(&file_schema),
-            file_schema,
-            projection: Some(vec![1, 0, 2]),
-            limit: None,
-            table_partition_cols: vec![],
-            output_ordering: vec![],
-        },
+        FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema)
+            .with_file_group(file_group)
+            .with_projection(Some(vec![1, 0, 2])),
         None,
         None,
         Default::default(),
diff --git a/datafusion/execution/src/object_store.rs 
b/datafusion/execution/src/object_store.rs
index 126f83f7e2..7697c01d63 100644
--- a/datafusion/execution/src/object_store.rs
+++ b/datafusion/execution/src/object_store.rs
@@ -51,7 +51,14 @@ impl ObjectStoreUrl {
         Ok(Self { url: parsed })
     }
 
-    /// An [`ObjectStoreUrl`] for the local filesystem
+    /// An [`ObjectStoreUrl`] for the local filesystem (`file://`)
+    ///
+    /// # Example
+    /// ```
+    /// # use datafusion_execution::object_store::ObjectStoreUrl;
+    /// let local_fs = ObjectStoreUrl::parse("file://").unwrap();
+    /// assert_eq!(local_fs, ObjectStoreUrl::local_filesystem())
+    /// ```
     pub fn local_filesystem() -> Self {
         Self::parse("file://").unwrap()
     }
diff --git a/datafusion/substrait/src/physical_plan/consumer.rs 
b/datafusion/substrait/src/physical_plan/consumer.rs
index 50b08e7793..68f8b02b0f 100644
--- a/datafusion/substrait/src/physical_plan/consumer.rs
+++ b/datafusion/substrait/src/physical_plan/consumer.rs
@@ -24,7 +24,7 @@ use datafusion::datasource::listing::PartitionedFile;
 use datafusion::datasource::object_store::ObjectStoreUrl;
 use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
 use datafusion::error::{DataFusionError, Result};
-use datafusion::physical_plan::{ExecutionPlan, Statistics};
+use datafusion::physical_plan::ExecutionPlan;
 use datafusion::prelude::SessionContext;
 
 use async_recursion::async_recursion;
@@ -104,16 +104,11 @@ pub async fn from_substrait_rel(
                         file_groups[part_index].push(partitioned_file)
                     }
 
-                    let mut base_config = FileScanConfig {
-                        object_store_url: ObjectStoreUrl::local_filesystem(),
-                        file_schema: Arc::new(Schema::empty()),
-                        file_groups,
-                        statistics: Statistics::new_unknown(&Schema::empty()),
-                        projection: None,
-                        limit: None,
-                        table_partition_cols: vec![],
-                        output_ordering: vec![],
-                    };
+                    let mut base_config = FileScanConfig::new(
+                        ObjectStoreUrl::local_filesystem(),
+                        Arc::new(Schema::empty()),
+                    )
+                    .with_file_groups(file_groups);
 
                     if let Some(MaskExpression { select, .. }) = 
&read.projection {
                         if let Some(projection) = &select.as_ref() {
diff --git a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs 
b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
index 70887e3934..aca0443194 100644
--- a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
@@ -23,7 +23,7 @@ use datafusion::datasource::listing::PartitionedFile;
 use datafusion::datasource::object_store::ObjectStoreUrl;
 use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
 use datafusion::error::Result;
-use datafusion::physical_plan::{displayable, ExecutionPlan, Statistics};
+use datafusion::physical_plan::{displayable, ExecutionPlan};
 use datafusion::prelude::SessionContext;
 use datafusion_substrait::physical_plan::{consumer, producer};
 
@@ -31,25 +31,20 @@ use substrait::proto::extensions;
 
 #[tokio::test]
 async fn parquet_exec() -> Result<()> {
-    let scan_config = FileScanConfig {
-        object_store_url: ObjectStoreUrl::local_filesystem(),
-        file_schema: Arc::new(Schema::empty()),
-        file_groups: vec![
-            vec![PartitionedFile::new(
-                "file://foo/part-0.parquet".to_string(),
-                123,
-            )],
-            vec![PartitionedFile::new(
-                "file://foo/part-1.parquet".to_string(),
-                123,
-            )],
-        ],
-        statistics: Statistics::new_unknown(&Schema::empty()),
-        projection: None,
-        limit: None,
-        table_partition_cols: vec![],
-        output_ordering: vec![],
-    };
+    let scan_config = FileScanConfig::new(
+        ObjectStoreUrl::local_filesystem(),
+        Arc::new(Schema::empty()),
+    )
+    .with_file_groups(vec![
+        vec![PartitionedFile::new(
+            "file://foo/part-0.parquet".to_string(),
+            123,
+        )],
+        vec![PartitionedFile::new(
+            "file://foo/part-1.parquet".to_string(),
+            123,
+        )],
+    ]);
     let parquet_exec: Arc<dyn ExecutionPlan> = Arc::new(ParquetExec::new(
         scan_config,
         None,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to