This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 64057fd9ed feat: fill missing values with NULLs while inserting (#8146)
64057fd9ed is described below
commit 64057fd9ed763c79c3068c548be6bdd058f04608
Author: Jonah Gao <[email protected]>
AuthorDate: Tue Nov 14 22:28:03 2023 +0800
feat: fill missing values with NULLs while inserting (#8146)
* feat: fill missing values with NULLs while inserting
* add test comment
* update to re-trigger ci
---
datafusion/sql/src/statement.rs | 46 +++++++++++++---------
datafusion/sql/tests/sql_integration.rs | 33 +++++++---------
datafusion/sqllogictest/test_files/insert.slt | 17 +++++++-
.../sqllogictest/test_files/insert_to_external.slt | 17 +++++++-
4 files changed, 75 insertions(+), 38 deletions(-)
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index ecc77b0442..49755729d2 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -33,7 +33,7 @@ use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{
not_impl_err, plan_datafusion_err, plan_err, unqualified_field_not_found,
Column,
Constraints, DFField, DFSchema, DFSchemaRef, DataFusionError,
OwnedTableReference,
- Result, SchemaReference, TableReference, ToDFSchema,
+ Result, ScalarValue, SchemaReference, TableReference, ToDFSchema,
};
use datafusion_expr::dml::{CopyOptions, CopyTo};
use
datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
@@ -1087,9 +1087,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let arrow_schema = (*table_source.schema()).clone();
let table_schema = DFSchema::try_from(arrow_schema)?;
- // Get insert fields and index_mapping
- // The i-th field of the table is `fields[index_mapping[i]]`
- let (fields, index_mapping) = if columns.is_empty() {
+ // Get insert fields and target table's value indices
+ //
+ // if value_indices[i] = Some(j), it means that the value of the i-th
target table's column is
+ // derived from the j-th output of the source.
+ //
+ // if value_indices[i] = None, it means that the value of the i-th
target table's column is
+ // not provided, and should be filled with a default value later.
+ let (fields, value_indices) = if columns.is_empty() {
// Empty means we're inserting into all columns of the table
(
table_schema.fields().clone(),
@@ -1098,7 +1103,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.collect::<Vec<_>>(),
)
} else {
- let mut mapping = vec![None; table_schema.fields().len()];
+ let mut value_indices = vec![None; table_schema.fields().len()];
let fields = columns
.into_iter()
.map(|c| self.normalizer.normalize(c))
@@ -1107,19 +1112,19 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let column_index = table_schema
.index_of_column_by_name(None, &c)?
.ok_or_else(|| unqualified_field_not_found(&c,
&table_schema))?;
- if mapping[column_index].is_some() {
+ if value_indices[column_index].is_some() {
return Err(DataFusionError::SchemaError(
datafusion_common::SchemaError::DuplicateUnqualifiedField {
name: c,
},
));
} else {
- mapping[column_index] = Some(i);
+ value_indices[column_index] = Some(i);
}
Ok(table_schema.field(column_index).clone())
})
.collect::<Result<Vec<DFField>>>()?;
- (fields, mapping)
+ (fields, value_indices)
};
// infer types for Values clause... other types should be resolvable
the regular way
@@ -1154,17 +1159,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
plan_err!("Column count doesn't match insert query!")?;
}
- let exprs = index_mapping
+ let exprs = value_indices
.into_iter()
- .flatten()
- .map(|i| {
- let target_field = &fields[i];
- let source_field = source.schema().field(i);
- let expr =
-
datafusion_expr::Expr::Column(source_field.unqualified_column())
- .cast_to(target_field.data_type(), source.schema())?
- .alias(target_field.name());
- Ok(expr)
+ .enumerate()
+ .map(|(i, value_index)| {
+ let target_field = table_schema.field(i);
+ let expr = match value_index {
+ Some(v) => {
+ let source_field = source.schema().field(v);
+
datafusion_expr::Expr::Column(source_field.qualified_column())
+ .cast_to(target_field.data_type(),
source.schema())?
+ }
+ // Fill the default value for the column, currently only
supports NULL.
+ None => datafusion_expr::Expr::Literal(ScalarValue::Null)
+ .cast_to(target_field.data_type(),
&DFSchema::empty())?,
+ };
+ Ok(expr.alias(target_field.name()))
})
.collect::<Result<Vec<datafusion_expr::Expr>>>()?;
let source = project(source, exprs)?;
diff --git a/datafusion/sql/tests/sql_integration.rs
b/datafusion/sql/tests/sql_integration.rs
index ff6dca7eef..4c2bad1c71 100644
--- a/datafusion/sql/tests/sql_integration.rs
+++ b/datafusion/sql/tests/sql_integration.rs
@@ -422,12 +422,11 @@ CopyTo: format=csv output_url=output.csv
single_file_output=true options: ()
fn plan_insert() {
let sql =
"insert into person (id, first_name, last_name) values (1, 'Alan',
'Turing')";
- let plan = r#"
-Dml: op=[Insert Into] table=[person]
- Projection: CAST(column1 AS UInt32) AS id, column2 AS first_name, column3 AS
last_name
- Values: (Int64(1), Utf8("Alan"), Utf8("Turing"))
- "#
- .trim();
+ let plan = "Dml: op=[Insert Into] table=[person]\
+ \n Projection: CAST(column1 AS UInt32) AS id, column2 AS
first_name, column3 AS last_name, \
+ CAST(NULL AS Int32) AS age, CAST(NULL AS Utf8) AS
state, CAST(NULL AS Float64) AS salary, \
+ CAST(NULL AS Timestamp(Nanosecond, None)) AS
birth_date, CAST(NULL AS Int32) AS 😀\
+ \n Values: (Int64(1), Utf8(\"Alan\"), Utf8(\"Turing\"))";
quick_test(sql, plan);
}
@@ -4037,12 +4036,11 @@ Dml: op=[Update] table=[person]
fn test_prepare_statement_insert_infer() {
let sql = "insert into person (id, first_name, last_name) values ($1, $2,
$3)";
- let expected_plan = r#"
-Dml: op=[Insert Into] table=[person]
- Projection: column1 AS id, column2 AS first_name, column3 AS last_name
- Values: ($1, $2, $3)
- "#
- .trim();
+ let expected_plan = "Dml: op=[Insert Into] table=[person]\
+ \n Projection: column1 AS id, column2 AS first_name,
column3 AS last_name, \
+ CAST(NULL AS Int32) AS age, CAST(NULL AS
Utf8) AS state, CAST(NULL AS Float64) AS salary, \
+ CAST(NULL AS Timestamp(Nanosecond, None))
AS birth_date, CAST(NULL AS Int32) AS 😀\
+ \n Values: ($1, $2, $3)";
let expected_dt = "[Int32]";
let plan = prepare_stmt_quick_test(sql, expected_plan, expected_dt);
@@ -4061,12 +4059,11 @@ Dml: op=[Insert Into] table=[person]
ScalarValue::Utf8(Some("Alan".to_string())),
ScalarValue::Utf8(Some("Turing".to_string())),
];
- let expected_plan = r#"
-Dml: op=[Insert Into] table=[person]
- Projection: column1 AS id, column2 AS first_name, column3 AS last_name
- Values: (UInt32(1), Utf8("Alan"), Utf8("Turing"))
- "#
- .trim();
+ let expected_plan = "Dml: op=[Insert Into] table=[person]\
+ \n Projection: column1 AS id, column2 AS first_name,
column3 AS last_name, \
+ CAST(NULL AS Int32) AS age, CAST(NULL AS
Utf8) AS state, CAST(NULL AS Float64) AS salary, \
+ CAST(NULL AS Timestamp(Nanosecond, None))
AS birth_date, CAST(NULL AS Int32) AS 😀\
+ \n Values: (UInt32(1), Utf8(\"Alan\"),
Utf8(\"Turing\"))";
let plan = plan.replace_params_with_values(¶m_values).unwrap();
prepare_stmt_replace_params_quick_test(plan, param_values, expected_plan);
diff --git a/datafusion/sqllogictest/test_files/insert.slt
b/datafusion/sqllogictest/test_files/insert.slt
index 8b9fd52e0d..9860bdcae0 100644
--- a/datafusion/sqllogictest/test_files/insert.slt
+++ b/datafusion/sqllogictest/test_files/insert.slt
@@ -258,14 +258,18 @@ insert into table_without_values(name, id) values(4,
'zoo');
statement error Error during planning: Column count doesn't match insert query!
insert into table_without_values(id) values(4, 'zoo');
-statement error Error during planning: Inserting query must have the same
schema with the table.
+# insert NULL values for the missing column (name)
+query IT
insert into table_without_values(id) values(4);
+----
+1
query IT rowsort
select * from table_without_values;
----
1 foo
2 bar
+4 NULL
statement ok
drop table table_without_values;
@@ -285,6 +289,16 @@ insert into table_without_values values(2, NULL);
----
1
+# insert NULL values for the missing column (field2)
+query II
+insert into table_without_values(field1) values(3);
+----
+1
+
+# insert NULL values for the missing column (field1), but column is
non-nullable
+statement error Execution error: Invalid batch column at '0' has null but
schema specifies non-nullable
+insert into table_without_values(field2) values(300);
+
statement error Execution error: Invalid batch column at '0' has null but
schema specifies non-nullable
insert into table_without_values values(NULL, 300);
@@ -296,6 +310,7 @@ select * from table_without_values;
----
1 100
2 NULL
+3 NULL
statement ok
drop table table_without_values;
diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt
b/datafusion/sqllogictest/test_files/insert_to_external.slt
index d6449bc272..4441036241 100644
--- a/datafusion/sqllogictest/test_files/insert_to_external.slt
+++ b/datafusion/sqllogictest/test_files/insert_to_external.slt
@@ -468,14 +468,18 @@ insert into table_without_values(name, id) values(4,
'zoo');
statement error Error during planning: Column count doesn't match insert query!
insert into table_without_values(id) values(4, 'zoo');
-statement error Error during planning: Inserting query must have the same
schema with the table.
+# insert NULL values for the missing column (name)
+query IT
insert into table_without_values(id) values(4);
+----
+1
query IT rowsort
select * from table_without_values;
----
1 foo
2 bar
+4 NULL
statement ok
drop table table_without_values;
@@ -498,6 +502,16 @@ insert into table_without_values values(2, NULL);
----
1
+# insert NULL values for the missing column (field2)
+query II
+insert into table_without_values(field1) values(3);
+----
+1
+
+# insert NULL values for the missing column (field1), but column is
non-nullable
+statement error Execution error: Invalid batch column at '0' has null but
schema specifies non-nullable
+insert into table_without_values(field2) values(300);
+
statement error Execution error: Invalid batch column at '0' has null but
schema specifies non-nullable
insert into table_without_values values(NULL, 300);
@@ -509,6 +523,7 @@ select * from table_without_values;
----
1 100
2 NULL
+3 NULL
statement ok
drop table table_without_values;