This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 66330bd43d fix: `reassign_predicate_columns` w/ in-list expr (#6114)
66330bd43d is described below

commit 66330bd43d13710889a2e5c8feb7ceecc145b87f
Author: Marco Neumann <[email protected]>
AuthorDate: Tue Apr 25 20:00:17 2023 +0200

    fix: `reassign_predicate_columns` w/ in-list expr (#6114)
    
    We need to also re-assign the schema stored within `InListExpr` at the
    same time when we replace the children.
---
 datafusion/physical-expr/src/utils.rs | 67 ++++++++++++++++++++++++++++++++---
 1 file changed, 63 insertions(+), 4 deletions(-)

diff --git a/datafusion/physical-expr/src/utils.rs 
b/datafusion/physical-expr/src/utils.rs
index 472914f8d0..89b51dada8 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 use crate::equivalence::EquivalentClass;
-use crate::expressions::{BinaryExpr, Column, UnKnownColumn};
+use crate::expressions::{BinaryExpr, Column, InListExpr, UnKnownColumn};
 use crate::{
     EquivalenceProperties, PhysicalExpr, PhysicalSortExpr, 
PhysicalSortRequirement,
 };
@@ -573,8 +573,10 @@ pub fn reassign_predicate_columns(
     schema: &SchemaRef,
     ignore_not_found: bool,
 ) -> Result<Arc<dyn PhysicalExpr>> {
-    pred.transform(&|expr| {
-        if let Some(column) = expr.as_any().downcast_ref::<Column>() {
+    pred.transform_down(&|expr| {
+        let expr_any = expr.as_any();
+
+        if let Some(column) = expr_any.downcast_ref::<Column>() {
             let index = match schema.index_of(column.name()) {
                 Ok(idx) => idx,
                 Err(_) if ignore_not_found => usize::MAX,
@@ -584,6 +586,26 @@ pub fn reassign_predicate_columns(
                 column.name(),
                 index,
             ))));
+        } else if let Some(in_list) = expr_any.downcast_ref::<InListExpr>() {
+            // transform child first
+            let expr = reassign_predicate_columns(
+                in_list.expr().clone(),
+                schema,
+                ignore_not_found,
+            )?;
+            let list = in_list
+                .list()
+                .iter()
+                .map(|expr| {
+                    reassign_predicate_columns(expr.clone(), schema, 
ignore_not_found)
+                })
+                .collect::<Result<Vec<_>>>()?;
+            return Ok(Transformed::Yes(Arc::new(InListExpr::new(
+                expr,
+                list,
+                in_list.negated(),
+                schema.as_ref(),
+            ))));
         }
 
         Ok(Transformed::No(expr))
@@ -593,7 +615,7 @@ pub fn reassign_predicate_columns(
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::expressions::{binary, cast, col, lit, Column, Literal};
+    use crate::expressions::{binary, cast, col, in_list, lit, Column, Literal};
     use crate::PhysicalSortExpr;
     use arrow::compute::SortOptions;
     use datafusion_common::{Result, ScalarValue};
@@ -918,4 +940,41 @@ mod tests {
         }));
         Ok(())
     }
+
+    #[test]
+    fn test_reassign_predicate_columns_in_list() {
+        let int_field = Field::new("should_not_matter", DataType::Int64, true);
+        let dict_field = Field::new(
+            "id",
+            DataType::Dictionary(Box::new(DataType::Int32), 
Box::new(DataType::Utf8)),
+            true,
+        );
+        let schema_small = Arc::new(Schema::new(vec![dict_field.clone()]));
+        let schema_big = Arc::new(Schema::new(vec![int_field, dict_field]));
+        let pred = in_list(
+            Arc::new(Column::new_with_schema("id", &schema_big).unwrap()),
+            vec![lit(ScalarValue::Dictionary(
+                Box::new(DataType::Int32),
+                Box::new(ScalarValue::from("2")),
+            ))],
+            &false,
+            &schema_big,
+        )
+        .unwrap();
+
+        let actual = reassign_predicate_columns(pred, &schema_small, 
false).unwrap();
+
+        let expected = in_list(
+            Arc::new(Column::new_with_schema("id", &schema_small).unwrap()),
+            vec![lit(ScalarValue::Dictionary(
+                Box::new(DataType::Int32),
+                Box::new(ScalarValue::from("2")),
+            ))],
+            &false,
+            &schema_small,
+        )
+        .unwrap();
+
+        assert_eq!(actual.as_ref(), expected.as_any());
+    }
 }

Reply via email to