This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 20c7af3bfe Support CREATE TABLE via SQL for infinite streams (#6352)
20c7af3bfe is described below

commit 20c7af3bfecfce1e1578ed5dadf46ba6c715e8b3
Author: Armin Primadi <[email protected]>
AuthorDate: Wed May 17 13:38:54 2023 +0700

    Support CREATE TABLE via SQL for infinite streams (#6352)
    
    * Refactor create external table to use one_of_keywords
    
    * Add more tests
    
    * Implement CREATE UNBOUNDED EXTERNAL TABLE ...
    
    * Fix cargo fmt
    
    * Fix error
    
    * Fix test
    
    * Fix test
    
    * Add unbounded on hash
---
 datafusion/core/src/catalog/listing_schema.rs      |   1 +
 .../core/src/datasource/listing_table_factory.rs   |  11 +-
 .../test_files/create_external_table.slt           |   4 +
 .../core/tests/sqllogictests/test_files/ddl.slt    |  21 +--
 .../tests/sqllogictests/test_files/groupby.slt     |   3 +-
 .../core/tests/sqllogictests/test_files/window.slt |   6 +-
 datafusion/expr/src/logical_plan/ddl.rs            |   3 +
 datafusion/proto/proto/datafusion.proto            |   1 +
 datafusion/proto/src/generated/pbjson.rs           |  17 +++
 datafusion/proto/src/generated/prost.rs            |   2 +
 datafusion/proto/src/logical_plan/mod.rs           |   3 +
 datafusion/sql/src/parser.rs                       | 162 ++++++++++++++++-----
 datafusion/sql/src/statement.rs                    |   2 +
 13 files changed, 167 insertions(+), 69 deletions(-)

diff --git a/datafusion/core/src/catalog/listing_schema.rs 
b/datafusion/core/src/catalog/listing_schema.rs
index b39c3693c7..50f733c323 100644
--- a/datafusion/core/src/catalog/listing_schema.rs
+++ b/datafusion/core/src/catalog/listing_schema.rs
@@ -145,6 +145,7 @@ impl ListingSchemaProvider {
                             definition: None,
                             file_compression_type: 
CompressionTypeVariant::UNCOMPRESSED,
                             order_exprs: vec![],
+                            unbounded: false,
                             options: Default::default(),
                         },
                     )
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs 
b/datafusion/core/src/datasource/listing_table_factory.rs
index 1e2d4051d5..dfc3d11732 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -135,15 +135,7 @@ impl TableProviderFactory for ListingTableFactory {
         };
 
         // look for 'infinite' as an option
-        let infinite_source = match cmd.options.get("infinite_source").map(|s| 
s.as_str())
-        {
-            None => false,
-            Some("true") => true,
-            Some("false") => false,
-            Some(value) => {
-                return Err(DataFusionError::Plan(format!("Unknown value for 
infinite_source: {value}. Expected 'true' or 'false'")));
-            }
-        };
+        let infinite_source = cmd.unbounded;
 
         let options = ListingOptions::new(file_format)
             .with_collect_stat(state.config().collect_statistics())
@@ -210,6 +202,7 @@ mod tests {
             file_compression_type: CompressionTypeVariant::UNCOMPRESSED,
             definition: None,
             order_exprs: vec![],
+            unbounded: false,
             options: HashMap::new(),
         };
         let table_provider = factory.create(&state, &cmd).await.unwrap();
diff --git 
a/datafusion/core/tests/sqllogictests/test_files/create_external_table.slt 
b/datafusion/core/tests/sqllogictests/test_files/create_external_table.slt
index a0784ff96d..ce6fdfeee0 100644
--- a/datafusion/core/tests/sqllogictests/test_files/create_external_table.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/create_external_table.slt
@@ -99,3 +99,7 @@ CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV WITH HEAD ROW 
LOCATION 'foo.csv';
 # Missing `anything` in WITH clause
 statement error DataFusion error: SQL error: ParserError\("Expected HEADER, 
found: LOCATION"\)
 CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV WITH LOCATION 'foo.csv';
+
+# Unrecognized random clause
+statement error DataFusion error: SQL error: ParserError\("Unexpected token 
FOOBAR"\)
+CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV FOOBAR BARBAR BARFOO LOCATION 
'foo.csv';
diff --git a/datafusion/core/tests/sqllogictests/test_files/ddl.slt 
b/datafusion/core/tests/sqllogictests/test_files/ddl.slt
index 006641cac5..d75d35e6f0 100644
--- a/datafusion/core/tests/sqllogictests/test_files/ddl.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/ddl.slt
@@ -723,10 +723,9 @@ DROP SCHEMA empty_schema;
 
 # external table with infinite source
 statement ok
-CREATE external table t(c1 integer, c2 integer, c3 integer)
+CREATE UNBOUNDED external table t(c1 integer, c2 integer, c3 integer)
 STORED as CSV
 WITH HEADER ROW
-OPTIONS('infinite_source' 'true')
 LOCATION 'tests/data/empty.csv';
 
 # should see infinite_source=true in the explain
@@ -745,7 +744,6 @@ statement ok
 CREATE external table t(c1 integer, c2 integer, c3 integer)
 STORED as CSV
 WITH HEADER ROW
-OPTIONS('infinite_source' 'false')
 LOCATION 'tests/data/empty.csv';
 
 # expect to see no infinite_source in the explain
@@ -757,20 +755,3 @@ physical_plan CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/te
 
 statement ok
 drop table t;
-
-
-# error conditions
-statement error DataFusion error: Error during planning: Unknown value for 
infinite_source: FALSE\. Expected 'true' or 'false'
-CREATE external table t(c1 integer, c2 integer, c3 integer)
-STORED as CSV
-WITH HEADER ROW
-OPTIONS('infinite_source' 'FALSE')
-LOCATION 'tests/data/empty.csv';
-
-# error conditions
-statement error DataFusion error: Error during planning: Unknown value for 
infinite_source: g\. Expected 'true' or 'false'
-CREATE external table t(c1 integer, c2 integer, c3 integer)
-STORED as CSV
-WITH HEADER ROW
-OPTIONS('infinite_source' 'g')
-LOCATION 'tests/data/empty.csv';
diff --git a/datafusion/core/tests/sqllogictests/test_files/groupby.slt 
b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
index e5a93e709f..106837348a 100644
--- a/datafusion/core/tests/sqllogictests/test_files/groupby.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
@@ -1928,7 +1928,7 @@ SELECT DISTINCT + col1 FROM tab2 AS cor0 GROUP BY 
cor0.col1
 # a,b,c column. Column a has cardinality 2, column b has cardinality 4.
 # Column c has cardinality 100 (unique entries). Column d has cardinality 5.
 statement ok
-CREATE EXTERNAL TABLE annotated_data_infinite2 (
+CREATE UNBOUNDED EXTERNAL TABLE annotated_data_infinite2 (
   a0 INTEGER,
   a INTEGER,
   b INTEGER,
@@ -1938,7 +1938,6 @@ CREATE EXTERNAL TABLE annotated_data_infinite2 (
 STORED AS CSV
 WITH HEADER ROW
 WITH ORDER (a ASC, b ASC, c ASC)
-OPTIONS('infinite_source' 'true')
 LOCATION 'tests/data/window_2.csv';
 
 # test_window_agg_sort
diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt 
b/datafusion/core/tests/sqllogictests/test_files/window.slt
index 80423a5fe2..b15bc90fb1 100644
--- a/datafusion/core/tests/sqllogictests/test_files/window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/window.slt
@@ -2451,7 +2451,7 @@ LOCATION 'tests/data/window_1.csv'
 # Source is CsvExec which is ordered by ts column.
 # Infinite source
 statement ok
-CREATE EXTERNAL TABLE annotated_data_infinite (
+CREATE UNBOUNDED EXTERNAL TABLE annotated_data_infinite (
   ts INTEGER,
   inc_col INTEGER,
   desc_col INTEGER,
@@ -2459,7 +2459,6 @@ CREATE EXTERNAL TABLE annotated_data_infinite (
 STORED AS CSV
 WITH HEADER ROW
 WITH ORDER (ts ASC)
-OPTIONS('infinite_source' 'true')
 LOCATION 'tests/data/window_1.csv';
 
 # test_source_sorted_aggregate
@@ -2866,7 +2865,7 @@ LOCATION 'tests/data/window_2.csv';
 # a,b,c column. Column a has cardinality 2, column b has cardinality 4.
 # Column c has cardinality 100 (unique entries). Column d has cardinality 5.
 statement ok
-CREATE EXTERNAL TABLE annotated_data_infinite2 (
+CREATE UNBOUNDED EXTERNAL TABLE annotated_data_infinite2 (
   a0 INTEGER,
   a INTEGER,
   b INTEGER,
@@ -2876,7 +2875,6 @@ CREATE EXTERNAL TABLE annotated_data_infinite2 (
 STORED AS CSV
 WITH HEADER ROW
 WITH ORDER (a ASC, b ASC, c ASC)
-OPTIONS('infinite_source' 'true')
 LOCATION 'tests/data/window_2.csv';
 
 
diff --git a/datafusion/expr/src/logical_plan/ddl.rs 
b/datafusion/expr/src/logical_plan/ddl.rs
index eeaa1c5ddb..9ddafea3b6 100644
--- a/datafusion/expr/src/logical_plan/ddl.rs
+++ b/datafusion/expr/src/logical_plan/ddl.rs
@@ -192,6 +192,8 @@ pub struct CreateExternalTable {
     pub order_exprs: Vec<Expr>,
     /// File compression type (GZIP, BZIP2, XZ, ZSTD)
     pub file_compression_type: CompressionTypeVariant,
+    /// Whether the table is an infinite streams
+    pub unbounded: bool,
     /// Table(provider) specific options
     pub options: HashMap<String, String>,
 }
@@ -211,6 +213,7 @@ impl Hash for CreateExternalTable {
         self.definition.hash(state);
         self.file_compression_type.hash(state);
         self.order_exprs.hash(state);
+        self.unbounded.hash(state);
         self.options.len().hash(state); // HashMap is not hashable
     }
 }
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index ef1e2f284e..446b4a0275 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -184,6 +184,7 @@ message CreateExternalTableNode {
   string definition = 9;
   string file_compression_type = 10;
   repeated LogicalExprNode order_exprs = 13;
+  bool unbounded = 14;
   map<string, string> options = 11;
 }
 
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index c217c4d7fe..571fd45f8a 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -3547,6 +3547,9 @@ impl serde::Serialize for CreateExternalTableNode {
         if !self.order_exprs.is_empty() {
             len += 1;
         }
+        if self.unbounded {
+            len += 1;
+        }
         if !self.options.is_empty() {
             len += 1;
         }
@@ -3584,6 +3587,9 @@ impl serde::Serialize for CreateExternalTableNode {
         if !self.order_exprs.is_empty() {
             struct_ser.serialize_field("orderExprs", &self.order_exprs)?;
         }
+        if self.unbounded {
+            struct_ser.serialize_field("unbounded", &self.unbounded)?;
+        }
         if !self.options.is_empty() {
             struct_ser.serialize_field("options", &self.options)?;
         }
@@ -3614,6 +3620,7 @@ impl<'de> serde::Deserialize<'de> for 
CreateExternalTableNode {
             "fileCompressionType",
             "order_exprs",
             "orderExprs",
+            "unbounded",
             "options",
         ];
 
@@ -3630,6 +3637,7 @@ impl<'de> serde::Deserialize<'de> for 
CreateExternalTableNode {
             Definition,
             FileCompressionType,
             OrderExprs,
+            Unbounded,
             Options,
         }
         impl<'de> serde::Deserialize<'de> for GeneratedField {
@@ -3663,6 +3671,7 @@ impl<'de> serde::Deserialize<'de> for 
CreateExternalTableNode {
                             "definition" => Ok(GeneratedField::Definition),
                             "fileCompressionType" | "file_compression_type" => 
Ok(GeneratedField::FileCompressionType),
                             "orderExprs" | "order_exprs" => 
Ok(GeneratedField::OrderExprs),
+                            "unbounded" => Ok(GeneratedField::Unbounded),
                             "options" => Ok(GeneratedField::Options),
                             _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
                         }
@@ -3694,6 +3703,7 @@ impl<'de> serde::Deserialize<'de> for 
CreateExternalTableNode {
                 let mut definition__ = None;
                 let mut file_compression_type__ = None;
                 let mut order_exprs__ = None;
+                let mut unbounded__ = None;
                 let mut options__ = None;
                 while let Some(k) = map.next_key()? {
                     match k {
@@ -3763,6 +3773,12 @@ impl<'de> serde::Deserialize<'de> for 
CreateExternalTableNode {
                             }
                             order_exprs__ = Some(map.next_value()?);
                         }
+                        GeneratedField::Unbounded => {
+                            if unbounded__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("unbounded"));
+                            }
+                            unbounded__ = Some(map.next_value()?);
+                        }
                         GeneratedField::Options => {
                             if options__.is_some() {
                                 return 
Err(serde::de::Error::duplicate_field("options"));
@@ -3785,6 +3801,7 @@ impl<'de> serde::Deserialize<'de> for 
CreateExternalTableNode {
                     definition: definition__.unwrap_or_default(),
                     file_compression_type: 
file_compression_type__.unwrap_or_default(),
                     order_exprs: order_exprs__.unwrap_or_default(),
+                    unbounded: unbounded__.unwrap_or_default(),
                     options: options__.unwrap_or_default(),
                 })
             }
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index 83255797fa..a5c0603239 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -293,6 +293,8 @@ pub struct CreateExternalTableNode {
     pub file_compression_type: ::prost::alloc::string::String,
     #[prost(message, repeated, tag = "13")]
     pub order_exprs: ::prost::alloc::vec::Vec<LogicalExprNode>,
+    #[prost(bool, tag = "14")]
+    pub unbounded: bool,
     #[prost(map = "string, string", tag = "11")]
     pub options: ::std::collections::HashMap<
         ::prost::alloc::string::String,
diff --git a/datafusion/proto/src/logical_plan/mod.rs 
b/datafusion/proto/src/logical_plan/mod.rs
index c61f90b2dc..47107affb3 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -527,6 +527,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                     if_not_exists: create_extern_table.if_not_exists,
                     file_compression_type: 
CompressionTypeVariant::from_str(&create_extern_table.file_compression_type).map_err(|_|
 DataFusionError::NotImplemented(format!("Unsupported file compression type 
{}", create_extern_table.file_compression_type)))?,
                     definition,
+                    unbounded: create_extern_table.unbounded,
                     options: create_extern_table.options.clone(),
                 })))
             }
@@ -1184,6 +1185,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                     definition,
                     file_compression_type,
                     order_exprs,
+                    unbounded,
                     options,
                 },
             )) => Ok(protobuf::LogicalPlanNode {
@@ -1203,6 +1205,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                             .collect::<Result<Vec<_>, to_proto::Error>>()?,
                         definition: definition.clone().unwrap_or_default(),
                         file_compression_type: 
file_compression_type.to_string(),
+                        unbounded: *unbounded,
                         options: options.clone(),
                     },
                 )),
diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs
index b1116af3cf..a70868fa2f 100644
--- a/datafusion/sql/src/parser.rs
+++ b/datafusion/sql/src/parser.rs
@@ -89,6 +89,8 @@ pub struct CreateExternalTable {
     pub if_not_exists: bool,
     /// File compression type (GZIP, BZIP2, XZ)
     pub file_compression_type: CompressionTypeVariant,
+    /// Infinite streams?
+    pub unbounded: bool,
     /// Table(provider) specific options
     pub options: HashMap<String, String>,
 }
@@ -245,7 +247,10 @@ impl<'a> DFParser<'a> {
     /// Parse a SQL `CREATE` statement handling `CREATE EXTERNAL TABLE`
     pub fn parse_create(&mut self) -> Result<Statement, ParserError> {
         if self.parser.parse_keyword(Keyword::EXTERNAL) {
-            self.parse_create_external_table()
+            self.parse_create_external_table(false)
+        } else if self.parser.parse_keyword(Keyword::UNBOUNDED) {
+            self.parser.expect_keyword(Keyword::EXTERNAL)?;
+            self.parse_create_external_table(true)
         } else {
             Ok(Statement::Statement(Box::from(self.parser.parse_create()?)))
         }
@@ -396,7 +401,10 @@ impl<'a> DFParser<'a> {
         })
     }
 
-    fn parse_create_external_table(&mut self) -> Result<Statement, 
ParserError> {
+    fn parse_create_external_table(
+        &mut self,
+        unbounded: bool,
+    ) -> Result<Statement, ParserError> {
         self.parser.expect_keyword(Keyword::TABLE)?;
         let if_not_exists =
             self.parser
@@ -427,39 +435,72 @@ impl<'a> DFParser<'a> {
         }
 
         loop {
-            if self.parser.parse_keyword(Keyword::STORED) {
-                self.parser.expect_keyword(Keyword::AS)?;
-                ensure_not_set(&builder.file_type, "STORED AS")?;
-                builder.file_type = Some(self.parse_file_format()?);
-            } else if self.parser.parse_keyword(Keyword::LOCATION) {
-                ensure_not_set(&builder.location, "LOCATION")?;
-                builder.location = Some(self.parser.parse_literal_string()?);
-            } else if self.parser.parse_keyword(Keyword::WITH) {
-                if self.parser.parse_keyword(Keyword::ORDER) {
-                    ensure_not_set(&builder.order_exprs, "WITH ORDER")?;
-                    builder.order_exprs = Some(self.parse_order_by_exprs()?);
-                } else {
-                    self.parser.expect_keyword(Keyword::HEADER)?;
-                    self.parser.expect_keyword(Keyword::ROW)?;
-                    ensure_not_set(&builder.has_header, "WITH HEADER ROW")?;
-                    builder.has_header = Some(true);
+            if let Some(keyword) = self.parser.parse_one_of_keywords(&[
+                Keyword::STORED,
+                Keyword::LOCATION,
+                Keyword::WITH,
+                Keyword::DELIMITER,
+                Keyword::COMPRESSION,
+                Keyword::PARTITIONED,
+                Keyword::OPTIONS,
+            ]) {
+                match keyword {
+                    Keyword::STORED => {
+                        self.parser.expect_keyword(Keyword::AS)?;
+                        ensure_not_set(&builder.file_type, "STORED AS")?;
+                        builder.file_type = Some(self.parse_file_format()?);
+                    }
+                    Keyword::LOCATION => {
+                        ensure_not_set(&builder.location, "LOCATION")?;
+                        builder.location = 
Some(self.parser.parse_literal_string()?);
+                    }
+                    Keyword::WITH => {
+                        if self.parser.parse_keyword(Keyword::ORDER) {
+                            ensure_not_set(&builder.order_exprs, "WITH 
ORDER")?;
+                            builder.order_exprs = 
Some(self.parse_order_by_exprs()?);
+                        } else {
+                            self.parser.expect_keyword(Keyword::HEADER)?;
+                            self.parser.expect_keyword(Keyword::ROW)?;
+                            ensure_not_set(&builder.has_header, "WITH HEADER 
ROW")?;
+                            builder.has_header = Some(true);
+                        }
+                    }
+                    Keyword::DELIMITER => {
+                        ensure_not_set(&builder.delimiter, "DELIMITER")?;
+                        builder.delimiter = Some(self.parse_delimiter()?);
+                    }
+                    Keyword::COMPRESSION => {
+                        self.parser.expect_keyword(Keyword::TYPE)?;
+                        ensure_not_set(
+                            &builder.file_compression_type,
+                            "COMPRESSION TYPE",
+                        )?;
+                        builder.file_compression_type =
+                            Some(self.parse_file_compression_type()?);
+                    }
+                    Keyword::PARTITIONED => {
+                        self.parser.expect_keyword(Keyword::BY)?;
+                        ensure_not_set(&builder.table_partition_cols, 
"PARTITIONED BY")?;
+                        builder.table_partition_cols = 
Some(self.parse_partitions()?);
+                    }
+                    Keyword::OPTIONS => {
+                        ensure_not_set(&builder.options, "OPTIONS")?;
+                        builder.options = Some(self.parse_options()?);
+                    }
+                    _ => {
+                        unreachable!()
+                    }
                 }
-            } else if self.parser.parse_keyword(Keyword::DELIMITER) {
-                ensure_not_set(&builder.delimiter, "DELIMITER")?;
-                builder.delimiter = Some(self.parse_delimiter()?);
-            } else if self.parser.parse_keyword(Keyword::COMPRESSION) {
-                self.parser.expect_keyword(Keyword::TYPE)?;
-                ensure_not_set(&builder.file_compression_type, "COMPRESSION 
TYPE")?;
-                builder.file_compression_type = 
Some(self.parse_file_compression_type()?);
-            } else if self.parser.parse_keyword(Keyword::PARTITIONED) {
-                self.parser.expect_keyword(Keyword::BY)?;
-                ensure_not_set(&builder.table_partition_cols, "PARTITIONED 
BY")?;
-                builder.table_partition_cols = Some(self.parse_partitions()?)
-            } else if self.parser.parse_keyword(Keyword::OPTIONS) {
-                ensure_not_set(&builder.options, "OPTIONS")?;
-                builder.options = Some(self.parse_options()?);
             } else {
-                break;
+                let token = self.parser.next_token();
+                if token == Token::EOF || token == Token::SemiColon {
+                    break;
+                } else {
+                    return Err(ParserError::ParserError(format!(
+                        "Unexpected token {}",
+                        token
+                    )));
+                }
             }
         }
 
@@ -488,6 +529,7 @@ impl<'a> DFParser<'a> {
             file_compression_type: builder
                 .file_compression_type
                 .unwrap_or(CompressionTypeVariant::UNCOMPRESSED),
+            unbounded,
             options: builder.options.unwrap_or(HashMap::new()),
         };
         Ok(Statement::CreateExternalTable(create))
@@ -610,6 +652,44 @@ mod tests {
             order_exprs: vec![],
             if_not_exists: false,
             file_compression_type: UNCOMPRESSED,
+            unbounded: false,
+            options: HashMap::new(),
+        });
+        expect_parse_ok(sql, expected)?;
+
+        // positive case: leading space
+        let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION 
'foo.csv'     ";
+        let expected = Statement::CreateExternalTable(CreateExternalTable {
+            name: "t".into(),
+            columns: vec![make_column_def("c1", DataType::Int(None))],
+            file_type: "CSV".to_string(),
+            has_header: false,
+            delimiter: ',',
+            location: "foo.csv".into(),
+            table_partition_cols: vec![],
+            order_exprs: vec![],
+            if_not_exists: false,
+            file_compression_type: UNCOMPRESSED,
+            unbounded: false,
+            options: HashMap::new(),
+        });
+        expect_parse_ok(sql, expected)?;
+
+        // positive case: leading space + semicolon
+        let sql =
+            "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION 'foo.csv'  
    ;";
+        let expected = Statement::CreateExternalTable(CreateExternalTable {
+            name: "t".into(),
+            columns: vec![make_column_def("c1", DataType::Int(None))],
+            file_type: "CSV".to_string(),
+            has_header: false,
+            delimiter: ',',
+            location: "foo.csv".into(),
+            table_partition_cols: vec![],
+            order_exprs: vec![],
+            if_not_exists: false,
+            file_compression_type: UNCOMPRESSED,
+            unbounded: false,
             options: HashMap::new(),
         });
         expect_parse_ok(sql, expected)?;
@@ -628,6 +708,7 @@ mod tests {
             order_exprs: vec![],
             if_not_exists: false,
             file_compression_type: UNCOMPRESSED,
+            unbounded: false,
             options: HashMap::new(),
         });
         expect_parse_ok(sql, expected)?;
@@ -646,6 +727,7 @@ mod tests {
             order_exprs: vec![],
             if_not_exists: false,
             file_compression_type: UNCOMPRESSED,
+            unbounded: false,
             options: HashMap::new(),
         });
         expect_parse_ok(sql, expected)?;
@@ -667,6 +749,7 @@ mod tests {
                 order_exprs: vec![],
                 if_not_exists: false,
                 file_compression_type: UNCOMPRESSED,
+                unbounded: false,
                 options: HashMap::new(),
             });
             expect_parse_ok(sql, expected)?;
@@ -693,6 +776,7 @@ mod tests {
                 file_compression_type: CompressionTypeVariant::from_str(
                     file_compression_type,
                 )?,
+                unbounded: false,
                 options: HashMap::new(),
             });
             expect_parse_ok(sql, expected)?;
@@ -711,6 +795,7 @@ mod tests {
             order_exprs: vec![],
             if_not_exists: false,
             file_compression_type: UNCOMPRESSED,
+            unbounded: false,
             options: HashMap::new(),
         });
         expect_parse_ok(sql, expected)?;
@@ -728,6 +813,7 @@ mod tests {
             order_exprs: vec![],
             if_not_exists: false,
             file_compression_type: UNCOMPRESSED,
+            unbounded: false,
             options: HashMap::new(),
         });
         expect_parse_ok(sql, expected)?;
@@ -745,6 +831,7 @@ mod tests {
             order_exprs: vec![],
             if_not_exists: false,
             file_compression_type: UNCOMPRESSED,
+            unbounded: false,
             options: HashMap::new(),
         });
         expect_parse_ok(sql, expected)?;
@@ -763,6 +850,7 @@ mod tests {
             order_exprs: vec![],
             if_not_exists: true,
             file_compression_type: UNCOMPRESSED,
+            unbounded: false,
             options: HashMap::new(),
         });
         expect_parse_ok(sql, expected)?;
@@ -786,6 +874,7 @@ mod tests {
             order_exprs: vec![],
             if_not_exists: false,
             file_compression_type: UNCOMPRESSED,
+            unbounded: false,
             options: HashMap::from([("k1".into(), "v1".into())]),
         });
         expect_parse_ok(sql, expected)?;
@@ -804,6 +893,7 @@ mod tests {
             order_exprs: vec![],
             if_not_exists: false,
             file_compression_type: UNCOMPRESSED,
+            unbounded: false,
             options: HashMap::from([
                 ("k1".into(), "v1".into()),
                 ("k2".into(), "v2".into()),
@@ -851,6 +941,7 @@ mod tests {
                 }],
                 if_not_exists: false,
                 file_compression_type: UNCOMPRESSED,
+                unbounded: false,
                 options: HashMap::new(),
             });
             expect_parse_ok(sql, expected)?;
@@ -890,6 +981,7 @@ mod tests {
             ],
             if_not_exists: false,
             file_compression_type: UNCOMPRESSED,
+            unbounded: false,
             options: HashMap::new(),
         });
         expect_parse_ok(sql, expected)?;
@@ -925,13 +1017,14 @@ mod tests {
             }],
             if_not_exists: false,
             file_compression_type: UNCOMPRESSED,
+            unbounded: false,
             options: HashMap::new(),
         });
         expect_parse_ok(sql, expected)?;
 
         // Most complete CREATE EXTERNAL TABLE statement possible
         let sql = "
-            CREATE EXTERNAL TABLE IF NOT EXISTS t (c1 int, c2 float)
+            CREATE UNBOUNDED EXTERNAL TABLE IF NOT EXISTS t (c1 int, c2 float)
             STORED AS PARQUET
             DELIMITER '*'
             WITH HEADER ROW
@@ -969,6 +1062,7 @@ mod tests {
             }],
             if_not_exists: true,
             file_compression_type: CompressionTypeVariant::ZSTD,
+            unbounded: true,
             options: HashMap::from([
                 ("ROW_GROUP_SIZE".into(), "1024".into()),
                 ("TRUNCATE".into(), "NO".into()),
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index 16d3c6c793..64c59eee87 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -588,6 +588,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             if_not_exists,
             file_compression_type,
             order_exprs,
+            unbounded,
             options,
         } = statement;
 
@@ -629,6 +630,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 definition,
                 file_compression_type,
                 order_exprs: ordered_exprs,
+                unbounded,
                 options,
             },
         )))

Reply via email to