This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new e3f412c9a Fix SchemaError in FilterPushDown optimization with UNION 
ALL (#3282)
e3f412c9a is described below

commit e3f412c9ad9c9c05f02739778d028e4c94997b15
Author: Jon Mease <[email protected]>
AuthorDate: Sat Sep 3 06:52:28 2022 -0400

    Fix SchemaError in FilterPushDown optimization with UNION ALL (#3282)
    
    * Add failing union_all_on_projection test
    
    * Update projection step to replace both qualified and unqualified columns
    
    * Remove redundant clone
---
 datafusion/optimizer/src/filter_push_down.rs | 32 ++++++++++++++++++++++++++--
 1 file changed, 30 insertions(+), 2 deletions(-)

diff --git a/datafusion/optimizer/src/filter_push_down.rs 
b/datafusion/optimizer/src/filter_push_down.rs
index 1d5386f46..cf7b770c6 100644
--- a/datafusion/optimizer/src/filter_push_down.rs
+++ b/datafusion/optimizer/src/filter_push_down.rs
@@ -368,14 +368,18 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> 
Result<LogicalPlan> {
                 .fields()
                 .iter()
                 .enumerate()
-                .map(|(i, field)| {
+                .flat_map(|(i, field)| {
                     // strip alias, as they should not be part of filters
                     let expr = match &expr[i] {
                         Expr::Alias(expr, _) => expr.as_ref().clone(),
                         expr => expr.clone(),
                     };
 
-                    (field.qualified_name(), expr)
+                    // Convert both qualified and unqualified fields
+                    [
+                        (field.name().clone(), expr.clone()),
+                        (field.qualified_name(), expr),
+                    ]
                 })
                 .collect::<HashMap<_, _>>();
 
@@ -995,6 +999,30 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn union_all_on_projection() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let table = LogicalPlanBuilder::from(table_scan)
+            .project_with_alias(vec![col("a").alias("b")], 
Some("test2".to_string()))?;
+
+        let plan = table
+            .union(table.build()?)?
+            .filter(col("b").eq(lit(1i64)))?
+            .build()?;
+
+        // filter appears below Union
+        let expected = "\
+            Union\
+            \n  Projection: #test.a AS b, alias=test2\
+            \n    Filter: #test.a = Int64(1)\
+            \n      TableScan: test\
+            \n  Projection: #test.a AS b, alias=test2\
+            \n    Filter: #test.a = Int64(1)\
+            \n      TableScan: test";
+        assert_optimized_plan_eq(&plan, expected);
+        Ok(())
+    }
+
     /// verifies that filters with the same columns are correctly placed
     #[test]
     fn filter_2_breaks_limits() -> Result<()> {

Reply via email to