This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 7ef4de5842 Minor: allow creating infinite external tables (#6235)
7ef4de5842 is described below
commit 7ef4de5842efdc3cffaa8b2bf9923ce8c579145f
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri May 5 08:58:29 2023 -0400
Minor: allow creating infinite external tables (#6235)
---
.../core/src/datasource/listing_table_factory.rs | 12 +++++
.../core/src/physical_plan/file_format/mod.rs | 4 ++
.../core/tests/sqllogictests/test_files/ddl.slt | 60 +++++++++++++++++++++-
3 files changed, 75 insertions(+), 1 deletion(-)
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs
b/datafusion/core/src/datasource/listing_table_factory.rs
index 34afe75a6e..01d8ea6eac 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -132,11 +132,23 @@ impl TableProviderFactory for ListingTableFactory {
Some(cmd.order_exprs.clone())
};
+ // look for 'infinite' as an option
+ let infinite_source = match cmd.options.get("infinite_source").map(|s|
s.as_str())
+ {
+ None => false,
+ Some("true") => true,
+ Some("false") => false,
+ Some(value) => {
+ return Err(DataFusionError::Plan(format!("Unknown value for
infinite_source: {value}. Expected 'true' or 'false'")));
+ }
+ };
+
let options = ListingOptions::new(file_format)
.with_collect_stat(state.config().collect_statistics())
.with_file_extension(file_extension)
.with_target_partitions(state.config().target_partitions())
.with_table_partition_cols(table_partition_cols)
+ .with_infinite_source(infinite_source)
.with_file_sort_order(file_sort_order);
let table_path = ListingTableUrl::parse(&cmd.location)?;
diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs
b/datafusion/core/src/physical_plan/file_format/mod.rs
index 6580caad2d..95533a930c 100644
--- a/datafusion/core/src/physical_plan/file_format/mod.rs
+++ b/datafusion/core/src/physical_plan/file_format/mod.rs
@@ -248,6 +248,10 @@ impl Display for FileScanConfig {
write!(f, ", limit={limit}")?;
}
+ if self.infinite_source {
+ write!(f, ", infinite_source=true")?;
+ }
+
if let Some(orders) = ordering {
if !orders.is_empty() {
write!(f, ", output_ordering={}",
OutputOrderingDisplay(&orders))?;
diff --git a/datafusion/core/tests/sqllogictests/test_files/ddl.slt
b/datafusion/core/tests/sqllogictests/test_files/ddl.slt
index f08c5e61ca..afb009f89f 100644
--- a/datafusion/core/tests/sqllogictests/test_files/ddl.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/ddl.slt
@@ -659,7 +659,7 @@ NULL
statement ok
drop table foo;
-# craete csv table with empty csv file
+# create csv table with empty csv file
statement ok
CREATE EXTERNAL TABLE empty STORED AS CSV WITH HEADER ROW LOCATION
'tests/data/empty.csv';
@@ -704,3 +704,61 @@ CREATE SCHEMA empty_schema;
statement ok
DROP SCHEMA empty_schema;
+
+##########
+# creating external CSV tables with an infinite marking
+##########
+
+# external table with infinite source
+statement ok
+CREATE external table t(c1 integer, c2 integer, c3 integer)
+STORED as CSV
+WITH HEADER ROW
+OPTIONS('infinite_source' 'true')
+LOCATION 'tests/data/empty.csv';
+
+# should see infinite_source=true in the explain
+query TT
+explain select c1 from t;
+----
+logical_plan TableScan: t projection=[c1]
+physical_plan CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/empty.csv]]}, projection=[c1],
infinite_source=true, has_header=true
+
+statement ok
+drop table t;
+
+
+# external table without explicit non infinite source
+statement ok
+CREATE external table t(c1 integer, c2 integer, c3 integer)
+STORED as CSV
+WITH HEADER ROW
+OPTIONS('infinite_source' 'false')
+LOCATION 'tests/data/empty.csv';
+
+# expect to see no infinite_source in the explain
+query TT
+explain select c1 from t;
+----
+logical_plan TableScan: t projection=[c1]
+physical_plan CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/empty.csv]]}, projection=[c1],
has_header=true
+
+statement ok
+drop table t;
+
+
+# error conditions
+statement error DataFusion error: Error during planning: Unknown value for
infinite_source: FALSE\. Expected 'true' or 'false'
+CREATE external table t(c1 integer, c2 integer, c3 integer)
+STORED as CSV
+WITH HEADER ROW
+OPTIONS('infinite_source' 'FALSE')
+LOCATION 'tests/data/empty.csv';
+
+# error conditions
+statement error DataFusion error: Error during planning: Unknown value for
infinite_source: g\. Expected 'true' or 'false'
+CREATE external table t(c1 integer, c2 integer, c3 integer)
+STORED as CSV
+WITH HEADER ROW
+OPTIONS('infinite_source' 'g')
+LOCATION 'tests/data/empty.csv';