This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 42eabb97f9 bug: improve schema checking for `insert into` cases 
(#14572)
42eabb97f9 is described below

commit 42eabb97f950e4c4adbb21fc37c3453c349b1658
Author: Qi Zhu <[email protected]>
AuthorDate: Mon Feb 17 19:56:32 2025 +0800

    bug: improve schema checking for `insert into` cases (#14572)
    
    * Address comments
    
    * Address comments
    
    * Address comments
    
    * Address comment
    
    * Address new comments
---
 datafusion/common/src/dfschema.rs                  | 49 +++++++++++++-------
 datafusion/core/src/datasource/listing/table.rs    | 23 +---------
 datafusion/core/src/datasource/memory.rs           | 23 ++--------
 datafusion/core/tests/dataframe/mod.rs             | 52 ++++++++++++++++++++++
 .../test_files/aggregate_skip_partial.slt          |  3 +-
 datafusion/sqllogictest/test_files/insert.slt      | 14 ++++--
 .../sqllogictest/test_files/insert_to_external.slt | 12 ++++-
 7 files changed, 112 insertions(+), 64 deletions(-)

diff --git a/datafusion/common/src/dfschema.rs 
b/datafusion/common/src/dfschema.rs
index 7e9025dee1..99fb179c76 100644
--- a/datafusion/common/src/dfschema.rs
+++ b/datafusion/common/src/dfschema.rs
@@ -1002,12 +1002,14 @@ pub trait SchemaExt {
     /// It works the same as [`DFSchema::equivalent_names_and_types`].
     fn equivalent_names_and_types(&self, other: &Self) -> bool;
 
-    /// Returns true if the two schemas have the same qualified named
-    /// fields with logically equivalent data types. Returns false otherwise.
+    /// Returns nothing if the two schemas have the same qualified named
+    /// fields with logically equivalent data types. Returns internal error 
otherwise.
     ///
     /// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
     /// equivalence checking.
-    fn logically_equivalent_names_and_types(&self, other: &Self) -> bool;
+    ///
+    /// It is only used by insert into cases.
+    fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()>;
 }
 
 impl SchemaExt for Schema {
@@ -1028,21 +1030,36 @@ impl SchemaExt for Schema {
             })
     }
 
-    fn logically_equivalent_names_and_types(&self, other: &Self) -> bool {
+    // It is only used by insert into cases.
+    fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()> 
{
+        // case 1 : schema length mismatch
         if self.fields().len() != other.fields().len() {
-            return false;
+            _plan_err!(
+                "Inserting query must have the same schema length as the 
table. \
+            Expected table schema length: {}, got: {}",
+                self.fields().len(),
+                other.fields().len()
+            )
+        } else {
+            // case 2 : schema length match, but fields mismatch
+            // check if the fields name are the same and have the same data 
types
+            self.fields()
+                .iter()
+                .zip(other.fields().iter())
+                .try_for_each(|(f1, f2)| {
+                    if f1.name() != f2.name() || 
!DFSchema::datatype_is_logically_equal(f1.data_type(), f2.data_type()) {
+                        _plan_err!(
+                            "Inserting query schema mismatch: Expected table 
field '{}' with type {:?}, \
+                            but got '{}' with type {:?}.",
+                            f1.name(),
+                            f1.data_type(),
+                            f2.name(),
+                            f2.data_type())
+                    } else {
+                        Ok(())
+                    }
+                })
         }
-
-        self.fields()
-            .iter()
-            .zip(other.fields().iter())
-            .all(|(f1, f2)| {
-                f1.name() == f2.name()
-                    && DFSchema::datatype_is_logically_equal(
-                        f1.data_type(),
-                        f2.data_type(),
-                    )
-            })
     }
 }
 
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 2f517e397e..3be8af59ea 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -996,27 +996,8 @@ impl TableProvider for ListingTable {
         insert_op: InsertOp,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         // Check that the schema of the plan matches the schema of this table.
-        if !self
-            .schema()
-            .logically_equivalent_names_and_types(&input.schema())
-        {
-            // Return an error if schema of the input query does not match 
with the table schema.
-            return plan_err!(
-                "Inserting query must have the same schema with the table. \
-                Expected: {:?}, got: {:?}",
-                self.schema()
-                    .fields()
-                    .iter()
-                    .map(|field| field.data_type())
-                    .collect::<Vec<_>>(),
-                input
-                    .schema()
-                    .fields()
-                    .iter()
-                    .map(|field| field.data_type())
-                    .collect::<Vec<_>>()
-            );
-        }
+        self.schema()
+            .logically_equivalent_names_and_types(&input.schema())?;
 
         let table_path = &self.table_paths()[0];
         if !table_path.is_collection() {
diff --git a/datafusion/core/src/datasource/memory.rs 
b/datafusion/core/src/datasource/memory.rs
index f1bda1a66f..94c6e45804 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory.rs
@@ -278,26 +278,9 @@ impl TableProvider for MemTable {
 
         // Create a physical plan from the logical plan.
         // Check that the schema of the plan matches the schema of this table.
-        if !self
-            .schema()
-            .logically_equivalent_names_and_types(&input.schema())
-        {
-            return plan_err!(
-                "Inserting query must have the same schema with the table. \
-                Expected: {:?}, got: {:?}",
-                self.schema()
-                    .fields()
-                    .iter()
-                    .map(|field| field.data_type())
-                    .collect::<Vec<_>>(),
-                input
-                    .schema()
-                    .fields()
-                    .iter()
-                    .map(|field| field.data_type())
-                    .collect::<Vec<_>>()
-            );
-        }
+        self.schema()
+            .logically_equivalent_names_and_types(&input.schema())?;
+
         if insert_op != InsertOp::Append {
             return not_impl_err!("{insert_op} not implemented for MemoryTable 
yet");
         }
diff --git a/datafusion/core/tests/dataframe/mod.rs 
b/datafusion/core/tests/dataframe/mod.rs
index 8155fd6a2f..b0d469d751 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -5274,3 +5274,55 @@ async fn register_non_parquet_file() {
         "1.json' does not match the expected extension '.parquet'"
     );
 }
+
+// Test inserting into checking.
+#[tokio::test]
+async fn test_insert_into_checking() -> Result<()> {
+    // Create a new schema with one field called "a" of type Int64, and 
setting nullable to false
+    let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, 
false)]));
+
+    let session_ctx = SessionContext::new();
+
+    // Create and register the initial table with the provided schema and data
+    let initial_table = Arc::new(MemTable::try_new(schema.clone(), 
vec![vec![]])?);
+    session_ctx.register_table("t", initial_table.clone())?;
+
+    // There are two cases we need to check
+    // 1. The len of the schema of the plan and the schema of the table should 
be the same
+    // 2. The datatype of the schema of the plan and the schema of the table 
should be the same
+
+    // Test case 1:
+    let write_df = session_ctx.sql("values (1, 2), (3, 4)").await.unwrap();
+
+    let e = write_df
+        .write_table("t", DataFrameWriteOptions::new())
+        .await
+        .unwrap_err();
+
+    assert_contains!(
+        e.to_string(),
+        "Inserting query must have the same schema length as the table."
+    );
+
+    // Setting nullable to true
+    // Make sure the nullable check go through
+    let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, 
true)]));
+
+    let session_ctx = SessionContext::new();
+
+    // Create and register the initial table with the provided schema and data
+    let initial_table = Arc::new(MemTable::try_new(schema.clone(), 
vec![vec![]])?);
+    session_ctx.register_table("t", initial_table.clone())?;
+
+    // Test case 2:
+    let write_df = session_ctx.sql("values ('a123'), ('b456')").await.unwrap();
+
+    let e = write_df
+        .write_table("t", DataFrameWriteOptions::new())
+        .await
+        .unwrap_err();
+
+    assert_contains!(e.to_string(), "Inserting query schema mismatch: Expected 
table field 'a' with type Int64, but got 'column1' with type Utf8");
+
+    Ok(())
+}
diff --git a/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt 
b/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt
index a2e51cffac..3a4d641abf 100644
--- a/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt
+++ b/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt
@@ -220,6 +220,7 @@ set datafusion.execution.batch_size = 4;
 
 # Inserting into nullable table with batch_size specified above
 # to prevent creation on single in-memory batch
