This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 9e1e32f802 Support logic optimize rule to pass the case that Utf8view
datatype combined with Utf8 datatype (#15239)
9e1e32f802 is described below
commit 9e1e32f80231e1666a82d1567dd001c1f0aff8c8
Author: Qi Zhu <[email protected]>
AuthorDate: Wed Mar 19 02:42:02 2025 +0800
Support logic optimize rule to pass the case that Utf8view datatype
combined with Utf8 datatype (#15239)
* Support logic optimize rule to pass the case that Utf8view datatype
combined with Utf8 datatype
* Support logic optimize rule to pass the case that Utf8view datatype
combined with Utf8 datatype
* fix test
* fix
* fix
* fmt
* clean code
* fix test
* Fix test
* address comments
* move docs
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/common/src/dfschema.rs | 58 ++++++++++++++++++-----
datafusion/expr/src/logical_plan/invariants.rs | 6 +--
datafusion/optimizer/src/optimizer.rs | 31 +++---------
datafusion/sqllogictest/test_files/expr.slt | 8 +++-
datafusion/substrait/src/logical_plan/consumer.rs | 2 +-
5 files changed, 63 insertions(+), 42 deletions(-)
diff --git a/datafusion/common/src/dfschema.rs
b/datafusion/common/src/dfschema.rs
index 65bb40810f..43d082f9dc 100644
--- a/datafusion/common/src/dfschema.rs
+++ b/datafusion/common/src/dfschema.rs
@@ -564,6 +564,7 @@ impl DFSchema {
}
/// Check to see if fields in 2 Arrow schemas are compatible
+ #[deprecated(since = "47.0.0", note = "This method is no longer used")]
pub fn check_arrow_schema_type_compatible(
&self,
arrow_schema: &Schema,
@@ -604,26 +605,57 @@ impl DFSchema {
})
}
- /// Returns true if the two schemas have the same qualified named
- /// fields with the same data types. Returns false otherwise.
+ #[deprecated(since = "47.0.0", note = "Use has_equivalent_names_and_types`
instead")]
+ pub fn equivalent_names_and_types(&self, other: &Self) -> bool {
+ self.has_equivalent_names_and_types(other).is_ok()
+ }
+
+ /// Returns Ok if the two schemas have the same qualified named
+ /// fields with the compatible data types.
///
- /// This is a specialized version of Eq that ignores differences
- /// in nullability and metadata.
+ /// Returns an `Err` with a message otherwise.
+ ///
+ /// This is a specialized version of Eq that ignores differences in
+ /// nullability and metadata.
///
/// Use [DFSchema]::logically_equivalent_names_and_types for a weaker
/// logical type checking, which for example would consider a dictionary
/// encoded UTF8 array to be equivalent to a plain UTF8 array.
- pub fn equivalent_names_and_types(&self, other: &Self) -> bool {
+ pub fn has_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
+ // case 1 : schema length mismatch
if self.fields().len() != other.fields().len() {
- return false;
+ _plan_err!(
+ "Schema mismatch: the schema length are not same \
+ Expected schema length: {}, got: {}",
+ self.fields().len(),
+ other.fields().len()
+ )
+ } else {
+ // case 2 : schema length match, but fields mismatch
+ // check if the fields name are the same and have the same data
types
+ self.fields()
+ .iter()
+ .zip(other.fields().iter())
+ .try_for_each(|(f1, f2)| {
+ if f1.name() != f2.name()
+ || (!DFSchema::datatype_is_semantically_equal(
+ f1.data_type(),
+ f2.data_type(),
+ ) && !can_cast_types(f2.data_type(), f1.data_type()))
+ {
+ _plan_err!(
+ "Schema mismatch: Expected field '{}' with type
{:?}, \
+ but got '{}' with type {:?}.",
+ f1.name(),
+ f1.data_type(),
+ f2.name(),
+ f2.data_type()
+ )
+ } else {
+ Ok(())
+ }
+ })
}
- let self_fields = self.iter();
- let other_fields = other.iter();
- self_fields.zip(other_fields).all(|((q1, f1), (q2, f2))| {
- q1 == q2
- && f1.name() == f2.name()
- && Self::datatype_is_semantically_equal(f1.data_type(),
f2.data_type())
- })
}
/// Checks if two [`DataType`]s are logically equal. This is a notably
weaker constraint
diff --git a/datafusion/expr/src/logical_plan/invariants.rs
b/datafusion/expr/src/logical_plan/invariants.rs
index f4ac33b6af..d83410bf99 100644
--- a/datafusion/expr/src/logical_plan/invariants.rs
+++ b/datafusion/expr/src/logical_plan/invariants.rs
@@ -112,11 +112,11 @@ fn assert_valid_semantic_plan(plan: &LogicalPlan) ->
Result<()> {
/// Returns an error if the plan does not have the expected schema.
/// Ignores metadata and nullability.
pub fn assert_expected_schema(schema: &DFSchemaRef, plan: &LogicalPlan) ->
Result<()> {
- let equivalent = plan.schema().equivalent_names_and_types(schema);
+ let compatible = plan.schema().has_equivalent_names_and_types(schema);
- if !equivalent {
+ if let Err(e) = compatible {
internal_err!(
- "Failed due to a difference in schemas, original schema: {:?}, new
schema: {:?}",
+ "Failed due to a difference in schemas: {e}, original schema:
{:?}, new schema: {:?}",
schema,
plan.schema()
)
diff --git a/datafusion/optimizer/src/optimizer.rs
b/datafusion/optimizer/src/optimizer.rs
index 3a69bd91e7..04d73fe3ab 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -447,7 +447,7 @@ fn assert_valid_optimization(
plan: &LogicalPlan,
prev_schema: &Arc<DFSchema>,
) -> Result<()> {
- // verify invariant: optimizer passes should not change the schema
+ // verify invariant: optimizer passes should not change the schema if the
schema can't be cast from the previous schema.
// Refer to
<https://datafusion.apache.org/contributor-guide/specification/invariants.html#logical-schema-is-invariant-under-logical-optimization>
assert_expected_schema(prev_schema, plan)?;
@@ -459,7 +459,9 @@ mod tests {
use std::sync::{Arc, Mutex};
use datafusion_common::tree_node::Transformed;
- use datafusion_common::{plan_err, DFSchema, DFSchemaRef, DataFusionError,
Result};
+ use datafusion_common::{
+ assert_contains, plan_err, DFSchema, DFSchemaRef, DataFusionError,
Result,
+ };
use datafusion_expr::logical_plan::EmptyRelation;
use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder,
Projection};
@@ -505,28 +507,9 @@ mod tests {
schema: Arc::new(DFSchema::empty()),
});
let err = opt.optimize(plan, &config, &observe).unwrap_err();
- assert!(err.strip_backtrace().starts_with(
- "Optimizer rule 'get table_scan rule' failed\n\
- caused by\n\
- Check optimizer-specific invariants after optimizer rule: get
table_scan rule\n\
- caused by\n\
- Internal error: Failed due to a difference in schemas, \
- original schema: DFSchema { inner: Schema { \
- fields: [], \
- metadata: {} }, \
- field_qualifiers: [], \
- functional_dependencies: FunctionalDependencies { deps: [] } \
- }, \
- new schema: DFSchema { inner: Schema { \
- fields: [\
- Field { name: \"a\", data_type: UInt32, nullable: false,
dict_id: 0, dict_is_ordered: false, metadata: {} }, \
- Field { name: \"b\", data_type: UInt32, nullable: false,
dict_id: 0, dict_is_ordered: false, metadata: {} }, \
- Field { name: \"c\", data_type: UInt32, nullable: false,
dict_id: 0, dict_is_ordered: false, metadata: {} }\
- ], \
- metadata: {} }, \
- field_qualifiers: [Some(Bare { table: \"test\" }), Some(Bare {
table: \"test\" }), Some(Bare { table: \"test\" })], \
- functional_dependencies: FunctionalDependencies { deps: [] } }",
- ));
+
+ // Simplify assert to check the error message contains the expected
message, which is only the schema length mismatch
+ assert_contains!(err.strip_backtrace(), "Schema mismatch: the schema
length are not same Expected schema length: 3, got: 0");
}
#[test]
diff --git a/datafusion/sqllogictest/test_files/expr.slt
b/datafusion/sqllogictest/test_files/expr.slt
index 74e9fe065a..24f7c3ea15 100644
--- a/datafusion/sqllogictest/test_files/expr.slt
+++ b/datafusion/sqllogictest/test_files/expr.slt
@@ -478,8 +478,14 @@ a
statement ok
create table foo (a varchar, b varchar) as values ('a', 'b');
+
+query T
+SELECT concat_ws('', a, b,'c') from foo
+----
+abc
+
query T
-SELECT concat_ws('',a,b,'c') from foo
+SELECT concat_ws('',arrow_cast(a, 'Utf8View'),arrow_cast(b, 'Utf8View'),'c')
from foo
----
abc
diff --git a/datafusion/substrait/src/logical_plan/consumer.rs
b/datafusion/substrait/src/logical_plan/consumer.rs
index ee47d4ab88..6752286a3c 100644
--- a/datafusion/substrait/src/logical_plan/consumer.rs
+++ b/datafusion/substrait/src/logical_plan/consumer.rs
@@ -776,7 +776,7 @@ pub async fn from_substrait_plan_with_consumer(
return Ok(plan);
}
let renamed_schema =
make_renamed_schema(plan.schema(), &root.names)?;
- if
renamed_schema.equivalent_names_and_types(plan.schema()) {
+ if
renamed_schema.has_equivalent_names_and_types(plan.schema()).is_ok() {
// Nothing to do if the schema is already
equivalent
return Ok(plan);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]