This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 3bbf48afd1 Enable creating and inserting to empty external tables via
SQL (#7276)
3bbf48afd1 is described below
commit 3bbf48afd1a028d099bc5023c63407394c5dec46
Author: Devin D'Angelo <[email protected]>
AuthorDate: Mon Aug 14 13:18:42 2023 -0400
Enable creating and inserting to empty external tables via SQL (#7276)
* end to end sql test cases + fixes needed to pass
* remove not needed repartition statement
---
datafusion/core/src/datasource/file_format/csv.rs | 5 +-
datafusion/core/src/datasource/listing/table.rs | 194 +++++++++++++++++++++
.../core/src/datasource/listing_table_factory.rs | 17 +-
datafusion/sql/src/statement.rs | 5 -
datafusion/sql/tests/sql_integration.rs | 14 +-
5 files changed, 222 insertions(+), 13 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/csv.rs
b/datafusion/core/src/datasource/file_format/csv.rs
index 32fbf03b58..c3ab50fd43 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -545,12 +545,11 @@ impl DataSink for CsvSink {
// Uniquely identify this batch of files with a random string,
to prevent collisions overwriting files
let write_id = Alphanumeric.sample_string(&mut
rand::thread_rng(), 16);
for part_idx in 0..num_partitions {
- let header = true;
+ let header = self.has_header;
let builder =
WriterBuilder::new().with_delimiter(self.delimiter);
let serializer = CsvSerializer::new()
.with_builder(builder)
.with_header(header);
- serializers.push(Box::new(serializer));
let file_path = base_path
.prefix()
.child(format!("/{}_{}.csv", write_id, part_idx));
@@ -567,6 +566,8 @@ impl DataSink for CsvSink {
object_store.clone(),
)
.await?;
+
+ serializers.push(Box::new(serializer));
writers.push(writer);
}
}
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 4f2387ad0d..d4e2c4aafe 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -217,6 +217,22 @@ pub enum ListingTableInsertMode {
///Throw an error if insert into is attempted on this table
Error,
}
+
+impl FromStr for ListingTableInsertMode {
+ type Err = DataFusionError;
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ let s_lower = s.to_lowercase();
+ match s_lower.as_str() {
+ "append_to_file" => Ok(ListingTableInsertMode::AppendToFile),
+ "append_new_files" => Ok(ListingTableInsertMode::AppendNewFiles),
+ "error" => Ok(ListingTableInsertMode::Error),
+ _ => Err(DataFusionError::Plan(format!(
+ "Unknown or unsupported insert mode {s}. Supported options are
\
+ append_to_file, append_new_files, and error."
+ ))),
+ }
+ }
+}
/// Options for creating a [`ListingTable`]
#[derive(Clone, Debug)]
pub struct ListingOptions {
@@ -1607,6 +1623,124 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn test_insert_into_sql_csv_defaults() -> Result<()> {
+ helper_test_insert_into_sql(
+ "csv",
+ FileCompressionType::UNCOMPRESSED,
+ "OPTIONS (insert_mode 'append_new_files')",
+ None,
+ )
+ .await?;
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_insert_into_sql_csv_defaults_header_row() -> Result<()> {
+ helper_test_insert_into_sql(
+ "csv",
+ FileCompressionType::UNCOMPRESSED,
+ "WITH HEADER ROW \
+ OPTIONS (insert_mode 'append_new_files')",
+ None,
+ )
+ .await?;
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_insert_into_sql_json_defaults() -> Result<()> {
+ helper_test_insert_into_sql(
+ "json",
+ FileCompressionType::UNCOMPRESSED,
+ "OPTIONS (insert_mode 'append_new_files')",
+ None,
+ )
+ .await?;
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_insert_into_sql_parquet_defaults() -> Result<()> {
+ helper_test_insert_into_sql(
+ "parquet",
+ FileCompressionType::UNCOMPRESSED,
+ "",
+ None,
+ )
+ .await?;
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_insert_into_sql_parquet_session_overrides() -> Result<()> {
+ let mut config_map: HashMap<String, String> = HashMap::new();
+ config_map.insert(
+ "datafusion.execution.parquet.compression".into(),
+ "zstd(5)".into(),
+ );
+ config_map.insert(
+ "datafusion.execution.parquet.dictionary_enabled".into(),
+ "false".into(),
+ );
+ config_map.insert(
+ "datafusion.execution.parquet.dictionary_page_size_limit".into(),
+ "100".into(),
+ );
+ config_map.insert(
+ "datafusion.execution.parquet.staistics_enabled".into(),
+ "none".into(),
+ );
+ config_map.insert(
+ "datafusion.execution.parquet.max_statistics_size".into(),
+ "10".into(),
+ );
+ config_map.insert(
+ "datafusion.execution.parquet.max_row_group_size".into(),
+ "5".into(),
+ );
+ config_map.insert(
+ "datafusion.execution.parquet.created_by".into(),
+ "datafusion test".into(),
+ );
+ config_map.insert(
+ "datafusion.execution.parquet.column_index_truncate_length".into(),
+ "50".into(),
+ );
+ config_map.insert(
+ "datafusion.execution.parquet.data_page_row_count_limit".into(),
+ "50".into(),
+ );
+ config_map.insert(
+ "datafusion.execution.parquet.bloom_filter_enabled".into(),
+ "true".into(),
+ );
+ config_map.insert(
+ "datafusion.execution.parquet.bloom_filter_fpp".into(),
+ "0.01".into(),
+ );
+ config_map.insert(
+ "datafusion.execution.parquet.bloom_filter_ndv".into(),
+ "1000".into(),
+ );
+ config_map.insert(
+ "datafusion.execution.parquet.writer_version".into(),
+ "2.0".into(),
+ );
+ config_map.insert(
+ "datafusion.execution.parquet.write_batch_size".into(),
+ "5".into(),
+ );
+ helper_test_insert_into_sql(
+ "parquet",
+ FileCompressionType::UNCOMPRESSED,
+ "",
+ Some(config_map),
+ )
+ .await?;
+ Ok(())
+ }
+
#[tokio::test]
async fn test_insert_into_append_new_parquet_files_session_overrides() ->
Result<()> {
let mut config_map: HashMap<String, String> = HashMap::new();
@@ -2096,4 +2230,64 @@ mod tests {
// Return Ok if the function
Ok(())
}
+
+ /// tests insert into with end to end sql
+ /// create external table + insert into statements
+ async fn helper_test_insert_into_sql(
+ file_type: &str,
+ // TODO test with create statement options such as compression
+ _file_compression_type: FileCompressionType,
+ external_table_options: &str,
+ session_config_map: Option<HashMap<String, String>>,
+ ) -> Result<()> {
+ // Create the initial context
+ let session_ctx = match session_config_map {
+ Some(cfg) => {
+ let config = SessionConfig::from_string_hash_map(cfg)?;
+ SessionContext::with_config(config)
+ }
+ None => SessionContext::new(),
+ };
+
+ // create table
+ let tmp_dir = TempDir::new()?;
+ let tmp_path = tmp_dir.into_path();
+ let str_path = tmp_path.to_str().expect("Temp path should convert to
&str");
+ session_ctx
+ .sql(&format!(
+ "create external table foo(a varchar, b varchar, c int) \
+ stored as {file_type} \
+ location '{str_path}' \
+ {external_table_options}"
+ ))
+ .await?
+ .collect()
+ .await?;
+
+ // insert data
+ session_ctx.sql("insert into foo values ('foo', 'bar', 1),('foo',
'bar', 2), ('foo', 'bar', 3)")
+ .await?
+ .collect()
+ .await?;
+
+ // check count
+ let batches = session_ctx
+ .sql("select * from foo")
+ .await?
+ .collect()
+ .await?;
+
+ let expected = vec![
+ "+-----+-----+---+",
+ "| a | b | c |",
+ "+-----+-----+---+",
+ "| foo | bar | 1 |",
+ "| foo | bar | 2 |",
+ "| foo | bar | 3 |",
+ "+-----+-----+---+",
+ ];
+ assert_batches_eq!(expected, &batches);
+
+ Ok(())
+ }
}
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs
b/datafusion/core/src/datasource/listing_table_factory.rs
index fdd696172b..3f2dcf627e 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -40,6 +40,8 @@ use crate::datasource::provider::TableProviderFactory;
use crate::datasource::TableProvider;
use crate::execution::context::SessionState;
+use super::listing::ListingTableInsertMode;
+
/// A `TableProviderFactory` capable of creating new `ListingTable`s
pub struct ListingTableFactory {}
@@ -131,13 +133,26 @@ impl TableProviderFactory for ListingTableFactory {
// look for 'infinite' as an option
let infinite_source = cmd.unbounded;
+ let explicit_insert_mode = cmd.options.get("insert_mode");
+ let insert_mode = match explicit_insert_mode {
+ Some(mode) => ListingTableInsertMode::from_str(mode),
+ None => match file_type {
+ FileType::CSV => Ok(ListingTableInsertMode::AppendToFile),
+ FileType::PARQUET =>
Ok(ListingTableInsertMode::AppendNewFiles),
+ FileType::AVRO => Ok(ListingTableInsertMode::AppendNewFiles),
+ FileType::JSON => Ok(ListingTableInsertMode::AppendToFile),
+ FileType::ARROW => Ok(ListingTableInsertMode::AppendNewFiles),
+ },
+ }?;
+
let options = ListingOptions::new(file_format)
.with_collect_stat(state.config().collect_statistics())
.with_file_extension(file_extension)
.with_target_partitions(state.config().target_partitions())
.with_table_partition_cols(table_partition_cols)
.with_infinite_source(infinite_source)
- .with_file_sort_order(cmd.order_exprs.clone());
+ .with_file_sort_order(cmd.order_exprs.clone())
+ .with_insert_mode(insert_mode);
let table_path = ListingTableUrl::parse(&cmd.location)?;
let resolved_schema = match provided_schema {
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index ad66640efa..16036defda 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -607,11 +607,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
options,
} = statement;
- // semantic checks
- if file_type == "PARQUET" && !columns.is_empty() {
- plan_err!("Column definitions can not be specified for PARQUET
files.")?;
- }
-
if file_type != "CSV"
&& file_type != "JSON"
&& file_compression_type != CompressionTypeVariant::UNCOMPRESSED
diff --git a/datafusion/sql/tests/sql_integration.rs
b/datafusion/sql/tests/sql_integration.rs
index eef9093947..accb6ec9ce 100644
--- a/datafusion/sql/tests/sql_integration.rs
+++ b/datafusion/sql/tests/sql_integration.rs
@@ -1796,11 +1796,15 @@ fn create_external_table_with_compression_type() {
#[test]
fn create_external_table_parquet() {
let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS PARQUET LOCATION
'foo.parquet'";
- let err = logical_plan(sql).expect_err("query should have failed");
- assert_eq!(
- "Plan(\"Column definitions can not be specified for PARQUET files.\")",
- format!("{err:?}")
- );
+ let expected = "CreateExternalTable: Bare { table: \"t\" }";
+ quick_test(sql, expected);
+}
+
+#[test]
+fn create_external_table_parquet_sort_order() {
+ let sql = "create external table foo(a varchar, b varchar, c timestamp)
stored as parquet location '/tmp/foo' with order (c)";
+ let expected = "CreateExternalTable: Bare { table: \"foo\" }";
+ quick_test(sql, expected);
}
#[test]