This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 64057fd9ed feat: fill missing values with NULLs while inserting (#8146)
64057fd9ed is described below

commit 64057fd9ed763c79c3068c548be6bdd058f04608
Author: Jonah Gao <[email protected]>
AuthorDate: Tue Nov 14 22:28:03 2023 +0800

    feat: fill missing values with NULLs while inserting (#8146)
    
    * feat: fill missing values with NULLs while inserting
    
    * add test comment
    
    * update to re-trigger ci
---
 datafusion/sql/src/statement.rs                    | 46 +++++++++++++---------
 datafusion/sql/tests/sql_integration.rs            | 33 +++++++---------
 datafusion/sqllogictest/test_files/insert.slt      | 17 +++++++-
 .../sqllogictest/test_files/insert_to_external.slt | 17 +++++++-
 4 files changed, 75 insertions(+), 38 deletions(-)

diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index ecc77b0442..49755729d2 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -33,7 +33,7 @@ use datafusion_common::parsers::CompressionTypeVariant;
 use datafusion_common::{
     not_impl_err, plan_datafusion_err, plan_err, unqualified_field_not_found, 
Column,
     Constraints, DFField, DFSchema, DFSchemaRef, DataFusionError, 
OwnedTableReference,
-    Result, SchemaReference, TableReference, ToDFSchema,
+    Result, ScalarValue, SchemaReference, TableReference, ToDFSchema,
 };
 use datafusion_expr::dml::{CopyOptions, CopyTo};
 use 
datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
@@ -1087,9 +1087,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         let arrow_schema = (*table_source.schema()).clone();
         let table_schema = DFSchema::try_from(arrow_schema)?;
 
-        // Get insert fields and index_mapping
-        // The i-th field of the table is `fields[index_mapping[i]]`
-        let (fields, index_mapping) = if columns.is_empty() {
+        // Get insert fields and target table's value indices
+        //
+        // if value_indices[i] = Some(j), it means that the value of the i-th 
target table's column is
+        // derived from the j-th output of the source.
+        //
+        // if value_indices[i] = None, it means that the value of the i-th 
target table's column is
+        // not provided, and should be filled with a default value later.
+        let (fields, value_indices) = if columns.is_empty() {
             // Empty means we're inserting into all columns of the table
             (
                 table_schema.fields().clone(),
@@ -1098,7 +1103,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                     .collect::<Vec<_>>(),
             )
         } else {
-            let mut mapping = vec![None; table_schema.fields().len()];
+            let mut value_indices = vec![None; table_schema.fields().len()];
             let fields = columns
                 .into_iter()
                 .map(|c| self.normalizer.normalize(c))
@@ -1107,19 +1112,19 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                     let column_index = table_schema
                         .index_of_column_by_name(None, &c)?
                         .ok_or_else(|| unqualified_field_not_found(&c, 
&table_schema))?;
-                    if mapping[column_index].is_some() {
+                    if value_indices[column_index].is_some() {
                         return Err(DataFusionError::SchemaError(
                             
datafusion_common::SchemaError::DuplicateUnqualifiedField {
                                 name: c,
                             },
                         ));
                     } else {
-                        mapping[column_index] = Some(i);
+                        value_indices[column_index] = Some(i);
                     }
                     Ok(table_schema.field(column_index).clone())
                 })
                 .collect::<Result<Vec<DFField>>>()?;
-            (fields, mapping)
+            (fields, value_indices)
         };
 
         // infer types for Values clause... other types should be resolvable 
the regular way
@@ -1154,17 +1159,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             plan_err!("Column count doesn't match insert query!")?;
         }
 
-        let exprs = index_mapping
+        let exprs = value_indices
             .into_iter()
-            .flatten()
-            .map(|i| {
-                let target_field = &fields[i];
-                let source_field = source.schema().field(i);
-                let expr =
-                    
datafusion_expr::Expr::Column(source_field.unqualified_column())
-                        .cast_to(target_field.data_type(), source.schema())?
-                        .alias(target_field.name());
-                Ok(expr)
+            .enumerate()
+            .map(|(i, value_index)| {
+                let target_field = table_schema.field(i);
+                let expr = match value_index {
+                    Some(v) => {
+                        let source_field = source.schema().field(v);
+                        
datafusion_expr::Expr::Column(source_field.qualified_column())
+                            .cast_to(target_field.data_type(), 
source.schema())?
+                    }
+                    // Fill the default value for the column, currently only 
supports NULL.
+                    None => datafusion_expr::Expr::Literal(ScalarValue::Null)
+                        .cast_to(target_field.data_type(), 
&DFSchema::empty())?,
+                };
+                Ok(expr.alias(target_field.name()))
             })
             .collect::<Result<Vec<datafusion_expr::Expr>>>()?;
         let source = project(source, exprs)?;
diff --git a/datafusion/sql/tests/sql_integration.rs 
b/datafusion/sql/tests/sql_integration.rs
index ff6dca7eef..4c2bad1c71 100644
--- a/datafusion/sql/tests/sql_integration.rs
+++ b/datafusion/sql/tests/sql_integration.rs
@@ -422,12 +422,11 @@ CopyTo: format=csv output_url=output.csv 
single_file_output=true options: ()
 fn plan_insert() {
     let sql =
         "insert into person (id, first_name, last_name) values (1, 'Alan', 
'Turing')";
-    let plan = r#"
-Dml: op=[Insert Into] table=[person]
-  Projection: CAST(column1 AS UInt32) AS id, column2 AS first_name, column3 AS 
last_name
-    Values: (Int64(1), Utf8("Alan"), Utf8("Turing"))
-    "#
-    .trim();
+    let plan = "Dml: op=[Insert Into] table=[person]\
+                \n  Projection: CAST(column1 AS UInt32) AS id, column2 AS 
first_name, column3 AS last_name, \
+                        CAST(NULL AS Int32) AS age, CAST(NULL AS Utf8) AS 
state, CAST(NULL AS Float64) AS salary, \
+                        CAST(NULL AS Timestamp(Nanosecond, None)) AS 
birth_date, CAST(NULL AS Int32) AS 😀\
+                \n    Values: (Int64(1), Utf8(\"Alan\"), Utf8(\"Turing\"))";
     quick_test(sql, plan);
 }
 
@@ -4037,12 +4036,11 @@ Dml: op=[Update] table=[person]
 fn test_prepare_statement_insert_infer() {
     let sql = "insert into person (id, first_name, last_name) values ($1, $2, 
$3)";
 
-    let expected_plan = r#"
-Dml: op=[Insert Into] table=[person]
-  Projection: column1 AS id, column2 AS first_name, column3 AS last_name
-    Values: ($1, $2, $3)
-        "#
-    .trim();
+    let expected_plan = "Dml: op=[Insert Into] table=[person]\
+                        \n  Projection: column1 AS id, column2 AS first_name, 
column3 AS last_name, \
+                                    CAST(NULL AS Int32) AS age, CAST(NULL AS 
Utf8) AS state, CAST(NULL AS Float64) AS salary, \
+                                    CAST(NULL AS Timestamp(Nanosecond, None)) 
AS birth_date, CAST(NULL AS Int32) AS 😀\
+                        \n    Values: ($1, $2, $3)";
 
     let expected_dt = "[Int32]";
     let plan = prepare_stmt_quick_test(sql, expected_plan, expected_dt);
@@ -4061,12 +4059,11 @@ Dml: op=[Insert Into] table=[person]
         ScalarValue::Utf8(Some("Alan".to_string())),
         ScalarValue::Utf8(Some("Turing".to_string())),
     ];
-    let expected_plan = r#"
-Dml: op=[Insert Into] table=[person]
-  Projection: column1 AS id, column2 AS first_name, column3 AS last_name
-    Values: (UInt32(1), Utf8("Alan"), Utf8("Turing"))
-        "#
-    .trim();
+    let expected_plan = "Dml: op=[Insert Into] table=[person]\
+                        \n  Projection: column1 AS id, column2 AS first_name, 
column3 AS last_name, \
+                                    CAST(NULL AS Int32) AS age, CAST(NULL AS 
Utf8) AS state, CAST(NULL AS Float64) AS salary, \
+                                    CAST(NULL AS Timestamp(Nanosecond, None)) 
AS birth_date, CAST(NULL AS Int32) AS 😀\
+                        \n    Values: (UInt32(1), Utf8(\"Alan\"), 
Utf8(\"Turing\"))";
     let plan = plan.replace_params_with_values(&param_values).unwrap();
 
     prepare_stmt_replace_params_quick_test(plan, param_values, expected_plan);
diff --git a/datafusion/sqllogictest/test_files/insert.slt 
b/datafusion/sqllogictest/test_files/insert.slt
index 8b9fd52e0d..9860bdcae0 100644
--- a/datafusion/sqllogictest/test_files/insert.slt
+++ b/datafusion/sqllogictest/test_files/insert.slt
@@ -258,14 +258,18 @@ insert into table_without_values(name, id) values(4, 
'zoo');
 statement error Error during planning: Column count doesn't match insert query!
 insert into table_without_values(id) values(4, 'zoo');
 
-statement error Error during planning: Inserting query must have the same 
schema with the table.
+# insert NULL values for the missing column (name)
+query IT
 insert into table_without_values(id) values(4);
+----
+1
 
 query IT rowsort
 select * from table_without_values;
 ----
 1 foo
 2 bar
+4 NULL
 
 statement ok
 drop table table_without_values;
@@ -285,6 +289,16 @@ insert into table_without_values values(2, NULL);
 ----
 1
 
+# insert NULL values for the missing column (field2) 
+query II
+insert into table_without_values(field1) values(3);
+----
+1
+
+# insert NULL values for the missing column (field1), but column is 
non-nullable
+statement error Execution error: Invalid batch column at '0' has null but 
schema specifies non-nullable
+insert into table_without_values(field2) values(300);
+
 statement error Execution error: Invalid batch column at '0' has null but 
schema specifies non-nullable
 insert into table_without_values values(NULL, 300);
 
@@ -296,6 +310,7 @@ select * from table_without_values;
 ----
 1 100
 2 NULL
+3 NULL
 
 statement ok
 drop table table_without_values;
diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt 
b/datafusion/sqllogictest/test_files/insert_to_external.slt
index d6449bc272..4441036241 100644
--- a/datafusion/sqllogictest/test_files/insert_to_external.slt
+++ b/datafusion/sqllogictest/test_files/insert_to_external.slt
@@ -468,14 +468,18 @@ insert into table_without_values(name, id) values(4, 
'zoo');
 statement error Error during planning: Column count doesn't match insert query!
 insert into table_without_values(id) values(4, 'zoo');
 
-statement error Error during planning: Inserting query must have the same 
schema with the table.
+# insert NULL values for the missing column (name)
+query IT
 insert into table_without_values(id) values(4);
+----
+1
 
 query IT rowsort
 select * from table_without_values;
 ----
 1 foo
 2 bar
+4 NULL
 
 statement ok
 drop table table_without_values;
@@ -498,6 +502,16 @@ insert into table_without_values values(2, NULL);
 ----
 1
 
+# insert NULL values for the missing column (field2)
+query II
+insert into table_without_values(field1) values(3);
+----
+1
+
+# insert NULL values for the missing column (field1), but column is 
non-nullable
+statement error Execution error: Invalid batch column at '0' has null but 
schema specifies non-nullable
+insert into table_without_values(field2) values(300);
+
 statement error Execution error: Invalid batch column at '0' has null but 
schema specifies non-nullable
 insert into table_without_values values(NULL, 300);
 
@@ -509,6 +523,7 @@ select * from table_without_values;
 ----
 1 100
 2 NULL
+3 NULL
 
 statement ok
 drop table table_without_values;

Reply via email to