This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 4709fc65f7 Add `FileScanConfig::new()` API (#10623)
4709fc65f7 is described below
commit 4709fc65f7debc143696fa3a23ab6569ec8a383c
Author: Andrew Lamb <[email protected]>
AuthorDate: Sat May 25 07:00:13 2024 -0400
Add `FileScanConfig::new()` API (#10623)
* Add FileScanConfig::new() API, update code to use new API
* Remove add_* api
---
datafusion-examples/examples/csv_opener.rs | 16 +--
datafusion-examples/examples/json_opener.rs | 16 +--
datafusion/core/src/datasource/file_format/mod.rs | 15 +--
datafusion/core/src/datasource/listing/table.rs | 17 ++-
.../core/src/datasource/physical_plan/avro.rs | 55 ++++-----
.../core/src/datasource/physical_plan/csv.rs | 12 +-
.../datasource/physical_plan/file_scan_config.rs | 124 +++++++++++++++++++--
.../src/datasource/physical_plan/file_stream.rs | 16 +--
.../core/src/datasource/physical_plan/json.rs | 52 ++-------
.../src/datasource/physical_plan/parquet/mod.rs | 56 +++-------
.../combine_partial_final_agg.rs | 17 +--
.../src/physical_optimizer/enforce_distribution.rs | 74 ++++--------
.../src/physical_optimizer/projection_pushdown.rs | 34 +++---
.../replace_with_order_preserving_variants.rs | 22 ++--
.../core/src/physical_optimizer/test_utils.rs | 27 +----
datafusion/core/src/test/mod.rs | 42 ++-----
datafusion/core/src/test_util/parquet.rs | 27 ++---
datafusion/core/tests/parquet/custom_reader.rs | 17 +--
datafusion/core/tests/parquet/page_pruning.rs | 14 +--
datafusion/core/tests/parquet/schema_adapter.rs | 14 +--
datafusion/core/tests/parquet/schema_coercion.rs | 31 ++----
datafusion/execution/src/object_store.rs | 9 +-
datafusion/substrait/src/physical_plan/consumer.rs | 17 +--
.../tests/cases/roundtrip_physical_plan.rs | 35 +++---
24 files changed, 316 insertions(+), 443 deletions(-)
diff --git a/datafusion-examples/examples/csv_opener.rs
b/datafusion-examples/examples/csv_opener.rs
index 96753c8c52..d02aa9b308 100644
--- a/datafusion-examples/examples/csv_opener.rs
+++ b/datafusion-examples/examples/csv_opener.rs
@@ -17,7 +17,6 @@
use std::{sync::Arc, vec};
-use datafusion::common::Statistics;
use datafusion::{
assert_batches_eq,
datasource::{
@@ -58,16 +57,11 @@ async fn main() -> Result<()> {
let path = std::path::Path::new(&path).canonicalize()?;
- let scan_config = FileScanConfig {
- object_store_url: ObjectStoreUrl::local_filesystem(),
- file_schema: schema.clone(),
- file_groups:
vec![vec![PartitionedFile::new(path.display().to_string(), 10)]],
- statistics: Statistics::new_unknown(&schema),
- projection: Some(vec![12, 0]),
- limit: Some(5),
- table_partition_cols: vec![],
- output_ordering: vec![],
- };
+ let scan_config =
+ FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema.clone())
+ .with_projection(Some(vec![12, 0]))
+ .with_limit(Some(5))
+ .with_file(PartitionedFile::new(path.display().to_string(), 10));
let result =
FileStream::new(&scan_config, 0, opener,
&ExecutionPlanMetricsSet::new())
diff --git a/datafusion-examples/examples/json_opener.rs
b/datafusion-examples/examples/json_opener.rs
index ee33f969ca..e32fb9b096 100644
--- a/datafusion-examples/examples/json_opener.rs
+++ b/datafusion-examples/examples/json_opener.rs
@@ -29,7 +29,6 @@ use datafusion::{
error::Result,
physical_plan::metrics::ExecutionPlanMetricsSet,
};
-use datafusion_common::Statistics;
use futures::StreamExt;
use object_store::ObjectStore;
@@ -61,16 +60,11 @@ async fn main() -> Result<()> {
Arc::new(object_store),
);
- let scan_config = FileScanConfig {
- object_store_url: ObjectStoreUrl::local_filesystem(),
- file_schema: schema.clone(),
- file_groups: vec![vec![PartitionedFile::new(path.to_string(), 10)]],
- statistics: Statistics::new_unknown(&schema),
- projection: Some(vec![1, 0]),
- limit: Some(5),
- table_partition_cols: vec![],
- output_ordering: vec![],
- };
+ let scan_config =
+ FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema.clone())
+ .with_projection(Some(vec![1, 0]))
+ .with_limit(Some(5))
+ .with_file(PartitionedFile::new(path.to_string(), 10));
let result =
FileStream::new(&scan_config, 0, opener,
&ExecutionPlanMetricsSet::new())
diff --git a/datafusion/core/src/datasource/file_format/mod.rs
b/datafusion/core/src/datasource/file_format/mod.rs
index 243a91b743..7cc3421ebb 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -154,16 +154,11 @@ pub(crate) mod test_util {
let exec = format
.create_physical_plan(
state,
- FileScanConfig {
- object_store_url: ObjectStoreUrl::local_filesystem(),
- file_schema,
- file_groups,
- statistics,
- projection,
- limit,
- table_partition_cols: vec![],
- output_ordering: vec![],
- },
+ FileScanConfig::new(ObjectStoreUrl::local_filesystem(),
file_schema)
+ .with_file_groups(file_groups)
+ .with_statistics(statistics)
+ .with_projection(projection)
+ .with_limit(limit),
None,
)
.await?;
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index cf70894806..746e4b8e33 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -805,16 +805,13 @@ impl TableProvider for ListingTable {
.format
.create_physical_plan(
state,
- FileScanConfig {
- object_store_url,
- file_schema: Arc::clone(&self.file_schema),
- file_groups: partitioned_file_lists,
- statistics,
- projection: projection.cloned(),
- limit,
- output_ordering,
- table_partition_cols,
- },
+ FileScanConfig::new(object_store_url,
Arc::clone(&self.file_schema))
+ .with_file_groups(partitioned_file_lists)
+ .with_statistics(statistics)
+ .with_projection(projection.cloned())
+ .with_limit(limit)
+ .with_output_ordering(output_ordering)
+ .with_table_partition_cols(table_partition_cols),
filters.as_ref(),
)
.await
diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs
b/datafusion/core/src/datasource/physical_plan/avro.rs
index a8a29e9bba..4bb8f28860 100644
--- a/datafusion/core/src/datasource/physical_plan/avro.rs
+++ b/datafusion/core/src/datasource/physical_plan/avro.rs
@@ -271,16 +271,11 @@ mod tests {
.infer_schema(&state, &store, &[meta.clone()])
.await?;
- let avro_exec = AvroExec::new(FileScanConfig {
- object_store_url: ObjectStoreUrl::local_filesystem(),
- file_groups: vec![vec![meta.into()]],
- statistics: Statistics::new_unknown(&file_schema),
- file_schema,
- projection: Some(vec![0, 1, 2]),
- limit: None,
- table_partition_cols: vec![],
- output_ordering: vec![],
- });
+ let avro_exec = AvroExec::new(
+ FileScanConfig::new(ObjectStoreUrl::local_filesystem(),
file_schema)
+ .with_file(meta.into())
+ .with_projection(Some(vec![0, 1, 2])),
+ );
assert_eq!(
avro_exec
.properties()
@@ -348,16 +343,11 @@ mod tests {
// Include the missing column in the projection
let projection = Some(vec![0, 1, 2, actual_schema.fields().len()]);
- let avro_exec = AvroExec::new(FileScanConfig {
- object_store_url,
- file_groups: vec![vec![meta.into()]],
- statistics: Statistics::new_unknown(&file_schema),
- file_schema,
- projection,
- limit: None,
- table_partition_cols: vec![],
- output_ordering: vec![],
- });
+ let avro_exec = AvroExec::new(
+ FileScanConfig::new(object_store_url, file_schema)
+ .with_file(meta.into())
+ .with_projection(projection),
+ );
assert_eq!(
avro_exec
.properties()
@@ -422,18 +412,19 @@ mod tests {
let mut partitioned_file = PartitionedFile::from(meta);
partitioned_file.partition_values =
vec![ScalarValue::from("2021-10-26")];
- let avro_exec = AvroExec::new(FileScanConfig {
- // select specific columns of the files as well as the partitioning
- // column which is supposed to be the last column in the table
schema.
- projection: Some(vec![0, 1, file_schema.fields().len(), 2]),
- object_store_url,
- file_groups: vec![vec![partitioned_file]],
- statistics: Statistics::new_unknown(&file_schema),
- file_schema,
- limit: None,
- table_partition_cols: vec![Field::new("date", DataType::Utf8,
false)],
- output_ordering: vec![],
- });
+ let projection = Some(vec![0, 1, file_schema.fields().len(), 2]);
+ let avro_exec = AvroExec::new(
+ FileScanConfig::new(object_store_url, file_schema)
+ // select specific columns of the files as well as the
partitioning
+ // column which is supposed to be the last column in the table
schema.
+ .with_projection(projection)
+ .with_file(partitioned_file)
+ .with_table_partition_cols(vec![Field::new(
+ "date",
+ DataType::Utf8,
+ false,
+ )]),
+ );
assert_eq!(
avro_exec
.properties()
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs
b/datafusion/core/src/datasource/physical_plan/csv.rs
index a266b9b014..679f6c0109 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -561,7 +561,7 @@ mod tests {
tmp_dir.path(),
)?;
- let mut config = partitioned_csv_config(file_schema, file_groups)?;
+ let mut config = partitioned_csv_config(file_schema, file_groups);
config.projection = Some(vec![0, 2, 4]);
let csv = CsvExec::new(
@@ -627,7 +627,7 @@ mod tests {
tmp_dir.path(),
)?;
- let mut config = partitioned_csv_config(file_schema, file_groups)?;
+ let mut config = partitioned_csv_config(file_schema, file_groups);
config.projection = Some(vec![4, 0, 2]);
let csv = CsvExec::new(
@@ -693,7 +693,7 @@ mod tests {
tmp_dir.path(),
)?;
- let mut config = partitioned_csv_config(file_schema, file_groups)?;
+ let mut config = partitioned_csv_config(file_schema, file_groups);
config.limit = Some(5);
let csv = CsvExec::new(
@@ -756,7 +756,7 @@ mod tests {
tmp_dir.path(),
)?;
- let mut config = partitioned_csv_config(file_schema, file_groups)?;
+ let mut config = partitioned_csv_config(file_schema, file_groups);
config.limit = Some(5);
let csv = CsvExec::new(
@@ -809,7 +809,7 @@ mod tests {
tmp_dir.path(),
)?;
- let mut config = partitioned_csv_config(file_schema, file_groups)?;
+ let mut config = partitioned_csv_config(file_schema, file_groups);
// Add partition columns
config.table_partition_cols = vec![Field::new("date", DataType::Utf8,
false)];
@@ -914,7 +914,7 @@ mod tests {
)
.unwrap();
- let config = partitioned_csv_config(file_schema, file_groups).unwrap();
+ let config = partitioned_csv_config(file_schema, file_groups);
let csv = CsvExec::new(
config,
true,
diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
index 4de7eb136f..f5d3c7a641 100644
--- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
@@ -64,12 +64,41 @@ pub fn wrap_partition_value_in_dict(val: ScalarValue) ->
ScalarValue {
/// The base configurations to provide when creating a physical plan for
/// any given file format.
+///
+/// # Example
+/// ```
+/// # use std::sync::Arc;
+/// # use arrow_schema::Schema;
+/// use datafusion::datasource::listing::PartitionedFile;
+/// # use datafusion::datasource::physical_plan::FileScanConfig;
+/// # use datafusion_execution::object_store::ObjectStoreUrl;
+/// # let file_schema = Arc::new(Schema::empty());
+/// // create FileScan config for reading data from file://
+/// let object_store_url = ObjectStoreUrl::local_filesystem();
+/// let config = FileScanConfig::new(object_store_url, file_schema)
+/// .with_limit(Some(1000)) // read only the first 1000 records
+/// .with_projection(Some(vec![2, 3])) // project columns 2 and 3
+/// // Read /tmp/file1.parquet with known size of 1234 bytes in a single
group
+/// .with_file(PartitionedFile::new("file1.parquet", 1234))
+/// // Read /tmp/file2.parquet 56 bytes and /tmp/file3.parquet 78 bytes
+/// // in a single row group
+/// .with_file_group(vec![
+/// PartitionedFile::new("file2.parquet", 56),
+/// PartitionedFile::new("file3.parquet", 78),
+/// ]);
+/// ```
#[derive(Clone)]
pub struct FileScanConfig {
/// Object store URL, used to get an [`ObjectStore`] instance from
/// [`RuntimeEnv::object_store`]
///
+ /// This `ObjectStoreUrl` should be the prefix of the absolute url for
files
+ /// as `file://` or `s3://my_bucket`. It should not include the path to the
+ /// file itself. The relevant URL prefix must be registered via
+ /// [`RuntimeEnv::register_object_store`]
+ ///
/// [`ObjectStore`]: object_store::ObjectStore
+ /// [`RuntimeEnv::register_object_store`]:
datafusion_execution::runtime_env::RuntimeEnv::register_object_store
/// [`RuntimeEnv::object_store`]:
datafusion_execution::runtime_env::RuntimeEnv::object_store
pub object_store_url: ObjectStoreUrl,
/// Schema before `projection` is applied. It contains the all columns
that may
@@ -87,6 +116,7 @@ pub struct FileScanConfig {
/// sequentially, one after the next.
pub file_groups: Vec<Vec<PartitionedFile>>,
/// Estimated overall statistics of the files, taking `filters` into
account.
+ /// Defaults to [`Statistics::new_unknown`].
pub statistics: Statistics,
/// Columns on which to project the data. Indexes that are higher than the
/// number of columns of `file_schema` refer to `table_partition_cols`.
@@ -101,6 +131,86 @@ pub struct FileScanConfig {
}
impl FileScanConfig {
+ /// Create a new `FileScanConfig` with default settings for scanning files.
+ ///
+ /// See example on [`FileScanConfig`]
+ ///
+ /// No file groups are added by default. See [`Self::with_file`],
[`Self::with_file_group]` and
+ /// [`Self::with_file_groups`].
+ ///
+ /// # Parameters:
+ /// * `object_store_url`: See [`Self::object_store_url`]
+ /// * `file_schema`: See [`Self::file_schema`]
+ pub fn new(object_store_url: ObjectStoreUrl, file_schema: SchemaRef) ->
Self {
+ let statistics = Statistics::new_unknown(&file_schema);
+ Self {
+ object_store_url,
+ file_schema,
+ file_groups: vec![],
+ statistics,
+ projection: None,
+ limit: None,
+ table_partition_cols: vec![],
+ output_ordering: vec![],
+ }
+ }
+
+ /// Set the statistics of the files
+ pub fn with_statistics(mut self, statistics: Statistics) -> Self {
+ self.statistics = statistics;
+ self
+ }
+
+ /// Set the projection of the files
+ pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
+ self.projection = projection;
+ self
+ }
+
+ /// Set the limit of the files
+ pub fn with_limit(mut self, limit: Option<usize>) -> Self {
+ self.limit = limit;
+ self
+ }
+
+ /// Add a file as a single group
+ ///
+ /// See [Self::file_groups] for more information.
+ pub fn with_file(self, file: PartitionedFile) -> Self {
+ self.with_file_group(vec![file])
+ }
+
+ /// Add the file groups
+ ///
+ /// See [Self::file_groups] for more information.
+ pub fn with_file_groups(
+ mut self,
+ mut file_groups: Vec<Vec<PartitionedFile>>,
+ ) -> Self {
+ self.file_groups.append(&mut file_groups);
+ self
+ }
+
+ /// Add a new file group
+ ///
+ /// See [Self::file_groups] for more information
+ pub fn with_file_group(mut self, file_group: Vec<PartitionedFile>) -> Self
{
+ self.file_groups.push(file_group);
+ self
+ }
+
+ /// Set the partitioning columns of the files
+ pub fn with_table_partition_cols(mut self, table_partition_cols:
Vec<Field>) -> Self {
+ self.table_partition_cols = table_partition_cols;
+ self
+ }
+
+ /// Set the output ordering of the files
+ pub fn with_output_ordering(mut self, output_ordering: Vec<LexOrdering>)
-> Self {
+ self.output_ordering = output_ordering;
+ self
+ }
+
/// Project the schema and the statistics on the given column indices
pub fn project(&self) -> (SchemaRef, Statistics, Vec<LexOrdering>) {
if self.projection.is_none() && self.table_partition_cols.is_empty() {
@@ -1117,16 +1227,10 @@ mod tests {
statistics: Statistics,
table_partition_cols: Vec<Field>,
) -> FileScanConfig {
- FileScanConfig {
- file_schema,
- file_groups: vec![vec![]],
- limit: None,
- object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
- projection,
- statistics,
- table_partition_cols,
- output_ordering: vec![],
- }
+ FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(),
file_schema)
+ .with_projection(projection)
+ .with_statistics(statistics)
+ .with_table_partition_cols(table_partition_cols)
}
/// Convert partition columns from Vec<String DataType> to Vec<Field>
diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs
b/datafusion/core/src/datasource/physical_plan/file_stream.rs
index 9732d08c7a..6f354b31ae 100644
--- a/datafusion/core/src/datasource/physical_plan/file_stream.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs
@@ -524,7 +524,7 @@ mod tests {
use crate::test::{make_partition, object_store::register_test_store};
use arrow_schema::Schema;
- use datafusion_common::{internal_err, Statistics};
+ use datafusion_common::internal_err;
/// Test `FileOpener` which will simulate errors during file opening or
scanning
#[derive(Default)]
@@ -643,16 +643,12 @@ mod tests {
let on_error = self.on_error;
- let config = FileScanConfig {
- object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
- statistics: Statistics::new_unknown(&file_schema),
+ let config = FileScanConfig::new(
+ ObjectStoreUrl::parse("test:///").unwrap(),
file_schema,
- file_groups: vec![file_group],
- projection: None,
- limit: self.limit,
- table_partition_cols: vec![],
- output_ordering: vec![],
- };
+ )
+ .with_file_group(file_group)
+ .with_limit(self.limit);
let metrics_set = ExecutionPlanMetricsSet::new();
let file_stream = FileStream::new(&config, 0, self.opener,
&metrics_set)
.unwrap()
diff --git a/datafusion/core/src/datasource/physical_plan/json.rs
b/datafusion/core/src/datasource/physical_plan/json.rs
index 4728069f19..5e8ba52659 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -521,16 +521,9 @@ mod tests {
prepare_store(&state, file_compression_type.to_owned(),
tmp_dir.path()).await;
let exec = NdJsonExec::new(
- FileScanConfig {
- object_store_url,
- file_groups,
- statistics: Statistics::new_unknown(&file_schema),
- file_schema,
- projection: None,
- limit: Some(3),
- table_partition_cols: vec![],
- output_ordering: vec![],
- },
+ FileScanConfig::new(object_store_url, file_schema)
+ .with_file_groups(file_groups)
+ .with_limit(Some(3)),
file_compression_type.to_owned(),
);
@@ -599,16 +592,9 @@ mod tests {
let missing_field_idx = file_schema.fields.len() - 1;
let exec = NdJsonExec::new(
- FileScanConfig {
- object_store_url,
- file_groups,
- statistics: Statistics::new_unknown(&file_schema),
- file_schema,
- projection: None,
- limit: Some(3),
- table_partition_cols: vec![],
- output_ordering: vec![],
- },
+ FileScanConfig::new(object_store_url, file_schema)
+ .with_file_groups(file_groups)
+ .with_limit(Some(3)),
file_compression_type.to_owned(),
);
@@ -646,16 +632,9 @@ mod tests {
prepare_store(&state, file_compression_type.to_owned(),
tmp_dir.path()).await;
let exec = NdJsonExec::new(
- FileScanConfig {
- object_store_url,
- file_groups,
- statistics: Statistics::new_unknown(&file_schema),
- file_schema,
- projection: Some(vec![0, 2]),
- limit: None,
- table_partition_cols: vec![],
- output_ordering: vec![],
- },
+ FileScanConfig::new(object_store_url, file_schema)
+ .with_file_groups(file_groups)
+ .with_projection(Some(vec![0, 2])),
file_compression_type.to_owned(),
);
let inferred_schema = exec.schema();
@@ -698,16 +677,9 @@ mod tests {
prepare_store(&state, file_compression_type.to_owned(),
tmp_dir.path()).await;
let exec = NdJsonExec::new(
- FileScanConfig {
- object_store_url,
- file_groups,
- statistics: Statistics::new_unknown(&file_schema),
- file_schema,
- projection: Some(vec![3, 0, 2]),
- limit: None,
- table_partition_cols: vec![],
- output_ordering: vec![],
- },
+ FileScanConfig::new(object_store_url, file_schema)
+ .with_file_groups(file_groups)
+ .with_projection(Some(vec![3, 0, 2])),
file_compression_type.to_owned(),
);
let inferred_schema = exec.schema();
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index 410413ebd7..17cb6a66c7 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -925,23 +925,16 @@ mod tests {
// files with multiple pages
let multi_page = page_index_predicate;
let (meta, _files) = store_parquet(batches,
multi_page).await.unwrap();
- let file_groups = meta.into_iter().map(Into::into).collect();
+ let file_group = meta.into_iter().map(Into::into).collect();
// set up predicate (this is normally done by a layer higher up)
let predicate = predicate.map(|p| logical2physical(&p,
&file_schema));
// prepare the scan
let mut parquet_exec = ParquetExec::new(
- FileScanConfig {
- object_store_url: ObjectStoreUrl::local_filesystem(),
- file_groups: vec![file_groups],
- statistics: Statistics::new_unknown(&file_schema),
- file_schema,
- projection,
- limit: None,
- table_partition_cols: vec![],
- output_ordering: vec![],
- },
+ FileScanConfig::new(ObjectStoreUrl::local_filesystem(),
file_schema)
+ .with_file_group(file_group)
+ .with_projection(projection),
predicate,
None,
Default::default(),
@@ -1590,16 +1583,8 @@ mod tests {
file_schema: SchemaRef,
) -> Result<()> {
let parquet_exec = ParquetExec::new(
- FileScanConfig {
- object_store_url: ObjectStoreUrl::local_filesystem(),
- file_groups,
- statistics: Statistics::new_unknown(&file_schema),
- file_schema,
- projection: None,
- limit: None,
- table_partition_cols: vec![],
- output_ordering: vec![],
- },
+ FileScanConfig::new(ObjectStoreUrl::local_filesystem(),
file_schema)
+ .with_file_groups(file_groups),
None,
None,
Default::default(),
@@ -1700,15 +1685,11 @@ mod tests {
]);
let parquet_exec = ParquetExec::new(
- FileScanConfig {
- object_store_url,
- file_groups: vec![vec![partitioned_file]],
- file_schema: schema.clone(),
- statistics: Statistics::new_unknown(&schema),
+ FileScanConfig::new(object_store_url, schema.clone())
+ .with_file(partitioned_file)
// file has 10 cols so index 12 should be month and 13 should
be day
- projection: Some(vec![0, 1, 2, 12, 13]),
- limit: None,
- table_partition_cols: vec![
+ .with_projection(Some(vec![0, 1, 2, 12, 13]))
+ .with_table_partition_cols(vec![
Field::new("year", DataType::Utf8, false),
Field::new("month", DataType::UInt8, false),
Field::new(
@@ -1719,9 +1700,7 @@ mod tests {
),
false,
),
- ],
- output_ordering: vec![],
- },
+ ]),
None,
None,
Default::default(),
@@ -1779,17 +1758,10 @@ mod tests {
extensions: None,
};
+ let file_schema = Arc::new(Schema::empty());
let parquet_exec = ParquetExec::new(
- FileScanConfig {
- object_store_url: ObjectStoreUrl::local_filesystem(),
- file_groups: vec![vec![partitioned_file]],
- file_schema: Arc::new(Schema::empty()),
- statistics: Statistics::new_unknown(&Schema::empty()),
- projection: None,
- limit: None,
- table_partition_cols: vec![],
- output_ordering: vec![],
- },
+ FileScanConfig::new(ObjectStoreUrl::local_filesystem(),
file_schema)
+ .with_file(partitioned_file),
None,
None,
Default::default(),
diff --git
a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
index e41e4dd316..b93f4012b0 100644
--- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
+++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
@@ -203,7 +203,7 @@ mod tests {
use crate::datasource::physical_plan::{FileScanConfig, ParquetExec};
use crate::physical_plan::expressions::lit;
use crate::physical_plan::repartition::RepartitionExec;
- use crate::physical_plan::{displayable, Partitioning, Statistics};
+ use crate::physical_plan::{displayable, Partitioning};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_physical_expr::expressions::{col, Count, Sum};
@@ -246,16 +246,11 @@ mod tests {
fn parquet_exec(schema: &SchemaRef) -> Arc<ParquetExec> {
Arc::new(ParquetExec::new(
- FileScanConfig {
- object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
- file_schema: schema.clone(),
- file_groups: vec![vec![PartitionedFile::new("x".to_string(),
100)]],
- statistics: Statistics::new_unknown(schema),
- projection: None,
- limit: None,
- table_partition_cols: vec![],
- output_ordering: vec![],
- },
+ FileScanConfig::new(
+ ObjectStoreUrl::parse("test:///").unwrap(),
+ schema.clone(),
+ )
+ .with_file(PartitionedFile::new("x".to_string(), 100)),
None,
None,
Default::default(),
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index cd84e911d3..033cec5301 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -1432,16 +1432,9 @@ pub(crate) mod tests {
output_ordering: Vec<Vec<PhysicalSortExpr>>,
) -> Arc<ParquetExec> {
Arc::new(ParquetExec::new(
- FileScanConfig {
- object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
- file_schema: schema(),
- file_groups: vec![vec![PartitionedFile::new("x".to_string(),
100)]],
- statistics: Statistics::new_unknown(&schema()),
- projection: None,
- limit: None,
- table_partition_cols: vec![],
- output_ordering,
- },
+ FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(),
schema())
+ .with_file(PartitionedFile::new("x".to_string(), 100))
+ .with_output_ordering(output_ordering),
None,
None,
Default::default(),
@@ -1457,19 +1450,12 @@ pub(crate) mod tests {
output_ordering: Vec<Vec<PhysicalSortExpr>>,
) -> Arc<ParquetExec> {
Arc::new(ParquetExec::new(
- FileScanConfig {
- object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
- file_schema: schema(),
- file_groups: vec![
+ FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(),
schema())
+ .with_file_groups(vec![
vec![PartitionedFile::new("x".to_string(), 100)],
vec![PartitionedFile::new("y".to_string(), 100)],
- ],
- statistics: Statistics::new_unknown(&schema()),
- projection: None,
- limit: None,
- table_partition_cols: vec![],
- output_ordering,
- },
+ ])
+ .with_output_ordering(output_ordering),
None,
None,
Default::default(),
@@ -1482,16 +1468,9 @@ pub(crate) mod tests {
fn csv_exec_with_sort(output_ordering: Vec<Vec<PhysicalSortExpr>>) ->
Arc<CsvExec> {
Arc::new(CsvExec::new(
- FileScanConfig {
- object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
- file_schema: schema(),
- file_groups: vec![vec![PartitionedFile::new("x".to_string(),
100)]],
- statistics: Statistics::new_unknown(&schema()),
- projection: None,
- limit: None,
- table_partition_cols: vec![],
- output_ordering,
- },
+ FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(),
schema())
+ .with_file(PartitionedFile::new("x".to_string(), 100))
+ .with_output_ordering(output_ordering),
false,
b',',
b'"',
@@ -1509,19 +1488,12 @@ pub(crate) mod tests {
output_ordering: Vec<Vec<PhysicalSortExpr>>,
) -> Arc<CsvExec> {
Arc::new(CsvExec::new(
- FileScanConfig {
- object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
- file_schema: schema(),
- file_groups: vec![
+ FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(),
schema())
+ .with_file_groups(vec![
vec![PartitionedFile::new("x".to_string(), 100)],
vec![PartitionedFile::new("y".to_string(), 100)],
- ],
- statistics: Statistics::new_unknown(&schema()),
- projection: None,
- limit: None,
- table_partition_cols: vec![],
- output_ordering,
- },
+ ])
+ .with_output_ordering(output_ordering),
false,
b',',
b'"',
@@ -3790,19 +3762,11 @@ pub(crate) mod tests {
let plan = aggregate_exec_with_alias(
Arc::new(CsvExec::new(
- FileScanConfig {
- object_store_url:
ObjectStoreUrl::parse("test:///").unwrap(),
- file_schema: schema(),
- file_groups: vec![vec![PartitionedFile::new(
- "x".to_string(),
- 100,
- )]],
- statistics: Statistics::new_unknown(&schema()),
- projection: None,
- limit: None,
- table_partition_cols: vec![],
- output_ordering: vec![],
- },
+ FileScanConfig::new(
+ ObjectStoreUrl::parse("test:///").unwrap(),
+ schema(),
+ )
+ .with_file(PartitionedFile::new("x".to_string(), 100)),
false,
b',',
b'"',
diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
index fe1290e407..a15b9d4fbc 100644
--- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
@@ -1297,7 +1297,7 @@ mod tests {
use crate::physical_plan::joins::StreamJoinPartitionMode;
use arrow_schema::{DataType, Field, Schema, SortOptions};
- use datafusion_common::{JoinType, ScalarValue, Statistics};
+ use datafusion_common::{JoinType, ScalarValue};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::{
@@ -1676,16 +1676,12 @@ mod tests {
Field::new("e", DataType::Int32, true),
]));
Arc::new(CsvExec::new(
- FileScanConfig {
- object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
- file_schema: schema.clone(),
- file_groups: vec![vec![PartitionedFile::new("x".to_string(),
100)]],
- statistics: Statistics::new_unknown(&schema),
- projection: Some(vec![0, 1, 2, 3, 4]),
- limit: None,
- table_partition_cols: vec![],
- output_ordering: vec![vec![]],
- },
+ FileScanConfig::new(
+ ObjectStoreUrl::parse("test:///").unwrap(),
+ schema.clone(),
+ )
+ .with_file(PartitionedFile::new("x".to_string(), 100))
+ .with_projection(Some(vec![0, 1, 2, 3, 4])),
false,
0,
0,
@@ -1702,16 +1698,12 @@ mod tests {
Field::new("d", DataType::Int32, true),
]));
Arc::new(CsvExec::new(
- FileScanConfig {
- object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
- file_schema: schema.clone(),
- file_groups: vec![vec![PartitionedFile::new("x".to_string(),
100)]],
- statistics: Statistics::new_unknown(&schema),
- projection: Some(vec![3, 2, 1]),
- limit: None,
- table_partition_cols: vec![],
- output_ordering: vec![vec![]],
- },
+ FileScanConfig::new(
+ ObjectStoreUrl::parse("test:///").unwrap(),
+ schema.clone(),
+ )
+ .with_file(PartitionedFile::new("x".to_string(), 100))
+ .with_projection(Some(vec![3, 2, 1])),
false,
0,
0,
diff --git
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index f69c0df32e..e3ef3b95aa 100644
---
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -291,7 +291,7 @@ mod tests {
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::tree_node::{TransformedResult, TreeNode};
- use datafusion_common::{Result, Statistics};
+ use datafusion_common::Result;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_expr::{JoinType, Operator};
use datafusion_physical_expr::expressions::{self, col, Column};
@@ -1491,19 +1491,13 @@ mod tests {
let projection: Vec<usize> = vec![0, 2, 3];
Arc::new(CsvExec::new(
- FileScanConfig {
- object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
- file_schema: schema.clone(),
- file_groups: vec![vec![PartitionedFile::new(
- "file_path".to_string(),
- 100,
- )]],
- statistics: Statistics::new_unknown(schema),
- projection: Some(projection),
- limit: None,
- table_partition_cols: vec![],
- output_ordering: vec![sort_exprs],
- },
+ FileScanConfig::new(
+ ObjectStoreUrl::parse("test:///").unwrap(),
+ schema.clone(),
+ )
+ .with_file(PartitionedFile::new("file_path".to_string(), 100))
+ .with_projection(Some(projection))
+ .with_output_ordering(vec![sort_exprs]),
true,
0,
b'"',
diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs
b/datafusion/core/src/physical_optimizer/test_utils.rs
index 7bc1eeb7c4..4d926847e4 100644
--- a/datafusion/core/src/physical_optimizer/test_utils.rs
+++ b/datafusion/core/src/physical_optimizer/test_utils.rs
@@ -41,7 +41,7 @@ use crate::prelude::{CsvReadOptions, SessionContext};
use arrow_schema::{Schema, SchemaRef, SortOptions};
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
-use datafusion_common::{JoinType, Statistics};
+use datafusion_common::JoinType;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_expr::{AggregateFunction, WindowFrame,
WindowFunctionDefinition};
use datafusion_physical_expr::expressions::col;
@@ -275,16 +275,8 @@ pub fn sort_preserving_merge_exec(
/// Create a non sorted parquet exec
pub fn parquet_exec(schema: &SchemaRef) -> Arc<ParquetExec> {
Arc::new(ParquetExec::new(
- FileScanConfig {
- object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
- file_schema: schema.clone(),
- file_groups: vec![vec![PartitionedFile::new("x".to_string(),
100)]],
- statistics: Statistics::new_unknown(schema),
- projection: None,
- limit: None,
- table_partition_cols: vec![],
- output_ordering: vec![],
- },
+ FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(),
schema.clone())
+ .with_file(PartitionedFile::new("x".to_string(), 100)),
None,
None,
Default::default(),
@@ -299,16 +291,9 @@ pub fn parquet_exec_sorted(
let sort_exprs = sort_exprs.into_iter().collect();
Arc::new(ParquetExec::new(
- FileScanConfig {
- object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
- file_schema: schema.clone(),
- file_groups: vec![vec![PartitionedFile::new("x".to_string(),
100)]],
- statistics: Statistics::new_unknown(schema),
- projection: None,
- limit: None,
- table_partition_cols: vec![],
- output_ordering: vec![sort_exprs],
- },
+ FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(),
schema.clone())
+ .with_file(PartitionedFile::new("x".to_string(), 100))
+ .with_output_ordering(vec![sort_exprs]),
None,
None,
Default::default(),
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index 1152c70d43..b03aaabcad 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -91,7 +91,7 @@ pub fn scan_partitioned_csv(partitions: usize, work_dir:
&Path) -> Result<Arc<Cs
FileCompressionType::UNCOMPRESSED,
work_dir,
)?;
- let config = partitioned_csv_config(schema, file_groups)?;
+ let config = partitioned_csv_config(schema, file_groups);
Ok(Arc::new(CsvExec::new(
config,
true,
@@ -196,17 +196,9 @@ pub fn partitioned_file_groups(
pub fn partitioned_csv_config(
schema: SchemaRef,
file_groups: Vec<Vec<PartitionedFile>>,
-) -> Result<FileScanConfig> {
- Ok(FileScanConfig {
- object_store_url: ObjectStoreUrl::local_filesystem(),
- file_schema: schema.clone(),
- file_groups,
- statistics: Statistics::new_unknown(&schema),
- projection: None,
- limit: None,
- table_partition_cols: vec![],
- output_ordering: vec![],
- })
+) -> FileScanConfig {
+ FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema)
+ .with_file_groups(file_groups)
}
pub fn assert_fields_eq(plan: &LogicalPlan, expected: Vec<&str>) {
@@ -283,16 +275,9 @@ pub fn csv_exec_sorted(
let sort_exprs = sort_exprs.into_iter().collect();
Arc::new(CsvExec::new(
- FileScanConfig {
- object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
- file_schema: schema.clone(),
- file_groups: vec![vec![PartitionedFile::new("x".to_string(),
100)]],
- statistics: Statistics::new_unknown(schema),
- projection: None,
- limit: None,
- table_partition_cols: vec![],
- output_ordering: vec![sort_exprs],
- },
+ FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(),
schema.clone())
+ .with_file(PartitionedFile::new("x".to_string(), 100))
+ .with_output_ordering(vec![sort_exprs]),
false,
0,
0,
@@ -345,16 +330,9 @@ pub fn csv_exec_ordered(
let sort_exprs = sort_exprs.into_iter().collect();
Arc::new(CsvExec::new(
- FileScanConfig {
- object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
- file_schema: schema.clone(),
- file_groups:
vec![vec![PartitionedFile::new("file_path".to_string(), 100)]],
- statistics: Statistics::new_unknown(schema),
- projection: None,
- limit: None,
- table_partition_cols: vec![],
- output_ordering: vec![sort_exprs],
- },
+ FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(),
schema.clone())
+ .with_file(PartitionedFile::new("file_path".to_string(), 100))
+ .with_output_ordering(vec![sort_exprs]),
true,
0,
b'"',
diff --git a/datafusion/core/src/test_util/parquet.rs
b/datafusion/core/src/test_util/parquet.rs
index 1d5668c7ec..df1d2c6f09 100644
--- a/datafusion/core/src/test_util/parquet.rs
+++ b/datafusion/core/src/test_util/parquet.rs
@@ -37,8 +37,6 @@ use crate::physical_plan::metrics::MetricsSet;
use crate::physical_plan::ExecutionPlan;
use crate::prelude::{Expr, SessionConfig, SessionContext};
-use datafusion_common::Statistics;
-
use object_store::path::Path;
use object_store::ObjectMeta;
use parquet::arrow::ArrowWriter;
@@ -144,22 +142,15 @@ impl TestParquetFile {
ctx: &SessionContext,
maybe_filter: Option<Expr>,
) -> Result<Arc<dyn ExecutionPlan>> {
- let scan_config = FileScanConfig {
- object_store_url: self.object_store_url.clone(),
- file_schema: self.schema.clone(),
- file_groups: vec![vec![PartitionedFile {
- object_meta: self.object_meta.clone(),
- partition_values: vec![],
- range: None,
- statistics: None,
- extensions: None,
- }]],
- statistics: Statistics::new_unknown(&self.schema),
- projection: None,
- limit: None,
- table_partition_cols: vec![],
- output_ordering: vec![],
- };
+ let scan_config =
+ FileScanConfig::new(self.object_store_url.clone(),
self.schema.clone())
+ .with_file(PartitionedFile {
+ object_meta: self.object_meta.clone(),
+ partition_values: vec![],
+ range: None,
+ statistics: None,
+ extensions: None,
+ });
let df_schema = self.schema.clone().to_dfschema_ref()?;
diff --git a/datafusion/core/tests/parquet/custom_reader.rs
b/datafusion/core/tests/parquet/custom_reader.rs
index e4f4d229c4..4f50c55c62 100644
--- a/datafusion/core/tests/parquet/custom_reader.rs
+++ b/datafusion/core/tests/parquet/custom_reader.rs
@@ -30,8 +30,8 @@ use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{
FileMeta, FileScanConfig, ParquetExec, ParquetFileMetrics,
ParquetFileReaderFactory,
};
+use datafusion::physical_plan::collect;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
-use datafusion::physical_plan::{collect, Statistics};
use datafusion::prelude::SessionContext;
use datafusion_common::Result;
@@ -63,7 +63,7 @@ async fn
route_data_access_ops_to_parquet_file_reader_factory() {
let file_schema = batch.schema().clone();
let (in_memory_object_store, parquet_files_meta) =
store_parquet_in_memory(vec![batch]).await;
- let file_groups = parquet_files_meta
+ let file_group = parquet_files_meta
.into_iter()
.map(|meta| PartitionedFile {
object_meta: meta,
@@ -76,17 +76,12 @@ async fn
route_data_access_ops_to_parquet_file_reader_factory() {
// prepare the scan
let parquet_exec = ParquetExec::new(
- FileScanConfig {
+ FileScanConfig::new(
// just any url that doesn't point to in memory object store
- object_store_url: ObjectStoreUrl::local_filesystem(),
- file_groups: vec![file_groups],
- statistics: Statistics::new_unknown(&file_schema),
+ ObjectStoreUrl::local_filesystem(),
file_schema,
- projection: None,
- limit: None,
- table_partition_cols: vec![],
- output_ordering: vec![],
- },
+ )
+ .with_file_group(file_group),
None,
None,
Default::default(),
diff --git a/datafusion/core/tests/parquet/page_pruning.rs
b/datafusion/core/tests/parquet/page_pruning.rs
index 8f42f21834..2e9cda40c3 100644
--- a/datafusion/core/tests/parquet/page_pruning.rs
+++ b/datafusion/core/tests/parquet/page_pruning.rs
@@ -27,7 +27,7 @@ use datafusion::execution::context::SessionState;
use datafusion::physical_plan::metrics::MetricValue;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
-use datafusion_common::{ScalarValue, Statistics, ToDFSchema};
+use datafusion_common::{ScalarValue, ToDFSchema};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::{col, lit, Expr};
use datafusion_physical_expr::create_physical_expr;
@@ -71,17 +71,7 @@ async fn get_parquet_exec(state: &SessionState, filter:
Expr) -> ParquetExec {
let predicate = create_physical_expr(&filter, &df_schema,
&execution_props).unwrap();
let parquet_exec = ParquetExec::new(
- FileScanConfig {
- object_store_url,
- file_groups: vec![vec![partitioned_file]],
- file_schema: schema.clone(),
- statistics: Statistics::new_unknown(&schema),
- // file has 10 cols so index 12 should be month
- projection: None,
- limit: None,
- table_partition_cols: vec![],
- output_ordering: vec![],
- },
+ FileScanConfig::new(object_store_url,
schema).with_file(partitioned_file),
Some(predicate),
None,
Default::default(),
diff --git a/datafusion/core/tests/parquet/schema_adapter.rs
b/datafusion/core/tests/parquet/schema_adapter.rs
index 10c4e8a4c0..ead2884e43 100644
--- a/datafusion/core/tests/parquet/schema_adapter.rs
+++ b/datafusion/core/tests/parquet/schema_adapter.rs
@@ -30,7 +30,7 @@ use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{
FileScanConfig, ParquetExec, SchemaAdapter, SchemaAdapterFactory,
SchemaMapper,
};
-use datafusion::physical_plan::{collect, Statistics};
+use datafusion::physical_plan::collect;
use datafusion::prelude::SessionContext;
use datafusion::datasource::listing::PartitionedFile;
@@ -83,16 +83,8 @@ async fn can_override_schema_adapter() {
// prepare the scan
let parquet_exec = ParquetExec::new(
- FileScanConfig {
- object_store_url: ObjectStoreUrl::local_filesystem(),
- file_groups: vec![vec![partitioned_file]],
- statistics: Statistics::new_unknown(&schema),
- file_schema: schema,
- projection: None,
- limit: None,
- table_partition_cols: vec![],
- output_ordering: vec![],
- },
+ FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema)
+ .with_file(partitioned_file),
None,
None,
Default::default(),
diff --git a/datafusion/core/tests/parquet/schema_coercion.rs
b/datafusion/core/tests/parquet/schema_coercion.rs
index 88f795d2a4..ac51b4f712 100644
--- a/datafusion/core/tests/parquet/schema_coercion.rs
+++ b/datafusion/core/tests/parquet/schema_coercion.rs
@@ -26,7 +26,7 @@ use datafusion::assert_batches_sorted_eq;
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
use datafusion::physical_plan::collect;
use datafusion::prelude::SessionContext;
-use datafusion_common::{Result, Statistics};
+use datafusion_common::Result;
use datafusion_execution::object_store::ObjectStoreUrl;
use object_store::path::Path;
@@ -51,7 +51,7 @@ async fn multi_parquet_coercion() {
let batch2 = RecordBatch::try_from_iter(vec![("c2", c2), ("c3",
c3)]).unwrap();
let (meta, _files) = store_parquet(vec![batch1, batch2]).await.unwrap();
- let file_groups = meta.into_iter().map(Into::into).collect();
+ let file_group = meta.into_iter().map(Into::into).collect();
// cast c1 to utf8, c2 to int32, c3 to float64
let file_schema = Arc::new(Schema::new(vec![
@@ -60,16 +60,8 @@ async fn multi_parquet_coercion() {
Field::new("c3", DataType::Float64, true),
]));
let parquet_exec = ParquetExec::new(
- FileScanConfig {
- object_store_url: ObjectStoreUrl::local_filesystem(),
- file_groups: vec![file_groups],
- statistics: Statistics::new_unknown(&file_schema),
- file_schema,
- projection: None,
- limit: None,
- table_partition_cols: vec![],
- output_ordering: vec![],
- },
+ FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema)
+ .with_file_group(file_group),
None,
None,
Default::default(),
@@ -115,7 +107,7 @@ async fn multi_parquet_coercion_projection() {
RecordBatch::try_from_iter(vec![("c2", c2), ("c1", c1s), ("c3",
c3)]).unwrap();
let (meta, _files) = store_parquet(vec![batch1, batch2]).await.unwrap();
- let file_groups = meta.into_iter().map(Into::into).collect();
+ let file_group = meta.into_iter().map(Into::into).collect();
// cast c1 to utf8, c2 to int32, c3 to float64
let file_schema = Arc::new(Schema::new(vec![
@@ -124,16 +116,9 @@ async fn multi_parquet_coercion_projection() {
Field::new("c3", DataType::Float64, true),
]));
let parquet_exec = ParquetExec::new(
- FileScanConfig {
- object_store_url: ObjectStoreUrl::local_filesystem(),
- file_groups: vec![file_groups],
- statistics: Statistics::new_unknown(&file_schema),
- file_schema,
- projection: Some(vec![1, 0, 2]),
- limit: None,
- table_partition_cols: vec![],
- output_ordering: vec![],
- },
+ FileScanConfig::new(ObjectStoreUrl::local_filesystem(), file_schema)
+ .with_file_group(file_group)
+ .with_projection(Some(vec![1, 0, 2])),
None,
None,
Default::default(),
diff --git a/datafusion/execution/src/object_store.rs
b/datafusion/execution/src/object_store.rs
index 126f83f7e2..7697c01d63 100644
--- a/datafusion/execution/src/object_store.rs
+++ b/datafusion/execution/src/object_store.rs
@@ -51,7 +51,14 @@ impl ObjectStoreUrl {
Ok(Self { url: parsed })
}
- /// An [`ObjectStoreUrl`] for the local filesystem
+ /// An [`ObjectStoreUrl`] for the local filesystem (`file://`)
+ ///
+ /// # Example
+ /// ```
+ /// # use datafusion_execution::object_store::ObjectStoreUrl;
+ /// let local_fs = ObjectStoreUrl::parse("file://").unwrap();
+ /// assert_eq!(local_fs, ObjectStoreUrl::local_filesystem())
+ /// ```
pub fn local_filesystem() -> Self {
Self::parse("file://").unwrap()
}
diff --git a/datafusion/substrait/src/physical_plan/consumer.rs
b/datafusion/substrait/src/physical_plan/consumer.rs
index 50b08e7793..68f8b02b0f 100644
--- a/datafusion/substrait/src/physical_plan/consumer.rs
+++ b/datafusion/substrait/src/physical_plan/consumer.rs
@@ -24,7 +24,7 @@ use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
use datafusion::error::{DataFusionError, Result};
-use datafusion::physical_plan::{ExecutionPlan, Statistics};
+use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use async_recursion::async_recursion;
@@ -104,16 +104,11 @@ pub async fn from_substrait_rel(
file_groups[part_index].push(partitioned_file)
}
- let mut base_config = FileScanConfig {
- object_store_url: ObjectStoreUrl::local_filesystem(),
- file_schema: Arc::new(Schema::empty()),
- file_groups,
- statistics: Statistics::new_unknown(&Schema::empty()),
- projection: None,
- limit: None,
- table_partition_cols: vec![],
- output_ordering: vec![],
- };
+ let mut base_config = FileScanConfig::new(
+ ObjectStoreUrl::local_filesystem(),
+ Arc::new(Schema::empty()),
+ )
+ .with_file_groups(file_groups);
if let Some(MaskExpression { select, .. }) =
&read.projection {
if let Some(projection) = &select.as_ref() {
diff --git a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
index 70887e3934..aca0443194 100644
--- a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
@@ -23,7 +23,7 @@ use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec};
use datafusion::error::Result;
-use datafusion::physical_plan::{displayable, ExecutionPlan, Statistics};
+use datafusion::physical_plan::{displayable, ExecutionPlan};
use datafusion::prelude::SessionContext;
use datafusion_substrait::physical_plan::{consumer, producer};
@@ -31,25 +31,20 @@ use substrait::proto::extensions;
#[tokio::test]
async fn parquet_exec() -> Result<()> {
- let scan_config = FileScanConfig {
- object_store_url: ObjectStoreUrl::local_filesystem(),
- file_schema: Arc::new(Schema::empty()),
- file_groups: vec![
- vec![PartitionedFile::new(
- "file://foo/part-0.parquet".to_string(),
- 123,
- )],
- vec![PartitionedFile::new(
- "file://foo/part-1.parquet".to_string(),
- 123,
- )],
- ],
- statistics: Statistics::new_unknown(&Schema::empty()),
- projection: None,
- limit: None,
- table_partition_cols: vec![],
- output_ordering: vec![],
- };
+ let scan_config = FileScanConfig::new(
+ ObjectStoreUrl::local_filesystem(),
+ Arc::new(Schema::empty()),
+ )
+ .with_file_groups(vec![
+ vec![PartitionedFile::new(
+ "file://foo/part-0.parquet".to_string(),
+ 123,
+ )],
+ vec![PartitionedFile::new(
+ "file://foo/part-1.parquet".to_string(),
+ 123,
+ )],
+ ]);
let parquet_exec: Arc<dyn ExecutionPlan> = Arc::new(ParquetExec::new(
scan_config,
None,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]