This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 6a775b761a Add Sqllogictests for INSERT INTO external table (#7294)
6a775b761a is described below
commit 6a775b761a544a8c8079471733290f23a3d62861
Author: Devin D'Angelo <[email protected]>
AuthorDate: Thu Aug 17 09:10:58 2023 -0400
Add Sqllogictests for INSERT INTO external table (#7294)
* insert sqllogictests rebase
* remove commented line
---
datafusion/core/src/datasource/listing/url.rs | 28 ++
.../core/src/datasource/listing_table_factory.rs | 31 ++-
datafusion/core/src/physical_planner.rs | 24 +-
datafusion/expr/src/logical_plan/plan.rs | 17 +-
.../sqllogictest/test_files/insert_to_external.slt | 308 +++++++++++++++++++++
5 files changed, 374 insertions(+), 34 deletions(-)
diff --git a/datafusion/core/src/datasource/listing/url.rs
b/datafusion/core/src/datasource/listing/url.rs
index 0c017a517c..96998de17b 100644
--- a/datafusion/core/src/datasource/listing/url.rs
+++ b/datafusion/core/src/datasource/listing/url.rs
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+use std::fs;
+
use crate::datasource::object_store::ObjectStoreUrl;
use datafusion_common::{DataFusionError, Result};
use futures::stream::BoxStream;
@@ -87,6 +89,32 @@ impl ListingTableUrl {
}
}
+ /// Get object store for specified input_url
+ /// if input_url is actually not a url, we assume it is a local file path
+ /// if we have a local path, create it if not exists so
ListingTableUrl::parse works
+ pub fn parse_create_local_if_not_exists(
+ s: impl AsRef<str>,
+ is_directory: bool,
+ ) -> Result<Self> {
+ let s = s.as_ref();
+ let is_valid_url = Url::parse(s).is_ok();
+
+ match is_valid_url {
+ true => ListingTableUrl::parse(s),
+ false => {
+ let path = std::path::PathBuf::from(s);
+ if !path.exists() {
+ if is_directory {
+ fs::create_dir_all(path)?;
+ } else {
+ fs::File::create(path)?;
+ }
+ }
+ ListingTableUrl::parse(s)
+ }
+ }
+ }
+
/// Creates a new [`ListingTableUrl`] interpreting `s` as a filesystem path
fn parse_path(s: &str) -> Result<Self> {
let (prefix, glob) = match split_glob_expression(s) {
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs
b/datafusion/core/src/datasource/listing_table_factory.rs
index 3f2dcf627e..5c03557036 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -145,6 +145,36 @@ impl TableProviderFactory for ListingTableFactory {
},
}?;
+ let create_local_path_mode = cmd
+ .options
+ .get("create_local_path")
+ .map(|s| s.as_str())
+ .unwrap_or("false");
+ let single_file = cmd
+ .options
+ .get("single_file")
+ .map(|s| s.as_str())
+ .unwrap_or("false");
+
+ let single_file = match single_file {
+ "true" => Ok(true),
+ "false" => Ok(false),
+ _ => Err(DataFusionError::Plan(
+ "Invalid option single_file, must be 'true' or 'false'".into(),
+ )),
+ }?;
+
+ let table_path = match create_local_path_mode {
+ "true" => ListingTableUrl::parse_create_local_if_not_exists(
+ &cmd.location,
+ !single_file,
+ ),
+ "false" => ListingTableUrl::parse(&cmd.location),
+ _ => Err(DataFusionError::Plan(
+ "Invalid option create_local_path, must be 'true' or
'false'".into(),
+ )),
+ }?;
+
let options = ListingOptions::new(file_format)
.with_collect_stat(state.config().collect_statistics())
.with_file_extension(file_extension)
@@ -154,7 +184,6 @@ impl TableProviderFactory for ListingTableFactory {
.with_file_sort_order(cmd.order_exprs.clone())
.with_insert_mode(insert_mode);
- let table_path = ListingTableUrl::parse(&cmd.location)?;
let resolved_schema = match provided_schema {
None => options.infer_schema(state, &table_path).await?,
Some(s) => s,
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index 85396576d9..1b1b232e2d 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -39,7 +39,6 @@ use crate::logical_expr::{
};
use datafusion_common::display::ToStringifiedPlan;
use datafusion_expr::dml::{CopyTo, OutputFileFormat};
-use url::Url;
use crate::logical_expr::{Limit, Values};
use crate::physical_expr::create_physical_expr;
@@ -91,7 +90,6 @@ use itertools::{multiunzip, Itertools};
use log::{debug, trace};
use std::collections::HashMap;
use std::fmt::Write;
-use std::fs;
use std::sync::Arc;
fn create_function_physical_name(
@@ -565,30 +563,10 @@ impl DefaultPhysicalPlanner {
}) => {
let input_exec = self.create_initial_plan(input,
session_state).await?;
- // Get object store for specified output_url
- // if user did not pass in a url, we assume it is a local
file path
- // this requires some special handling as copy can create
non
- // existing file paths
- let is_valid_url = Url::parse(output_url).is_ok();
-
// TODO: make this behavior configurable via options
(should copy to create path/file as needed?)
// TODO: add additional configurable options for if
existing files should be overwritten or
// appended to
- let parsed_url = match is_valid_url {
- true => ListingTableUrl::parse(output_url),
- false => {
- let path = std::path::PathBuf::from(output_url);
- if !path.exists(){
- if *per_thread_output{
- fs::create_dir_all(path)?;
- } else{
- fs::File::create(path)?;
- }
- }
- ListingTableUrl::parse(output_url)
- }
- }?;
-
+ let parsed_url =
ListingTableUrl::parse_create_local_if_not_exists(output_url,
*per_thread_output)?;
let object_store_url = parsed_url.object_store();
let schema: Schema = (**input.schema()).clone().into();
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index 85eb59098c..6594613c19 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -1099,16 +1099,13 @@ impl LogicalPlan {
per_thread_output,
options,
}) => {
- let mut op_str = String::new();
- op_str.push('(');
- for (key, val) in options {
- if !op_str.is_empty() {
- op_str.push(',');
- }
- op_str.push_str(&format!("{key} {val}"));
- }
- op_str.push(')');
- write!(f, "CopyTo: format={file_format}
output_url={output_url} per_thread_output={per_thread_output} options:
{op_str}")
+ let op_str = options
+ .iter()
+ .map(|(k, v)| format!("{k} {v}"))
+ .collect::<Vec<String>>()
+ .join(", ");
+
+ write!(f, "CopyTo: format={file_format}
output_url={output_url} per_thread_output={per_thread_output} options:
({op_str})")
}
LogicalPlan::Ddl(ddl) => {
write!(f, "{}", ddl.display())
diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt
b/datafusion/sqllogictest/test_files/insert_to_external.slt
new file mode 100644
index 0000000000..6c9c6fdd47
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/insert_to_external.slt
@@ -0,0 +1,308 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+###################################
+## INSERT to external table tests##
+###################################
+
+
+statement ok
+CREATE EXTERNAL TABLE aggregate_test_100 (
+ c1 VARCHAR NOT NULL,
+ c2 TINYINT NOT NULL,
+ c3 SMALLINT NOT NULL,
+ c4 SMALLINT,
+ c5 INT,
+ c6 BIGINT NOT NULL,
+ c7 SMALLINT NOT NULL,
+ c8 INT NOT NULL,
+ c9 BIGINT UNSIGNED NOT NULL,
+ c10 VARCHAR NOT NULL,
+ c11 FLOAT NOT NULL,
+ c12 DOUBLE NOT NULL,
+ c13 VARCHAR NOT NULL
+)
+STORED AS CSV
+WITH HEADER ROW
+LOCATION '../../testing/data/csv/aggregate_test_100.csv'
+
+# test_insert_into
+
+statement ok
+set datafusion.execution.target_partitions = 8;
+
+statement ok
+CREATE EXTERNAL TABLE
+single_file_test(a bigint, b bigint)
+STORED AS csv
+LOCATION 'test_files/scratch/single_csv_table.csv'
+OPTIONS(
+create_local_path 'true',
+single_file 'true',
+);
+
+query II
+INSERT INTO single_file_test values (1, 2), (3, 4);
+----
+2
+
+query II
+select * from single_file_test;
+----
+1 2
+3 4
+
+statement ok
+CREATE EXTERNAL TABLE
+directory_test(a bigint, b bigint)
+STORED AS parquet
+LOCATION 'test_files/scratch/external_parquet_table_q0'
+OPTIONS(
+create_local_path 'true',
+);
+
+query II
+INSERT INTO directory_test values (1, 2), (3, 4);
+----
+2
+
+query II
+select * from directory_test;
+----
+1 2
+3 4
+
+statement ok
+CREATE EXTERNAL TABLE
+table_without_values(field1 BIGINT NULL, field2 BIGINT NULL)
+STORED AS parquet
+LOCATION 'test_files/scratch/external_parquet_table_q1'
+OPTIONS (create_local_path 'true');
+
+query TT
+EXPLAIN
+INSERT INTO table_without_values SELECT
+SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING),
+COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING)
+FROM aggregate_test_100
+ORDER by c1
+----
+logical_plan
+Dml: op=[Insert Into] table=[table_without_values]
+--Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1]
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING AS field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING
AS field2
+----Sort: aggregate_test_100.c1 ASC NULLS LAST
+------Projection: SUM(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING, aggregate_test_100.c1
+--------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+----------TableScan: aggregate_test_100 projection=[c1, c4, c9]
+physical_plan
+InsertExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[])
+--ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as field1, COUNT(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as field2]
+----SortPreservingMergeExec: [c1@2 ASC NULLS LAST]
+------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4) PARTITION
BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as COUNT(*) PAR [...]
+--------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name:
"SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bou [...]
+----------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST]
+------------CoalesceBatchesExec: target_batch_size=8192
+--------------RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8
+----------------RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1
+------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1,
c4, c9], has_header=true
+
+query II
+INSERT INTO table_without_values SELECT
+SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING),
+COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING)
+FROM aggregate_test_100
+ORDER by c1
+----
+100
+
+# verify there is data now in the table
+query I
+SELECT COUNT(*) from table_without_values;
+----
+100
+
+# verify there is data now in the table
+query II
+SELECT *
+FROM table_without_values
+ORDER BY field1, field2
+LIMIT 5;
+----
+-70111 3
+-65362 3
+-62295 3
+-56721 3
+-55414 3
+
+statement ok
+drop table table_without_values;
+
+# test_insert_into_as_select_multi_partitioned
+statement ok
+CREATE EXTERNAL TABLE
+table_without_values(field1 BIGINT NULL, field2 BIGINT NULL)
+STORED AS parquet
+LOCATION 'test_files/scratch/external_parquet_table_q2'
+OPTIONS (create_local_path 'true');
+
+query TT
+EXPLAIN
+INSERT INTO table_without_values SELECT
+SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING) as a1,
+COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING) as a2
+FROM aggregate_test_100
+----
+logical_plan
+Dml: op=[Insert Into] table=[table_without_values]
+--Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1]
ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING AS field1, COUNT(*) PARTITION BY [aggregate_test_100.c1] ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING
AS field2
+----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+------TableScan: aggregate_test_100 projection=[c1, c4, c9]
+physical_plan
+InsertExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[])
+--ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2]
+----BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name:
"SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: [...]
+------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST]
+--------CoalesceBatchesExec: target_batch_size=8192
+----------RepartitionExec: partitioning=Hash([c1@0], 8), input_partitions=8
+------------RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1
+--------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1,
c4, c9], has_header=true
+
+
+
+query II
+INSERT INTO table_without_values SELECT
+SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING) as a1,
+COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING) as a2
+FROM aggregate_test_100
+----
+100
+
+statement ok
+drop table table_without_values;
+
+
+# test_insert_into_with_sort
+statement ok
+CREATE EXTERNAL TABLE
+table_without_values(c1 varchar NULL)
+STORED AS parquet
+LOCATION 'test_files/scratch/external_parquet_table_q3'
+OPTIONS (create_local_path 'true');
+
+# verify that the sort order of the insert query is maintained into the
+# insert (there should be a SortExec in the following plan)
+# See
https://github.com/apache/arrow-datafusion/pull/6354#discussion_r1195284178 for
more background
+query TT
+explain insert into table_without_values select c1 from aggregate_test_100
order by c1;
+----
+logical_plan
+Dml: op=[Insert Into] table=[table_without_values]
+--Projection: aggregate_test_100.c1 AS c1
+----Sort: aggregate_test_100.c1 ASC NULLS LAST
+------TableScan: aggregate_test_100 projection=[c1]
+physical_plan
+InsertExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[])
+--ProjectionExec: expr=[c1@0 as c1]
+----SortExec: expr=[c1@0 ASC NULLS LAST]
+------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1],
has_header=true
+
+query T
+insert into table_without_values select c1 from aggregate_test_100 order by c1;
+----
+100
+
+query I
+select count(*) from table_without_values;
+----
+100
+
+
+statement ok
+drop table table_without_values;
+
+
+# test insert with column names
+statement ok
+CREATE EXTERNAL TABLE
+table_without_values(id BIGINT, name varchar)
+STORED AS parquet
+LOCATION 'test_files/scratch/external_parquet_table_q4'
+OPTIONS (create_local_path 'true');
+
+query IT
+insert into table_without_values(id, name) values(1, 'foo');
+----
+1
+
+query IT
+insert into table_without_values(name, id) values('bar', 2);
+----
+1
+
+statement error Schema error: Schema contains duplicate unqualified field name
id
+insert into table_without_values(id, id) values(3, 3);
+
+statement error Arrow error: Cast error: Cannot cast string 'zoo' to value of
Int64 type
+insert into table_without_values(name, id) values(4, 'zoo');
+
+statement error Error during planning: Column count doesn't match insert query!
+insert into table_without_values(id) values(4, 'zoo');
+
+statement error Error during planning: Inserting query must have the same
schema with the table.
+insert into table_without_values(id) values(4);
+
+query IT rowsort
+select * from table_without_values;
+----
+1 foo
+2 bar
+
+statement ok
+drop table table_without_values;
+
+# test insert with non-nullable column
+statement ok
+CREATE EXTERNAL TABLE
+table_without_values(field1 BIGINT NOT NULL, field2 BIGINT NULL)
+STORED AS parquet
+LOCATION 'test_files/scratch/external_parquet_table_q5'
+OPTIONS (create_local_path 'true');
+
+query II
+insert into table_without_values values(1, 100);
+----
+1
+
+query II
+insert into table_without_values values(2, NULL);
+----
+1
+
+statement error Execution error: Invalid batch column at '0' has null but
schema specifies non-nullable
+insert into table_without_values values(NULL, 300);
+
+statement error Execution error: Invalid batch column at '0' has null but
schema specifies non-nullable
+insert into table_without_values values(3, 300), (NULL, 400);
+
+query II rowsort
+select * from table_without_values;
+----
+1 100
+2 NULL
+
+statement ok
+drop table table_without_values;