This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 2c8e65b Move folding of now() to ConstEvaluator (#1176)
2c8e65b is described below
commit 2c8e65bcca9fe41ad80116d99ad974c86cb59654
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Oct 28 12:26:06 2021 -0400
Move folding of now() to ConstEvaluator (#1176)
---
datafusion/src/optimizer/constant_folding.rs | 65 +++--------------
datafusion/src/optimizer/utils.rs | 100 +++++++++++++++++++--------
2 files changed, 83 insertions(+), 82 deletions(-)
diff --git a/datafusion/src/optimizer/constant_folding.rs
b/datafusion/src/optimizer/constant_folding.rs
index 74fdc72..8c29da4 100644
--- a/datafusion/src/optimizer/constant_folding.rs
+++ b/datafusion/src/optimizer/constant_folding.rs
@@ -17,8 +17,6 @@
//! Constant folding and algebraic simplification
-use std::sync::Arc;
-
use arrow::datatypes::DataType;
use crate::error::Result;
@@ -26,7 +24,6 @@ use crate::execution::context::ExecutionProps;
use crate::logical_plan::{DFSchemaRef, Expr, ExprRewriter, LogicalPlan,
Operator};
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
-use crate::physical_plan::functions::BuiltinScalarFunction;
use crate::scalar::ScalarValue;
/// Simplifies plans by rewriting [`Expr`]`s evaluating constants
@@ -61,18 +58,14 @@ impl OptimizerRule for ConstantFolding {
// children plans.
let mut simplifier = Simplifier {
schemas: plan.all_schemas(),
- execution_props,
};
- let mut const_evaluator = utils::ConstEvaluator::new();
+ let mut const_evaluator = utils::ConstEvaluator::new(execution_props);
match plan {
- LogicalPlan::Filter { predicate, input } => Ok(LogicalPlan::Filter
{
- predicate: predicate.clone().rewrite(&mut simplifier)?,
- input: Arc::new(self.optimize(input, execution_props)?),
- }),
- // Rest: recurse into plan, apply optimization where possible
- LogicalPlan::Projection { .. }
+ // Recurse into plan, apply optimization where possible
+ LogicalPlan::Filter { .. }
+ | LogicalPlan::Projection { .. }
| LogicalPlan::Window { .. }
| LogicalPlan::Aggregate { .. }
| LogicalPlan::Repartition { .. }
@@ -130,7 +123,6 @@ impl OptimizerRule for ConstantFolding {
struct Simplifier<'a> {
/// input schemas
schemas: Vec<&'a DFSchemaRef>,
- execution_props: &'a ExecutionProps,
}
impl<'a> Simplifier<'a> {
@@ -228,15 +220,6 @@ impl<'a> ExprRewriter for Simplifier<'a> {
Expr::Not(inner)
}
}
- // convert now() --> the time in `ExecutionProps`
- Expr::ScalarFunction {
- fun: BuiltinScalarFunction::Now,
- ..
- } => Expr::Literal(ScalarValue::TimestampNanosecond(Some(
- self.execution_props
- .query_execution_start_time
- .timestamp_nanos(),
- ))),
expr => {
// no additional rewrites possible
expr
@@ -248,10 +231,13 @@ impl<'a> ExprRewriter for Simplifier<'a> {
#[cfg(test)]
mod tests {
+ use std::sync::Arc;
+
use super::*;
use crate::{
assert_contains,
logical_plan::{col, lit, max, min, DFField, DFSchema,
LogicalPlanBuilder},
+ physical_plan::functions::BuiltinScalarFunction,
};
use arrow::datatypes::*;
@@ -282,7 +268,6 @@ mod tests {
let schema = expr_test_schema();
let mut rewriter = Simplifier {
schemas: vec![&schema],
- execution_props: &ExecutionProps::new(),
};
assert_eq!(
@@ -298,7 +283,6 @@ mod tests {
let schema = expr_test_schema();
let mut rewriter = Simplifier {
schemas: vec![&schema],
- execution_props: &ExecutionProps::new(),
};
// x = null is always null
@@ -334,7 +318,6 @@ mod tests {
let schema = expr_test_schema();
let mut rewriter = Simplifier {
schemas: vec![&schema],
- execution_props: &ExecutionProps::new(),
};
assert_eq!(col("c2").get_type(&schema)?, DataType::Boolean);
@@ -365,7 +348,6 @@ mod tests {
let schema = expr_test_schema();
let mut rewriter = Simplifier {
schemas: vec![&schema],
- execution_props: &ExecutionProps::new(),
};
// When one of the operand is not of boolean type, folding the other
boolean constant will
@@ -405,7 +387,6 @@ mod tests {
let schema = expr_test_schema();
let mut rewriter = Simplifier {
schemas: vec![&schema],
- execution_props: &ExecutionProps::new(),
};
assert_eq!(col("c2").get_type(&schema)?, DataType::Boolean);
@@ -441,7 +422,6 @@ mod tests {
let schema = expr_test_schema();
let mut rewriter = Simplifier {
schemas: vec![&schema],
- execution_props: &ExecutionProps::new(),
};
// when one of the operand is not of boolean type, folding the other
boolean constant will
@@ -477,7 +457,6 @@ mod tests {
let schema = expr_test_schema();
let mut rewriter = Simplifier {
schemas: vec![&schema],
- execution_props: &ExecutionProps::new(),
};
assert_eq!(
@@ -754,27 +733,6 @@ mod tests {
}
#[test]
- fn single_now_expr() {
- let table_scan = test_table_scan().unwrap();
- let proj = vec![now_expr()];
- let time = Utc::now();
- let plan = LogicalPlanBuilder::from(table_scan)
- .project(proj)
- .unwrap()
- .build()
- .unwrap();
-
- let expected = format!(
- "Projection: TimestampNanosecond({})\
- \n TableScan: test projection=None",
- time.timestamp_nanos()
- );
- let actual = get_optimized_plan_formatted(&plan, &time);
-
- assert_eq!(expected, actual);
- }
-
- #[test]
fn multiple_now_expr() {
let table_scan = test_table_scan().unwrap();
let time = Utc::now();
@@ -838,17 +796,16 @@ mod tests {
// now() < cast(to_timestamp(...) as int) + 5000000000
let plan = LogicalPlanBuilder::from(table_scan)
.filter(
- now_expr()
+ cast_to_int64_expr(now_expr())
.lt(cast_to_int64_expr(to_timestamp_expr(ts_string)) +
lit(50000)),
)
.unwrap()
.build()
.unwrap();
- // Note that constant folder should be able to run again and fold
- // this whole expression down to a single constant;
- // https://github.com/apache/arrow-datafusion/issues/1160
- let expected = "Filter: TimestampNanosecond(1599566400000000000) <
CAST(totimestamp(Utf8(\"2020-09-08T12:05:00+00:00\")) AS Int64) + Int32(50000)\
+ // Note that constant folder runs and folds the entire
+ // expression down to a single constant (true)
+ let expected = "Filter: Boolean(true)\
\n TableScan: test projection=None";
let actual = get_optimized_plan_formatted(&plan, &time);
diff --git a/datafusion/src/optimizer/utils.rs
b/datafusion/src/optimizer/utils.rs
index fdc9a17..00ea31e 100644
--- a/datafusion/src/optimizer/utils.rs
+++ b/datafusion/src/optimizer/utils.rs
@@ -506,7 +506,10 @@ pub fn rewrite_expression(expr: &Expr, expressions:
&[Expr]) -> Result<Expr> {
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::optimizer::utils::ConstEvaluator;
-/// let mut const_evaluator = ConstEvaluator::new();
+/// # use datafusion::execution::context::ExecutionProps;
+///
+/// let execution_props = ExecutionProps::new();
+/// let mut const_evaluator = ConstEvaluator::new(&execution_props);
///
/// // (1 + 2) + a
/// let expr = (lit(1) + lit(2)) + col("a");
@@ -575,10 +578,15 @@ impl ExprRewriter for ConstEvaluator {
}
impl ConstEvaluator {
- /// Create a new `ConstantEvaluator`.
- pub fn new() -> Self {
+ /// Create a new `ConstantEvaluator`. Session constants (such as
+ /// the time for `now()` are taken from the passed
+ /// `execution_props`.
+ pub fn new(execution_props: &ExecutionProps) -> Self {
let planner = DefaultPhysicalPlanner::default();
- let ctx_state = ExecutionContextState::new();
+ let ctx_state = ExecutionContextState {
+ execution_props: execution_props.clone(),
+ ..ExecutionContextState::new()
+ };
let input_schema = DFSchema::empty();
// The dummy column name is unused and doesn't matter as only
@@ -604,9 +612,8 @@ impl ConstEvaluator {
fn volatility_ok(volatility: Volatility) -> bool {
match volatility {
Volatility::Immutable => true,
- // To evaluate stable functions, need ExecutionProps, see
- // Simplifier for code that does that.
- Volatility::Stable => false,
+ // Values for functions such as now() are taken from ExecutionProps
+ Volatility::Stable => true,
Volatility::Volatile => false,
}
}
@@ -689,6 +696,7 @@ mod tests {
array::{ArrayRef, Int32Array},
datatypes::DataType,
};
+ use chrono::{DateTime, TimeZone, Utc};
use std::collections::HashSet;
#[test]
@@ -799,42 +807,69 @@ mod tests {
let rand = Expr::ScalarFunction { args: vec![], fun };
let expr = (rand + lit(1)) + lit(2);
test_evaluate(expr.clone(), expr);
+ }
- // volatile / stable functions should not be evaluated
- // now() + (1 + 2) --> now() + 3
- let fun = BuiltinScalarFunction::Now;
- assert_eq!(fun.volatility(), Volatility::Stable);
- let now = Expr::ScalarFunction { args: vec![], fun };
- let expr = now.clone() + (lit(1) + lit(2));
- let expected = now + lit(3);
- test_evaluate(expr, expected);
+ #[test]
+ fn test_const_evaluator_now() {
+ let ts_nanos = 1599566400000000000i64;
+ let time = chrono::Utc.timestamp_nanos(ts_nanos);
+ let ts_string = "2020-09-08T12:05:00+00:00";
+
+ // now() --> ts
+ test_evaluate_with_start_time(now_expr(),
lit_timestamp_nano(ts_nanos), &time);
+
+ // CAST(now() as int64) + 100 --> ts + 100
+ let expr = cast_to_int64_expr(now_expr()) + lit(100);
+ test_evaluate_with_start_time(expr, lit(ts_nanos + 100), &time);
+
+ // now() < cast(to_timestamp(...) as int) + 50000 ---> true
+ let expr = cast_to_int64_expr(now_expr())
+ .lt(cast_to_int64_expr(to_timestamp_expr(ts_string)) + lit(50000));
+ test_evaluate_with_start_time(expr, lit(true), &time);
+ }
+
+ fn now_expr() -> Expr {
+ Expr::ScalarFunction {
+ args: vec![],
+ fun: BuiltinScalarFunction::Now,
+ }
+ }
+
+ fn cast_to_int64_expr(expr: Expr) -> Expr {
+ Expr::Cast {
+ expr: expr.into(),
+ data_type: DataType::Int64,
+ }
+ }
+
+ fn to_timestamp_expr(arg: impl Into<String>) -> Expr {
+ Expr::ScalarFunction {
+ args: vec![lit(arg.into())],
+ fun: BuiltinScalarFunction::ToTimestamp,
+ }
}
#[test]
- fn test_const_evaluator_udfs() {
+ fn test_evaluator_udfs() {
let args = vec![lit(1) + lit(2), lit(30) + lit(40)];
let folded_args = vec![lit(3), lit(70)];
// immutable UDF should get folded
- // udf_add(1+2, 30+40) --> 70
+ // udf_add(1+2, 30+40) --> 73
let expr = Expr::ScalarUDF {
args: args.clone(),
fun: make_udf_add(Volatility::Immutable),
};
test_evaluate(expr, lit(73));
- // stable UDF should have args folded
- // udf_add(1+2, 30+40) --> udf_add(3, 70)
+ // stable UDF should be entirely folded
+ // udf_add(1+2, 30+40) --> 73
let fun = make_udf_add(Volatility::Stable);
let expr = Expr::ScalarUDF {
args: args.clone(),
fun: Arc::clone(&fun),
};
- let expected_expr = Expr::ScalarUDF {
- args: folded_args.clone(),
- fun: Arc::clone(&fun),
- };
- test_evaluate(expr, expected_expr);
+ test_evaluate(expr, lit(73));
// volatile UDF should have args folded
// udf_add(1+2, 30+40) --> udf_add(3, 70)
@@ -892,11 +927,16 @@ mod tests {
))
}
- // udfs
- // validate that even a volatile function's arguments will be evaluated
+ fn test_evaluate_with_start_time(
+ input_expr: Expr,
+ expected_expr: Expr,
+ date_time: &DateTime<Utc>,
+ ) {
+ let execution_props = ExecutionProps {
+ query_execution_start_time: *date_time,
+ };
- fn test_evaluate(input_expr: Expr, expected_expr: Expr) {
- let mut const_evaluator = ConstEvaluator::new();
+ let mut const_evaluator = ConstEvaluator::new(&execution_props);
let evaluated_expr = input_expr
.clone()
.rewrite(&mut const_evaluator)
@@ -908,4 +948,8 @@ mod tests {
input_expr, expected_expr, evaluated_expr
);
}
+
+ fn test_evaluate(input_expr: Expr, expected_expr: Expr) {
+ test_evaluate_with_start_time(input_expr, expected_expr, &Utc::now())
+ }
}