alamb commented on code in PR #7909:
URL: https://github.com/apache/arrow-datafusion/pull/7909#discussion_r1369229798
##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -668,16 +670,40 @@ impl OptimizerRule for PushDownFilter {
(field.qualified_name(), expr)
})
- .collect::<HashMap<_, _>>();
+ .partition(|(_, value)| matches!(value,
Expr::ScalarFunction(f) if f.fun.volatility() == Volatility::Volatile));
- // re-write all filters based on this projection
- // E.g. in `Filter: b\n Projection: a > 1 as b`, we can swap
them, but the filter must be "a > 1"
- let new_filter = LogicalPlan::Filter(Filter::try_new(
- replace_cols_by_name(filter.predicate.clone(),
&replace_map)?,
- projection.input.clone(),
- )?);
+ let mut push_predicates = vec![];
+ let mut keep_predicates = vec![];
+ for expr in
split_conjunction_owned(filter.predicate.clone()).into_iter()
+ {
+ if contain(expr.clone(), &volatile_map.clone())? {
Review Comment:
I don't think we need to clone here:
```suggestion
if contain(expr.clone(), volatile_map)? {
```
##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -668,16 +670,40 @@ impl OptimizerRule for PushDownFilter {
(field.qualified_name(), expr)
})
- .collect::<HashMap<_, _>>();
+ .partition(|(_, value)| matches!(value,
Expr::ScalarFunction(f) if f.fun.volatility() == Volatility::Volatile));
- // re-write all filters based on this projection
- // E.g. in `Filter: b\n Projection: a > 1 as b`, we can swap
them, but the filter must be "a > 1"
- let new_filter = LogicalPlan::Filter(Filter::try_new(
- replace_cols_by_name(filter.predicate.clone(),
&replace_map)?,
- projection.input.clone(),
- )?);
+ let mut push_predicates = vec![];
+ let mut keep_predicates = vec![];
+ for expr in
split_conjunction_owned(filter.predicate.clone()).into_iter()
+ {
+ if contain(expr.clone(), &volatile_map.clone())? {
+ keep_predicates.push(expr);
+ } else {
+ push_predicates.push(expr);
+ }
+ }
- child_plan.with_new_inputs(&[new_filter])?
+ match conjunction(push_predicates) {
+ Some(expr) => {
+ // re-write all filters based on this projection
+ // E.g. in `Filter: b\n Projection: a > 1 as b`, we
can swap them, but the filter must be "a > 1"
+ let new_filter = LogicalPlan::Filter(Filter::try_new(
+ replace_cols_by_name(expr, &non_volatile_map)?,
+ projection.input.clone(),
+ )?);
+
+ if keep_predicates.is_empty() {
+ child_plan.with_new_inputs(&[new_filter])?
+ } else {
+ let child_plan =
child_plan.with_new_inputs(&[new_filter])?;
+ LogicalPlan::Filter(Filter::try_new(
+ conjunction(keep_predicates).unwrap(),
+ Arc::new(child_plan),
+ )?)
+ }
Review Comment:
You maybe could avoid the unwrap with something like this instead:
```suggestion
match conjunction(keep_predicates) {
None => child_plan.with_new_inputs(&[new_filter])?
Some(keep_predicate) => {
let child_plan =
child_plan.with_new_inputs(&[new_filter])?;
LogicalPlan::Filter(Filter::try_new(
keep_predicate
Arc::new(child_plan),
)?)
}
```
##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -881,6 +907,25 @@ pub fn replace_cols_by_name(
})
}
+/// check whether the expression uses the columns in `check_map`.
+pub fn contain(e: Expr, check_map: &HashMap<String, Expr>) -> Result<bool> {
+ let mut is_contain = false;
+ e.apply(&mut |expr| {
+ Ok(if let Expr::Column(c) = &expr {
+ match check_map.get(&c.flat_name()) {
+ Some(_) => {
+ is_contain = true;
+ VisitRecursion::Stop
+ }
+ None => VisitRecursion::Continue,
+ }
+ } else {
+ VisitRecursion::Continue
+ })
+ })?;
Review Comment:
Since the closure always returns `Ok` you could change this to `unwrap()` if
you wanted and have `contain` return `bool` instead of `Result<bool>`
##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -2712,4 +2757,75 @@ Projection: a, b
\n TableScan: test2";
assert_optimized_plan_eq(&plan, expected)
}
+
+ #[test]
+ fn test_push_down_volatile_function_in_aggregate() -> Result<()> {
+ // SELECT t.a, t.r FROM (SELECT a, SUM(b), random() AS r FROM test1
GROUP BY a) AS t WHERE t.a > 5 AND t.r > 0.5;
+ let table_scan = test_table_scan_with_name("test1")?;
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .aggregate(vec![col("a")], vec![sum(col("b"))])?
+ .project(vec![col("a"), sum(col("b")), random().alias("r")])?
Review Comment:
I think it is important to test the expression like `random() + 1` rather
than just `random()` for the reasons I listed above
```suggestion
.project(vec![col("a"), sum(col("b")),
random().add(lit(1)).alias("r")])?
```
##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -668,16 +670,40 @@ impl OptimizerRule for PushDownFilter {
(field.qualified_name(), expr)
})
- .collect::<HashMap<_, _>>();
+ .partition(|(_, value)| matches!(value,
Expr::ScalarFunction(f) if f.fun.volatility() == Volatility::Volatile));
Review Comment:
I don't think this will work for expressions that have random
It will find expressions like `random()` but not expressions like `random()
+ 1,`
To do so I think you need to write a function (perhaps via `expr.apply()`
that looks for volatile expressions recursively
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]