This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new b096539 [Datafusion] NOW() function support (#288)
b096539 is described below
commit b096539d9670d9547bfd3ae1fb47ef95b2d06f96
Author: sathis <[email protected]>
AuthorDate: Fri May 14 23:30:09 2021 +0530
[Datafusion] NOW() function support (#288)
* Add initial implementation of NOW
* Run rustfmt
* Change incorrect condition
* Add timestamp optimizer which optimizes the logical plan and makes sure
all now() return same value
* Add unit tests & fix alias
* Add unit tests & fix alias
* Run cargo fmt
* Comment out failing test
* Optimize the match to fix clippy
* Initialize datetime during optimize not creation
* Add assertion to compare multiple now() values
* Run cargo fmt
* Move timestamp to execution props
* Add missing prop
* Add missing prop
* Remove duplicated code
* Fix tests & format
* Fix clippy
* Revert clippy fix
* Update datafusion/src/execution/context.rs
Co-authored-by: Andrew Lamb <[email protected]>
* Fix review comments. Move timestamp evaluation logic to
constant_folding.rs
* Pass ExecutionProps to scalar functions
* Revert "Pass ExecutionProps to scalar functions"
This reverts commit d9cb005df4a4c1bf05b18b5d9a1aefc4f9e706bb.
* Add closure approach from @alamb
* Re-enable concat test
* Changing Option<DateTime<Utc>> to DateTime<Utc>
Co-authored-by: Sathis Kumar <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
---
.../core/src/serde/physical_plan/from_proto.rs | 6 +-
datafusion/src/execution/context.rs | 128 ++++++++++++++-------
datafusion/src/optimizer/constant_folding.rs | 107 ++++++++++++++++-
datafusion/src/optimizer/eliminate_limit.rs | 13 ++-
datafusion/src/optimizer/filter_push_down.rs | 7 +-
datafusion/src/optimizer/hash_build_probe_order.rs | 17 ++-
datafusion/src/optimizer/limit_push_down.rs | 7 +-
datafusion/src/optimizer/optimizer.rs | 7 +-
datafusion/src/optimizer/projection_push_down.rs | 41 +++++--
datafusion/src/optimizer/utils.rs | 15 ++-
.../src/physical_plan/datetime_expressions.rs | 19 ++-
datafusion/src/physical_plan/functions.rs | 55 +++++++--
datafusion/src/physical_plan/parquet.rs | 19 +--
datafusion/src/physical_plan/planner.rs | 23 ++--
datafusion/src/physical_plan/type_coercion.rs | 8 ++
datafusion/tests/sql.rs | 49 +++++++-
datafusion/tests/user_defined_plan.rs | 11 +-
17 files changed, 415 insertions(+), 117 deletions(-)
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index 6a33c6a..9c35c9d 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -33,7 +33,9 @@ use arrow::datatypes::{DataType, Schema, SchemaRef};
use datafusion::catalog::catalog::{
CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider,
};
-use datafusion::execution::context::{ExecutionConfig, ExecutionContextState};
+use datafusion::execution::context::{
+ ExecutionConfig, ExecutionContextState, ExecutionProps,
+};
use datafusion::logical_plan::{DFSchema, Expr};
use datafusion::physical_plan::aggregates::{create_aggregate_expr,
AggregateFunction};
use datafusion::physical_plan::expressions::col;
@@ -226,6 +228,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for
&protobuf::PhysicalPlanNode {
var_provider: Default::default(),
aggregate_functions: Default::default(),
config: ExecutionConfig::new(),
+ execution_props: ExecutionProps::new(),
};
let input_schema = hash_agg
@@ -391,6 +394,7 @@ fn compile_expr(
var_provider: HashMap::new(),
aggregate_functions: HashMap::new(),
config: ExecutionConfig::new(),
+ execution_props: ExecutionProps::new(),
};
let expr: Expr = expr.try_into()?;
df_planner
diff --git a/datafusion/src/execution/context.rs
b/datafusion/src/execution/context.rs
index b53f7c1..9c7a621 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -74,6 +74,7 @@ use crate::sql::{
};
use crate::variable::{VarProvider, VarType};
use crate::{dataframe::DataFrame, physical_plan::udaf::AggregateUDF};
+use chrono::{DateTime, Utc};
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
@@ -159,6 +160,7 @@ impl ExecutionContext {
var_provider: HashMap::new(),
aggregate_functions: HashMap::new(),
config,
+ execution_props: ExecutionProps::new(),
})),
}
}
@@ -454,12 +456,16 @@ impl ExecutionContext {
/// Optimizes the logical plan by applying optimizer rules.
pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
- let optimizers = &self.state.lock().unwrap().config.optimizers;
+ let state = &mut self.state.lock().unwrap();
+ let execution_props = &mut state.execution_props.clone();
+ let optimizers = &state.config.optimizers;
+
+ let execution_props = execution_props.start_execution();
let mut new_plan = plan.clone();
debug!("Logical plan:\n {:?}", plan);
for optimizer in optimizers {
- new_plan = optimizer.optimize(&new_plan)?;
+ new_plan = optimizer.optimize(&new_plan, execution_props)?;
}
debug!("Optimized logical plan:\n {:?}", new_plan);
Ok(new_plan)
@@ -470,7 +476,9 @@ impl ExecutionContext {
&self,
logical_plan: &LogicalPlan,
) -> Result<Arc<dyn ExecutionPlan>> {
- let state = self.state.lock().unwrap();
+ let mut state = self.state.lock().unwrap();
+ state.execution_props.start_execution();
+
state
.config
.query_planner
@@ -740,6 +748,15 @@ impl ExecutionConfig {
}
}
+/// Holds per-execution properties and data (such as starting timestamps, etc).
+/// An instance of this struct is created each time a [`LogicalPlan`] is
prepared for
+/// execution (optimized). If the same plan is optimized multiple times, a new
+/// `ExecutionProps` is created each time.
+#[derive(Clone)]
+pub struct ExecutionProps {
+ pub(crate) query_execution_start_time: DateTime<Utc>,
+}
+
/// Execution context for registering data sources and executing queries
#[derive(Clone)]
pub struct ExecutionContextState {
@@ -753,9 +770,38 @@ pub struct ExecutionContextState {
pub aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
/// Context configuration
pub config: ExecutionConfig,
+ /// Execution properties
+ pub execution_props: ExecutionProps,
+}
+
+impl ExecutionProps {
+ /// Creates a new execution props
+ pub fn new() -> Self {
+ ExecutionProps {
+ query_execution_start_time: chrono::Utc::now(),
+ }
+ }
+
+ /// Marks the execution of query started timestamp
+ pub fn start_execution(&mut self) -> &Self {
+ self.query_execution_start_time = chrono::Utc::now();
+ &*self
+ }
}
impl ExecutionContextState {
+ /// Returns new ExecutionContextState
+ pub fn new() -> Self {
+ ExecutionContextState {
+ catalog_list: Arc::new(MemoryCatalogList::new()),
+ scalar_functions: HashMap::new(),
+ var_provider: HashMap::new(),
+ aggregate_functions: HashMap::new(),
+ config: ExecutionConfig::new(),
+ execution_props: ExecutionProps::new(),
+ }
+ }
+
fn resolve_table_ref<'a>(
&'a self,
table_ref: impl Into<TableReference<'a>>,
@@ -1507,7 +1553,7 @@ mod tests {
"+-------------------------+-------------------------+-------------------------+---------------------+",
"| 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10.432 | 2021-01-01
05:11:10.432 | 2021-01-01 05:11:10 |",
"+-------------------------+-------------------------+-------------------------+---------------------+",
-];
+ ];
assert_batches_sorted_eq!(expected, &results);
Ok(())
@@ -1633,7 +1679,7 @@ mod tests {
let results = plan_and_collect(
&mut ctx,
- "SELECT date_trunc('week', t1) as week, SUM(c2) FROM test GROUP BY
date_trunc('week', t1)"
+ "SELECT date_trunc('week', t1) as week, SUM(c2) FROM test GROUP BY
date_trunc('week', t1)",
).await?;
assert_eq!(results.len(), 1);
@@ -1881,16 +1927,15 @@ mod tests {
let results =
run_count_distinct_integers_aggregated_scenario(partitions).await?;
assert_eq!(results.len(), 1);
- let expected = vec!
-[
-
"+---------+-----------------+------------------------+-------------------------+-------------------------+-------------------------+-------------------------+--------------------------+--------------------------+--------------------------+",
- "| c_group | COUNT(c_uint64) | COUNT(DISTINCT c_int8) | COUNT(DISTINCT
c_int16) | COUNT(DISTINCT c_int32) | COUNT(DISTINCT c_int64) | COUNT(DISTINCT
c_uint8) | COUNT(DISTINCT c_uint16) | COUNT(DISTINCT c_uint32) | COUNT(DISTINCT
c_uint64) |",
-
"+---------+-----------------+------------------------+-------------------------+-------------------------+-------------------------+-------------------------+--------------------------+--------------------------+--------------------------+",
- "| a | 3 | 2 | 2
| 2 | 2 | 2
| 2 | 2 | 2
|",
- "| b | 1 | 1 | 1
| 1 | 1 | 1
| 1 | 1 | 1
|",
- "| c | 3 | 2 | 2
| 2 | 2 | 2
| 2 | 2 | 2
|",
-
"+---------+-----------------+------------------------+-------------------------+-------------------------+-------------------------+-------------------------+--------------------------+--------------------------+--------------------------+",
-];
+ let expected = vec![
+
"+---------+-----------------+------------------------+-------------------------+-------------------------+-------------------------+-------------------------+--------------------------+--------------------------+--------------------------+",
+ "| c_group | COUNT(c_uint64) | COUNT(DISTINCT c_int8) |
COUNT(DISTINCT c_int16) | COUNT(DISTINCT c_int32) | COUNT(DISTINCT c_int64) |
COUNT(DISTINCT c_uint8) | COUNT(DISTINCT c_uint16) | COUNT(DISTINCT c_uint32) |
COUNT(DISTINCT c_uint64) |",
+
"+---------+-----------------+------------------------+-------------------------+-------------------------+-------------------------+-------------------------+--------------------------+--------------------------+--------------------------+",
+ "| a | 3 | 2 | 2
| 2 | 2 | 2
| 2 | 2 | 2
|",
+ "| b | 1 | 1 | 1
| 1 | 1 | 1
| 1 | 1 | 1
|",
+ "| c | 3 | 2 | 2
| 2 | 2 | 2
| 2 | 2 | 2
|",
+
"+---------+-----------------+------------------------+-------------------------+-------------------------+-------------------------+-------------------------+--------------------------+--------------------------+--------------------------+",
+ ];
assert_batches_sorted_eq!(expected, &results);
Ok(())
@@ -1910,14 +1955,14 @@ mod tests {
assert_eq!(results.len(), 1);
let expected = vec![
-
"+---------+-----------------+------------------------+-------------------------+-------------------------+-------------------------+-------------------------+--------------------------+--------------------------+--------------------------+",
- "| c_group | COUNT(c_uint64) | COUNT(DISTINCT c_int8) | COUNT(DISTINCT
c_int16) | COUNT(DISTINCT c_int32) | COUNT(DISTINCT c_int64) | COUNT(DISTINCT
c_uint8) | COUNT(DISTINCT c_uint16) | COUNT(DISTINCT c_uint32) | COUNT(DISTINCT
c_uint64) |",
-
"+---------+-----------------+------------------------+-------------------------+-------------------------+-------------------------+-------------------------+--------------------------+--------------------------+--------------------------+",
- "| a | 5 | 3 | 3
| 3 | 3 | 3
| 3 | 3 | 3
|",
- "| b | 5 | 4 | 4
| 4 | 4 | 4
| 4 | 4 | 4
|",
- "| c | 1 | 1 | 1
| 1 | 1 | 1
| 1 | 1 | 1
|",
-
"+---------+-----------------+------------------------+-------------------------+-------------------------+-------------------------+-------------------------+--------------------------+--------------------------+--------------------------+",
-];
+
"+---------+-----------------+------------------------+-------------------------+-------------------------+-------------------------+-------------------------+--------------------------+--------------------------+--------------------------+",
+ "| c_group | COUNT(c_uint64) | COUNT(DISTINCT c_int8) |
COUNT(DISTINCT c_int16) | COUNT(DISTINCT c_int32) | COUNT(DISTINCT c_int64) |
COUNT(DISTINCT c_uint8) | COUNT(DISTINCT c_uint16) | COUNT(DISTINCT c_uint32) |
COUNT(DISTINCT c_uint64) |",
+
"+---------+-----------------+------------------------+-------------------------+-------------------------+-------------------------+-------------------------+--------------------------+--------------------------+--------------------------+",
+ "| a | 5 | 3 | 3
| 3 | 3 | 3
| 3 | 3 | 3
|",
+ "| b | 5 | 4 | 4
| 4 | 4 | 4
| 4 | 4 | 4
|",
+ "| c | 1 | 1 | 1
| 1 | 1 | 1
| 1 | 1 | 1
|",
+
"+---------+-----------------+------------------------+-------------------------+-------------------------+-------------------------+-------------------------+--------------------------+--------------------------+--------------------------+",
+ ];
assert_batches_sorted_eq!(expected, &results);
Ok(())
@@ -2311,6 +2356,7 @@ mod tests {
}
Ok(())
}
+
#[test]
fn ctx_sql_should_optimize_plan() -> Result<()> {
let mut ctx = ExecutionContext::new();
@@ -2844,13 +2890,11 @@ mod tests {
.await
.unwrap();
let expected = vec![
-
-
"+---------------+--------------+------------+-------------+------------------+----------------+-------------+-----------+--------------------------+------------------------+-------------------+-------------------------+---------------+--------------------+---------------+",
- "| table_catalog | table_schema | table_name | column_name |
ordinal_position | column_default | is_nullable | data_type |
character_maximum_length | character_octet_length | numeric_precision |
numeric_precision_radix | numeric_scale | datetime_precision | interval_type |",
-
"+---------------+--------------+------------+-------------+------------------+----------------+-------------+-----------+--------------------------+------------------------+-------------------+-------------------------+---------------+--------------------+---------------+",
- "| datafusion | public | t | i | 0
| | YES | Int32 | |
| 32 | 2 |
| | |",
-
"+---------------+--------------+------------+-------------+------------------+----------------+-------------+-----------+--------------------------+------------------------+-------------------+-------------------------+---------------+--------------------+---------------+",
-
+
"+---------------+--------------+------------+-------------+------------------+----------------+-------------+-----------+--------------------------+------------------------+-------------------+-------------------------+---------------+--------------------+---------------+",
+ "| table_catalog | table_schema | table_name | column_name |
ordinal_position | column_default | is_nullable | data_type |
character_maximum_length | character_octet_length | numeric_precision |
numeric_precision_radix | numeric_scale | datetime_precision | interval_type |",
+
"+---------------+--------------+------------+-------------+------------------+----------------+-------------+-----------+--------------------------+------------------------+-------------------+-------------------------+---------------+--------------------+---------------+",
+ "| datafusion | public | t | i | 0
| | YES | Int32 |
| | 32 | 2 |
| | |",
+
"+---------------+--------------+------------+-------------+------------------+----------------+-------------+-----------+--------------------------+------------------------+-------------------+-------------------------+---------------+--------------------+---------------+",
];
assert_batches_sorted_eq!(expected, &result);
@@ -2984,18 +3028,18 @@ mod tests {
.unwrap();
let expected = vec![
-
"+---------------+--------------+------------+------------------+------------------+----------------+-------------+-----------------------------+--------------------------+------------------------+-------------------+-------------------------+---------------+--------------------+---------------+",
- "| table_catalog | table_schema | table_name | column_name |
ordinal_position | column_default | is_nullable | data_type |
character_maximum_length | character_octet_length | numeric_precision |
numeric_precision_radix | numeric_scale | datetime_precision | interval_type |",
-
"+---------------+--------------+------------+------------------+------------------+----------------+-------------+-----------------------------+--------------------------+------------------------+-------------------+-------------------------+---------------+--------------------+---------------+",
- "| my_catalog | my_schema | t1 | i | 0
| | YES | Int32 |
| | 32 | 2
| | | |",
- "| my_catalog | my_schema | t2 | binary_col | 4
| | NO | Binary |
| 2147483647 | |
| | | |",
- "| my_catalog | my_schema | t2 | float64_col | 1
| | YES | Float64 |
| | 24 | 2
| | | |",
- "| my_catalog | my_schema | t2 | int32_col | 0
| | NO | Int32 |
| | 32 | 2
| | | |",
- "| my_catalog | my_schema | t2 | large_binary_col | 5
| | NO | LargeBinary |
| 9223372036854775807 | |
| | | |",
- "| my_catalog | my_schema | t2 | large_utf8_col | 3
| | NO | LargeUtf8 |
| 9223372036854775807 | |
| | | |",
- "| my_catalog | my_schema | t2 | timestamp_nanos | 6
| | NO | Timestamp(Nanosecond, None) |
| | |
| | | |",
- "| my_catalog | my_schema | t2 | utf8_col | 2
| | YES | Utf8 |
| 2147483647 | |
| | | |",
-
"+---------------+--------------+------------+------------------+------------------+----------------+-------------+-----------------------------+--------------------------+------------------------+-------------------+-------------------------+---------------+--------------------+---------------+",
+
"+---------------+--------------+------------+------------------+------------------+----------------+-------------+-----------------------------+--------------------------+------------------------+-------------------+-------------------------+---------------+--------------------+---------------+",
+ "| table_catalog | table_schema | table_name | column_name |
ordinal_position | column_default | is_nullable | data_type |
character_maximum_length | character_octet_length | numeric_precision |
numeric_precision_radix | numeric_scale | datetime_precision | interval_type |",
+
"+---------------+--------------+------------+------------------+------------------+----------------+-------------+-----------------------------+--------------------------+------------------------+-------------------+-------------------------+---------------+--------------------+---------------+",
+ "| my_catalog | my_schema | t1 | i |
0 | | YES | Int32 |
| | 32 | 2
| | | |",
+ "| my_catalog | my_schema | t2 | binary_col |
4 | | NO | Binary |
| 2147483647 | |
| | | |",
+ "| my_catalog | my_schema | t2 | float64_col |
1 | | YES | Float64 |
| | 24 | 2
| | | |",
+ "| my_catalog | my_schema | t2 | int32_col |
0 | | NO | Int32 |
| | 32 | 2
| | | |",
+ "| my_catalog | my_schema | t2 | large_binary_col |
5 | | NO | LargeBinary |
| 9223372036854775807 | |
| | | |",
+ "| my_catalog | my_schema | t2 | large_utf8_col |
3 | | NO | LargeUtf8 |
| 9223372036854775807 | |
| | | |",
+ "| my_catalog | my_schema | t2 | timestamp_nanos |
6 | | NO | Timestamp(Nanosecond, None) |
| | |
| | | |",
+ "| my_catalog | my_schema | t2 | utf8_col |
2 | | YES | Utf8 |
| 2147483647 | |
| | | |",
+
"+---------------+--------------+------------+------------------+------------------+----------------+-------------+-----------------------------+--------------------------+------------------------+-------------------+-------------------------+---------------+--------------------+---------------+",
];
assert_batches_sorted_eq!(expected, &result);
}
diff --git a/datafusion/src/optimizer/constant_folding.rs
b/datafusion/src/optimizer/constant_folding.rs
index 71c84f6..51bf0ce 100644
--- a/datafusion/src/optimizer/constant_folding.rs
+++ b/datafusion/src/optimizer/constant_folding.rs
@@ -23,9 +23,11 @@ use std::sync::Arc;
use arrow::datatypes::DataType;
use crate::error::Result;
+use crate::execution::context::ExecutionProps;
use crate::logical_plan::{DFSchemaRef, Expr, ExprRewriter, LogicalPlan,
Operator};
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
+use crate::physical_plan::functions::BuiltinScalarFunction;
use crate::scalar::ScalarValue;
/// Optimizer that simplifies comparison expressions involving boolean
literals.
@@ -47,7 +49,11 @@ impl ConstantFolding {
}
impl OptimizerRule for ConstantFolding {
- fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+ fn optimize(
+ &self,
+ plan: &LogicalPlan,
+ execution_props: &ExecutionProps,
+ ) -> Result<LogicalPlan> {
// We need to pass down the all schemas within the plan tree to
`optimize_expr` in order to
// to evaluate expression types. For example, a projection plan's
schema will only include
// projected columns. With just the projected schema, it's not
possible to infer types for
@@ -55,12 +61,13 @@ impl OptimizerRule for ConstantFolding {
// children plans.
let mut rewriter = ConstantRewriter {
schemas: plan.all_schemas(),
+ execution_props,
};
match plan {
LogicalPlan::Filter { predicate, input } => Ok(LogicalPlan::Filter
{
predicate: predicate.clone().rewrite(&mut rewriter)?,
- input: Arc::new(self.optimize(input)?),
+ input: Arc::new(self.optimize(input, execution_props)?),
}),
// Rest: recurse into plan, apply optimization where possible
LogicalPlan::Projection { .. }
@@ -78,7 +85,7 @@ impl OptimizerRule for ConstantFolding {
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
- .map(|plan| self.optimize(plan))
+ .map(|plan| self.optimize(plan, execution_props))
.collect::<Result<Vec<_>>>()?;
let expr = plan
@@ -103,6 +110,7 @@ impl OptimizerRule for ConstantFolding {
struct ConstantRewriter<'a> {
/// input schemas
schemas: Vec<&'a DFSchemaRef>,
+ execution_props: &'a ExecutionProps,
}
impl<'a> ConstantRewriter<'a> {
@@ -200,6 +208,14 @@ impl<'a> ExprRewriter for ConstantRewriter<'a> {
Expr::Not(inner)
}
}
+ Expr::ScalarFunction {
+ fun: BuiltinScalarFunction::Now,
+ ..
+ } => Expr::Literal(ScalarValue::TimestampNanosecond(Some(
+ self.execution_props
+ .query_execution_start_time
+ .timestamp_nanos(),
+ ))),
expr => {
// no rewrite possible
expr
@@ -217,6 +233,7 @@ mod tests {
};
use arrow::datatypes::*;
+ use chrono::{DateTime, Utc};
fn test_table_scan() -> Result<LogicalPlan> {
let schema = Schema::new(vec![
@@ -243,6 +260,7 @@ mod tests {
let schema = expr_test_schema();
let mut rewriter = ConstantRewriter {
schemas: vec![&schema],
+ execution_props: &ExecutionProps::new(),
};
assert_eq!(
@@ -258,6 +276,7 @@ mod tests {
let schema = expr_test_schema();
let mut rewriter = ConstantRewriter {
schemas: vec![&schema],
+ execution_props: &ExecutionProps::new(),
};
// x = null is always null
@@ -293,6 +312,7 @@ mod tests {
let schema = expr_test_schema();
let mut rewriter = ConstantRewriter {
schemas: vec![&schema],
+ execution_props: &ExecutionProps::new(),
};
assert_eq!(col("c2").get_type(&schema)?, DataType::Boolean);
@@ -323,6 +343,7 @@ mod tests {
let schema = expr_test_schema();
let mut rewriter = ConstantRewriter {
schemas: vec![&schema],
+ execution_props: &ExecutionProps::new(),
};
// When one of the operand is not of boolean type, folding the other
boolean constant will
@@ -362,6 +383,7 @@ mod tests {
let schema = expr_test_schema();
let mut rewriter = ConstantRewriter {
schemas: vec![&schema],
+ execution_props: &ExecutionProps::new(),
};
assert_eq!(col("c2").get_type(&schema)?, DataType::Boolean);
@@ -397,6 +419,7 @@ mod tests {
let schema = expr_test_schema();
let mut rewriter = ConstantRewriter {
schemas: vec![&schema],
+ execution_props: &ExecutionProps::new(),
};
// when one of the operand is not of boolean type, folding the other
boolean constant will
@@ -432,6 +455,7 @@ mod tests {
let schema = expr_test_schema();
let mut rewriter = ConstantRewriter {
schemas: vec![&schema],
+ execution_props: &ExecutionProps::new(),
};
assert_eq!(
@@ -459,7 +483,9 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = ConstantFolding::new();
- let optimized_plan = rule.optimize(plan).expect("failed to optimize
plan");
+ let optimized_plan = rule
+ .optimize(plan, &ExecutionProps::new())
+ .expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
}
@@ -589,4 +615,77 @@ mod tests {
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
+
+ fn get_optimized_plan_formatted(
+ plan: &LogicalPlan,
+ date_time: &DateTime<Utc>,
+ ) -> String {
+ let rule = ConstantFolding::new();
+ let execution_props = ExecutionProps {
+ query_execution_start_time: *date_time,
+ };
+
+ let optimized_plan = rule
+ .optimize(plan, &execution_props)
+ .expect("failed to optimize plan");
+ return format!("{:?}", optimized_plan);
+ }
+
+ #[test]
+ fn single_now_expr() {
+ let table_scan = test_table_scan().unwrap();
+ let proj = vec![Expr::ScalarFunction {
+ args: vec![],
+ fun: BuiltinScalarFunction::Now,
+ }];
+ let time = chrono::Utc::now();
+ let plan = LogicalPlanBuilder::from(&table_scan)
+ .project(proj)
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let expected = format!(
+ "Projection: TimestampNanosecond({})\
+ \n TableScan: test projection=None",
+ time.timestamp_nanos()
+ );
+ let actual = get_optimized_plan_formatted(&plan, &time);
+
+ assert_eq!(expected, actual);
+ }
+
+ #[test]
+ fn multiple_now_expr() {
+ let table_scan = test_table_scan().unwrap();
+ let time = chrono::Utc::now();
+ let proj = vec![
+ Expr::ScalarFunction {
+ args: vec![],
+ fun: BuiltinScalarFunction::Now,
+ },
+ Expr::Alias(
+ Box::new(Expr::ScalarFunction {
+ args: vec![],
+ fun: BuiltinScalarFunction::Now,
+ }),
+ "t2".to_string(),
+ ),
+ ];
+ let plan = LogicalPlanBuilder::from(&table_scan)
+ .project(proj)
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let actual = get_optimized_plan_formatted(&plan, &time);
+ let expected = format!(
+ "Projection: TimestampNanosecond({}), TimestampNanosecond({}) AS
t2\
+ \n TableScan: test projection=None",
+ time.timestamp_nanos(),
+ time.timestamp_nanos()
+ );
+
+ assert_eq!(actual, expected);
+ }
}
diff --git a/datafusion/src/optimizer/eliminate_limit.rs
b/datafusion/src/optimizer/eliminate_limit.rs
index 87b33d6..1b965f1 100644
--- a/datafusion/src/optimizer/eliminate_limit.rs
+++ b/datafusion/src/optimizer/eliminate_limit.rs
@@ -22,6 +22,7 @@ use crate::logical_plan::LogicalPlan;
use crate::optimizer::optimizer::OptimizerRule;
use super::utils;
+use crate::execution::context::ExecutionProps;
/// Optimization rule that replaces LIMIT 0 with an
[LogicalPlan::EmptyRelation]
pub struct EliminateLimit;
@@ -34,7 +35,11 @@ impl EliminateLimit {
}
impl OptimizerRule for EliminateLimit {
- fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+ fn optimize(
+ &self,
+ plan: &LogicalPlan,
+ execution_props: &ExecutionProps,
+ ) -> Result<LogicalPlan> {
match plan {
LogicalPlan::Limit { n, input } if *n == 0 => {
Ok(LogicalPlan::EmptyRelation {
@@ -50,7 +55,7 @@ impl OptimizerRule for EliminateLimit {
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
- .map(|plan| self.optimize(plan))
+ .map(|plan| self.optimize(plan, execution_props))
.collect::<Result<Vec<_>>>()?;
utils::from_plan(plan, &expr, &new_inputs)
@@ -72,7 +77,9 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = EliminateLimit::new();
- let optimized_plan = rule.optimize(plan).expect("failed to optimize
plan");
+ let optimized_plan = rule
+ .optimize(plan, &ExecutionProps::new())
+ .expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
assert_eq!(plan.schema(), optimized_plan.schema());
diff --git a/datafusion/src/optimizer/filter_push_down.rs
b/datafusion/src/optimizer/filter_push_down.rs
index 356d497..4c248e2 100644
--- a/datafusion/src/optimizer/filter_push_down.rs
+++ b/datafusion/src/optimizer/filter_push_down.rs
@@ -15,6 +15,7 @@
//! Filter Push Down optimizer rule ensures that filters are applied as early
as possible in the plan
use crate::datasource::datasource::TableProviderFilterPushDown;
+use crate::execution::context::ExecutionProps;
use crate::logical_plan::{and, LogicalPlan};
use crate::logical_plan::{DFSchema, Expr};
use crate::optimizer::optimizer::OptimizerRule;
@@ -413,7 +414,7 @@ impl OptimizerRule for FilterPushDown {
"filter_push_down"
}
- fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+ fn optimize(&self, plan: &LogicalPlan, _: &ExecutionProps) ->
Result<LogicalPlan> {
optimize(plan, State::default())
}
}
@@ -456,7 +457,9 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = FilterPushDown::new();
- let optimized_plan = rule.optimize(plan).expect("failed to optimize
plan");
+ let optimized_plan = rule
+ .optimize(plan, &ExecutionProps::new())
+ .expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
}
diff --git a/datafusion/src/optimizer/hash_build_probe_order.rs
b/datafusion/src/optimizer/hash_build_probe_order.rs
index b27171f..168c4a1 100644
--- a/datafusion/src/optimizer/hash_build_probe_order.rs
+++ b/datafusion/src/optimizer/hash_build_probe_order.rs
@@ -27,6 +27,7 @@ use crate::optimizer::optimizer::OptimizerRule;
use crate::{error::Result, prelude::JoinType};
use super::utils;
+use crate::execution::context::ExecutionProps;
/// BuildProbeOrder reorders the build and probe phase of
/// hash joins. This uses the amount of rows that a datasource has.
@@ -106,7 +107,11 @@ impl OptimizerRule for HashBuildProbeOrder {
"hash_build_probe_order"
}
- fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+ fn optimize(
+ &self,
+ plan: &LogicalPlan,
+ execution_props: &ExecutionProps,
+ ) -> Result<LogicalPlan> {
match plan {
// Main optimization rule, swaps order of left and right
// based on number of rows in each table
@@ -117,8 +122,8 @@ impl OptimizerRule for HashBuildProbeOrder {
join_type,
schema,
} => {
- let left = self.optimize(left)?;
- let right = self.optimize(right)?;
+ let left = self.optimize(left, execution_props)?;
+ let right = self.optimize(right, execution_props)?;
if should_swap_join_order(&left, &right) {
// Swap left and right, change join type and (equi-)join
key order
Ok(LogicalPlan::Join {
@@ -147,8 +152,8 @@ impl OptimizerRule for HashBuildProbeOrder {
right,
schema,
} => {
- let left = self.optimize(left)?;
- let right = self.optimize(right)?;
+ let left = self.optimize(left, execution_props)?;
+ let right = self.optimize(right, execution_props)?;
if should_swap_join_order(&left, &right) {
// Swap left and right
Ok(LogicalPlan::CrossJoin {
@@ -184,7 +189,7 @@ impl OptimizerRule for HashBuildProbeOrder {
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
- .map(|plan| self.optimize(plan))
+ .map(|plan| self.optimize(plan, execution_props))
.collect::<Result<Vec<_>>>()?;
utils::from_plan(plan, &expr, &new_inputs)
diff --git a/datafusion/src/optimizer/limit_push_down.rs
b/datafusion/src/optimizer/limit_push_down.rs
index 73a231f..e616869 100644
--- a/datafusion/src/optimizer/limit_push_down.rs
+++ b/datafusion/src/optimizer/limit_push_down.rs
@@ -21,6 +21,7 @@ use std::sync::Arc;
use super::utils;
use crate::error::Result;
+use crate::execution::context::ExecutionProps;
use crate::logical_plan::LogicalPlan;
use crate::optimizer::optimizer::OptimizerRule;
@@ -125,7 +126,7 @@ fn limit_push_down(
}
impl OptimizerRule for LimitPushDown {
- fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+ fn optimize(&self, plan: &LogicalPlan, _: &ExecutionProps) ->
Result<LogicalPlan> {
limit_push_down(None, plan)
}
@@ -143,7 +144,9 @@ mod test {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = LimitPushDown::new();
- let optimized_plan = rule.optimize(plan).expect("failed to optimize
plan");
+ let optimized_plan = rule
+ .optimize(plan, &ExecutionProps::new())
+ .expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
}
diff --git a/datafusion/src/optimizer/optimizer.rs
b/datafusion/src/optimizer/optimizer.rs
index dee8e06..5cf4047 100644
--- a/datafusion/src/optimizer/optimizer.rs
+++ b/datafusion/src/optimizer/optimizer.rs
@@ -18,6 +18,7 @@
//! Query optimizer traits
use crate::error::Result;
+use crate::execution::context::ExecutionProps;
use crate::logical_plan::LogicalPlan;
/// `OptimizerRule` transforms one ['LogicalPlan'] into another which
@@ -25,7 +26,11 @@ use crate::logical_plan::LogicalPlan;
/// way.
pub trait OptimizerRule {
/// Rewrite `plan` to an optimized form
- fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan>;
+ fn optimize(
+ &self,
+ plan: &LogicalPlan,
+ execution_props: &ExecutionProps,
+ ) -> Result<LogicalPlan>;
/// A human readable name for this optimizer rule
fn name(&self) -> &str;
diff --git a/datafusion/src/optimizer/projection_push_down.rs
b/datafusion/src/optimizer/projection_push_down.rs
index 7243fa5..21c9cab 100644
--- a/datafusion/src/optimizer/projection_push_down.rs
+++ b/datafusion/src/optimizer/projection_push_down.rs
@@ -19,6 +19,7 @@
//! loaded into memory
use crate::error::Result;
+use crate::execution::context::ExecutionProps;
use crate::logical_plan::{DFField, DFSchema, DFSchemaRef, LogicalPlan,
ToDFSchema};
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
@@ -32,7 +33,11 @@ use utils::optimize_explain;
pub struct ProjectionPushDown {}
impl OptimizerRule for ProjectionPushDown {
- fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+ fn optimize(
+ &self,
+ plan: &LogicalPlan,
+ execution_props: &ExecutionProps,
+ ) -> Result<LogicalPlan> {
// set of all columns refered by the plan (and thus considered
required by the root)
let required_columns = plan
.schema()
@@ -40,7 +45,7 @@ impl OptimizerRule for ProjectionPushDown {
.iter()
.map(|f| f.name().clone())
.collect::<HashSet<String>>();
- optimize_plan(self, plan, &required_columns, false)
+ optimize_plan(self, plan, &required_columns, false, execution_props)
}
fn name(&self) -> &str {
@@ -105,6 +110,7 @@ fn optimize_plan(
plan: &LogicalPlan,
required_columns: &HashSet<String>, // set of columns required up to this
step
has_projection: bool,
+ execution_props: &ExecutionProps,
) -> Result<LogicalPlan> {
let mut new_required_columns = required_columns.clone();
match plan {
@@ -137,8 +143,13 @@ fn optimize_plan(
}
})?;
- let new_input =
- optimize_plan(optimizer, &input, &new_required_columns, true)?;
+ let new_input = optimize_plan(
+ optimizer,
+ &input,
+ &new_required_columns,
+ true,
+ execution_props,
+ )?;
if new_fields.is_empty() {
// no need for an expression at all
Ok(new_input)
@@ -167,12 +178,14 @@ fn optimize_plan(
&left,
&new_required_columns,
true,
+ execution_props,
)?),
right: Arc::new(optimize_plan(
optimizer,
&right,
&new_required_columns,
true,
+ execution_props,
)?),
join_type: *join_type,
@@ -226,6 +239,7 @@ fn optimize_plan(
&input,
&new_required_columns,
true,
+ execution_props,
)?),
schema: DFSchemaRef::new(new_schema),
})
@@ -259,7 +273,14 @@ fn optimize_plan(
schema,
} => {
let schema = schema.as_ref().to_owned().into();
- optimize_explain(optimizer, *verbose, &*plan, stringified_plans,
&schema)
+ optimize_explain(
+ optimizer,
+ *verbose,
+ &*plan,
+ stringified_plans,
+ &schema,
+ execution_props,
+ )
}
// all other nodes: Add any additional columns used by
// expressions in this node to the list of required columns
@@ -281,7 +302,13 @@ fn optimize_plan(
let new_inputs = inputs
.iter()
.map(|plan| {
- optimize_plan(optimizer, plan, &new_required_columns,
has_projection)
+ optimize_plan(
+ optimizer,
+ plan,
+ &new_required_columns,
+ has_projection,
+ execution_props,
+ )
})
.collect::<Result<Vec<_>>>()?;
@@ -538,6 +565,6 @@ mod tests {
fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
let rule = ProjectionPushDown::new();
- rule.optimize(plan)
+ rule.optimize(plan, &ExecutionProps::new())
}
}
diff --git a/datafusion/src/optimizer/utils.rs
b/datafusion/src/optimizer/utils.rs
index 0ec3fa7..9288c65 100644
--- a/datafusion/src/optimizer/utils.rs
+++ b/datafusion/src/optimizer/utils.rs
@@ -22,6 +22,7 @@ use std::{collections::HashSet, sync::Arc};
use arrow::datatypes::Schema;
use super::optimizer::OptimizerRule;
+use crate::execution::context::ExecutionProps;
use crate::logical_plan::{
Expr, LogicalPlan, Operator, Partitioning, PlanType, Recursion,
StringifiedPlan,
ToDFSchema,
@@ -101,11 +102,12 @@ pub fn optimize_explain(
plan: &LogicalPlan,
stringified_plans: &[StringifiedPlan],
schema: &Schema,
+ execution_props: &ExecutionProps,
) -> Result<LogicalPlan> {
// These are the fields of LogicalPlan::Explain It might be nice
// to transform that enum Variant into its own struct and avoid
// passing the fields individually
- let plan = Arc::new(optimizer.optimize(plan)?);
+ let plan = Arc::new(optimizer.optimize(plan, execution_props)?);
let mut stringified_plans = stringified_plans.to_vec();
let optimizer_name = optimizer.name().into();
stringified_plans.push(StringifiedPlan::new(
@@ -128,6 +130,7 @@ pub fn optimize_explain(
pub fn optimize_children(
optimizer: &impl OptimizerRule,
plan: &LogicalPlan,
+ execution_props: &ExecutionProps,
) -> Result<LogicalPlan> {
if let LogicalPlan::Explain {
verbose,
@@ -142,6 +145,7 @@ pub fn optimize_children(
&*plan,
stringified_plans,
&schema.as_ref().to_owned().into(),
+ execution_props,
);
}
@@ -149,7 +153,7 @@ pub fn optimize_children(
let new_inputs = plan
.inputs()
.into_iter()
- .map(|plan| optimizer.optimize(plan))
+ .map(|plan| optimizer.optimize(plan, execution_props))
.collect::<Result<Vec<_>>>()?;
from_plan(plan, &new_exprs, &new_inputs)
@@ -443,7 +447,11 @@ mod tests {
struct TestOptimizer {}
impl OptimizerRule for TestOptimizer {
- fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+ fn optimize(
+ &self,
+ plan: &LogicalPlan,
+ _: &ExecutionProps,
+ ) -> Result<LogicalPlan> {
Ok(plan.clone())
}
@@ -465,6 +473,7 @@ mod tests {
&empty_plan,
&[StringifiedPlan::new(PlanType::LogicalPlan, "...")],
schema.as_ref(),
+ &ExecutionProps::new(),
)?;
match &optimized_explain {
diff --git a/datafusion/src/physical_plan/datetime_expressions.rs
b/datafusion/src/physical_plan/datetime_expressions.rs
index 7b58161..ec52e6b 100644
--- a/datafusion/src/physical_plan/datetime_expressions.rs
+++ b/datafusion/src/physical_plan/datetime_expressions.rs
@@ -268,6 +268,23 @@ pub fn to_timestamp(args: &[ColumnarValue]) ->
Result<ColumnarValue> {
)
}
+/// Create an implementation of `now()` that always returns the
+/// specified timestamp.
+///
+/// The semantics of `now()` require it to return the same value
+/// whenever it is called in a query. This this value is chosen during
+/// planning time and bound into a closure that
+pub fn make_now(
+ now_ts: DateTime<Utc>,
+) -> impl Fn(&[ColumnarValue]) -> Result<ColumnarValue> {
+ let now_ts = Some(now_ts.timestamp_nanos());
+ move |_arg| {
+ Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
+ now_ts,
+ )))
+ }
+}
+
fn date_trunc_single(granularity: &str, value: i64) -> Result<i64> {
let value = timestamp_ns_to_datetime(value).with_nanosecond(0);
let value = match granularity {
@@ -300,7 +317,7 @@ fn date_trunc_single(granularity: &str, value: i64) ->
Result<i64> {
return Err(DataFusionError::Execution(format!(
"Unsupported date_trunc granularity: {}",
unsupported
- )))
+ )));
}
};
// `with_x(0)` are infalible because `0` are always a valid
diff --git a/datafusion/src/physical_plan/functions.rs
b/datafusion/src/physical_plan/functions.rs
index 960d7c5..2e053a8 100644
--- a/datafusion/src/physical_plan/functions.rs
+++ b/datafusion/src/physical_plan/functions.rs
@@ -33,6 +33,7 @@ use super::{
type_coercion::{coerce, data_types},
ColumnarValue, PhysicalExpr,
};
+use crate::execution::context::ExecutionContextState;
use crate::physical_plan::array_expressions;
use crate::physical_plan::datetime_expressions;
use crate::physical_plan::expressions::{nullif_func, SUPPORTED_NULLIF_TYPES};
@@ -194,6 +195,8 @@ pub enum BuiltinScalarFunction {
ToHex,
/// to_timestamp
ToTimestamp,
+ ///now
+ Now,
/// translate
Translate,
/// trim
@@ -273,6 +276,7 @@ impl FromStr for BuiltinScalarFunction {
"substr" => BuiltinScalarFunction::Substr,
"to_hex" => BuiltinScalarFunction::ToHex,
"to_timestamp" => BuiltinScalarFunction::ToTimestamp,
+ "now" => BuiltinScalarFunction::Now,
"translate" => BuiltinScalarFunction::Translate,
"trim" => BuiltinScalarFunction::Trim,
"upper" => BuiltinScalarFunction::Upper,
@@ -298,15 +302,6 @@ pub fn return_type(
// verify that this is a valid set of data types for this function
data_types(&arg_types, &signature(fun))?;
- if arg_types.is_empty() {
- // functions currently cannot be evaluated without arguments, as they
can't
- // know the number of rows to return.
- return Err(DataFusionError::Plan(format!(
- "Function '{}' requires at least one argument",
- fun
- )));
- }
-
// the return type of the built in function.
// Some built-in functions' return type depends on the incoming type.
match fun {
@@ -582,6 +577,7 @@ pub fn return_type(
BuiltinScalarFunction::ToTimestamp => {
Ok(DataType::Timestamp(TimeUnit::Nanosecond, None))
}
+ BuiltinScalarFunction::Now =>
Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)),
BuiltinScalarFunction::Translate => Ok(match arg_types[0] {
DataType::LargeUtf8 => DataType::LargeUtf8,
DataType::Utf8 => DataType::Utf8,
@@ -714,6 +710,7 @@ pub fn create_physical_expr(
fun: &BuiltinScalarFunction,
args: &[Arc<dyn PhysicalExpr>],
input_schema: &Schema,
+ ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn PhysicalExpr>> {
let fun_expr: ScalarFunctionImplementation = Arc::new(match fun {
// math functions
@@ -805,6 +802,22 @@ pub fn create_physical_expr(
}
BuiltinScalarFunction::DatePart => datetime_expressions::date_part,
BuiltinScalarFunction::DateTrunc => datetime_expressions::date_trunc,
+ BuiltinScalarFunction::Now => {
+ // bind value for now at plan time
+ let fun_expr = Arc::new(datetime_expressions::make_now(
+ ctx_state.execution_props.query_execution_start_time,
+ ));
+
+ // TODO refactor code to not return here, but instead fall through
below
+ let args = vec![];
+ let arg_types = vec![]; // has no args
+ return Ok(Arc::new(ScalarFunctionExpr::new(
+ &format!("{}", fun),
+ fun_expr,
+ args,
+ &return_type(&fun, &arg_types)?,
+ )));
+ }
BuiltinScalarFunction::InitCap => |args| match args[0].data_type() {
DataType::Utf8 => {
make_scalar_function(string_expressions::initcap::<i32>)(args)
@@ -1451,13 +1464,14 @@ mod tests {
($FUNC:ident, $ARGS:expr, $EXPECTED:expr, $EXPECTED_TYPE:ty,
$DATA_TYPE: ident, $ARRAY_TYPE:ident) => {
// used to provide type annotation
let expected: Result<Option<$EXPECTED_TYPE>> = $EXPECTED;
+ let ctx_state = ExecutionContextState::new();
// any type works here: we evaluate against a literal of `value`
let schema = Schema::new(vec![Field::new("a", DataType::Int32,
false)]);
let columns: Vec<ArrayRef> =
vec![Arc::new(Int32Array::from(vec![1]))];
let expr =
- create_physical_expr(&BuiltinScalarFunction::$FUNC, $ARGS,
&schema)?;
+ create_physical_expr(&BuiltinScalarFunction::$FUNC, $ARGS,
&schema, &ctx_state)?;
// type is correct
assert_eq!(expr.data_type(&schema)?, DataType::$DATA_TYPE);
@@ -3618,7 +3632,20 @@ mod tests {
#[test]
fn test_concat_error() -> Result<()> {
- let result = return_type(&BuiltinScalarFunction::Concat, &[]);
+ let ctx_state = ExecutionContextState::new();
+ let schema = Schema::new(vec![Field::new("a", DataType::Int32,
false)]);
+
+ let expr = create_physical_expr(
+ &BuiltinScalarFunction::Concat,
+ &[],
+ &schema,
+ &ctx_state,
+ )?;
+
+ let columns: Vec<ArrayRef> = vec![Arc::new(Int32Array::from(vec![1]))];
+ let batch = RecordBatch::try_new(Arc::new(schema.clone()), columns)?;
+ let result = expr.evaluate(&batch);
+
if result.is_ok() {
Err(DataFusionError::Plan(
"Function 'concat' cannot accept zero arguments".to_string(),
@@ -3640,11 +3667,13 @@ mod tests {
Field::new("b", value2.data_type().clone(), false),
]);
let columns: Vec<ArrayRef> = vec![value1, value2];
+ let ctx_state = ExecutionContextState::new();
let expr = create_physical_expr(
&BuiltinScalarFunction::Array,
&[col("a"), col("b")],
&schema,
+ &ctx_state,
)?;
// type is correct
@@ -3700,6 +3729,7 @@ mod tests {
#[cfg(feature = "regex_expressions")]
fn test_regexp_match() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Utf8, false)]);
+ let ctx_state = ExecutionContextState::new();
// concat(value, value)
let col_value: ArrayRef = Arc::new(StringArray::from(vec!["aaa-555"]));
@@ -3709,6 +3739,7 @@ mod tests {
&BuiltinScalarFunction::RegexpMatch,
&[col("a"), pattern],
&schema,
+ &ctx_state,
)?;
// type is correct
@@ -3737,6 +3768,7 @@ mod tests {
#[cfg(feature = "regex_expressions")]
fn test_regexp_match_all_literals() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32,
false)]);
+ let ctx_state = ExecutionContextState::new();
// concat(value, value)
let col_value = lit(ScalarValue::Utf8(Some("aaa-555".to_string())));
@@ -3746,6 +3778,7 @@ mod tests {
&BuiltinScalarFunction::RegexpMatch,
&[col_value, pattern],
&schema,
+ &ctx_state,
)?;
// type is correct
diff --git a/datafusion/src/physical_plan/parquet.rs
b/datafusion/src/physical_plan/parquet.rs
index 09dd48d..dee0fc8 100644
--- a/datafusion/src/physical_plan/parquet.rs
+++ b/datafusion/src/physical_plan/parquet.rs
@@ -21,25 +21,18 @@ use std::fmt;
use std::fs::File;
use std::sync::Arc;
use std::task::{Context, Poll};
-use std::{
- any::Any,
- collections::{HashMap, HashSet},
-};
+use std::{any::Any, collections::HashSet};
use super::{
planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr,
RecordBatchStream,
SendableRecordBatchStream,
};
-use crate::{
- catalog::catalog::MemoryCatalogList,
- physical_plan::{common, ExecutionPlan, Partitioning},
-};
+use crate::physical_plan::{common, ExecutionPlan, Partitioning};
use crate::{
error::{DataFusionError, Result},
execution::context::ExecutionContextState,
logical_plan::{Expr, Operator},
optimizer::utils,
- prelude::ExecutionConfig,
};
use arrow::record_batch::RecordBatch;
use arrow::{
@@ -393,13 +386,7 @@ impl RowGroupPredicateBuilder {
.map(|(_, _, f)| f.clone())
.collect::<Vec<_>>();
let stat_schema = Schema::new(stat_fields);
- let execution_context_state = ExecutionContextState {
- catalog_list: Arc::new(MemoryCatalogList::new()),
- scalar_functions: HashMap::new(),
- var_provider: HashMap::new(),
- aggregate_functions: HashMap::new(),
- config: ExecutionConfig::new(),
- };
+ let execution_context_state = ExecutionContextState::new();
let predicate_expr =
DefaultPhysicalPlanner::default().create_physical_expr(
&logical_predicate_expr,
&stat_schema,
diff --git a/datafusion/src/physical_plan/planner.rs
b/datafusion/src/physical_plan/planner.rs
index acbb863..664e4dc 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -555,7 +555,12 @@ impl DefaultPhysicalPlanner {
.iter()
.map(|e| self.create_physical_expr(e, input_schema,
ctx_state))
.collect::<Result<Vec<_>>>()?;
- functions::create_physical_expr(fun, &physical_args,
input_schema)
+ functions::create_physical_expr(
+ fun,
+ &physical_args,
+ input_schema,
+ ctx_state,
+ )
}
Expr::ScalarUDF { fun, args } => {
let mut physical_args = vec![];
@@ -736,30 +741,20 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) ->
Result<(T, R)> {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::logical_plan::{DFField, DFSchema, DFSchemaRef};
use crate::physical_plan::{csv::CsvReadOptions, expressions, Partitioning};
- use crate::prelude::ExecutionConfig;
use crate::scalar::ScalarValue;
use crate::{
- catalog::catalog::MemoryCatalogList,
- logical_plan::{DFField, DFSchema, DFSchemaRef},
- };
- use crate::{
logical_plan::{col, lit, sum, LogicalPlanBuilder},
physical_plan::SendableRecordBatchStream,
};
use arrow::datatypes::{DataType, Field, SchemaRef};
use async_trait::async_trait;
use fmt::Debug;
- use std::{any::Any, collections::HashMap, fmt};
+ use std::{any::Any, fmt};
fn make_ctx_state() -> ExecutionContextState {
- ExecutionContextState {
- catalog_list: Arc::new(MemoryCatalogList::new()),
- scalar_functions: HashMap::new(),
- var_provider: HashMap::new(),
- aggregate_functions: HashMap::new(),
- config: ExecutionConfig::new(),
- }
+ ExecutionContextState::new()
}
fn plan(logical_plan: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> {
diff --git a/datafusion/src/physical_plan/type_coercion.rs
b/datafusion/src/physical_plan/type_coercion.rs
index d9f84e7..98ae09c 100644
--- a/datafusion/src/physical_plan/type_coercion.rs
+++ b/datafusion/src/physical_plan/type_coercion.rs
@@ -46,6 +46,10 @@ pub fn coerce(
schema: &Schema,
signature: &Signature,
) -> Result<Vec<Arc<dyn PhysicalExpr>>> {
+ if expressions.is_empty() {
+ return Ok(vec![]);
+ }
+
let current_types = expressions
.iter()
.map(|e| e.data_type(schema))
@@ -68,6 +72,10 @@ pub fn data_types(
current_types: &[DataType],
signature: &Signature,
) -> Result<Vec<DataType>> {
+ if current_types.is_empty() {
+ return Ok(vec![]);
+ }
+
let valid_types = get_valid_types(signature, current_types)?;
if valid_types
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 4b53e2f..c80ffe4 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -2342,7 +2342,7 @@ macro_rules! test_expression {
let mut ctx = ExecutionContext::new();
let sql = format!("SELECT {}", $SQL);
let actual = execute(&mut ctx, sql.as_str()).await;
- assert_eq!($EXPECTED, actual[0][0]);
+ assert_eq!(actual[0][0], $EXPECTED);
};
}
@@ -2864,6 +2864,53 @@ async fn test_cast_expressions() -> Result<()> {
}
#[tokio::test]
+async fn test_current_timestamp_expressions() -> Result<()> {
+ let t1 = chrono::Utc::now().timestamp();
+ let mut ctx = ExecutionContext::new();
+ let actual = execute(&mut ctx, "SELECT NOW(), NOW() as t2").await;
+ let res1 = actual[0][0].as_str();
+ let res2 = actual[0][1].as_str();
+ let t3 = chrono::Utc::now().timestamp();
+ let t2_naive =
+ chrono::NaiveDateTime::parse_from_str(res1, "%Y-%m-%d
%H:%M:%S%.6f").unwrap();
+
+ let t2 = t2_naive.timestamp();
+ assert!(t1 <= t2 && t2 <= t3);
+ assert_eq!(res2, res1);
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn test_current_timestamp_expressions_non_optimized() -> Result<()> {
+ let t1 = chrono::Utc::now().timestamp();
+ let ctx = ExecutionContext::new();
+ let sql = "SELECT NOW(), NOW() as t2";
+
+ let msg = format!("Creating logical plan for '{}'", sql);
+ let plan = ctx.create_logical_plan(sql).expect(&msg);
+
+ let msg = format!("Creating physical plan for '{}': {:?}", sql, plan);
+ let plan = ctx.create_physical_plan(&plan).expect(&msg);
+
+ let msg = format!("Executing physical plan for '{}': {:?}", sql, plan);
+ let res = collect(plan).await.expect(&msg);
+ let actual = result_vec(&res);
+
+ let res1 = actual[0][0].as_str();
+ let res2 = actual[0][1].as_str();
+ let t3 = chrono::Utc::now().timestamp();
+ let t2_naive =
+ chrono::NaiveDateTime::parse_from_str(res1, "%Y-%m-%d
%H:%M:%S%.6f").unwrap();
+
+ let t2 = t2_naive.timestamp();
+ assert!(t1 <= t2 && t2 <= t3);
+ assert_eq!(res2, res1);
+
+ Ok(())
+}
+
+#[tokio::test]
async fn test_cast_expressions_error() -> Result<()> {
// sin(utf8) should error
let mut ctx = create_ctx()?;
diff --git a/datafusion/tests/user_defined_plan.rs
b/datafusion/tests/user_defined_plan.rs
index f9f2443..5e38c57 100644
--- a/datafusion/tests/user_defined_plan.rs
+++ b/datafusion/tests/user_defined_plan.rs
@@ -85,6 +85,7 @@ use std::task::{Context, Poll};
use std::{any::Any, collections::BTreeMap, fmt, sync::Arc};
use async_trait::async_trait;
+use datafusion::execution::context::ExecutionProps;
use datafusion::logical_plan::DFSchemaRef;
/// Execute the specified sql and return the resulting record batches
@@ -211,7 +212,11 @@ impl QueryPlanner for TopKQueryPlanner {
struct TopKOptimizerRule {}
impl OptimizerRule for TopKOptimizerRule {
// Example rewrite pass to insert a user defined LogicalPlanNode
- fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+ fn optimize(
+ &self,
+ plan: &LogicalPlan,
+ execution_props: &ExecutionProps,
+ ) -> Result<LogicalPlan> {
// Note: this code simply looks for the pattern of a Limit followed by
a
// Sort and replaces it by a TopK node. It does not handle many
// edge cases (e.g multiple sort columns, sort ASC / DESC), etc.
@@ -226,7 +231,7 @@ impl OptimizerRule for TopKOptimizerRule {
return Ok(LogicalPlan::Extension {
node: Arc::new(TopKPlanNode {
k: *n,
- input: self.optimize(input.as_ref())?,
+ input: self.optimize(input.as_ref(),
execution_props)?,
expr: expr[0].clone(),
}),
});
@@ -236,7 +241,7 @@ impl OptimizerRule for TopKOptimizerRule {
// If we didn't find the Limit/Sort combination, recurse as
// normal and build the result.
- optimize_children(self, plan)
+ optimize_children(self, plan, execution_props)
}
fn name(&self) -> &str {