This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 3bbf48afd1 Enable creating and inserting to empty external tables via 
SQL (#7276)
3bbf48afd1 is described below

commit 3bbf48afd1a028d099bc5023c63407394c5dec46
Author: Devin D'Angelo <[email protected]>
AuthorDate: Mon Aug 14 13:18:42 2023 -0400

    Enable creating and inserting to empty external tables via SQL (#7276)
    
    * end to end sql test cases + fixes needed to pass
    
    * remove not needed repartition statement
---
 datafusion/core/src/datasource/file_format/csv.rs  |   5 +-
 datafusion/core/src/datasource/listing/table.rs    | 194 +++++++++++++++++++++
 .../core/src/datasource/listing_table_factory.rs   |  17 +-
 datafusion/sql/src/statement.rs                    |   5 -
 datafusion/sql/tests/sql_integration.rs            |  14 +-
 5 files changed, 222 insertions(+), 13 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/csv.rs 
b/datafusion/core/src/datasource/file_format/csv.rs
index 32fbf03b58..c3ab50fd43 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -545,12 +545,11 @@ impl DataSink for CsvSink {
                 // Uniquely identify this batch of files with a random string, 
to prevent collisions overwriting files
                 let write_id = Alphanumeric.sample_string(&mut 
rand::thread_rng(), 16);
                 for part_idx in 0..num_partitions {
-                    let header = true;
+                    let header = self.has_header;
                     let builder = 
WriterBuilder::new().with_delimiter(self.delimiter);
                     let serializer = CsvSerializer::new()
                         .with_builder(builder)
                         .with_header(header);
-                    serializers.push(Box::new(serializer));
                     let file_path = base_path
                         .prefix()
                         .child(format!("/{}_{}.csv", write_id, part_idx));
@@ -567,6 +566,8 @@ impl DataSink for CsvSink {
                         object_store.clone(),
                     )
                     .await?;
+
+                    serializers.push(Box::new(serializer));
                     writers.push(writer);
                 }
             }
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 4f2387ad0d..d4e2c4aafe 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -217,6 +217,22 @@ pub enum ListingTableInsertMode {
     ///Throw an error if insert into is attempted on this table
     Error,
 }
+
+impl FromStr for ListingTableInsertMode {
+    type Err = DataFusionError;
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        let s_lower = s.to_lowercase();
+        match s_lower.as_str() {
+            "append_to_file" => Ok(ListingTableInsertMode::AppendToFile),
+            "append_new_files" => Ok(ListingTableInsertMode::AppendNewFiles),
+            "error" => Ok(ListingTableInsertMode::Error),
+            _ => Err(DataFusionError::Plan(format!(
+                "Unknown or unsupported insert mode {s}. Supported options are 
\
+                append_to_file, append_new_files, and error."
+            ))),
+        }
+    }
+}
 /// Options for creating a [`ListingTable`]
 #[derive(Clone, Debug)]
 pub struct ListingOptions {
@@ -1607,6 +1623,124 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_insert_into_sql_csv_defaults() -> Result<()> {
+        helper_test_insert_into_sql(
+            "csv",
+            FileCompressionType::UNCOMPRESSED,
+            "OPTIONS (insert_mode 'append_new_files')",
+            None,
+        )
+        .await?;
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_insert_into_sql_csv_defaults_header_row() -> Result<()> {
+        helper_test_insert_into_sql(
+            "csv",
+            FileCompressionType::UNCOMPRESSED,
+            "WITH HEADER ROW \
+            OPTIONS (insert_mode 'append_new_files')",
+            None,
+        )
+        .await?;
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_insert_into_sql_json_defaults() -> Result<()> {
+        helper_test_insert_into_sql(
+            "json",
+            FileCompressionType::UNCOMPRESSED,
+            "OPTIONS (insert_mode 'append_new_files')",
+            None,
+        )
+        .await?;
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_insert_into_sql_parquet_defaults() -> Result<()> {
+        helper_test_insert_into_sql(
+            "parquet",
+            FileCompressionType::UNCOMPRESSED,
+            "",
+            None,
+        )
+        .await?;
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_insert_into_sql_parquet_session_overrides() -> Result<()> {
+        let mut config_map: HashMap<String, String> = HashMap::new();
+        config_map.insert(
+            "datafusion.execution.parquet.compression".into(),
+            "zstd(5)".into(),
+        );
+        config_map.insert(
+            "datafusion.execution.parquet.dictionary_enabled".into(),
+            "false".into(),
+        );
+        config_map.insert(
+            "datafusion.execution.parquet.dictionary_page_size_limit".into(),
+            "100".into(),
+        );
+        config_map.insert(
+            "datafusion.execution.parquet.staistics_enabled".into(),
+            "none".into(),
+        );
+        config_map.insert(
+            "datafusion.execution.parquet.max_statistics_size".into(),
+            "10".into(),
+        );
+        config_map.insert(
+            "datafusion.execution.parquet.max_row_group_size".into(),
+            "5".into(),
+        );
+        config_map.insert(
+            "datafusion.execution.parquet.created_by".into(),
+            "datafusion test".into(),
+        );
+        config_map.insert(
+            "datafusion.execution.parquet.column_index_truncate_length".into(),
+            "50".into(),
+        );
+        config_map.insert(
+            "datafusion.execution.parquet.data_page_row_count_limit".into(),
+            "50".into(),
+        );
+        config_map.insert(
+            "datafusion.execution.parquet.bloom_filter_enabled".into(),
+            "true".into(),
+        );
+        config_map.insert(
+            "datafusion.execution.parquet.bloom_filter_fpp".into(),
+            "0.01".into(),
+        );
+        config_map.insert(
+            "datafusion.execution.parquet.bloom_filter_ndv".into(),
+            "1000".into(),
+        );
+        config_map.insert(
+            "datafusion.execution.parquet.writer_version".into(),
+            "2.0".into(),
+        );
+        config_map.insert(
+            "datafusion.execution.parquet.write_batch_size".into(),
+            "5".into(),
+        );
+        helper_test_insert_into_sql(
+            "parquet",
+            FileCompressionType::UNCOMPRESSED,
+            "",
+            Some(config_map),
+        )
+        .await?;
+        Ok(())
+    }
+
     #[tokio::test]
     async fn test_insert_into_append_new_parquet_files_session_overrides() -> 
Result<()> {
         let mut config_map: HashMap<String, String> = HashMap::new();
@@ -2096,4 +2230,64 @@ mod tests {
         // Return Ok if the function
         Ok(())
     }
+
+    /// tests insert into with end to end sql
+    /// create external table + insert into statements
+    async fn helper_test_insert_into_sql(
+        file_type: &str,
+        // TODO test with create statement options such as compression
+        _file_compression_type: FileCompressionType,
+        external_table_options: &str,
+        session_config_map: Option<HashMap<String, String>>,
+    ) -> Result<()> {
+        // Create the initial context
+        let session_ctx = match session_config_map {
+            Some(cfg) => {
+                let config = SessionConfig::from_string_hash_map(cfg)?;
+                SessionContext::with_config(config)
+            }
+            None => SessionContext::new(),
+        };
+
+        // create table
+        let tmp_dir = TempDir::new()?;
+        let tmp_path = tmp_dir.into_path();
+        let str_path = tmp_path.to_str().expect("Temp path should convert to 
&str");
+        session_ctx
+            .sql(&format!(
+                "create external table foo(a varchar, b varchar, c int) \
+                        stored as {file_type} \
+                        location '{str_path}' \
+                        {external_table_options}"
+            ))
+            .await?
+            .collect()
+            .await?;
+
+        // insert data
+        session_ctx.sql("insert into foo values ('foo', 'bar', 1),('foo', 
'bar', 2), ('foo', 'bar', 3)")
+           .await?
+           .collect()
+           .await?;
+
+        // check count
+        let batches = session_ctx
+            .sql("select * from foo")
+            .await?
+            .collect()
+            .await?;
+
+        let expected = vec![
+            "+-----+-----+---+",
+            "| a   | b   | c |",
+            "+-----+-----+---+",
+            "| foo | bar | 1 |",
+            "| foo | bar | 2 |",
+            "| foo | bar | 3 |",
+            "+-----+-----+---+",
+        ];
+        assert_batches_eq!(expected, &batches);
+
+        Ok(())
+    }
 }
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs 
b/datafusion/core/src/datasource/listing_table_factory.rs
index fdd696172b..3f2dcf627e 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -40,6 +40,8 @@ use crate::datasource::provider::TableProviderFactory;
 use crate::datasource::TableProvider;
 use crate::execution::context::SessionState;
 
+use super::listing::ListingTableInsertMode;
+
 /// A `TableProviderFactory` capable of creating new `ListingTable`s
 pub struct ListingTableFactory {}
 
@@ -131,13 +133,26 @@ impl TableProviderFactory for ListingTableFactory {
         // look for 'infinite' as an option
         let infinite_source = cmd.unbounded;
 
+        let explicit_insert_mode = cmd.options.get("insert_mode");
+        let insert_mode = match explicit_insert_mode {
+            Some(mode) => ListingTableInsertMode::from_str(mode),
+            None => match file_type {
+                FileType::CSV => Ok(ListingTableInsertMode::AppendToFile),
+                FileType::PARQUET => 
Ok(ListingTableInsertMode::AppendNewFiles),
+                FileType::AVRO => Ok(ListingTableInsertMode::AppendNewFiles),
+                FileType::JSON => Ok(ListingTableInsertMode::AppendToFile),
+                FileType::ARROW => Ok(ListingTableInsertMode::AppendNewFiles),
+            },
+        }?;
+
         let options = ListingOptions::new(file_format)
             .with_collect_stat(state.config().collect_statistics())
             .with_file_extension(file_extension)
             .with_target_partitions(state.config().target_partitions())
             .with_table_partition_cols(table_partition_cols)
             .with_infinite_source(infinite_source)
-            .with_file_sort_order(cmd.order_exprs.clone());
+            .with_file_sort_order(cmd.order_exprs.clone())
+            .with_insert_mode(insert_mode);
 
         let table_path = ListingTableUrl::parse(&cmd.location)?;
         let resolved_schema = match provided_schema {
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index ad66640efa..16036defda 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -607,11 +607,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             options,
         } = statement;
 
-        // semantic checks
-        if file_type == "PARQUET" && !columns.is_empty() {
-            plan_err!("Column definitions can not be specified for PARQUET 
files.")?;
-        }
-
         if file_type != "CSV"
             && file_type != "JSON"
             && file_compression_type != CompressionTypeVariant::UNCOMPRESSED
diff --git a/datafusion/sql/tests/sql_integration.rs 
b/datafusion/sql/tests/sql_integration.rs
index eef9093947..accb6ec9ce 100644
--- a/datafusion/sql/tests/sql_integration.rs
+++ b/datafusion/sql/tests/sql_integration.rs
@@ -1796,11 +1796,15 @@ fn create_external_table_with_compression_type() {
 #[test]
 fn create_external_table_parquet() {
     let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS PARQUET LOCATION 
'foo.parquet'";
-    let err = logical_plan(sql).expect_err("query should have failed");
-    assert_eq!(
-        "Plan(\"Column definitions can not be specified for PARQUET files.\")",
-        format!("{err:?}")
-    );
+    let expected = "CreateExternalTable: Bare { table: \"t\" }";
+    quick_test(sql, expected);
+}
+
+#[test]
+fn create_external_table_parquet_sort_order() {
+    let sql = "create external table foo(a varchar, b varchar, c timestamp) 
stored as parquet location '/tmp/foo' with order (c)";
+    let expected = "CreateExternalTable: Bare { table: \"foo\" }";
+    quick_test(sql, expected);
 }
 
 #[test]

Reply via email to