This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 7aed4d697 Fix propagation of optimized predicates on nested 
projections (#3228)
7aed4d697 is described below

commit 7aed4d697fa24053d515babfd7678855451c6736
Author: Batuhan Taskaya <[email protected]>
AuthorDate: Mon Aug 29 12:01:23 2022 +0300

    Fix propagation of optimized predicates on nested projections (#3228)
    
    * Fix propagation of optimized predicates on nested projections
    
    * Add SQL integration tests
    
    * Alternative implementation on `issue_filters` (#1)
---
 datafusion/core/tests/sql/projection.rs      | 39 ++++++++++++++++
 datafusion/optimizer/src/filter_push_down.rs | 66 +++++++++++++++++-----------
 2 files changed, 79 insertions(+), 26 deletions(-)

diff --git a/datafusion/core/tests/sql/projection.rs 
b/datafusion/core/tests/sql/projection.rs
index 6e59bd421..97c6dcf8a 100644
--- a/datafusion/core/tests/sql/projection.rs
+++ b/datafusion/core/tests/sql/projection.rs
@@ -348,3 +348,42 @@ async fn project_column_with_same_name_as_relation() -> 
Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn project_column_with_filters_that_cant_pushed_down_always_false() -> 
Result<()> {
+    let ctx = SessionContext::new();
+
+    let sql = "select * from (select 1 as a) f where f.a=2;";
+    let actual = execute_to_batches(&ctx, sql).await;
+
+    let expected = vec!["++", "++"];
+    assert_batches_sorted_eq!(expected, &actual);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn project_column_with_filters_that_cant_pushed_down_always_true() -> 
Result<()> {
+    let ctx = SessionContext::new();
+
+    let sql = "select * from (select 1 as a) f where f.a=1;";
+    let actual = execute_to_batches(&ctx, sql).await;
+
+    let expected = vec!["+---+", "| a |", "+---+", "| 1 |", "+---+"];
+    assert_batches_sorted_eq!(expected, &actual);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn project_columns_in_memory_without_propagation() -> Result<()> {
+    let ctx = SessionContext::new();
+
+    let sql = "select column1 as a from (values (1), (2)) f where f.column1 = 
2;";
+    let actual = execute_to_batches(&ctx, sql).await;
+
+    let expected = vec!["+---+", "| a |", "+---+", "| 2 |", "+---+"];
+    assert_batches_sorted_eq!(expected, &actual);
+
+    Ok(())
+}
diff --git a/datafusion/optimizer/src/filter_push_down.rs 
b/datafusion/optimizer/src/filter_push_down.rs
index 2ac5b6e3b..3d0415232 100644
--- a/datafusion/optimizer/src/filter_push_down.rs
+++ b/datafusion/optimizer/src/filter_push_down.rs
@@ -81,6 +81,7 @@ impl State {
 }
 
 /// returns all predicates in `state` that depend on any of `used_columns`
+/// or the ones that does not reference any columns (e.g. WHERE 1=1)
 fn get_predicates<'a>(
     state: &'a State,
     used_columns: &HashSet<Column>,
@@ -89,10 +90,11 @@ fn get_predicates<'a>(
         .filters
         .iter()
         .filter(|(_, columns)| {
-            !columns
-                .intersection(used_columns)
-                .collect::<HashSet<_>>()
-                .is_empty()
+            columns.is_empty()
+                || !columns
+                    .intersection(used_columns)
+                    .collect::<HashSet<_>>()
+                    .is_empty()
         })
         .map(|&(ref a, ref b)| (a, b))
         .unzip()
@@ -338,34 +340,16 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> 
Result<LogicalPlan> {
             let mut predicates = vec![];
             utils::split_conjunction(predicate, &mut predicates);
 
-            // Predicates without referencing columns (WHERE FALSE, WHERE 1=1, 
etc.)
-            let mut no_col_predicates = vec![];
-
             predicates
                 .into_iter()
                 .try_for_each::<_, Result<()>>(|predicate| {
                     let mut columns: HashSet<Column> = HashSet::new();
                     expr_to_columns(predicate, &mut columns)?;
-                    if columns.is_empty() {
-                        no_col_predicates.push(predicate)
-                    } else {
-                        // collect the predicate
-                        state.filters.push((predicate.clone(), columns));
-                    }
+                    state.filters.push((predicate.clone(), columns));
                     Ok(())
                 })?;
 
-            // Predicates without columns will not be pushed down.
-            // As those contain only literals, they could be optimized using 
constant folding
-            // and removal of WHERE TRUE / WHERE FALSE
-            if !no_col_predicates.is_empty() {
-                Ok(utils::add_filter(
-                    optimize(input, state)?,
-                    &no_col_predicates,
-                ))
-            } else {
-                optimize(input, state)
-            }
+            optimize(input, state)
         }
         LogicalPlan::Projection(Projection {
             input,
@@ -401,8 +385,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> 
Result<LogicalPlan> {
 
             // optimize inner
             let new_input = optimize(input, state)?;
-
-            from_plan(plan, expr, &[new_input])
+            Ok(from_plan(plan, expr, &[new_input])?)
         }
         LogicalPlan::Aggregate(Aggregate {
             aggr_expr, input, ..
@@ -2092,4 +2075,35 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_propagation_of_optimized_inner_filters_with_projections() -> 
Result<()> {
+        // SELECT a FROM (SELECT 1 AS a) b WHERE b.a = 1
+        let plan = LogicalPlanBuilder::empty(true)
+            .project_with_alias(vec![lit(0i64).alias("a")], 
Some("b".to_owned()))?
+            .project_with_alias(vec![col("b.a")], Some("b".to_owned()))?
+            .filter(col("b.a").eq(lit(1i64)))?
+            .project(vec![col("b.a")])?
+            .build()?;
+
+        let expected_before = "\
+        Projection: #b.a\
+        \n  Filter: #b.a = Int64(1)\
+        \n    Projection: #b.a, alias=b\
+        \n      Projection: Int64(0) AS a, alias=b\
+        \n        EmptyRelation";
+        assert_eq!(format!("{:?}", plan), expected_before);
+
+        // Ensure that the predicate without any columns (0 = 1) is
+        // still there.
+        let expected_after = "\
+        Projection: #b.a\
+        \n  Projection: #b.a, alias=b\
+        \n    Projection: Int64(0) AS a, alias=b\
+        \n      Filter: Int64(0) = Int64(1)\
+        \n        EmptyRelation";
+        assert_optimized_plan_eq(&plan, expected_after);
+
+        Ok(())
+    }
 }

Reply via email to