This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 7aed4d697 Fix propagation of optimized predicates on nested
projections (#3228)
7aed4d697 is described below
commit 7aed4d697fa24053d515babfd7678855451c6736
Author: Batuhan Taskaya <[email protected]>
AuthorDate: Mon Aug 29 12:01:23 2022 +0300
Fix propagation of optimized predicates on nested projections (#3228)
* Fix propagation of optimized predicates on nested projections
* Add SQL integration tests
* Alternative implementation on `issue_filters` (#1)
---
datafusion/core/tests/sql/projection.rs | 39 ++++++++++++++++
datafusion/optimizer/src/filter_push_down.rs | 66 +++++++++++++++++-----------
2 files changed, 79 insertions(+), 26 deletions(-)
diff --git a/datafusion/core/tests/sql/projection.rs
b/datafusion/core/tests/sql/projection.rs
index 6e59bd421..97c6dcf8a 100644
--- a/datafusion/core/tests/sql/projection.rs
+++ b/datafusion/core/tests/sql/projection.rs
@@ -348,3 +348,42 @@ async fn project_column_with_same_name_as_relation() ->
Result<()> {
Ok(())
}
+
+#[tokio::test]
+async fn project_column_with_filters_that_cant_pushed_down_always_false() ->
Result<()> {
+ let ctx = SessionContext::new();
+
+ let sql = "select * from (select 1 as a) f where f.a=2;";
+ let actual = execute_to_batches(&ctx, sql).await;
+
+ let expected = vec!["++", "++"];
+ assert_batches_sorted_eq!(expected, &actual);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn project_column_with_filters_that_cant_pushed_down_always_true() ->
Result<()> {
+ let ctx = SessionContext::new();
+
+ let sql = "select * from (select 1 as a) f where f.a=1;";
+ let actual = execute_to_batches(&ctx, sql).await;
+
+ let expected = vec!["+---+", "| a |", "+---+", "| 1 |", "+---+"];
+ assert_batches_sorted_eq!(expected, &actual);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn project_columns_in_memory_without_propagation() -> Result<()> {
+ let ctx = SessionContext::new();
+
+ let sql = "select column1 as a from (values (1), (2)) f where f.column1 =
2;";
+ let actual = execute_to_batches(&ctx, sql).await;
+
+ let expected = vec!["+---+", "| a |", "+---+", "| 2 |", "+---+"];
+ assert_batches_sorted_eq!(expected, &actual);
+
+ Ok(())
+}
diff --git a/datafusion/optimizer/src/filter_push_down.rs
b/datafusion/optimizer/src/filter_push_down.rs
index 2ac5b6e3b..3d0415232 100644
--- a/datafusion/optimizer/src/filter_push_down.rs
+++ b/datafusion/optimizer/src/filter_push_down.rs
@@ -81,6 +81,7 @@ impl State {
}
/// returns all predicates in `state` that depend on any of `used_columns`
+/// or the ones that does not reference any columns (e.g. WHERE 1=1)
fn get_predicates<'a>(
state: &'a State,
used_columns: &HashSet<Column>,
@@ -89,10 +90,11 @@ fn get_predicates<'a>(
.filters
.iter()
.filter(|(_, columns)| {
- !columns
- .intersection(used_columns)
- .collect::<HashSet<_>>()
- .is_empty()
+ columns.is_empty()
+ || !columns
+ .intersection(used_columns)
+ .collect::<HashSet<_>>()
+ .is_empty()
})
.map(|&(ref a, ref b)| (a, b))
.unzip()
@@ -338,34 +340,16 @@ fn optimize(plan: &LogicalPlan, mut state: State) ->
Result<LogicalPlan> {
let mut predicates = vec![];
utils::split_conjunction(predicate, &mut predicates);
- // Predicates without referencing columns (WHERE FALSE, WHERE 1=1,
etc.)
- let mut no_col_predicates = vec![];
-
predicates
.into_iter()
.try_for_each::<_, Result<()>>(|predicate| {
let mut columns: HashSet<Column> = HashSet::new();
expr_to_columns(predicate, &mut columns)?;
- if columns.is_empty() {
- no_col_predicates.push(predicate)
- } else {
- // collect the predicate
- state.filters.push((predicate.clone(), columns));
- }
+ state.filters.push((predicate.clone(), columns));
Ok(())
})?;
- // Predicates without columns will not be pushed down.
- // As those contain only literals, they could be optimized using
constant folding
- // and removal of WHERE TRUE / WHERE FALSE
- if !no_col_predicates.is_empty() {
- Ok(utils::add_filter(
- optimize(input, state)?,
- &no_col_predicates,
- ))
- } else {
- optimize(input, state)
- }
+ optimize(input, state)
}
LogicalPlan::Projection(Projection {
input,
@@ -401,8 +385,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) ->
Result<LogicalPlan> {
// optimize inner
let new_input = optimize(input, state)?;
-
- from_plan(plan, expr, &[new_input])
+ Ok(from_plan(plan, expr, &[new_input])?)
}
LogicalPlan::Aggregate(Aggregate {
aggr_expr, input, ..
@@ -2092,4 +2075,35 @@ mod tests {
Ok(())
}
+
+ #[test]
+ fn test_propagation_of_optimized_inner_filters_with_projections() ->
Result<()> {
+ // SELECT a FROM (SELECT 1 AS a) b WHERE b.a = 1
+ let plan = LogicalPlanBuilder::empty(true)
+ .project_with_alias(vec![lit(0i64).alias("a")],
Some("b".to_owned()))?
+ .project_with_alias(vec![col("b.a")], Some("b".to_owned()))?
+ .filter(col("b.a").eq(lit(1i64)))?
+ .project(vec![col("b.a")])?
+ .build()?;
+
+ let expected_before = "\
+ Projection: #b.a\
+ \n Filter: #b.a = Int64(1)\
+ \n Projection: #b.a, alias=b\
+ \n Projection: Int64(0) AS a, alias=b\
+ \n EmptyRelation";
+ assert_eq!(format!("{:?}", plan), expected_before);
+
+ // Ensure that the predicate without any columns (0 = 1) is
+ // still there.
+ let expected_after = "\
+ Projection: #b.a\
+ \n Projection: #b.a, alias=b\
+ \n Projection: Int64(0) AS a, alias=b\
+ \n Filter: Int64(0) = Int64(1)\
+ \n EmptyRelation";
+ assert_optimized_plan_eq(&plan, expected_after);
+
+ Ok(())
+ }
}