This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 20c7af3bfe Support CREATE TABLE via SQL for infinite streams (#6352)
20c7af3bfe is described below
commit 20c7af3bfecfce1e1578ed5dadf46ba6c715e8b3
Author: Armin Primadi <[email protected]>
AuthorDate: Wed May 17 13:38:54 2023 +0700
Support CREATE TABLE via SQL for infinite streams (#6352)
* Refactor create external table to use one_of_keywords
* Add more tests
* Implement CREATE UNBOUNDED EXTERNAL TABLE ...
* Fix cargo fmt
* Fix error
* Fix test
* Fix test
* Add unbounded on hash
---
datafusion/core/src/catalog/listing_schema.rs | 1 +
.../core/src/datasource/listing_table_factory.rs | 11 +-
.../test_files/create_external_table.slt | 4 +
.../core/tests/sqllogictests/test_files/ddl.slt | 21 +--
.../tests/sqllogictests/test_files/groupby.slt | 3 +-
.../core/tests/sqllogictests/test_files/window.slt | 6 +-
datafusion/expr/src/logical_plan/ddl.rs | 3 +
datafusion/proto/proto/datafusion.proto | 1 +
datafusion/proto/src/generated/pbjson.rs | 17 +++
datafusion/proto/src/generated/prost.rs | 2 +
datafusion/proto/src/logical_plan/mod.rs | 3 +
datafusion/sql/src/parser.rs | 162 ++++++++++++++++-----
datafusion/sql/src/statement.rs | 2 +
13 files changed, 167 insertions(+), 69 deletions(-)
diff --git a/datafusion/core/src/catalog/listing_schema.rs
b/datafusion/core/src/catalog/listing_schema.rs
index b39c3693c7..50f733c323 100644
--- a/datafusion/core/src/catalog/listing_schema.rs
+++ b/datafusion/core/src/catalog/listing_schema.rs
@@ -145,6 +145,7 @@ impl ListingSchemaProvider {
definition: None,
file_compression_type:
CompressionTypeVariant::UNCOMPRESSED,
order_exprs: vec![],
+ unbounded: false,
options: Default::default(),
},
)
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs
b/datafusion/core/src/datasource/listing_table_factory.rs
index 1e2d4051d5..dfc3d11732 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -135,15 +135,7 @@ impl TableProviderFactory for ListingTableFactory {
};
// look for 'infinite' as an option
- let infinite_source = match cmd.options.get("infinite_source").map(|s|
s.as_str())
- {
- None => false,
- Some("true") => true,
- Some("false") => false,
- Some(value) => {
- return Err(DataFusionError::Plan(format!("Unknown value for
infinite_source: {value}. Expected 'true' or 'false'")));
- }
- };
+ let infinite_source = cmd.unbounded;
let options = ListingOptions::new(file_format)
.with_collect_stat(state.config().collect_statistics())
@@ -210,6 +202,7 @@ mod tests {
file_compression_type: CompressionTypeVariant::UNCOMPRESSED,
definition: None,
order_exprs: vec![],
+ unbounded: false,
options: HashMap::new(),
};
let table_provider = factory.create(&state, &cmd).await.unwrap();
diff --git
a/datafusion/core/tests/sqllogictests/test_files/create_external_table.slt
b/datafusion/core/tests/sqllogictests/test_files/create_external_table.slt
index a0784ff96d..ce6fdfeee0 100644
--- a/datafusion/core/tests/sqllogictests/test_files/create_external_table.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/create_external_table.slt
@@ -99,3 +99,7 @@ CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV WITH HEAD ROW
LOCATION 'foo.csv';
# Missing `anything` in WITH clause
statement error DataFusion error: SQL error: ParserError\("Expected HEADER,
found: LOCATION"\)
CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV WITH LOCATION 'foo.csv';
+
+# Unrecognized random clause
+statement error DataFusion error: SQL error: ParserError\("Unexpected token
FOOBAR"\)
+CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV FOOBAR BARBAR BARFOO LOCATION
'foo.csv';
diff --git a/datafusion/core/tests/sqllogictests/test_files/ddl.slt
b/datafusion/core/tests/sqllogictests/test_files/ddl.slt
index 006641cac5..d75d35e6f0 100644
--- a/datafusion/core/tests/sqllogictests/test_files/ddl.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/ddl.slt
@@ -723,10 +723,9 @@ DROP SCHEMA empty_schema;
# external table with infinite source
statement ok
-CREATE external table t(c1 integer, c2 integer, c3 integer)
+CREATE UNBOUNDED external table t(c1 integer, c2 integer, c3 integer)
STORED as CSV
WITH HEADER ROW
-OPTIONS('infinite_source' 'true')
LOCATION 'tests/data/empty.csv';
# should see infinite_source=true in the explain
@@ -745,7 +744,6 @@ statement ok
CREATE external table t(c1 integer, c2 integer, c3 integer)
STORED as CSV
WITH HEADER ROW
-OPTIONS('infinite_source' 'false')
LOCATION 'tests/data/empty.csv';
# expect to see no infinite_source in the explain
@@ -757,20 +755,3 @@ physical_plan CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/te
statement ok
drop table t;
-
-
-# error conditions
-statement error DataFusion error: Error during planning: Unknown value for
infinite_source: FALSE\. Expected 'true' or 'false'
-CREATE external table t(c1 integer, c2 integer, c3 integer)
-STORED as CSV
-WITH HEADER ROW
-OPTIONS('infinite_source' 'FALSE')
-LOCATION 'tests/data/empty.csv';
-
-# error conditions
-statement error DataFusion error: Error during planning: Unknown value for
infinite_source: g\. Expected 'true' or 'false'
-CREATE external table t(c1 integer, c2 integer, c3 integer)
-STORED as CSV
-WITH HEADER ROW
-OPTIONS('infinite_source' 'g')
-LOCATION 'tests/data/empty.csv';
diff --git a/datafusion/core/tests/sqllogictests/test_files/groupby.slt
b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
index e5a93e709f..106837348a 100644
--- a/datafusion/core/tests/sqllogictests/test_files/groupby.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
@@ -1928,7 +1928,7 @@ SELECT DISTINCT + col1 FROM tab2 AS cor0 GROUP BY
cor0.col1
# a,b,c column. Column a has cardinality 2, column b has cardinality 4.
# Column c has cardinality 100 (unique entries). Column d has cardinality 5.
statement ok
-CREATE EXTERNAL TABLE annotated_data_infinite2 (
+CREATE UNBOUNDED EXTERNAL TABLE annotated_data_infinite2 (
a0 INTEGER,
a INTEGER,
b INTEGER,
@@ -1938,7 +1938,6 @@ CREATE EXTERNAL TABLE annotated_data_infinite2 (
STORED AS CSV
WITH HEADER ROW
WITH ORDER (a ASC, b ASC, c ASC)
-OPTIONS('infinite_source' 'true')
LOCATION 'tests/data/window_2.csv';
# test_window_agg_sort
diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt
b/datafusion/core/tests/sqllogictests/test_files/window.slt
index 80423a5fe2..b15bc90fb1 100644
--- a/datafusion/core/tests/sqllogictests/test_files/window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/window.slt
@@ -2451,7 +2451,7 @@ LOCATION 'tests/data/window_1.csv'
# Source is CsvExec which is ordered by ts column.
# Infinite source
statement ok
-CREATE EXTERNAL TABLE annotated_data_infinite (
+CREATE UNBOUNDED EXTERNAL TABLE annotated_data_infinite (
ts INTEGER,
inc_col INTEGER,
desc_col INTEGER,
@@ -2459,7 +2459,6 @@ CREATE EXTERNAL TABLE annotated_data_infinite (
STORED AS CSV
WITH HEADER ROW
WITH ORDER (ts ASC)
-OPTIONS('infinite_source' 'true')
LOCATION 'tests/data/window_1.csv';
# test_source_sorted_aggregate
@@ -2866,7 +2865,7 @@ LOCATION 'tests/data/window_2.csv';
# a,b,c column. Column a has cardinality 2, column b has cardinality 4.
# Column c has cardinality 100 (unique entries). Column d has cardinality 5.
statement ok
-CREATE EXTERNAL TABLE annotated_data_infinite2 (
+CREATE UNBOUNDED EXTERNAL TABLE annotated_data_infinite2 (
a0 INTEGER,
a INTEGER,
b INTEGER,
@@ -2876,7 +2875,6 @@ CREATE EXTERNAL TABLE annotated_data_infinite2 (
STORED AS CSV
WITH HEADER ROW
WITH ORDER (a ASC, b ASC, c ASC)
-OPTIONS('infinite_source' 'true')
LOCATION 'tests/data/window_2.csv';
diff --git a/datafusion/expr/src/logical_plan/ddl.rs
b/datafusion/expr/src/logical_plan/ddl.rs
index eeaa1c5ddb..9ddafea3b6 100644
--- a/datafusion/expr/src/logical_plan/ddl.rs
+++ b/datafusion/expr/src/logical_plan/ddl.rs
@@ -192,6 +192,8 @@ pub struct CreateExternalTable {
pub order_exprs: Vec<Expr>,
/// File compression type (GZIP, BZIP2, XZ, ZSTD)
pub file_compression_type: CompressionTypeVariant,
+ /// Whether the table is an infinite streams
+ pub unbounded: bool,
/// Table(provider) specific options
pub options: HashMap<String, String>,
}
@@ -211,6 +213,7 @@ impl Hash for CreateExternalTable {
self.definition.hash(state);
self.file_compression_type.hash(state);
self.order_exprs.hash(state);
+ self.unbounded.hash(state);
self.options.len().hash(state); // HashMap is not hashable
}
}
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index ef1e2f284e..446b4a0275 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -184,6 +184,7 @@ message CreateExternalTableNode {
string definition = 9;
string file_compression_type = 10;
repeated LogicalExprNode order_exprs = 13;
+ bool unbounded = 14;
map<string, string> options = 11;
}
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index c217c4d7fe..571fd45f8a 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -3547,6 +3547,9 @@ impl serde::Serialize for CreateExternalTableNode {
if !self.order_exprs.is_empty() {
len += 1;
}
+ if self.unbounded {
+ len += 1;
+ }
if !self.options.is_empty() {
len += 1;
}
@@ -3584,6 +3587,9 @@ impl serde::Serialize for CreateExternalTableNode {
if !self.order_exprs.is_empty() {
struct_ser.serialize_field("orderExprs", &self.order_exprs)?;
}
+ if self.unbounded {
+ struct_ser.serialize_field("unbounded", &self.unbounded)?;
+ }
if !self.options.is_empty() {
struct_ser.serialize_field("options", &self.options)?;
}
@@ -3614,6 +3620,7 @@ impl<'de> serde::Deserialize<'de> for
CreateExternalTableNode {
"fileCompressionType",
"order_exprs",
"orderExprs",
+ "unbounded",
"options",
];
@@ -3630,6 +3637,7 @@ impl<'de> serde::Deserialize<'de> for
CreateExternalTableNode {
Definition,
FileCompressionType,
OrderExprs,
+ Unbounded,
Options,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
@@ -3663,6 +3671,7 @@ impl<'de> serde::Deserialize<'de> for
CreateExternalTableNode {
"definition" => Ok(GeneratedField::Definition),
"fileCompressionType" | "file_compression_type" =>
Ok(GeneratedField::FileCompressionType),
"orderExprs" | "order_exprs" =>
Ok(GeneratedField::OrderExprs),
+ "unbounded" => Ok(GeneratedField::Unbounded),
"options" => Ok(GeneratedField::Options),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
@@ -3694,6 +3703,7 @@ impl<'de> serde::Deserialize<'de> for
CreateExternalTableNode {
let mut definition__ = None;
let mut file_compression_type__ = None;
let mut order_exprs__ = None;
+ let mut unbounded__ = None;
let mut options__ = None;
while let Some(k) = map.next_key()? {
match k {
@@ -3763,6 +3773,12 @@ impl<'de> serde::Deserialize<'de> for
CreateExternalTableNode {
}
order_exprs__ = Some(map.next_value()?);
}
+ GeneratedField::Unbounded => {
+ if unbounded__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("unbounded"));
+ }
+ unbounded__ = Some(map.next_value()?);
+ }
GeneratedField::Options => {
if options__.is_some() {
return
Err(serde::de::Error::duplicate_field("options"));
@@ -3785,6 +3801,7 @@ impl<'de> serde::Deserialize<'de> for
CreateExternalTableNode {
definition: definition__.unwrap_or_default(),
file_compression_type:
file_compression_type__.unwrap_or_default(),
order_exprs: order_exprs__.unwrap_or_default(),
+ unbounded: unbounded__.unwrap_or_default(),
options: options__.unwrap_or_default(),
})
}
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index 83255797fa..a5c0603239 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -293,6 +293,8 @@ pub struct CreateExternalTableNode {
pub file_compression_type: ::prost::alloc::string::String,
#[prost(message, repeated, tag = "13")]
pub order_exprs: ::prost::alloc::vec::Vec<LogicalExprNode>,
+ #[prost(bool, tag = "14")]
+ pub unbounded: bool,
#[prost(map = "string, string", tag = "11")]
pub options: ::std::collections::HashMap<
::prost::alloc::string::String,
diff --git a/datafusion/proto/src/logical_plan/mod.rs
b/datafusion/proto/src/logical_plan/mod.rs
index c61f90b2dc..47107affb3 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -527,6 +527,7 @@ impl AsLogicalPlan for LogicalPlanNode {
if_not_exists: create_extern_table.if_not_exists,
file_compression_type:
CompressionTypeVariant::from_str(&create_extern_table.file_compression_type).map_err(|_|
DataFusionError::NotImplemented(format!("Unsupported file compression type
{}", create_extern_table.file_compression_type)))?,
definition,
+ unbounded: create_extern_table.unbounded,
options: create_extern_table.options.clone(),
})))
}
@@ -1184,6 +1185,7 @@ impl AsLogicalPlan for LogicalPlanNode {
definition,
file_compression_type,
order_exprs,
+ unbounded,
options,
},
)) => Ok(protobuf::LogicalPlanNode {
@@ -1203,6 +1205,7 @@ impl AsLogicalPlan for LogicalPlanNode {
.collect::<Result<Vec<_>, to_proto::Error>>()?,
definition: definition.clone().unwrap_or_default(),
file_compression_type:
file_compression_type.to_string(),
+ unbounded: *unbounded,
options: options.clone(),
},
)),
diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs
index b1116af3cf..a70868fa2f 100644
--- a/datafusion/sql/src/parser.rs
+++ b/datafusion/sql/src/parser.rs
@@ -89,6 +89,8 @@ pub struct CreateExternalTable {
pub if_not_exists: bool,
/// File compression type (GZIP, BZIP2, XZ)
pub file_compression_type: CompressionTypeVariant,
+ /// Infinite streams?
+ pub unbounded: bool,
/// Table(provider) specific options
pub options: HashMap<String, String>,
}
@@ -245,7 +247,10 @@ impl<'a> DFParser<'a> {
/// Parse a SQL `CREATE` statement handling `CREATE EXTERNAL TABLE`
pub fn parse_create(&mut self) -> Result<Statement, ParserError> {
if self.parser.parse_keyword(Keyword::EXTERNAL) {
- self.parse_create_external_table()
+ self.parse_create_external_table(false)
+ } else if self.parser.parse_keyword(Keyword::UNBOUNDED) {
+ self.parser.expect_keyword(Keyword::EXTERNAL)?;
+ self.parse_create_external_table(true)
} else {
Ok(Statement::Statement(Box::from(self.parser.parse_create()?)))
}
@@ -396,7 +401,10 @@ impl<'a> DFParser<'a> {
})
}
- fn parse_create_external_table(&mut self) -> Result<Statement,
ParserError> {
+ fn parse_create_external_table(
+ &mut self,
+ unbounded: bool,
+ ) -> Result<Statement, ParserError> {
self.parser.expect_keyword(Keyword::TABLE)?;
let if_not_exists =
self.parser
@@ -427,39 +435,72 @@ impl<'a> DFParser<'a> {
}
loop {
- if self.parser.parse_keyword(Keyword::STORED) {
- self.parser.expect_keyword(Keyword::AS)?;
- ensure_not_set(&builder.file_type, "STORED AS")?;
- builder.file_type = Some(self.parse_file_format()?);
- } else if self.parser.parse_keyword(Keyword::LOCATION) {
- ensure_not_set(&builder.location, "LOCATION")?;
- builder.location = Some(self.parser.parse_literal_string()?);
- } else if self.parser.parse_keyword(Keyword::WITH) {
- if self.parser.parse_keyword(Keyword::ORDER) {
- ensure_not_set(&builder.order_exprs, "WITH ORDER")?;
- builder.order_exprs = Some(self.parse_order_by_exprs()?);
- } else {
- self.parser.expect_keyword(Keyword::HEADER)?;
- self.parser.expect_keyword(Keyword::ROW)?;
- ensure_not_set(&builder.has_header, "WITH HEADER ROW")?;
- builder.has_header = Some(true);
+ if let Some(keyword) = self.parser.parse_one_of_keywords(&[
+ Keyword::STORED,
+ Keyword::LOCATION,
+ Keyword::WITH,
+ Keyword::DELIMITER,
+ Keyword::COMPRESSION,
+ Keyword::PARTITIONED,
+ Keyword::OPTIONS,
+ ]) {
+ match keyword {
+ Keyword::STORED => {
+ self.parser.expect_keyword(Keyword::AS)?;
+ ensure_not_set(&builder.file_type, "STORED AS")?;
+ builder.file_type = Some(self.parse_file_format()?);
+ }
+ Keyword::LOCATION => {
+ ensure_not_set(&builder.location, "LOCATION")?;
+ builder.location =
Some(self.parser.parse_literal_string()?);
+ }
+ Keyword::WITH => {
+ if self.parser.parse_keyword(Keyword::ORDER) {
+ ensure_not_set(&builder.order_exprs, "WITH
ORDER")?;
+ builder.order_exprs =
Some(self.parse_order_by_exprs()?);
+ } else {
+ self.parser.expect_keyword(Keyword::HEADER)?;
+ self.parser.expect_keyword(Keyword::ROW)?;
+ ensure_not_set(&builder.has_header, "WITH HEADER
ROW")?;
+ builder.has_header = Some(true);
+ }
+ }
+ Keyword::DELIMITER => {
+ ensure_not_set(&builder.delimiter, "DELIMITER")?;
+ builder.delimiter = Some(self.parse_delimiter()?);
+ }
+ Keyword::COMPRESSION => {
+ self.parser.expect_keyword(Keyword::TYPE)?;
+ ensure_not_set(
+ &builder.file_compression_type,
+ "COMPRESSION TYPE",
+ )?;
+ builder.file_compression_type =
+ Some(self.parse_file_compression_type()?);
+ }
+ Keyword::PARTITIONED => {
+ self.parser.expect_keyword(Keyword::BY)?;
+ ensure_not_set(&builder.table_partition_cols,
"PARTITIONED BY")?;
+ builder.table_partition_cols =
Some(self.parse_partitions()?);
+ }
+ Keyword::OPTIONS => {
+ ensure_not_set(&builder.options, "OPTIONS")?;
+ builder.options = Some(self.parse_options()?);
+ }
+ _ => {
+ unreachable!()
+ }
}
- } else if self.parser.parse_keyword(Keyword::DELIMITER) {
- ensure_not_set(&builder.delimiter, "DELIMITER")?;
- builder.delimiter = Some(self.parse_delimiter()?);
- } else if self.parser.parse_keyword(Keyword::COMPRESSION) {
- self.parser.expect_keyword(Keyword::TYPE)?;
- ensure_not_set(&builder.file_compression_type, "COMPRESSION
TYPE")?;
- builder.file_compression_type =
Some(self.parse_file_compression_type()?);
- } else if self.parser.parse_keyword(Keyword::PARTITIONED) {
- self.parser.expect_keyword(Keyword::BY)?;
- ensure_not_set(&builder.table_partition_cols, "PARTITIONED
BY")?;
- builder.table_partition_cols = Some(self.parse_partitions()?)
- } else if self.parser.parse_keyword(Keyword::OPTIONS) {
- ensure_not_set(&builder.options, "OPTIONS")?;
- builder.options = Some(self.parse_options()?);
} else {
- break;
+ let token = self.parser.next_token();
+ if token == Token::EOF || token == Token::SemiColon {
+ break;
+ } else {
+ return Err(ParserError::ParserError(format!(
+ "Unexpected token {}",
+ token
+ )));
+ }
}
}
@@ -488,6 +529,7 @@ impl<'a> DFParser<'a> {
file_compression_type: builder
.file_compression_type
.unwrap_or(CompressionTypeVariant::UNCOMPRESSED),
+ unbounded,
options: builder.options.unwrap_or(HashMap::new()),
};
Ok(Statement::CreateExternalTable(create))
@@ -610,6 +652,44 @@ mod tests {
order_exprs: vec![],
if_not_exists: false,
file_compression_type: UNCOMPRESSED,
+ unbounded: false,
+ options: HashMap::new(),
+ });
+ expect_parse_ok(sql, expected)?;
+
+ // positive case: leading space
+ let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION
'foo.csv' ";
+ let expected = Statement::CreateExternalTable(CreateExternalTable {
+ name: "t".into(),
+ columns: vec![make_column_def("c1", DataType::Int(None))],
+ file_type: "CSV".to_string(),
+ has_header: false,
+ delimiter: ',',
+ location: "foo.csv".into(),
+ table_partition_cols: vec![],
+ order_exprs: vec![],
+ if_not_exists: false,
+ file_compression_type: UNCOMPRESSED,
+ unbounded: false,
+ options: HashMap::new(),
+ });
+ expect_parse_ok(sql, expected)?;
+
+ // positive case: leading space + semicolon
+ let sql =
+ "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION 'foo.csv'
;";
+ let expected = Statement::CreateExternalTable(CreateExternalTable {
+ name: "t".into(),
+ columns: vec![make_column_def("c1", DataType::Int(None))],
+ file_type: "CSV".to_string(),
+ has_header: false,
+ delimiter: ',',
+ location: "foo.csv".into(),
+ table_partition_cols: vec![],
+ order_exprs: vec![],
+ if_not_exists: false,
+ file_compression_type: UNCOMPRESSED,
+ unbounded: false,
options: HashMap::new(),
});
expect_parse_ok(sql, expected)?;
@@ -628,6 +708,7 @@ mod tests {
order_exprs: vec![],
if_not_exists: false,
file_compression_type: UNCOMPRESSED,
+ unbounded: false,
options: HashMap::new(),
});
expect_parse_ok(sql, expected)?;
@@ -646,6 +727,7 @@ mod tests {
order_exprs: vec![],
if_not_exists: false,
file_compression_type: UNCOMPRESSED,
+ unbounded: false,
options: HashMap::new(),
});
expect_parse_ok(sql, expected)?;
@@ -667,6 +749,7 @@ mod tests {
order_exprs: vec![],
if_not_exists: false,
file_compression_type: UNCOMPRESSED,
+ unbounded: false,
options: HashMap::new(),
});
expect_parse_ok(sql, expected)?;
@@ -693,6 +776,7 @@ mod tests {
file_compression_type: CompressionTypeVariant::from_str(
file_compression_type,
)?,
+ unbounded: false,
options: HashMap::new(),
});
expect_parse_ok(sql, expected)?;
@@ -711,6 +795,7 @@ mod tests {
order_exprs: vec![],
if_not_exists: false,
file_compression_type: UNCOMPRESSED,
+ unbounded: false,
options: HashMap::new(),
});
expect_parse_ok(sql, expected)?;
@@ -728,6 +813,7 @@ mod tests {
order_exprs: vec![],
if_not_exists: false,
file_compression_type: UNCOMPRESSED,
+ unbounded: false,
options: HashMap::new(),
});
expect_parse_ok(sql, expected)?;
@@ -745,6 +831,7 @@ mod tests {
order_exprs: vec![],
if_not_exists: false,
file_compression_type: UNCOMPRESSED,
+ unbounded: false,
options: HashMap::new(),
});
expect_parse_ok(sql, expected)?;
@@ -763,6 +850,7 @@ mod tests {
order_exprs: vec![],
if_not_exists: true,
file_compression_type: UNCOMPRESSED,
+ unbounded: false,
options: HashMap::new(),
});
expect_parse_ok(sql, expected)?;
@@ -786,6 +874,7 @@ mod tests {
order_exprs: vec![],
if_not_exists: false,
file_compression_type: UNCOMPRESSED,
+ unbounded: false,
options: HashMap::from([("k1".into(), "v1".into())]),
});
expect_parse_ok(sql, expected)?;
@@ -804,6 +893,7 @@ mod tests {
order_exprs: vec![],
if_not_exists: false,
file_compression_type: UNCOMPRESSED,
+ unbounded: false,
options: HashMap::from([
("k1".into(), "v1".into()),
("k2".into(), "v2".into()),
@@ -851,6 +941,7 @@ mod tests {
}],
if_not_exists: false,
file_compression_type: UNCOMPRESSED,
+ unbounded: false,
options: HashMap::new(),
});
expect_parse_ok(sql, expected)?;
@@ -890,6 +981,7 @@ mod tests {
],
if_not_exists: false,
file_compression_type: UNCOMPRESSED,
+ unbounded: false,
options: HashMap::new(),
});
expect_parse_ok(sql, expected)?;
@@ -925,13 +1017,14 @@ mod tests {
}],
if_not_exists: false,
file_compression_type: UNCOMPRESSED,
+ unbounded: false,
options: HashMap::new(),
});
expect_parse_ok(sql, expected)?;
// Most complete CREATE EXTERNAL TABLE statement possible
let sql = "
- CREATE EXTERNAL TABLE IF NOT EXISTS t (c1 int, c2 float)
+ CREATE UNBOUNDED EXTERNAL TABLE IF NOT EXISTS t (c1 int, c2 float)
STORED AS PARQUET
DELIMITER '*'
WITH HEADER ROW
@@ -969,6 +1062,7 @@ mod tests {
}],
if_not_exists: true,
file_compression_type: CompressionTypeVariant::ZSTD,
+ unbounded: true,
options: HashMap::from([
("ROW_GROUP_SIZE".into(), "1024".into()),
("TRUNCATE".into(), "NO".into()),
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index 16d3c6c793..64c59eee87 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -588,6 +588,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
if_not_exists,
file_compression_type,
order_exprs,
+ unbounded,
options,
} = statement;
@@ -629,6 +630,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
definition,
file_compression_type,
order_exprs: ordered_exprs,
+ unbounded,
options,
},
)))