+
 statement ok
 CREATE TABLE aggregate_test_100_null (
   c2  TINYINT NOT NULL,
@@ -506,7 +507,7 @@ SELECT
   avg(c11) FILTER (WHERE c2 != 5)
 FROM aggregate_test_100 GROUP BY c1 ORDER BY c1;
 ----
-a 2.5 0.449071887467    
+a 2.5 0.449071887467
 b 2.642857142857 0.445486298629
 c 2.421052631579 0.422882117723
 d 2.125 0.518706191331
diff --git a/datafusion/sqllogictest/test_files/insert.slt 
b/datafusion/sqllogictest/test_files/insert.slt
index cbc989841a..ee76ee1c55 100644
--- a/datafusion/sqllogictest/test_files/insert.slt
+++ b/datafusion/sqllogictest/test_files/insert.slt
@@ -296,8 +296,11 @@ insert into table_without_values(field1) values(3);
 1
 
 # insert NULL values for the missing column (field1), but column is 
non-nullable
-statement error Execution error: Invalid batch column at '0' has null but 
schema specifies non-nullable
+statement error
 insert into table_without_values(field2) values(300);
+----
+DataFusion error: Execution error: Invalid batch column at '0' has null but 
schema specifies non-nullable
+
 
 statement error Invalid argument error: Column 'column1' is declared as 
non-nullable but contains null values
 insert into table_without_values values(NULL, 300);
@@ -358,7 +361,7 @@ statement ok
 create table test_column_defaults(
   a int,
   b int not null default null,
-  c int default 100*2+300, 
+  c int default 100*2+300,
   d text default lower('DEFAULT_TEXT'),
   e timestamp default now()
 )
@@ -368,8 +371,11 @@ insert into test_column_defaults values(1, 10, 100, 'ABC', 
now())
 ----
 1
 
-statement error DataFusion error: Execution error: Invalid batch column at '1' 
has null but schema specifies non-nullable
+statement error
 insert into test_column_defaults(a) values(2)
+----
+DataFusion error: Execution error: Invalid batch column at '1' has null but 
schema specifies non-nullable
+
 
 query I
 insert into test_column_defaults(b) values(20)
@@ -412,7 +418,7 @@ statement ok
 create table test_column_defaults(
   a int,
   b int not null default null,
-  c int default 100*2+300, 
+  c int default 100*2+300,
   d text default lower('DEFAULT_TEXT'),
   e timestamp default now()
 ) as values(1, 10, 100, 'ABC', now())
diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt 
b/datafusion/sqllogictest/test_files/insert_to_external.slt
index c5fa2b4e1a..ee1d67c5e2 100644
--- a/datafusion/sqllogictest/test_files/insert_to_external.slt
+++ b/datafusion/sqllogictest/test_files/insert_to_external.slt
@@ -60,6 +60,7 @@ STORED AS parquet
 LOCATION 'test_files/scratch/insert_to_external/parquet_types_partitioned/'
 PARTITIONED BY (b);
 
+#query error here because PARTITIONED BY (b) will make the b nullable to false
 query I
 insert into dictionary_encoded_parquet_partitioned
 select * from dictionary_encoded_values
@@ -81,6 +82,7 @@ STORED AS arrow
 LOCATION 'test_files/scratch/insert_to_external/arrow_dict_partitioned/'
 PARTITIONED BY (b);
 
+#query error here because PARTITIONED BY (b) will make the b nullable to false
 query I
 insert into dictionary_encoded_arrow_partitioned
 select * from dictionary_encoded_values
@@ -543,8 +545,11 @@ insert into table_without_values(field1) values(3);
 1
 
 # insert NULL values for the missing column (field1), but column is 
non-nullable
-statement error Execution error: Invalid batch column at '0' has null but 
schema specifies non-nullable
+statement error
 insert into table_without_values(field2) values(300);
+----
+DataFusion error: Execution error: Invalid batch column at '0' has null but 
schema specifies non-nullable
+
 
 statement error Invalid argument error: Column 'column1' is declared as 
non-nullable but contains null values
 insert into table_without_values values(NULL, 300);
@@ -581,8 +586,11 @@ insert into test_column_defaults values(1, 10, 100, 'ABC', 
now())
 ----
 1
 
-statement error DataFusion error: Execution error: Invalid batch column at '1' 
has null but schema specifies non-nullable
+statement error
 insert into test_column_defaults(a) values(2)
+----
+DataFusion error: Execution error: Invalid batch column at '1' has null but 
schema specifies non-nullable
+
 
 query I
 insert into test_column_defaults(b) values(20)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to