This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new a1e959d87a Remove ListingTable and FileScanConfig Unbounded (#8540)
(#8573)
a1e959d87a is described below
commit a1e959d87a66da7060bd005b1993b824c0683a63
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Dec 18 10:55:49 2023 +0000
Remove ListingTable and FileScanConfig Unbounded (#8540) (#8573)
* Remove ListingTable and FileScanConfig Unbounded (#8540)
* Fix substrait
* Fix logical conflicts
* Add deleted tests as ignored
---------
Co-authored-by: Mustafa Akur <[email protected]>
---
datafusion-examples/examples/csv_opener.rs | 1 -
datafusion-examples/examples/json_opener.rs | 1 -
datafusion/core/src/datasource/file_format/mod.rs | 1 -
.../core/src/datasource/file_format/options.rs | 48 ++-----
datafusion/core/src/datasource/listing/table.rs | 152 ---------------------
.../core/src/datasource/listing_table_factory.rs | 16 +--
.../src/datasource/physical_plan/arrow_file.rs | 4 -
.../core/src/datasource/physical_plan/avro.rs | 7 -
.../core/src/datasource/physical_plan/csv.rs | 4 -
.../datasource/physical_plan/file_scan_config.rs | 3 -
.../src/datasource/physical_plan/file_stream.rs | 1 -
.../core/src/datasource/physical_plan/json.rs | 8 --
.../core/src/datasource/physical_plan/mod.rs | 4 -
.../src/datasource/physical_plan/parquet/mod.rs | 4 -
datafusion/core/src/execution/context/mod.rs | 11 +-
.../combine_partial_final_agg.rs | 1 -
.../src/physical_optimizer/enforce_distribution.rs | 5 -
.../core/src/physical_optimizer/enforce_sorting.rs | 15 +-
.../src/physical_optimizer/projection_pushdown.rs | 2 -
.../replace_with_order_preserving_variants.rs | 92 ++++++-------
.../core/src/physical_optimizer/test_utils.rs | 24 ++--
datafusion/core/src/test/mod.rs | 3 -
datafusion/core/src/test_util/mod.rs | 25 +---
datafusion/core/src/test_util/parquet.rs | 1 -
datafusion/core/tests/parquet/custom_reader.rs | 1 -
datafusion/core/tests/parquet/page_pruning.rs | 1 -
datafusion/core/tests/parquet/schema_coercion.rs | 2 -
datafusion/core/tests/sql/joins.rs | 42 ++----
datafusion/proto/src/physical_plan/from_proto.rs | 1 -
.../proto/tests/cases/roundtrip_physical_plan.rs | 1 -
datafusion/substrait/src/physical_plan/consumer.rs | 1 -
.../tests/cases/roundtrip_physical_plan.rs | 1 -
32 files changed, 102 insertions(+), 381 deletions(-)
diff --git a/datafusion-examples/examples/csv_opener.rs
b/datafusion-examples/examples/csv_opener.rs
index 15fb07ded4..96753c8c52 100644
--- a/datafusion-examples/examples/csv_opener.rs
+++ b/datafusion-examples/examples/csv_opener.rs
@@ -67,7 +67,6 @@ async fn main() -> Result<()> {
limit: Some(5),
table_partition_cols: vec![],
output_ordering: vec![],
- infinite_source: false,
};
let result =
diff --git a/datafusion-examples/examples/json_opener.rs
b/datafusion-examples/examples/json_opener.rs
index 1a3dbe57be..ee33f969ca 100644
--- a/datafusion-examples/examples/json_opener.rs
+++ b/datafusion-examples/examples/json_opener.rs
@@ -70,7 +70,6 @@ async fn main() -> Result<()> {
limit: Some(5),
table_partition_cols: vec![],
output_ordering: vec![],
- infinite_source: false,
};
let result =
diff --git a/datafusion/core/src/datasource/file_format/mod.rs
b/datafusion/core/src/datasource/file_format/mod.rs
index 7c2331548e..12c9fb91ad 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -165,7 +165,6 @@ pub(crate) mod test_util {
limit,
table_partition_cols: vec![],
output_ordering: vec![],
- infinite_source: false,
},
None,
)
diff --git a/datafusion/core/src/datasource/file_format/options.rs
b/datafusion/core/src/datasource/file_format/options.rs
index 4c7557a4a9..d389137785 100644
--- a/datafusion/core/src/datasource/file_format/options.rs
+++ b/datafusion/core/src/datasource/file_format/options.rs
@@ -21,7 +21,6 @@ use std::sync::Arc;
use arrow::datatypes::{DataType, Schema, SchemaRef};
use async_trait::async_trait;
-use datafusion_common::{plan_err, DataFusionError};
use crate::datasource::file_format::arrow::ArrowFormat;
use crate::datasource::file_format::file_compression_type::FileCompressionType;
@@ -72,8 +71,6 @@ pub struct CsvReadOptions<'a> {
pub table_partition_cols: Vec<(String, DataType)>,
/// File compression type
pub file_compression_type: FileCompressionType,
- /// Flag indicating whether this file may be unbounded (as in a FIFO file).
- pub infinite: bool,
/// Indicates how the file is sorted
pub file_sort_order: Vec<Vec<Expr>>,
}
@@ -97,7 +94,6 @@ impl<'a> CsvReadOptions<'a> {
file_extension: DEFAULT_CSV_EXTENSION,
table_partition_cols: vec![],
file_compression_type: FileCompressionType::UNCOMPRESSED,
- infinite: false,
file_sort_order: vec![],
}
}
@@ -108,12 +104,6 @@ impl<'a> CsvReadOptions<'a> {
self
}
- /// Configure mark_infinite setting
- pub fn mark_infinite(mut self, infinite: bool) -> Self {
- self.infinite = infinite;
- self
- }
-
/// Specify delimiter to use for CSV read
pub fn delimiter(mut self, delimiter: u8) -> Self {
self.delimiter = delimiter;
@@ -324,8 +314,6 @@ pub struct AvroReadOptions<'a> {
pub file_extension: &'a str,
/// Partition Columns
pub table_partition_cols: Vec<(String, DataType)>,
- /// Flag indicating whether this file may be unbounded (as in a FIFO file).
- pub infinite: bool,
}
impl<'a> Default for AvroReadOptions<'a> {
@@ -334,7 +322,6 @@ impl<'a> Default for AvroReadOptions<'a> {
schema: None,
file_extension: DEFAULT_AVRO_EXTENSION,
table_partition_cols: vec![],
- infinite: false,
}
}
}
@@ -349,12 +336,6 @@ impl<'a> AvroReadOptions<'a> {
self
}
- /// Configure mark_infinite setting
- pub fn mark_infinite(mut self, infinite: bool) -> Self {
- self.infinite = infinite;
- self
- }
-
/// Specify schema to use for AVRO read
pub fn schema(mut self, schema: &'a Schema) -> Self {
self.schema = Some(schema);
@@ -466,21 +447,17 @@ pub trait ReadOptions<'a> {
state: SessionState,
table_path: ListingTableUrl,
schema: Option<&'a Schema>,
- infinite: bool,
) -> Result<SchemaRef>
where
'a: 'async_trait,
{
- match (schema, infinite) {
- (Some(s), _) => Ok(Arc::new(s.to_owned())),
- (None, false) => Ok(self
- .to_listing_options(config)
- .infer_schema(&state, &table_path)
- .await?),
- (None, true) => {
- plan_err!("Schema inference for infinite data sources is not
supported.")
- }
+ if let Some(s) = schema {
+ return Ok(Arc::new(s.to_owned()));
}
+
+ self.to_listing_options(config)
+ .infer_schema(&state, &table_path)
+ .await
}
}
@@ -500,7 +477,6 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
.with_target_partitions(config.target_partitions())
.with_table_partition_cols(self.table_partition_cols.clone())
.with_file_sort_order(self.file_sort_order.clone())
- .with_infinite_source(self.infinite)
}
async fn get_resolved_schema(
@@ -509,7 +485,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
state: SessionState,
table_path: ListingTableUrl,
) -> Result<SchemaRef> {
- self._get_resolved_schema(config, state, table_path, self.schema,
self.infinite)
+ self._get_resolved_schema(config, state, table_path, self.schema)
.await
}
}
@@ -535,7 +511,7 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> {
state: SessionState,
table_path: ListingTableUrl,
) -> Result<SchemaRef> {
- self._get_resolved_schema(config, state, table_path, self.schema,
false)
+ self._get_resolved_schema(config, state, table_path, self.schema)
.await
}
}
@@ -551,7 +527,6 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> {
.with_file_extension(self.file_extension)
.with_target_partitions(config.target_partitions())
.with_table_partition_cols(self.table_partition_cols.clone())
- .with_infinite_source(self.infinite)
.with_file_sort_order(self.file_sort_order.clone())
}
@@ -561,7 +536,7 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> {
state: SessionState,
table_path: ListingTableUrl,
) -> Result<SchemaRef> {
- self._get_resolved_schema(config, state, table_path, self.schema,
self.infinite)
+ self._get_resolved_schema(config, state, table_path, self.schema)
.await
}
}
@@ -575,7 +550,6 @@ impl ReadOptions<'_> for AvroReadOptions<'_> {
.with_file_extension(self.file_extension)
.with_target_partitions(config.target_partitions())
.with_table_partition_cols(self.table_partition_cols.clone())
- .with_infinite_source(self.infinite)
}
async fn get_resolved_schema(
@@ -584,7 +558,7 @@ impl ReadOptions<'_> for AvroReadOptions<'_> {
state: SessionState,
table_path: ListingTableUrl,
) -> Result<SchemaRef> {
- self._get_resolved_schema(config, state, table_path, self.schema,
self.infinite)
+ self._get_resolved_schema(config, state, table_path, self.schema)
.await
}
}
@@ -606,7 +580,7 @@ impl ReadOptions<'_> for ArrowReadOptions<'_> {
state: SessionState,
table_path: ListingTableUrl,
) -> Result<SchemaRef> {
- self._get_resolved_schema(config, state, table_path, self.schema,
false)
+ self._get_resolved_schema(config, state, table_path, self.schema)
.await
}
}
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 0ce1b43fe4..4c13d9d443 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -246,11 +246,6 @@ pub struct ListingOptions {
/// multiple equivalent orderings, the outer `Vec` will have a
/// single element.
pub file_sort_order: Vec<Vec<Expr>>,
- /// Infinite source means that the input is not guaranteed to end.
- /// Currently, CSV, JSON, and AVRO formats are supported.
- /// In order to support infinite inputs, DataFusion may adjust query
- /// plans (e.g. joins) to run the given query in full pipelining mode.
- pub infinite_source: bool,
/// This setting when true indicates that the table is backed by a single
file.
/// Any inserts to the table may only append to this existing file.
pub single_file: bool,
@@ -274,30 +269,11 @@ impl ListingOptions {
collect_stat: true,
target_partitions: 1,
file_sort_order: vec![],
- infinite_source: false,
single_file: false,
file_type_write_options: None,
}
}
- /// Set unbounded assumption on [`ListingOptions`] and returns self.
- ///
- /// ```
- /// use std::sync::Arc;
- /// use datafusion::datasource::{listing::ListingOptions,
file_format::csv::CsvFormat};
- /// use datafusion::prelude::SessionContext;
- /// let ctx = SessionContext::new();
- /// let listing_options = ListingOptions::new(Arc::new(
- /// CsvFormat::default()
- /// )).with_infinite_source(true);
- ///
- /// assert_eq!(listing_options.infinite_source, true);
- /// ```
- pub fn with_infinite_source(mut self, infinite_source: bool) -> Self {
- self.infinite_source = infinite_source;
- self
- }
-
/// Set file extension on [`ListingOptions`] and returns self.
///
/// ```
@@ -557,7 +533,6 @@ pub struct ListingTable {
options: ListingOptions,
definition: Option<String>,
collected_statistics: FileStatisticsCache,
- infinite_source: bool,
constraints: Constraints,
column_defaults: HashMap<String, Expr>,
}
@@ -587,7 +562,6 @@ impl ListingTable {
for (part_col_name, part_col_type) in &options.table_partition_cols {
builder.push(Field::new(part_col_name, part_col_type.clone(),
false));
}
- let infinite_source = options.infinite_source;
let table = Self {
table_paths: config.table_paths,
@@ -596,7 +570,6 @@ impl ListingTable {
options,
definition: None,
collected_statistics:
Arc::new(DefaultFileStatisticsCache::default()),
- infinite_source,
constraints: Constraints::empty(),
column_defaults: HashMap::new(),
};
@@ -729,7 +702,6 @@ impl TableProvider for ListingTable {
limit,
output_ordering: self.try_create_output_ordering()?,
table_partition_cols,
- infinite_source: self.infinite_source,
},
filters.as_ref(),
)
@@ -943,7 +915,6 @@ impl ListingTable {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
- use std::fs::File;
use super::*;
#[cfg(feature = "parquet")]
@@ -955,7 +926,6 @@ mod tests {
use crate::{
assert_batches_eq,
datasource::file_format::avro::AvroFormat,
- execution::options::ReadOptions,
logical_expr::{col, lit},
test::{columns, object_store::register_test_store},
};
@@ -967,37 +937,8 @@ mod tests {
use datafusion_common::{assert_contains, GetExt, ScalarValue};
use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator};
use datafusion_physical_expr::PhysicalSortExpr;
- use rstest::*;
use tempfile::TempDir;
- /// It creates dummy file and checks if it can create unbounded input
executors.
- async fn unbounded_table_helper(
- file_type: FileType,
- listing_option: ListingOptions,
- infinite_data: bool,
- ) -> Result<()> {
- let ctx = SessionContext::new();
- register_test_store(
- &ctx,
- &[(&format!("table/file{}", file_type.get_ext()), 100)],
- );
-
- let schema = Schema::new(vec![Field::new("a", DataType::Boolean,
false)]);
-
- let table_path = ListingTableUrl::parse("test:///table/").unwrap();
- let config = ListingTableConfig::new(table_path)
- .with_listing_options(listing_option)
- .with_schema(Arc::new(schema));
- // Create a table
- let table = ListingTable::try_new(config)?;
- // Create executor from table
- let source_exec = table.scan(&ctx.state(), None, &[], None).await?;
-
- assert_eq!(source_exec.unbounded_output(&[])?, infinite_data);
-
- Ok(())
- }
-
#[tokio::test]
async fn read_single_file() -> Result<()> {
let ctx = SessionContext::new();
@@ -1205,99 +1146,6 @@ mod tests {
Ok(())
}
- #[tokio::test]
- async fn unbounded_csv_table_without_schema() -> Result<()> {
- let tmp_dir = TempDir::new()?;
- let file_path = tmp_dir.path().join("dummy.csv");
- File::create(file_path)?;
- let ctx = SessionContext::new();
- let error = ctx
- .register_csv(
- "test",
- tmp_dir.path().to_str().unwrap(),
- CsvReadOptions::new().mark_infinite(true),
- )
- .await
- .unwrap_err();
- match error {
- DataFusionError::Plan(_) => Ok(()),
- val => Err(val),
- }
- }
-
- #[tokio::test]
- async fn unbounded_json_table_without_schema() -> Result<()> {
- let tmp_dir = TempDir::new()?;
- let file_path = tmp_dir.path().join("dummy.json");
- File::create(file_path)?;
- let ctx = SessionContext::new();
- let error = ctx
- .register_json(
- "test",
- tmp_dir.path().to_str().unwrap(),
- NdJsonReadOptions::default().mark_infinite(true),
- )
- .await
- .unwrap_err();
- match error {
- DataFusionError::Plan(_) => Ok(()),
- val => Err(val),
- }
- }
-
- #[tokio::test]
- async fn unbounded_avro_table_without_schema() -> Result<()> {
- let tmp_dir = TempDir::new()?;
- let file_path = tmp_dir.path().join("dummy.avro");
- File::create(file_path)?;
- let ctx = SessionContext::new();
- let error = ctx
- .register_avro(
- "test",
- tmp_dir.path().to_str().unwrap(),
- AvroReadOptions::default().mark_infinite(true),
- )
- .await
- .unwrap_err();
- match error {
- DataFusionError::Plan(_) => Ok(()),
- val => Err(val),
- }
- }
-
- #[rstest]
- #[tokio::test]
- async fn unbounded_csv_table(
- #[values(true, false)] infinite_data: bool,
- ) -> Result<()> {
- let config = CsvReadOptions::new().mark_infinite(infinite_data);
- let session_config = SessionConfig::new().with_target_partitions(1);
- let listing_options = config.to_listing_options(&session_config);
- unbounded_table_helper(FileType::CSV, listing_options,
infinite_data).await
- }
-
- #[rstest]
- #[tokio::test]
- async fn unbounded_json_table(
- #[values(true, false)] infinite_data: bool,
- ) -> Result<()> {
- let config = NdJsonReadOptions::default().mark_infinite(infinite_data);
- let session_config = SessionConfig::new().with_target_partitions(1);
- let listing_options = config.to_listing_options(&session_config);
- unbounded_table_helper(FileType::JSON, listing_options,
infinite_data).await
- }
-
- #[rstest]
- #[tokio::test]
- async fn unbounded_avro_table(
- #[values(true, false)] infinite_data: bool,
- ) -> Result<()> {
- let config = AvroReadOptions::default().mark_infinite(infinite_data);
- let session_config = SessionConfig::new().with_target_partitions(1);
- let listing_options = config.to_listing_options(&session_config);
- unbounded_table_helper(FileType::AVRO, listing_options,
infinite_data).await
- }
-
#[tokio::test]
async fn test_assert_list_files_for_scan_grouping() -> Result<()> {
// more expected partitions than files
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs
b/datafusion/core/src/datasource/listing_table_factory.rs
index a9d0c3a009..7c859ee988 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -133,21 +133,9 @@ impl TableProviderFactory for ListingTableFactory {
(Some(schema), table_partition_cols)
};
- // look for 'infinite' as an option
- let infinite_source = cmd.unbounded;
-
let mut statement_options = StatementOptions::from(&cmd.options);
// Extract ListingTable specific options if present or set default
- let unbounded = if infinite_source {
- statement_options.take_str_option("unbounded");
- infinite_source
- } else {
- statement_options
- .take_bool_option("unbounded")?
- .unwrap_or(false)
- };
-
let single_file = statement_options
.take_bool_option("single_file")?
.unwrap_or(false);
@@ -159,6 +147,7 @@ impl TableProviderFactory for ListingTableFactory {
}
}
statement_options.take_bool_option("create_local_path")?;
+ statement_options.take_str_option("unbounded");
let file_type = file_format.file_type();
@@ -207,8 +196,7 @@ impl TableProviderFactory for ListingTableFactory {
.with_table_partition_cols(table_partition_cols)
.with_file_sort_order(cmd.order_exprs.clone())
.with_single_file(single_file)
- .with_write_options(file_type_writer_options)
- .with_infinite_source(unbounded);
+ .with_write_options(file_type_writer_options);
let resolved_schema = match provided_schema {
None => options.infer_schema(state, &table_path).await?,
diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs
b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
index 30b55db284..ae1e879d0d 100644
--- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs
+++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
@@ -93,10 +93,6 @@ impl ExecutionPlan for ArrowExec {
Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
}
- fn unbounded_output(&self, _: &[bool]) -> Result<bool> {
- Ok(self.base_config().infinite_source)
- }
-
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
self.projected_output_ordering
.first()
diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs
b/datafusion/core/src/datasource/physical_plan/avro.rs
index 885b4c5d39..e448bf39f4 100644
--- a/datafusion/core/src/datasource/physical_plan/avro.rs
+++ b/datafusion/core/src/datasource/physical_plan/avro.rs
@@ -89,10 +89,6 @@ impl ExecutionPlan for AvroExec {
Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
}
- fn unbounded_output(&self, _: &[bool]) -> Result<bool> {
- Ok(self.base_config().infinite_source)
- }
-
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
self.projected_output_ordering
.first()
@@ -276,7 +272,6 @@ mod tests {
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
- infinite_source: false,
});
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
let mut results = avro_exec
@@ -348,7 +343,6 @@ mod tests {
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
- infinite_source: false,
});
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
@@ -419,7 +413,6 @@ mod tests {
limit: None,
table_partition_cols: vec![Field::new("date", DataType::Utf8,
false)],
output_ordering: vec![],
- infinite_source: false,
});
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs
b/datafusion/core/src/datasource/physical_plan/csv.rs
index 0eca37da13..0c34d22e9f 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -146,10 +146,6 @@ impl ExecutionPlan for CsvExec {
Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
}
- fn unbounded_output(&self, _: &[bool]) -> Result<bool> {
- Ok(self.base_config().infinite_source)
- }
-
/// See comments on `impl ExecutionPlan for ParquetExec`: output order
can't be
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
self.projected_output_ordering
diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
index 89694ff285..516755e4d2 100644
--- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
@@ -99,8 +99,6 @@ pub struct FileScanConfig {
pub table_partition_cols: Vec<Field>,
/// All equivalent lexicographical orderings that describe the schema.
pub output_ordering: Vec<LexOrdering>,
- /// Indicates whether this plan may produce an infinite stream of records.
- pub infinite_source: bool,
}
impl FileScanConfig {
@@ -707,7 +705,6 @@ mod tests {
statistics,
table_partition_cols,
output_ordering: vec![],
- infinite_source: false,
}
}
diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs
b/datafusion/core/src/datasource/physical_plan/file_stream.rs
index a715f6e8e3..99fb088b66 100644
--- a/datafusion/core/src/datasource/physical_plan/file_stream.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs
@@ -667,7 +667,6 @@ mod tests {
limit: self.limit,
table_partition_cols: vec![],
output_ordering: vec![],
- infinite_source: false,
};
let metrics_set = ExecutionPlanMetricsSet::new();
let file_stream = FileStream::new(&config, 0, self.opener,
&metrics_set)
diff --git a/datafusion/core/src/datasource/physical_plan/json.rs
b/datafusion/core/src/datasource/physical_plan/json.rs
index 9c3b523a65..c74fd13e77 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -110,10 +110,6 @@ impl ExecutionPlan for NdJsonExec {
Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
}
- fn unbounded_output(&self, _: &[bool]) -> Result<bool> {
- Ok(self.base_config.infinite_source)
- }
-
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
self.projected_output_ordering
.first()
@@ -462,7 +458,6 @@ mod tests {
limit: Some(3),
table_partition_cols: vec![],
output_ordering: vec![],
- infinite_source: false,
},
file_compression_type.to_owned(),
);
@@ -541,7 +536,6 @@ mod tests {
limit: Some(3),
table_partition_cols: vec![],
output_ordering: vec![],
- infinite_source: false,
},
file_compression_type.to_owned(),
);
@@ -589,7 +583,6 @@ mod tests {
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
- infinite_source: false,
},
file_compression_type.to_owned(),
);
@@ -642,7 +635,6 @@ mod tests {
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
- infinite_source: false,
},
file_compression_type.to_owned(),
);
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs
b/datafusion/core/src/datasource/physical_plan/mod.rs
index 8e4dd5400b..9d1c373aee 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -133,10 +133,6 @@ impl DisplayAs for FileScanConfig {
write!(f, ", limit={limit}")?;
}
- if self.infinite_source {
- write!(f, ", infinite_source=true")?;
- }
-
if let Some(ordering) = orderings.first() {
if !ordering.is_empty() {
let start = if orderings.len() == 1 {
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index 2b10b05a27..ade149da69 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -882,7 +882,6 @@ mod tests {
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
- infinite_source: false,
},
predicate,
None,
@@ -1539,7 +1538,6 @@ mod tests {
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
- infinite_source: false,
},
None,
None,
@@ -1654,7 +1652,6 @@ mod tests {
),
],
output_ordering: vec![],
- infinite_source: false,
},
None,
None,
@@ -1718,7 +1715,6 @@ mod tests {
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
- infinite_source: false,
},
None,
None,
diff --git a/datafusion/core/src/execution/context/mod.rs
b/datafusion/core/src/execution/context/mod.rs
index 58a4f08341..8916fa814a 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -964,14 +964,9 @@ impl SessionContext {
sql_definition: Option<String>,
) -> Result<()> {
let table_path = ListingTableUrl::parse(table_path)?;
- let resolved_schema = match (provided_schema, options.infinite_source)
{
- (Some(s), _) => s,
- (None, false) => options.infer_schema(&self.state(),
&table_path).await?,
- (None, true) => {
- return plan_err!(
- "Schema inference for infinite data sources is not
supported."
- )
- }
+ let resolved_schema = match provided_schema {
+ Some(s) => s,
+ None => options.infer_schema(&self.state(), &table_path).await?,
};
let config = ListingTableConfig::new(table_path)
.with_listing_options(options)
diff --git
a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
index c50ea36b68..7359a64630 100644
--- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
+++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
@@ -257,7 +257,6 @@ mod tests {
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
- infinite_source: false,
},
None,
None,
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 099759741a..0aef126578 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -1775,7 +1775,6 @@ pub(crate) mod tests {
limit: None,
table_partition_cols: vec![],
output_ordering,
- infinite_source: false,
},
None,
None,
@@ -1803,7 +1802,6 @@ pub(crate) mod tests {
limit: None,
table_partition_cols: vec![],
output_ordering,
- infinite_source: false,
},
None,
None,
@@ -1825,7 +1823,6 @@ pub(crate) mod tests {
limit: None,
table_partition_cols: vec![],
output_ordering,
- infinite_source: false,
},
false,
b',',
@@ -1856,7 +1853,6 @@ pub(crate) mod tests {
limit: None,
table_partition_cols: vec![],
output_ordering,
- infinite_source: false,
},
false,
b',',
@@ -3957,7 +3953,6 @@ pub(crate) mod tests {
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
- infinite_source: false,
},
false,
b',',
diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
index 277404b301..c0e9b834e6 100644
--- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
@@ -2117,7 +2117,7 @@ mod tests {
async fn test_with_lost_ordering_bounded() -> Result<()> {
let schema = create_test_schema3()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- let source = csv_exec_sorted(&schema, sort_exprs, false);
+ let source = csv_exec_sorted(&schema, sort_exprs);
let repartition_rr = repartition_exec(source);
let repartition_hash = Arc::new(RepartitionExec::try_new(
repartition_rr,
@@ -2141,10 +2141,11 @@ mod tests {
}
#[tokio::test]
+ #[ignore]
async fn test_with_lost_ordering_unbounded() -> Result<()> {
let schema = create_test_schema3()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- let source = csv_exec_sorted(&schema, sort_exprs, true);
+ let source = csv_exec_sorted(&schema, sort_exprs);
let repartition_rr = repartition_exec(source);
let repartition_hash = Arc::new(RepartitionExec::try_new(
repartition_rr,
@@ -2171,10 +2172,12 @@ mod tests {
}
#[tokio::test]
+ #[ignore]
async fn test_with_lost_ordering_unbounded_parallelize_off() -> Result<()>
{
let schema = create_test_schema3()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- let source = csv_exec_sorted(&schema, sort_exprs, true);
+ // Make source unbounded
+ let source = csv_exec_sorted(&schema, sort_exprs);
let repartition_rr = repartition_exec(source);
let repartition_hash = Arc::new(RepartitionExec::try_new(
repartition_rr,
@@ -2203,7 +2206,7 @@ mod tests {
async fn test_do_not_pushdown_through_spm() -> Result<()> {
let schema = create_test_schema3()?;
let sort_exprs = vec![sort_expr("a", &schema), sort_expr("b",
&schema)];
- let source = csv_exec_sorted(&schema, sort_exprs.clone(), false);
+ let source = csv_exec_sorted(&schema, sort_exprs.clone());
let repartition_rr = repartition_exec(source);
let spm = sort_preserving_merge_exec(sort_exprs, repartition_rr);
let physical_plan = sort_exec(vec![sort_expr("b", &schema)], spm);
@@ -2224,7 +2227,7 @@ mod tests {
async fn test_pushdown_through_spm() -> Result<()> {
let schema = create_test_schema3()?;
let sort_exprs = vec![sort_expr("a", &schema), sort_expr("b",
&schema)];
- let source = csv_exec_sorted(&schema, sort_exprs.clone(), false);
+ let source = csv_exec_sorted(&schema, sort_exprs.clone());
let repartition_rr = repartition_exec(source);
let spm = sort_preserving_merge_exec(sort_exprs, repartition_rr);
let physical_plan = sort_exec(
@@ -2252,7 +2255,7 @@ mod tests {
async fn test_window_multi_layer_requirement() -> Result<()> {
let schema = create_test_schema3()?;
let sort_exprs = vec![sort_expr("a", &schema), sort_expr("b",
&schema)];
- let source = csv_exec_sorted(&schema, vec![], false);
+ let source = csv_exec_sorted(&schema, vec![]);
let sort = sort_exec(sort_exprs.clone(), source);
let repartition = repartition_exec(sort);
let repartition = spr_repartition_exec(repartition);
diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
index 664afbe822..7e1312dad2 100644
--- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
@@ -1541,7 +1541,6 @@ mod tests {
limit: None,
table_partition_cols: vec![],
output_ordering: vec![vec![]],
- infinite_source: false,
},
false,
0,
@@ -1568,7 +1567,6 @@ mod tests {
limit: None,
table_partition_cols: vec![],
output_ordering: vec![vec![]],
- infinite_source: false,
},
false,
0,
diff --git
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index af45df7d84..41f2b39978 100644
---
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -350,7 +350,7 @@ mod tests {
async fn test_replace_multiple_input_repartition_1() -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- let source = csv_exec_sorted(&schema, sort_exprs, true);
+ let source = csv_exec_sorted(&schema, sort_exprs);
let repartition =
repartition_exec_hash(repartition_exec_round_robin(source));
let sort = sort_exec(vec![sort_expr("a", &schema)], repartition, true);
@@ -362,15 +362,15 @@ mod tests {
" SortExec: expr=[a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
- assert_optimized!(expected_input, expected_optimized, physical_plan);
+ assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
Ok(())
}
@@ -378,7 +378,7 @@ mod tests {
async fn test_with_inter_children_change_only() -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr_default("a", &schema)];
- let source = csv_exec_sorted(&schema, sort_exprs, true);
+ let source = csv_exec_sorted(&schema, sort_exprs);
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
@@ -408,7 +408,7 @@ mod tests {
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC],
has_header=true",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true",
];
let expected_optimized = [
@@ -419,9 +419,9 @@ mod tests {
" SortPreservingMergeExec: [a@0 ASC]",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC],
has_header=true",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true",
];
- assert_optimized!(expected_input, expected_optimized, physical_plan);
+ assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
Ok(())
}
@@ -429,7 +429,7 @@ mod tests {
async fn test_replace_multiple_input_repartition_2() -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- let source = csv_exec_sorted(&schema, sort_exprs, true);
+ let source = csv_exec_sorted(&schema, sort_exprs);
let repartition_rr = repartition_exec_round_robin(source);
let filter = filter_exec(repartition_rr);
let repartition_hash = repartition_exec_hash(filter);
@@ -444,16 +444,16 @@ mod tests {
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
- assert_optimized!(expected_input, expected_optimized, physical_plan);
+ assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
Ok(())
}
@@ -461,7 +461,7 @@ mod tests {
async fn test_replace_multiple_input_repartition_with_extra_steps() ->
Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- let source = csv_exec_sorted(&schema, sort_exprs, true);
+ let source = csv_exec_sorted(&schema, sort_exprs);
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let filter = filter_exec(repartition_hash);
@@ -478,7 +478,7 @@ mod tests {
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
@@ -486,9 +486,9 @@ mod tests {
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
- assert_optimized!(expected_input, expected_optimized, physical_plan);
+ assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
Ok(())
}
@@ -496,7 +496,7 @@ mod tests {
async fn test_replace_multiple_input_repartition_with_extra_steps_2() ->
Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- let source = csv_exec_sorted(&schema, sort_exprs, true);
+ let source = csv_exec_sorted(&schema, sort_exprs);
let repartition_rr = repartition_exec_round_robin(source);
let coalesce_batches_exec_1 = coalesce_batches_exec(repartition_rr);
let repartition_hash = repartition_exec_hash(coalesce_batches_exec_1);
@@ -516,7 +516,7 @@ mod tests {
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
@@ -525,9 +525,9 @@ mod tests {
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
- assert_optimized!(expected_input, expected_optimized, physical_plan);
+ assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
Ok(())
}
@@ -535,7 +535,7 @@ mod tests {
async fn test_not_replacing_when_no_need_to_preserve_sorting() ->
Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- let source = csv_exec_sorted(&schema, sort_exprs, true);
+ let source = csv_exec_sorted(&schema, sort_exprs);
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let filter = filter_exec(repartition_hash);
@@ -550,7 +550,7 @@ mod tests {
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized = [
"CoalescePartitionsExec",
@@ -558,7 +558,7 @@ mod tests {
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
@@ -568,7 +568,7 @@ mod tests {
async fn test_with_multiple_replacable_repartitions() -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- let source = csv_exec_sorted(&schema, sort_exprs, true);
+ let source = csv_exec_sorted(&schema, sort_exprs);
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let filter = filter_exec(repartition_hash);
@@ -587,7 +587,7 @@ mod tests {
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
@@ -596,9 +596,9 @@ mod tests {
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
- assert_optimized!(expected_input, expected_optimized, physical_plan);
+ assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
Ok(())
}
@@ -606,7 +606,7 @@ mod tests {
async fn test_not_replace_with_different_orderings() -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- let source = csv_exec_sorted(&schema, sort_exprs, true);
+ let source = csv_exec_sorted(&schema, sort_exprs);
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let sort = sort_exec(
@@ -625,14 +625,14 @@ mod tests {
" SortExec: expr=[c@1 ASC]",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized = [
"SortPreservingMergeExec: [c@1 ASC]",
" SortExec: expr=[c@1 ASC]",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
@@ -642,7 +642,7 @@ mod tests {
async fn test_with_lost_ordering() -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- let source = csv_exec_sorted(&schema, sort_exprs, true);
+ let source = csv_exec_sorted(&schema, sort_exprs);
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
@@ -654,15 +654,15 @@ mod tests {
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
- assert_optimized!(expected_input, expected_optimized, physical_plan);
+ assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
Ok(())
}
@@ -670,7 +670,7 @@ mod tests {
async fn test_with_lost_and_kept_ordering() -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- let source = csv_exec_sorted(&schema, sort_exprs, true);
+ let source = csv_exec_sorted(&schema, sort_exprs);
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
@@ -700,7 +700,7 @@ mod tests {
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized = [
@@ -712,9 +712,9 @@ mod tests {
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
- assert_optimized!(expected_input, expected_optimized, physical_plan);
+ assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
Ok(())
}
@@ -723,14 +723,14 @@ mod tests {
let schema = create_test_schema()?;
let left_sort_exprs = vec![sort_expr("a", &schema)];
- let left_source = csv_exec_sorted(&schema, left_sort_exprs, true);
+ let left_source = csv_exec_sorted(&schema, left_sort_exprs);
let left_repartition_rr = repartition_exec_round_robin(left_source);
let left_repartition_hash = repartition_exec_hash(left_repartition_rr);
let left_coalesce_partitions =
Arc::new(CoalesceBatchesExec::new(left_repartition_hash, 4096));
let right_sort_exprs = vec![sort_expr("a", &schema)];
- let right_source = csv_exec_sorted(&schema, right_sort_exprs, true);
+ let right_source = csv_exec_sorted(&schema, right_sort_exprs);
let right_repartition_rr = repartition_exec_round_robin(right_source);
let right_repartition_hash =
repartition_exec_hash(right_repartition_rr);
let right_coalesce_partitions =
@@ -756,11 +756,11 @@ mod tests {
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized = [
@@ -770,11 +770,11 @@ mod tests {
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true",
+ " CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
@@ -784,7 +784,7 @@ mod tests {
async fn test_with_bounded_input() -> Result<()> {
let schema = create_test_schema()?;
let sort_exprs = vec![sort_expr("a", &schema)];
- let source = csv_exec_sorted(&schema, sort_exprs, false);
+ let source = csv_exec_sorted(&schema, sort_exprs);
let repartition =
repartition_exec_hash(repartition_exec_round_robin(source));
let sort = sort_exec(vec![sort_expr("a", &schema)], repartition, true);
@@ -931,7 +931,6 @@ mod tests {
fn csv_exec_sorted(
schema: &SchemaRef,
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
- infinite_source: bool,
) -> Arc<dyn ExecutionPlan> {
let sort_exprs = sort_exprs.into_iter().collect();
let projection: Vec<usize> = vec![0, 2, 3];
@@ -949,7 +948,6 @@ mod tests {
limit: None,
table_partition_cols: vec![],
output_ordering: vec![sort_exprs],
- infinite_source,
},
true,
0,
diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs
b/datafusion/core/src/physical_optimizer/test_utils.rs
index 678dc1f373..6e14cca21f 100644
--- a/datafusion/core/src/physical_optimizer/test_utils.rs
+++ b/datafusion/core/src/physical_optimizer/test_utils.rs
@@ -45,6 +45,7 @@ use datafusion_expr::{AggregateFunction, WindowFrame,
WindowFunction};
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
+use crate::datasource::stream::{StreamConfig, StreamTable};
use async_trait::async_trait;
async fn register_current_csv(
@@ -54,14 +55,19 @@ async fn register_current_csv(
) -> Result<()> {
let testdata = crate::test_util::arrow_test_data();
let schema = crate::test_util::aggr_test_schema();
- ctx.register_csv(
- table_name,
- &format!("{testdata}/csv/aggregate_test_100.csv"),
- CsvReadOptions::new()
- .schema(&schema)
- .mark_infinite(infinite),
- )
- .await?;
+ let path = format!("{testdata}/csv/aggregate_test_100.csv");
+
+ match infinite {
+ true => {
+ let config = StreamConfig::new_file(schema, path.into());
+ ctx.register_table(table_name,
Arc::new(StreamTable::new(Arc::new(config))))?;
+ }
+ false => {
+ ctx.register_csv(table_name, &path,
CsvReadOptions::new().schema(&schema))
+ .await?;
+ }
+ }
+
Ok(())
}
@@ -272,7 +278,6 @@ pub fn parquet_exec(schema: &SchemaRef) -> Arc<ParquetExec>
{
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
- infinite_source: false,
},
None,
None,
@@ -296,7 +301,6 @@ pub fn parquet_exec_sorted(
limit: None,
table_partition_cols: vec![],
output_ordering: vec![sort_exprs],
- infinite_source: false,
},
None,
None,
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index aad5c19044..8770c0c423 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -203,7 +203,6 @@ pub fn partitioned_csv_config(
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
- infinite_source: false,
})
}
@@ -277,7 +276,6 @@ fn make_decimal() -> RecordBatch {
pub fn csv_exec_sorted(
schema: &SchemaRef,
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
- infinite_source: bool,
) -> Arc<dyn ExecutionPlan> {
let sort_exprs = sort_exprs.into_iter().collect();
@@ -291,7 +289,6 @@ pub fn csv_exec_sorted(
limit: None,
table_partition_cols: vec![],
output_ordering: vec![sort_exprs],
- infinite_source,
},
false,
0,
diff --git a/datafusion/core/src/test_util/mod.rs
b/datafusion/core/src/test_util/mod.rs
index c6b43de0c1..282b0f7079 100644
--- a/datafusion/core/src/test_util/mod.rs
+++ b/datafusion/core/src/test_util/mod.rs
@@ -36,7 +36,6 @@ use crate::datasource::provider::TableProviderFactory;
use crate::datasource::{empty::EmptyTable, provider_as_source, TableProvider};
use crate::error::Result;
use crate::execution::context::{SessionState, TaskContext};
-use crate::execution::options::ReadOptions;
use crate::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE};
use crate::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
RecordBatchStream,
@@ -58,6 +57,7 @@ use futures::Stream;
pub use datafusion_common::test_util::parquet_test_data;
pub use datafusion_common::test_util::{arrow_test_data, get_data_dir};
+use crate::datasource::stream::{StreamConfig, StreamTable};
pub use datafusion_common::{assert_batches_eq, assert_batches_sorted_eq};
/// Scan an empty data source, mainly used in tests
@@ -342,30 +342,17 @@ impl RecordBatchStream for UnboundedStream {
}
/// This function creates an unbounded sorted file for testing purposes.
-pub async fn register_unbounded_file_with_ordering(
+pub fn register_unbounded_file_with_ordering(
ctx: &SessionContext,
schema: SchemaRef,
file_path: &Path,
table_name: &str,
file_sort_order: Vec<Vec<Expr>>,
- with_unbounded_execution: bool,
) -> Result<()> {
- // Mark infinite and provide schema:
- let fifo_options = CsvReadOptions::new()
- .schema(schema.as_ref())
- .mark_infinite(with_unbounded_execution);
- // Get listing options:
- let options_sort = fifo_options
- .to_listing_options(&ctx.copied_config())
- .with_file_sort_order(file_sort_order);
+ let config =
+ StreamConfig::new_file(schema,
file_path.into()).with_order(file_sort_order);
+
// Register table:
- ctx.register_listing_table(
- table_name,
- file_path.as_os_str().to_str().unwrap(),
- options_sort,
- Some(schema),
- None,
- )
- .await?;
+ ctx.register_table(table_name,
Arc::new(StreamTable::new(Arc::new(config))))?;
Ok(())
}
diff --git a/datafusion/core/src/test_util/parquet.rs
b/datafusion/core/src/test_util/parquet.rs
index f3c0d2987a..336a680463 100644
--- a/datafusion/core/src/test_util/parquet.rs
+++ b/datafusion/core/src/test_util/parquet.rs
@@ -156,7 +156,6 @@ impl TestParquetFile {
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
- infinite_source: false,
};
let df_schema = self.schema.clone().to_dfschema_ref()?;
diff --git a/datafusion/core/tests/parquet/custom_reader.rs
b/datafusion/core/tests/parquet/custom_reader.rs
index 3752d42dbf..e76b201e02 100644
--- a/datafusion/core/tests/parquet/custom_reader.rs
+++ b/datafusion/core/tests/parquet/custom_reader.rs
@@ -85,7 +85,6 @@ async fn
route_data_access_ops_to_parquet_file_reader_factory() {
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
- infinite_source: false,
},
None,
None,
diff --git a/datafusion/core/tests/parquet/page_pruning.rs
b/datafusion/core/tests/parquet/page_pruning.rs
index e1e8b8e66e..23a56bc821 100644
--- a/datafusion/core/tests/parquet/page_pruning.rs
+++ b/datafusion/core/tests/parquet/page_pruning.rs
@@ -81,7 +81,6 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr)
-> ParquetExec {
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
- infinite_source: false,
},
Some(predicate),
None,
diff --git a/datafusion/core/tests/parquet/schema_coercion.rs
b/datafusion/core/tests/parquet/schema_coercion.rs
index 25c62f18f5..00f3eada49 100644
--- a/datafusion/core/tests/parquet/schema_coercion.rs
+++ b/datafusion/core/tests/parquet/schema_coercion.rs
@@ -69,7 +69,6 @@ async fn multi_parquet_coercion() {
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
- infinite_source: false,
},
None,
None,
@@ -133,7 +132,6 @@ async fn multi_parquet_coercion_projection() {
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
- infinite_source: false,
},
None,
None,
diff --git a/datafusion/core/tests/sql/joins.rs
b/datafusion/core/tests/sql/joins.rs
index 528bde6323..d1f270b540 100644
--- a/datafusion/core/tests/sql/joins.rs
+++ b/datafusion/core/tests/sql/joins.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use datafusion::datasource::stream::{StreamConfig, StreamTable};
use datafusion::test_util::register_unbounded_file_with_ordering;
use super::*;
@@ -105,9 +106,7 @@ async fn join_change_in_planner() -> Result<()> {
&left_file_path,
"left",
file_sort_order.clone(),
- true,
- )
- .await?;
+ )?;
let right_file_path = tmp_dir.path().join("right.csv");
File::create(right_file_path.clone()).unwrap();
register_unbounded_file_with_ordering(
@@ -116,9 +115,7 @@ async fn join_change_in_planner() -> Result<()> {
&right_file_path,
"right",
file_sort_order,
- true,
- )
- .await?;
+ )?;
let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN
right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10";
let dataframe = ctx.sql(sql).await?;
let physical_plan = dataframe.create_physical_plan().await?;
@@ -160,20 +157,13 @@ async fn join_change_in_planner_without_sort() ->
Result<()> {
Field::new("a1", DataType::UInt32, false),
Field::new("a2", DataType::UInt32, false),
]));
- ctx.register_csv(
- "left",
- left_file_path.as_os_str().to_str().unwrap(),
- CsvReadOptions::new().schema(&schema).mark_infinite(true),
- )
- .await?;
+ let left = StreamConfig::new_file(schema.clone(), left_file_path);
+ ctx.register_table("left", Arc::new(StreamTable::new(Arc::new(left))))?;
+
let right_file_path = tmp_dir.path().join("right.csv");
File::create(right_file_path.clone())?;
- ctx.register_csv(
- "right",
- right_file_path.as_os_str().to_str().unwrap(),
- CsvReadOptions::new().schema(&schema).mark_infinite(true),
- )
- .await?;
+ let right = StreamConfig::new_file(schema, right_file_path);
+ ctx.register_table("right", Arc::new(StreamTable::new(Arc::new(right))))?;
let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN
right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10";
let dataframe = ctx.sql(sql).await?;
let physical_plan = dataframe.create_physical_plan().await?;
@@ -217,20 +207,12 @@ async fn
join_change_in_planner_without_sort_not_allowed() -> Result<()> {
Field::new("a1", DataType::UInt32, false),
Field::new("a2", DataType::UInt32, false),
]));
- ctx.register_csv(
- "left",
- left_file_path.as_os_str().to_str().unwrap(),
- CsvReadOptions::new().schema(&schema).mark_infinite(true),
- )
- .await?;
+ let left = StreamConfig::new_file(schema.clone(), left_file_path);
+ ctx.register_table("left", Arc::new(StreamTable::new(Arc::new(left))))?;
let right_file_path = tmp_dir.path().join("right.csv");
File::create(right_file_path.clone())?;
- ctx.register_csv(
- "right",
- right_file_path.as_os_str().to_str().unwrap(),
- CsvReadOptions::new().schema(&schema).mark_infinite(true),
- )
- .await?;
+ let right = StreamConfig::new_file(schema.clone(), right_file_path);
+ ctx.register_table("right", Arc::new(StreamTable::new(Arc::new(right))))?;
let df = ctx.sql("SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL
JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 +
10").await?;
match df.create_physical_plan().await {
Ok(_) => panic!("Expecting error."),
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs
b/datafusion/proto/src/physical_plan/from_proto.rs
index dcebfbf2da..5c0ef615ca 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -526,7 +526,6 @@ pub fn parse_protobuf_file_scan_config(
limit: proto.limit.as_ref().map(|sl| sl.limit as usize),
table_partition_cols,
output_ordering,
- infinite_source: false,
})
}
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 4a512413e7..9a9827f2a0 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -492,7 +492,6 @@ fn roundtrip_parquet_exec_with_pruning_predicate() ->
Result<()> {
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
- infinite_source: false,
};
let predicate = Arc::new(BinaryExpr::new(
diff --git a/datafusion/substrait/src/physical_plan/consumer.rs
b/datafusion/substrait/src/physical_plan/consumer.rs
index 942798173e..3098dc386e 100644
--- a/datafusion/substrait/src/physical_plan/consumer.rs
+++ b/datafusion/substrait/src/physical_plan/consumer.rs
@@ -112,7 +112,6 @@ pub async fn from_substrait_rel(
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
- infinite_source: false,
};
if let Some(MaskExpression { select, .. }) =
&read.projection {
diff --git a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
index b64dd2c138..e5af3f94cc 100644
--- a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
@@ -49,7 +49,6 @@ async fn parquet_exec() -> Result<()> {
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
- infinite_source: false,
};
let parquet_exec: Arc<dyn ExecutionPlan> =
Arc::new(ParquetExec::new(scan_config, None, None));