This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new f154a9aa3 Fix the potential bug of check_all_column_from_schema (#5287)
f154a9aa3 is described below

commit f154a9aa38e376de2cf7ac0df3618c28729f2f20
Author: ygf11 <[email protected]>
AuthorDate: Fri Feb 17 21:46:53 2023 +0800

    Fix the potential bug of check_all_column_from_schema (#5287)
    
    * Fix the potential bug of check_all_column_from_schema
    
    * rename contain_column to is_column_from_schema
---
 datafusion/common/src/dfschema.rs                | 70 +++++++++++++++++++-----
 datafusion/expr/src/utils.rs                     | 31 +++++++----
 datafusion/optimizer/src/decorrelate_where_in.rs |  4 +-
 3 files changed, 79 insertions(+), 26 deletions(-)

diff --git a/datafusion/common/src/dfschema.rs 
b/datafusion/common/src/dfschema.rs
index e4a591ad3..982459ac6 100644
--- a/datafusion/common/src/dfschema.rs
+++ b/datafusion/common/src/dfschema.rs
@@ -191,7 +191,7 @@ impl DFSchema {
         &self,
         qualifier: Option<&str>,
         name: &str,
-    ) -> Result<usize> {
+    ) -> Result<Option<usize>> {
         let mut matches = self
             .fields
             .iter()
@@ -221,13 +221,9 @@ impl DFSchema {
             })
             .map(|(idx, _)| idx);
         match matches.next() {
-            None => Err(field_not_found(
-                qualifier.map(|s| s.to_string()),
-                name,
-                self,
-            )),
+            None => Ok(None),
             Some(idx) => match matches.next() {
-                None => Ok(idx),
+                None => Ok(Some(idx)),
                 // found more than one matches
                 Some(_) => Err(DataFusionError::Internal(format!(
                     "Ambiguous reference to qualified field named '{}.{}'",
@@ -240,7 +236,17 @@ impl DFSchema {
 
     /// Find the index of the column with the given qualifier and name
     pub fn index_of_column(&self, col: &Column) -> Result<usize> {
+        let qualifier = col.relation.as_deref();
+        self.index_of_column_by_name(col.relation.as_deref(), &col.name)?
+            .ok_or_else(|| {
+                field_not_found(qualifier.map(|s| s.to_string()), &col.name, 
self)
+            })
+    }
+
+    /// Check if the column is in the current schema
+    pub fn is_column_from_schema(&self, col: &Column) -> Result<bool> {
         self.index_of_column_by_name(col.relation.as_deref(), &col.name)
+            .map(|idx| idx.is_some())
     }
 
     /// Find the field with the given name
@@ -293,7 +299,10 @@ impl DFSchema {
         qualifier: &str,
         name: &str,
     ) -> Result<&DFField> {
-        let idx = self.index_of_column_by_name(Some(qualifier), name)?;
+        let idx = self
+            .index_of_column_by_name(Some(qualifier), name)?
+            .ok_or_else(|| field_not_found(Some(qualifier.to_string()), name, 
self))?;
+
         Ok(self.field(idx))
     }
 
@@ -663,9 +672,10 @@ mod tests {
 
     #[test]
     fn qualifier_in_name() -> Result<()> {
+        let col = Column::from_name("t1.c0");
         let schema = DFSchema::try_from_qualified_schema("t1", 
&test_schema_1())?;
         // lookup with unqualified name "t1.c0"
-        let err = schema.index_of_column_by_name(None, "t1.c0").err().unwrap();
+        let err = schema.index_of_column(&col).err().unwrap();
         assert_eq!(
             "Schema error: No field named 't1.c0'. Valid fields are 't1'.'c0', 
't1'.'c1'.",
             &format!("{err}")
@@ -829,14 +839,13 @@ mod tests {
     fn select_without_valid_fields() {
         let schema = DFSchema::empty();
 
-        let err = schema
-            .index_of_column_by_name(Some("t1"), "c0")
-            .err()
-            .unwrap();
+        let col = Column::from_qualified_name("t1.c0");
+        let err = schema.index_of_column(&col).err().unwrap();
         assert_eq!("Schema error: No field named 't1'.'c0'.", 
&format!("{err}"));
 
         // the same check without qualifier
-        let err = schema.index_of_column_by_name(None, "c0").err().unwrap();
+        let col = Column::from_name("c0");
+        let err = schema.index_of_column(&col).err().unwrap();
         assert_eq!("Schema error: No field named 'c0'.", &format!("{err}"));
     }
 
@@ -1123,6 +1132,39 @@ mod tests {
         assert_eq!(a_df.metadata(), a_arrow.metadata())
     }
 
+    #[test]
+    fn test_contain_column() -> Result<()> {
+        // qualified exists
+        {
+            let col = Column::from_qualified_name("t1.c0");
+            let schema = DFSchema::try_from_qualified_schema("t1", 
&test_schema_1())?;
+            assert!(schema.is_column_from_schema(&col)?);
+        }
+
+        // qualified not exists
+        {
+            let col = Column::from_qualified_name("t1.c2");
+            let schema = DFSchema::try_from_qualified_schema("t1", 
&test_schema_1())?;
+            assert!(!schema.is_column_from_schema(&col)?);
+        }
+
+        // unqualified exists
+        {
+            let col = Column::from_name("c0");
+            let schema = DFSchema::try_from_qualified_schema("t1", 
&test_schema_1())?;
+            assert!(schema.is_column_from_schema(&col)?);
+        }
+
+        // unqualified not exists
+        {
+            let col = Column::from_name("c2");
+            let schema = DFSchema::try_from_qualified_schema("t1", 
&test_schema_1())?;
+            assert!(!schema.is_column_from_schema(&col)?);
+        }
+
+        Ok(())
+    }
+
     fn test_schema_2() -> Schema {
         Schema::new(vec![
             Field::new("c100", DataType::Boolean, true),
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 8ce959e79..5706ef304 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -970,13 +970,18 @@ pub fn can_hash(data_type: &DataType) -> bool {
 }
 
 /// Check whether all columns are from the schema.
-pub fn check_all_column_from_schema(
+pub fn check_all_columns_from_schema(
     columns: &HashSet<Column>,
     schema: DFSchemaRef,
-) -> bool {
-    columns
-        .iter()
-        .all(|column| schema.index_of_column(column).is_ok())
+) -> Result<bool> {
+    for col in columns.iter() {
+        let exist = schema.is_column_from_schema(col)?;
+        if !exist {
+            return Ok(false);
+        }
+    }
+
+    Ok(true)
 }
 
 /// Give two sides of the equijoin predicate, return a valid join key pair.
@@ -1003,18 +1008,24 @@ pub fn find_valid_equijoin_key_pair(
     }
 
     let l_is_left =
-        check_all_column_from_schema(&left_using_columns, left_schema.clone());
+        check_all_columns_from_schema(&left_using_columns, 
left_schema.clone())?;
     let r_is_right =
-        check_all_column_from_schema(&right_using_columns, 
right_schema.clone());
+        check_all_columns_from_schema(&right_using_columns, 
right_schema.clone())?;
 
     let r_is_left_and_l_is_right = || {
-        check_all_column_from_schema(&right_using_columns, left_schema.clone())
-            && check_all_column_from_schema(&left_using_columns, 
right_schema.clone())
+        let result =
+            check_all_columns_from_schema(&right_using_columns, 
left_schema.clone())?
+                && check_all_columns_from_schema(
+                    &left_using_columns,
+                    right_schema.clone(),
+                )?;
+
+        Result::<_, DataFusionError>::Ok(result)
     };
 
     let join_key_pair = match (l_is_left, r_is_right) {
         (true, true) => Some((left_key.clone(), right_key.clone())),
-        (_, _) if r_is_left_and_l_is_right() => {
+        (_, _) if r_is_left_and_l_is_right()? => {
             Some((right_key.clone(), left_key.clone()))
         }
         _ => None,
diff --git a/datafusion/optimizer/src/decorrelate_where_in.rs 
b/datafusion/optimizer/src/decorrelate_where_in.rs
index 35164dcad..7a9a75ff4 100644
--- a/datafusion/optimizer/src/decorrelate_where_in.rs
+++ b/datafusion/optimizer/src/decorrelate_where_in.rs
@@ -22,7 +22,7 @@ use crate::{OptimizerConfig, OptimizerRule};
 use datafusion_common::{context, Column, DataFusionError, Result};
 use datafusion_expr::expr_rewriter::{replace_col, unnormalize_col};
 use datafusion_expr::logical_plan::{JoinType, Projection, Subquery};
-use datafusion_expr::utils::check_all_column_from_schema;
+use datafusion_expr::utils::check_all_columns_from_schema;
 use datafusion_expr::{Expr, Filter, LogicalPlan, LogicalPlanBuilder};
 use log::debug;
 use std::collections::{BTreeSet, HashMap};
@@ -229,7 +229,7 @@ fn extract_join_filters(maybe_filter: &LogicalPlan) -> 
Result<(Vec<Expr>, Logica
         let mut subquery_filters: Vec<Expr> = vec![];
         for expr in subquery_filter_exprs {
             let cols = expr.to_columns()?;
-            if check_all_column_from_schema(&cols, input_schema.clone()) {
+            if check_all_columns_from_schema(&cols, input_schema.clone())? {
                 subquery_filters.push(expr.clone());
             } else {
                 join_filters.push(expr.clone())

Reply via email to