This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 42eabb97f9 bug: improve schema checking for `insert into` cases
(#14572)
42eabb97f9 is described below
commit 42eabb97f950e4c4adbb21fc37c3453c349b1658
Author: Qi Zhu <[email protected]>
AuthorDate: Mon Feb 17 19:56:32 2025 +0800
bug: improve schema checking for `insert into` cases (#14572)
* Address comments
* Address comments
* Address comments
* Address comment
* Address new comments
---
datafusion/common/src/dfschema.rs | 49 +++++++++++++-------
datafusion/core/src/datasource/listing/table.rs | 23 +---------
datafusion/core/src/datasource/memory.rs | 23 ++--------
datafusion/core/tests/dataframe/mod.rs | 52 ++++++++++++++++++++++
.../test_files/aggregate_skip_partial.slt | 3 +-
datafusion/sqllogictest/test_files/insert.slt | 14 ++++--
.../sqllogictest/test_files/insert_to_external.slt | 12 ++++-
7 files changed, 112 insertions(+), 64 deletions(-)
diff --git a/datafusion/common/src/dfschema.rs
b/datafusion/common/src/dfschema.rs
index 7e9025dee1..99fb179c76 100644
--- a/datafusion/common/src/dfschema.rs
+++ b/datafusion/common/src/dfschema.rs
@@ -1002,12 +1002,14 @@ pub trait SchemaExt {
/// It works the same as [`DFSchema::equivalent_names_and_types`].
fn equivalent_names_and_types(&self, other: &Self) -> bool;
- /// Returns true if the two schemas have the same qualified named
- /// fields with logically equivalent data types. Returns false otherwise.
+ /// Returns nothing if the two schemas have the same qualified named
+ /// fields with logically equivalent data types. Returns internal error
otherwise.
///
/// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
/// equivalence checking.
- fn logically_equivalent_names_and_types(&self, other: &Self) -> bool;
+ ///
+ /// It is only used by insert into cases.
+ fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()>;
}
impl SchemaExt for Schema {
@@ -1028,21 +1030,36 @@ impl SchemaExt for Schema {
})
}
- fn logically_equivalent_names_and_types(&self, other: &Self) -> bool {
+ // It is only used by insert into cases.
+ fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()>
{
+ // case 1 : schema length mismatch
if self.fields().len() != other.fields().len() {
- return false;
+ _plan_err!(
+ "Inserting query must have the same schema length as the
table. \
+ Expected table schema length: {}, got: {}",
+ self.fields().len(),
+ other.fields().len()
+ )
+ } else {
+ // case 2 : schema length match, but fields mismatch
+ // check if the fields name are the same and have the same data
types
+ self.fields()
+ .iter()
+ .zip(other.fields().iter())
+ .try_for_each(|(f1, f2)| {
+ if f1.name() != f2.name() ||
!DFSchema::datatype_is_logically_equal(f1.data_type(), f2.data_type()) {
+ _plan_err!(
+ "Inserting query schema mismatch: Expected table
field '{}' with type {:?}, \
+ but got '{}' with type {:?}.",
+ f1.name(),
+ f1.data_type(),
+ f2.name(),
+ f2.data_type())
+ } else {
+ Ok(())
+ }
+ })
}
-
- self.fields()
- .iter()
- .zip(other.fields().iter())
- .all(|(f1, f2)| {
- f1.name() == f2.name()
- && DFSchema::datatype_is_logically_equal(
- f1.data_type(),
- f2.data_type(),
- )
- })
}
}
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 2f517e397e..3be8af59ea 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -996,27 +996,8 @@ impl TableProvider for ListingTable {
insert_op: InsertOp,
) -> Result<Arc<dyn ExecutionPlan>> {
// Check that the schema of the plan matches the schema of this table.
- if !self
- .schema()
- .logically_equivalent_names_and_types(&input.schema())
- {
- // Return an error if schema of the input query does not match
with the table schema.
- return plan_err!(
- "Inserting query must have the same schema with the table. \
- Expected: {:?}, got: {:?}",
- self.schema()
- .fields()
- .iter()
- .map(|field| field.data_type())
- .collect::<Vec<_>>(),
- input
- .schema()
- .fields()
- .iter()
- .map(|field| field.data_type())
- .collect::<Vec<_>>()
- );
- }
+ self.schema()
+ .logically_equivalent_names_and_types(&input.schema())?;
let table_path = &self.table_paths()[0];
if !table_path.is_collection() {
diff --git a/datafusion/core/src/datasource/memory.rs
b/datafusion/core/src/datasource/memory.rs
index f1bda1a66f..94c6e45804 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory.rs
@@ -278,26 +278,9 @@ impl TableProvider for MemTable {
// Create a physical plan from the logical plan.
// Check that the schema of the plan matches the schema of this table.
- if !self
- .schema()
- .logically_equivalent_names_and_types(&input.schema())
- {
- return plan_err!(
- "Inserting query must have the same schema with the table. \
- Expected: {:?}, got: {:?}",
- self.schema()
- .fields()
- .iter()
- .map(|field| field.data_type())
- .collect::<Vec<_>>(),
- input
- .schema()
- .fields()
- .iter()
- .map(|field| field.data_type())
- .collect::<Vec<_>>()
- );
- }
+ self.schema()
+ .logically_equivalent_names_and_types(&input.schema())?;
+
if insert_op != InsertOp::Append {
return not_impl_err!("{insert_op} not implemented for MemoryTable
yet");
}
diff --git a/datafusion/core/tests/dataframe/mod.rs
b/datafusion/core/tests/dataframe/mod.rs
index 8155fd6a2f..b0d469d751 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -5274,3 +5274,55 @@ async fn register_non_parquet_file() {
"1.json' does not match the expected extension '.parquet'"
);
}
+
+// Test inserting into checking.
+#[tokio::test]
+async fn test_insert_into_checking() -> Result<()> {
+ // Create a new schema with one field called "a" of type Int64, and
setting nullable to false
+ let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64,
false)]));
+
+ let session_ctx = SessionContext::new();
+
+ // Create and register the initial table with the provided schema and data
+ let initial_table = Arc::new(MemTable::try_new(schema.clone(),
vec![vec![]])?);
+ session_ctx.register_table("t", initial_table.clone())?;
+
+ // There are two cases we need to check
+ // 1. The len of the schema of the plan and the schema of the table should
be the same
+ // 2. The datatype of the schema of the plan and the schema of the table
should be the same
+
+ // Test case 1:
+ let write_df = session_ctx.sql("values (1, 2), (3, 4)").await.unwrap();
+
+ let e = write_df
+ .write_table("t", DataFrameWriteOptions::new())
+ .await
+ .unwrap_err();
+
+ assert_contains!(
+ e.to_string(),
+ "Inserting query must have the same schema length as the table."
+ );
+
+ // Setting nullable to true
+ // Make sure the nullable check go through
+ let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64,
true)]));
+
+ let session_ctx = SessionContext::new();
+
+ // Create and register the initial table with the provided schema and data
+ let initial_table = Arc::new(MemTable::try_new(schema.clone(),
vec![vec![]])?);
+ session_ctx.register_table("t", initial_table.clone())?;
+
+ // Test case 2:
+ let write_df = session_ctx.sql("values ('a123'), ('b456')").await.unwrap();
+
+ let e = write_df
+ .write_table("t", DataFrameWriteOptions::new())
+ .await
+ .unwrap_err();
+
+ assert_contains!(e.to_string(), "Inserting query schema mismatch: Expected
table field 'a' with type Int64, but got 'column1' with type Utf8");
+
+ Ok(())
+}
diff --git a/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt
b/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt
index a2e51cffac..3a4d641abf 100644
--- a/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt
+++ b/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt
@@ -220,6 +220,7 @@ set datafusion.execution.batch_size = 4;
# Inserting into nullable table with batch_size specified above
# to prevent creation on single in-memory batch
+
statement ok
CREATE TABLE aggregate_test_100_null (
c2 TINYINT NOT NULL,
@@ -506,7 +507,7 @@ SELECT
avg(c11) FILTER (WHERE c2 != 5)
FROM aggregate_test_100 GROUP BY c1 ORDER BY c1;
----
-a 2.5 0.449071887467
+a 2.5 0.449071887467
b 2.642857142857 0.445486298629
c 2.421052631579 0.422882117723
d 2.125 0.518706191331
diff --git a/datafusion/sqllogictest/test_files/insert.slt
b/datafusion/sqllogictest/test_files/insert.slt
index cbc989841a..ee76ee1c55 100644
--- a/datafusion/sqllogictest/test_files/insert.slt
+++ b/datafusion/sqllogictest/test_files/insert.slt
@@ -296,8 +296,11 @@ insert into table_without_values(field1) values(3);
1
# insert NULL values for the missing column (field1), but column is
non-nullable
-statement error Execution error: Invalid batch column at '0' has null but
schema specifies non-nullable
+statement error
insert into table_without_values(field2) values(300);
+----
+DataFusion error: Execution error: Invalid batch column at '0' has null but
schema specifies non-nullable
+
statement error Invalid argument error: Column 'column1' is declared as
non-nullable but contains null values
insert into table_without_values values(NULL, 300);
@@ -358,7 +361,7 @@ statement ok
create table test_column_defaults(
a int,
b int not null default null,
- c int default 100*2+300,
+ c int default 100*2+300,
d text default lower('DEFAULT_TEXT'),
e timestamp default now()
)
@@ -368,8 +371,11 @@ insert into test_column_defaults values(1, 10, 100, 'ABC',
now())
----
1
-statement error DataFusion error: Execution error: Invalid batch column at '1'
has null but schema specifies non-nullable
+statement error
insert into test_column_defaults(a) values(2)
+----
+DataFusion error: Execution error: Invalid batch column at '1' has null but
schema specifies non-nullable
+
query I
insert into test_column_defaults(b) values(20)
@@ -412,7 +418,7 @@ statement ok
create table test_column_defaults(
a int,
b int not null default null,
- c int default 100*2+300,
+ c int default 100*2+300,
d text default lower('DEFAULT_TEXT'),
e timestamp default now()
) as values(1, 10, 100, 'ABC', now())
diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt
b/datafusion/sqllogictest/test_files/insert_to_external.slt
index c5fa2b4e1a..ee1d67c5e2 100644
--- a/datafusion/sqllogictest/test_files/insert_to_external.slt
+++ b/datafusion/sqllogictest/test_files/insert_to_external.slt
@@ -60,6 +60,7 @@ STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/parquet_types_partitioned/'
PARTITIONED BY (b);
+#query error here because PARTITIONED BY (b) will make the b nullable to false
query I
insert into dictionary_encoded_parquet_partitioned
select * from dictionary_encoded_values
@@ -81,6 +82,7 @@ STORED AS arrow
LOCATION 'test_files/scratch/insert_to_external/arrow_dict_partitioned/'
PARTITIONED BY (b);
+#query error here because PARTITIONED BY (b) will make the b nullable to false
query I
insert into dictionary_encoded_arrow_partitioned
select * from dictionary_encoded_values
@@ -543,8 +545,11 @@ insert into table_without_values(field1) values(3);
1
# insert NULL values for the missing column (field1), but column is
non-nullable
-statement error Execution error: Invalid batch column at '0' has null but
schema specifies non-nullable
+statement error
insert into table_without_values(field2) values(300);
+----
+DataFusion error: Execution error: Invalid batch column at '0' has null but
schema specifies non-nullable
+
statement error Invalid argument error: Column 'column1' is declared as
non-nullable but contains null values
insert into table_without_values values(NULL, 300);
@@ -581,8 +586,11 @@ insert into test_column_defaults values(1, 10, 100, 'ABC',
now())
----
1
-statement error DataFusion error: Execution error: Invalid batch column at '1'
has null but schema specifies non-nullable
+statement error
insert into test_column_defaults(a) values(2)
+----
+DataFusion error: Execution error: Invalid batch column at '1' has null but
schema specifies non-nullable
+
query I
insert into test_column_defaults(b) values(20)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]