This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new b096539  [Datafusion] NOW() function support (#288)
b096539 is described below

commit b096539d9670d9547bfd3ae1fb47ef95b2d06f96
Author: sathis <[email protected]>
AuthorDate: Fri May 14 23:30:09 2021 +0530

    [Datafusion] NOW() function support (#288)
    
    * Add initial implementation of NOW
    
    * Run rustfmt
    
    * Change incorrect condition
    
    * Add timestamp optimizer which optimizes the logical plan and makes sure 
all now() return same value
    
    * Add unit tests & fix alias
    
    * Add unit tests & fix alias
    
    * Run cargo fmt
    
    * Comment out failing test
    
    * Optimize the match to fix clippy
    
    * Initialize datetime during optimize not creation
    
    * Add assertion to compare multiple now() values
    
    * Run cargo fmt
    
    * Move timestamp to execution props
    
    * Add missing prop
    
    * Add missing prop
    
    * Remove duplicated code
    
    * Fix tests & format
    
    * Fix clippy
    
    * Revert clippy fix
    
    * Update datafusion/src/execution/context.rs
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * Fix review comments. Move timestamp evaluation logic to 
constant_folding.rs
    
    * Pass ExecutionProps to scalar functions
    
    * Revert "Pass ExecutionProps to scalar functions"
    
    This reverts commit d9cb005df4a4c1bf05b18b5d9a1aefc4f9e706bb.
    
    * Add closure approach from @alamb
    
    * Re-enable concat test
    
    * Changing Option<DateTime<Utc>> to DateTime<Utc>
    
    Co-authored-by: Sathis Kumar <[email protected]>
    Co-authored-by: Andrew Lamb <[email protected]>
---
 .../core/src/serde/physical_plan/from_proto.rs     |   6 +-
 datafusion/src/execution/context.rs                | 128 ++++++++++++++-------
 datafusion/src/optimizer/constant_folding.rs       | 107 ++++++++++++++++-
 datafusion/src/optimizer/eliminate_limit.rs        |  13 ++-
 datafusion/src/optimizer/filter_push_down.rs       |   7 +-
 datafusion/src/optimizer/hash_build_probe_order.rs |  17 ++-
 datafusion/src/optimizer/limit_push_down.rs        |   7 +-
 datafusion/src/optimizer/optimizer.rs              |   7 +-
 datafusion/src/optimizer/projection_push_down.rs   |  41 +++++--
 datafusion/src/optimizer/utils.rs                  |  15 ++-
 .../src/physical_plan/datetime_expressions.rs      |  19 ++-
 datafusion/src/physical_plan/functions.rs          |  55 +++++++--
 datafusion/src/physical_plan/parquet.rs            |  19 +--
 datafusion/src/physical_plan/planner.rs            |  23 ++--
 datafusion/src/physical_plan/type_coercion.rs      |   8 ++
 datafusion/tests/sql.rs                            |  49 +++++++-
 datafusion/tests/user_defined_plan.rs              |  11 +-
 17 files changed, 415 insertions(+), 117 deletions(-)

diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs 
b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index 6a33c6a..9c35c9d 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -33,7 +33,9 @@ use arrow::datatypes::{DataType, Schema, SchemaRef};
 use datafusion::catalog::catalog::{
     CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider,
 };
-use datafusion::execution::context::{ExecutionConfig, ExecutionContextState};
+use datafusion::execution::context::{
+    ExecutionConfig, ExecutionContextState, ExecutionProps,
+};
 use datafusion::logical_plan::{DFSchema, Expr};
 use datafusion::physical_plan::aggregates::{create_aggregate_expr, 
AggregateFunction};
 use datafusion::physical_plan::expressions::col;
@@ -226,6 +228,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for 
&protobuf::PhysicalPlanNode {
                     var_provider: Default::default(),
                     aggregate_functions: Default::default(),
                     config: ExecutionConfig::new(),
+                    execution_props: ExecutionProps::new(),
                 };
 
                 let input_schema = hash_agg
@@ -391,6 +394,7 @@ fn compile_expr(
         var_provider: HashMap::new(),
         aggregate_functions: HashMap::new(),
         config: ExecutionConfig::new(),
+        execution_props: ExecutionProps::new(),
     };
     let expr: Expr = expr.try_into()?;
     df_planner
diff --git a/datafusion/src/execution/context.rs 
b/datafusion/src/execution/context.rs
index b53f7c1..9c7a621 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -74,6 +74,7 @@ use crate::sql::{
 };
 use crate::variable::{VarProvider, VarType};
 use crate::{dataframe::DataFrame, physical_plan::udaf::AggregateUDF};
+use chrono::{DateTime, Utc};
 use parquet::arrow::ArrowWriter;
 use parquet::file::properties::WriterProperties;
 
@@ -159,6 +160,7 @@ impl ExecutionContext {
                 var_provider: HashMap::new(),
                 aggregate_functions: HashMap::new(),
                 config,
+                execution_props: ExecutionProps::new(),
             })),
         }
     }
@@ -454,12 +456,16 @@ impl ExecutionContext {
 
     /// Optimizes the logical plan by applying optimizer rules.
     pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
-        let optimizers = &self.state.lock().unwrap().config.optimizers;
+        let state = &mut self.state.lock().unwrap();
+        let execution_props = &mut state.execution_props.clone();
+        let optimizers = &state.config.optimizers;
+
+        let execution_props = execution_props.start_execution();
 
         let mut new_plan = plan.clone();
         debug!("Logical plan:\n {:?}", plan);
         for optimizer in optimizers {
-            new_plan = optimizer.optimize(&new_plan)?;
+            new_plan = optimizer.optimize(&new_plan, execution_props)?;
         }
         debug!("Optimized logical plan:\n {:?}", new_plan);
         Ok(new_plan)
@@ -470,7 +476,9 @@ impl ExecutionContext {
         &self,
         logical_plan: &LogicalPlan,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        let state = self.state.lock().unwrap();
+        let mut state = self.state.lock().unwrap();
+        state.execution_props.start_execution();
+
         state
             .config
             .query_planner
@@ -740,6 +748,15 @@ impl ExecutionConfig {
     }
 }
 
+/// Holds per-execution properties and data (such as starting timestamps, etc).
+/// An instance of this struct is created each time a [`LogicalPlan`] is 
prepared for
+/// execution (optimized). If the same plan is optimized multiple times, a new
+/// `ExecutionProps` is created each time.
+#[derive(Clone)]
+pub struct ExecutionProps {
+    pub(crate) query_execution_start_time: DateTime<Utc>,
+}
+
 /// Execution context for registering data sources and executing queries
 #[derive(Clone)]
 pub struct ExecutionContextState {
@@ -753,9 +770,38 @@ pub struct ExecutionContextState {
     pub aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
     /// Context configuration
     pub config: ExecutionConfig,
+    /// Execution properties
+    pub execution_props: ExecutionProps,
+}
+
+impl ExecutionProps {
+    /// Creates a new execution props
+    pub fn new() -> Self {
+        ExecutionProps {
+            query_execution_start_time: chrono::Utc::now(),
+        }
+    }
+
+    /// Marks the execution of query started timestamp
+    pub fn start_execution(&mut self) -> &Self {
+        self.query_execution_start_time = chrono::Utc::now();
+        &*self
+    }
 }
 
 impl ExecutionContextState {
+    /// Returns new ExecutionContextState
+    pub fn new() -> Self {
+        ExecutionContextState {
+            catalog_list: Arc::new(MemoryCatalogList::new()),
+            scalar_functions: HashMap::new(),
+            var_provider: HashMap::new(),
+            aggregate_functions: HashMap::new(),
+            config: ExecutionConfig::new(),
+            execution_props: ExecutionProps::new(),
+        }
+    }
+
     fn resolve_table_ref<'a>(
         &'a self,
         table_ref: impl Into<TableReference<'a>>,
@@ -1507,7 +1553,7 @@ mod tests {
             
"+-------------------------+-------------------------+-------------------------+---------------------+",
             "| 2021-01-01 05:11:10.432 | 2021-01-01 05:11:10.432 | 2021-01-01 
05:11:10.432 | 2021-01-01 05:11:10 |",
             
"+-------------------------+-------------------------+-------------------------+---------------------+",
-];
+        ];
         assert_batches_sorted_eq!(expected, &results);
 
         Ok(())
@@ -1633,7 +1679,7 @@ mod tests {
 
         let results = plan_and_collect(
             &mut ctx,
-            "SELECT date_trunc('week', t1) as week, SUM(c2) FROM test GROUP BY 
date_trunc('week', t1)"
+            "SELECT date_trunc('week', t1) as week, SUM(c2) FROM test GROUP BY 
date_trunc('week', t1)",
         ).await?;
         assert_eq!(results.len(), 1);
 
@@ -1881,16 +1927,15 @@ mod tests {
         let results = 
run_count_distinct_integers_aggregated_scenario(partitions).await?;
         assert_eq!(results.len(), 1);
 
-        let expected = vec!
-[
-    
"+---------+-----------------+------------------------+-------------------------+-------------------------+-------------------------+-------------------------+--------------------------+--------------------------+--------------------------+",
-    "| c_group | COUNT(c_uint64) | COUNT(DISTINCT c_int8) | COUNT(DISTINCT 
c_int16) | COUNT(DISTINCT c_int32) | COUNT(DISTINCT c_int64) | COUNT(DISTINCT 
c_uint8) | COUNT(DISTINCT c_uint16) | COUNT(DISTINCT c_uint32) | COUNT(DISTINCT 
c_uint64) |",
-    
"+---------+-----------------+------------------------+-------------------------+-------------------------+-------------------------+-------------------------+--------------------------+--------------------------+--------------------------+",
-    "| a       | 3               | 2                      | 2                  
     | 2                       | 2                       | 2                    
   | 2                        | 2                        | 2                    
    |",
-    "| b       | 1               | 1                      | 1                  
     | 1                       | 1                       | 1                    
   | 1                        | 1                        | 1                    
    |",
-    "| c       | 3               | 2                      | 2                  
     | 2                       | 2                       | 2                    
   | 2                        | 2                        | 2                    
    |",
-    
"+---------+-----------------+------------------------+-------------------------+-------------------------+-------------------------+-------------------------+--------------------------+--------------------------+--------------------------+",
-];
+        let expected = vec![
+            
"+---------+-----------------+------------------------+-------------------------+-------------------------+-------------------------+-------------------------+--------------------------+--------------------------+--------------------------+",
+            "| c_group | COUNT(c_uint64) | COUNT(DISTINCT c_int8) | 
COUNT(DISTINCT c_int16) | COUNT(DISTINCT c_int32) | COUNT(DISTINCT c_int64) | 
COUNT(DISTINCT c_uint8) | COUNT(DISTINCT c_uint16) | COUNT(DISTINCT c_uint32) | 
COUNT(DISTINCT c_uint64) |",
+            
"+---------+-----------------+------------------------+-------------------------+-------------------------+-------------------------+-------------------------+--------------------------+--------------------------+--------------------------+",
+            "| a       | 3               | 2                      | 2          
             | 2                       | 2                       | 2            
           | 2                        | 2                        | 2            
            |",
+            "| b       | 1               | 1                      | 1          
             | 1                       | 1                       | 1            
           | 1                        | 1                        | 1            
            |",
+            "| c       | 3               | 2                      | 2          
             | 2                       | 2                       | 2            
           | 2                        | 2                        | 2            
            |",
+            
"+---------+-----------------+------------------------+-------------------------+-------------------------+-------------------------+-------------------------+--------------------------+--------------------------+--------------------------+",
+        ];
         assert_batches_sorted_eq!(expected, &results);
 
         Ok(())
@@ -1910,14 +1955,14 @@ mod tests {
         assert_eq!(results.len(), 1);
 
         let expected = vec![
-    
"+---------+-----------------+------------------------+-------------------------+-------------------------+-------------------------+-------------------------+--------------------------+--------------------------+--------------------------+",
-    "| c_group | COUNT(c_uint64) | COUNT(DISTINCT c_int8) | COUNT(DISTINCT 
c_int16) | COUNT(DISTINCT c_int32) | COUNT(DISTINCT c_int64) | COUNT(DISTINCT 
c_uint8) | COUNT(DISTINCT c_uint16) | COUNT(DISTINCT c_uint32) | COUNT(DISTINCT 
c_uint64) |",
-    
"+---------+-----------------+------------------------+-------------------------+-------------------------+-------------------------+-------------------------+--------------------------+--------------------------+--------------------------+",
-    "| a       | 5               | 3                      | 3                  
     | 3                       | 3                       | 3                    
   | 3                        | 3                        | 3                    
    |",
-    "| b       | 5               | 4                      | 4                  
     | 4                       | 4                       | 4                    
   | 4                        | 4                        | 4                    
    |",
-    "| c       | 1               | 1                      | 1                  
     | 1                       | 1                       | 1                    
   | 1                        | 1                        | 1                    
    |",
-    
"+---------+-----------------+------------------------+-------------------------+-------------------------+-------------------------+-------------------------+--------------------------+--------------------------+--------------------------+",
-];
+            
"+---------+-----------------+------------------------+-------------------------+-------------------------+-------------------------+-------------------------+--------------------------+--------------------------+--------------------------+",
+            "| c_group | COUNT(c_uint64) | COUNT(DISTINCT c_int8) | 
COUNT(DISTINCT c_int16) | COUNT(DISTINCT c_int32) | COUNT(DISTINCT c_int64) | 
COUNT(DISTINCT c_uint8) | COUNT(DISTINCT c_uint16) | COUNT(DISTINCT c_uint32) | 
COUNT(DISTINCT c_uint64) |",
+            
"+---------+-----------------+------------------------+-------------------------+-------------------------+-------------------------+-------------------------+--------------------------+--------------------------+--------------------------+",
+            "| a       | 5               | 3                      | 3          
             | 3                       | 3                       | 3            
           | 3                        | 3                        | 3            
            |",
+            "| b       | 5               | 4                      | 4          
             | 4                       | 4                       | 4            
           | 4                        | 4                        | 4            
            |",
+            "| c       | 1               | 1                      | 1          
             | 1                       | 1                       | 1            
           | 1                        | 1                        | 1            
            |",
+            
"+---------+-----------------+------------------------+-------------------------+-------------------------+-------------------------+-------------------------+--------------------------+--------------------------+--------------------------+",
+        ];
         assert_batches_sorted_eq!(expected, &results);
 
         Ok(())
@@ -2311,6 +2356,7 @@ mod tests {
         }
         Ok(())
     }
+
     #[test]
     fn ctx_sql_should_optimize_plan() -> Result<()> {
         let mut ctx = ExecutionContext::new();
@@ -2844,13 +2890,11 @@ mod tests {
             .await
             .unwrap();
         let expected = vec![
-
-    
"+---------------+--------------+------------+-------------+------------------+----------------+-------------+-----------+--------------------------+------------------------+-------------------+-------------------------+---------------+--------------------+---------------+",
-    "| table_catalog | table_schema | table_name | column_name | 
ordinal_position | column_default | is_nullable | data_type | 
character_maximum_length | character_octet_length | numeric_precision | 
numeric_precision_radix | numeric_scale | datetime_precision | interval_type |",
-    
"+---------------+--------------+------------+-------------+------------------+----------------+-------------+-----------+--------------------------+------------------------+-------------------+-------------------------+---------------+--------------------+---------------+",
-    "| datafusion    | public       | t          | i           | 0             
   |                | YES         | Int32     |                          |      
                  | 32                | 2                       |               
|                    |               |",
-    
"+---------------+--------------+------------+-------------+------------------+----------------+-------------+-----------+--------------------------+------------------------+-------------------+-------------------------+---------------+--------------------+---------------+",
-
+            
"+---------------+--------------+------------+-------------+------------------+----------------+-------------+-----------+--------------------------+------------------------+-------------------+-------------------------+---------------+--------------------+---------------+",
+            "| table_catalog | table_schema | table_name | column_name | 
ordinal_position | column_default | is_nullable | data_type | 
character_maximum_length | character_octet_length | numeric_precision | 
numeric_precision_radix | numeric_scale | datetime_precision | interval_type |",
+            
"+---------------+--------------+------------+-------------+------------------+----------------+-------------+-----------+--------------------------+------------------------+-------------------+-------------------------+---------------+--------------------+---------------+",
+            "| datafusion    | public       | t          | i           | 0     
           |                | YES         | Int32     |                         
 |                        | 32                | 2                       |       
        |                    |               |",
+            
"+---------------+--------------+------------+-------------+------------------+----------------+-------------+-----------+--------------------------+------------------------+-------------------+-------------------------+---------------+--------------------+---------------+",
         ];
         assert_batches_sorted_eq!(expected, &result);
 
@@ -2984,18 +3028,18 @@ mod tests {
                 .unwrap();
 
         let expected = vec![
-    
"+---------------+--------------+------------+------------------+------------------+----------------+-------------+-----------------------------+--------------------------+------------------------+-------------------+-------------------------+---------------+--------------------+---------------+",
-    "| table_catalog | table_schema | table_name | column_name      | 
ordinal_position | column_default | is_nullable | data_type                   | 
character_maximum_length | character_octet_length | numeric_precision | 
numeric_precision_radix | numeric_scale | datetime_precision | interval_type |",
-    
"+---------------+--------------+------------+------------------+------------------+----------------+-------------+-----------------------------+--------------------------+------------------------+-------------------+-------------------------+---------------+--------------------+---------------+",
-    "| my_catalog    | my_schema    | t1         | i                | 0        
        |                | YES         | Int32                       |          
                |                        | 32                | 2                
       |               |                    |               |",
-    "| my_catalog    | my_schema    | t2         | binary_col       | 4        
        |                | NO          | Binary                      |          
                | 2147483647             |                   |                  
       |               |                    |               |",
-    "| my_catalog    | my_schema    | t2         | float64_col      | 1        
        |                | YES         | Float64                     |          
                |                        | 24                | 2                
       |               |                    |               |",
-    "| my_catalog    | my_schema    | t2         | int32_col        | 0        
        |                | NO          | Int32                       |          
                |                        | 32                | 2                
       |               |                    |               |",
-    "| my_catalog    | my_schema    | t2         | large_binary_col | 5        
        |                | NO          | LargeBinary                 |          
                | 9223372036854775807    |                   |                  
       |               |                    |               |",
-    "| my_catalog    | my_schema    | t2         | large_utf8_col   | 3        
        |                | NO          | LargeUtf8                   |          
                | 9223372036854775807    |                   |                  
       |               |                    |               |",
-    "| my_catalog    | my_schema    | t2         | timestamp_nanos  | 6        
        |                | NO          | Timestamp(Nanosecond, None) |          
                |                        |                   |                  
       |               |                    |               |",
-    "| my_catalog    | my_schema    | t2         | utf8_col         | 2        
        |                | YES         | Utf8                        |          
                | 2147483647             |                   |                  
       |               |                    |               |",
-    
"+---------------+--------------+------------+------------------+------------------+----------------+-------------+-----------------------------+--------------------------+------------------------+-------------------+-------------------------+---------------+--------------------+---------------+",
+            
"+---------------+--------------+------------+------------------+------------------+----------------+-------------+-----------------------------+--------------------------+------------------------+-------------------+-------------------------+---------------+--------------------+---------------+",
+            "| table_catalog | table_schema | table_name | column_name      | 
ordinal_position | column_default | is_nullable | data_type                   | 
character_maximum_length | character_octet_length | numeric_precision | 
numeric_precision_radix | numeric_scale | datetime_precision | interval_type |",
+            
"+---------------+--------------+------------+------------------+------------------+----------------+-------------+-----------------------------+--------------------------+------------------------+-------------------+-------------------------+---------------+--------------------+---------------+",
+            "| my_catalog    | my_schema    | t1         | i                | 
0                |                | YES         | Int32                       | 
                         |                        | 32                | 2       
                |               |                    |               |",
+            "| my_catalog    | my_schema    | t2         | binary_col       | 
4                |                | NO          | Binary                      | 
                         | 2147483647             |                   |         
                |               |                    |               |",
+            "| my_catalog    | my_schema    | t2         | float64_col      | 
1                |                | YES         | Float64                     | 
                         |                        | 24                | 2       
                |               |                    |               |",
+            "| my_catalog    | my_schema    | t2         | int32_col        | 
0                |                | NO          | Int32                       | 
                         |                        | 32                | 2       
                |               |                    |               |",
+            "| my_catalog    | my_schema    | t2         | large_binary_col | 
5                |                | NO          | LargeBinary                 | 
                         | 9223372036854775807    |                   |         
                |               |                    |               |",
+            "| my_catalog    | my_schema    | t2         | large_utf8_col   | 
3                |                | NO          | LargeUtf8                   | 
                         | 9223372036854775807    |                   |         
                |               |                    |               |",
+            "| my_catalog    | my_schema    | t2         | timestamp_nanos  | 
6                |                | NO          | Timestamp(Nanosecond, None) | 
                         |                        |                   |         
                |               |                    |               |",
+            "| my_catalog    | my_schema    | t2         | utf8_col         | 
2                |                | YES         | Utf8                        | 
                         | 2147483647             |                   |         
                |               |                    |               |",
+            
"+---------------+--------------+------------+------------------+------------------+----------------+-------------+-----------------------------+--------------------------+------------------------+-------------------+-------------------------+---------------+--------------------+---------------+",
         ];
         assert_batches_sorted_eq!(expected, &result);
     }
diff --git a/datafusion/src/optimizer/constant_folding.rs 
b/datafusion/src/optimizer/constant_folding.rs
index 71c84f6..51bf0ce 100644
--- a/datafusion/src/optimizer/constant_folding.rs
+++ b/datafusion/src/optimizer/constant_folding.rs
@@ -23,9 +23,11 @@ use std::sync::Arc;
 use arrow::datatypes::DataType;
 
 use crate::error::Result;
+use crate::execution::context::ExecutionProps;
 use crate::logical_plan::{DFSchemaRef, Expr, ExprRewriter, LogicalPlan, 
Operator};
 use crate::optimizer::optimizer::OptimizerRule;
 use crate::optimizer::utils;
+use crate::physical_plan::functions::BuiltinScalarFunction;
 use crate::scalar::ScalarValue;
 
 /// Optimizer that simplifies comparison expressions involving boolean 
literals.
@@ -47,7 +49,11 @@ impl ConstantFolding {
 }
 
 impl OptimizerRule for ConstantFolding {
-    fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        execution_props: &ExecutionProps,
+    ) -> Result<LogicalPlan> {
         // We need to pass down the all schemas within the plan tree to 
`optimize_expr` in order to
         // to evaluate expression types. For example, a projection plan's 
schema will only include
         // projected columns. With just the projected schema, it's not 
possible to infer types for
@@ -55,12 +61,13 @@ impl OptimizerRule for ConstantFolding {
         // children plans.
         let mut rewriter = ConstantRewriter {
             schemas: plan.all_schemas(),
+            execution_props,
         };
 
         match plan {
             LogicalPlan::Filter { predicate, input } => Ok(LogicalPlan::Filter 
{
                 predicate: predicate.clone().rewrite(&mut rewriter)?,
-                input: Arc::new(self.optimize(input)?),
+                input: Arc::new(self.optimize(input, execution_props)?),
             }),
             // Rest: recurse into plan, apply optimization where possible
             LogicalPlan::Projection { .. }
@@ -78,7 +85,7 @@ impl OptimizerRule for ConstantFolding {
                 let inputs = plan.inputs();
                 let new_inputs = inputs
                     .iter()
-                    .map(|plan| self.optimize(plan))
+                    .map(|plan| self.optimize(plan, execution_props))
                     .collect::<Result<Vec<_>>>()?;
 
                 let expr = plan
@@ -103,6 +110,7 @@ impl OptimizerRule for ConstantFolding {
 struct ConstantRewriter<'a> {
     /// input schemas
     schemas: Vec<&'a DFSchemaRef>,
+    execution_props: &'a ExecutionProps,
 }
 
 impl<'a> ConstantRewriter<'a> {
@@ -200,6 +208,14 @@ impl<'a> ExprRewriter for ConstantRewriter<'a> {
                     Expr::Not(inner)
                 }
             }
+            Expr::ScalarFunction {
+                fun: BuiltinScalarFunction::Now,
+                ..
+            } => Expr::Literal(ScalarValue::TimestampNanosecond(Some(
+                self.execution_props
+                    .query_execution_start_time
+                    .timestamp_nanos(),
+            ))),
             expr => {
                 // no rewrite possible
                 expr
@@ -217,6 +233,7 @@ mod tests {
     };
 
     use arrow::datatypes::*;
+    use chrono::{DateTime, Utc};
 
     fn test_table_scan() -> Result<LogicalPlan> {
         let schema = Schema::new(vec![
@@ -243,6 +260,7 @@ mod tests {
         let schema = expr_test_schema();
         let mut rewriter = ConstantRewriter {
             schemas: vec![&schema],
+            execution_props: &ExecutionProps::new(),
         };
 
         assert_eq!(
@@ -258,6 +276,7 @@ mod tests {
         let schema = expr_test_schema();
         let mut rewriter = ConstantRewriter {
             schemas: vec![&schema],
+            execution_props: &ExecutionProps::new(),
         };
 
         // x = null is always null
@@ -293,6 +312,7 @@ mod tests {
         let schema = expr_test_schema();
         let mut rewriter = ConstantRewriter {
             schemas: vec![&schema],
+            execution_props: &ExecutionProps::new(),
         };
 
         assert_eq!(col("c2").get_type(&schema)?, DataType::Boolean);
@@ -323,6 +343,7 @@ mod tests {
         let schema = expr_test_schema();
         let mut rewriter = ConstantRewriter {
             schemas: vec![&schema],
+            execution_props: &ExecutionProps::new(),
         };
 
         // When one of the operand is not of boolean type, folding the other 
boolean constant will
@@ -362,6 +383,7 @@ mod tests {
         let schema = expr_test_schema();
         let mut rewriter = ConstantRewriter {
             schemas: vec![&schema],
+            execution_props: &ExecutionProps::new(),
         };
 
         assert_eq!(col("c2").get_type(&schema)?, DataType::Boolean);
@@ -397,6 +419,7 @@ mod tests {
         let schema = expr_test_schema();
         let mut rewriter = ConstantRewriter {
             schemas: vec![&schema],
+            execution_props: &ExecutionProps::new(),
         };
 
         // when one of the operand is not of boolean type, folding the other 
boolean constant will
@@ -432,6 +455,7 @@ mod tests {
         let schema = expr_test_schema();
         let mut rewriter = ConstantRewriter {
             schemas: vec![&schema],
+            execution_props: &ExecutionProps::new(),
         };
 
         assert_eq!(
@@ -459,7 +483,9 @@ mod tests {
 
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
         let rule = ConstantFolding::new();
-        let optimized_plan = rule.optimize(plan).expect("failed to optimize 
plan");
+        let optimized_plan = rule
+            .optimize(plan, &ExecutionProps::new())
+            .expect("failed to optimize plan");
         let formatted_plan = format!("{:?}", optimized_plan);
         assert_eq!(formatted_plan, expected);
     }
@@ -589,4 +615,77 @@ mod tests {
         assert_optimized_plan_eq(&plan, expected);
         Ok(())
     }
+
+    fn get_optimized_plan_formatted(
+        plan: &LogicalPlan,
+        date_time: &DateTime<Utc>,
+    ) -> String {
+        let rule = ConstantFolding::new();
+        let execution_props = ExecutionProps {
+            query_execution_start_time: *date_time,
+        };
+
+        let optimized_plan = rule
+            .optimize(plan, &execution_props)
+            .expect("failed to optimize plan");
+        return format!("{:?}", optimized_plan);
+    }
+
+    #[test]
+    fn single_now_expr() {
+        let table_scan = test_table_scan().unwrap();
+        let proj = vec![Expr::ScalarFunction {
+            args: vec![],
+            fun: BuiltinScalarFunction::Now,
+        }];
+        let time = chrono::Utc::now();
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(proj)
+            .unwrap()
+            .build()
+            .unwrap();
+
+        let expected = format!(
+            "Projection: TimestampNanosecond({})\
+            \n  TableScan: test projection=None",
+            time.timestamp_nanos()
+        );
+        let actual = get_optimized_plan_formatted(&plan, &time);
+
+        assert_eq!(expected, actual);
+    }
+
+    #[test]
+    fn multiple_now_expr() {
+        let table_scan = test_table_scan().unwrap();
+        let time = chrono::Utc::now();
+        let proj = vec![
+            Expr::ScalarFunction {
+                args: vec![],
+                fun: BuiltinScalarFunction::Now,
+            },
+            Expr::Alias(
+                Box::new(Expr::ScalarFunction {
+                    args: vec![],
+                    fun: BuiltinScalarFunction::Now,
+                }),
+                "t2".to_string(),
+            ),
+        ];
+        let plan = LogicalPlanBuilder::from(&table_scan)
+            .project(proj)
+            .unwrap()
+            .build()
+            .unwrap();
+
+        let actual = get_optimized_plan_formatted(&plan, &time);
+        let expected = format!(
+            "Projection: TimestampNanosecond({}), TimestampNanosecond({}) AS 
t2\
+            \n  TableScan: test projection=None",
+            time.timestamp_nanos(),
+            time.timestamp_nanos()
+        );
+
+        assert_eq!(actual, expected);
+    }
 }
diff --git a/datafusion/src/optimizer/eliminate_limit.rs 
b/datafusion/src/optimizer/eliminate_limit.rs
index 87b33d6..1b965f1 100644
--- a/datafusion/src/optimizer/eliminate_limit.rs
+++ b/datafusion/src/optimizer/eliminate_limit.rs
@@ -22,6 +22,7 @@ use crate::logical_plan::LogicalPlan;
 use crate::optimizer::optimizer::OptimizerRule;
 
 use super::utils;
+use crate::execution::context::ExecutionProps;
 
 /// Optimization rule that replaces LIMIT 0 with an 
[LogicalPlan::EmptyRelation]
 pub struct EliminateLimit;
@@ -34,7 +35,11 @@ impl EliminateLimit {
 }
 
 impl OptimizerRule for EliminateLimit {
-    fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        execution_props: &ExecutionProps,
+    ) -> Result<LogicalPlan> {
         match plan {
             LogicalPlan::Limit { n, input } if *n == 0 => {
                 Ok(LogicalPlan::EmptyRelation {
@@ -50,7 +55,7 @@ impl OptimizerRule for EliminateLimit {
                 let inputs = plan.inputs();
                 let new_inputs = inputs
                     .iter()
-                    .map(|plan| self.optimize(plan))
+                    .map(|plan| self.optimize(plan, execution_props))
                     .collect::<Result<Vec<_>>>()?;
 
                 utils::from_plan(plan, &expr, &new_inputs)
@@ -72,7 +77,9 @@ mod tests {
 
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
         let rule = EliminateLimit::new();
-        let optimized_plan = rule.optimize(plan).expect("failed to optimize 
plan");
+        let optimized_plan = rule
+            .optimize(plan, &ExecutionProps::new())
+            .expect("failed to optimize plan");
         let formatted_plan = format!("{:?}", optimized_plan);
         assert_eq!(formatted_plan, expected);
         assert_eq!(plan.schema(), optimized_plan.schema());
diff --git a/datafusion/src/optimizer/filter_push_down.rs 
b/datafusion/src/optimizer/filter_push_down.rs
index 356d497..4c248e2 100644
--- a/datafusion/src/optimizer/filter_push_down.rs
+++ b/datafusion/src/optimizer/filter_push_down.rs
@@ -15,6 +15,7 @@
 //! Filter Push Down optimizer rule ensures that filters are applied as early 
as possible in the plan
 
 use crate::datasource::datasource::TableProviderFilterPushDown;
+use crate::execution::context::ExecutionProps;
 use crate::logical_plan::{and, LogicalPlan};
 use crate::logical_plan::{DFSchema, Expr};
 use crate::optimizer::optimizer::OptimizerRule;
@@ -413,7 +414,7 @@ impl OptimizerRule for FilterPushDown {
         "filter_push_down"
     }
 
-    fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+    fn optimize(&self, plan: &LogicalPlan, _: &ExecutionProps) -> 
Result<LogicalPlan> {
         optimize(plan, State::default())
     }
 }
@@ -456,7 +457,9 @@ mod tests {
 
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
         let rule = FilterPushDown::new();
-        let optimized_plan = rule.optimize(plan).expect("failed to optimize 
plan");
+        let optimized_plan = rule
+            .optimize(plan, &ExecutionProps::new())
+            .expect("failed to optimize plan");
         let formatted_plan = format!("{:?}", optimized_plan);
         assert_eq!(formatted_plan, expected);
     }
diff --git a/datafusion/src/optimizer/hash_build_probe_order.rs 
b/datafusion/src/optimizer/hash_build_probe_order.rs
index b27171f..168c4a1 100644
--- a/datafusion/src/optimizer/hash_build_probe_order.rs
+++ b/datafusion/src/optimizer/hash_build_probe_order.rs
@@ -27,6 +27,7 @@ use crate::optimizer::optimizer::OptimizerRule;
 use crate::{error::Result, prelude::JoinType};
 
 use super::utils;
+use crate::execution::context::ExecutionProps;
 
 /// BuildProbeOrder reorders the build and probe phase of
 /// hash joins. This uses the amount of rows that a datasource has.
@@ -106,7 +107,11 @@ impl OptimizerRule for HashBuildProbeOrder {
         "hash_build_probe_order"
     }
 
-    fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        execution_props: &ExecutionProps,
+    ) -> Result<LogicalPlan> {
         match plan {
             // Main optimization rule, swaps order of left and right
             // based on number of rows in each table
@@ -117,8 +122,8 @@ impl OptimizerRule for HashBuildProbeOrder {
                 join_type,
                 schema,
             } => {
-                let left = self.optimize(left)?;
-                let right = self.optimize(right)?;
+                let left = self.optimize(left, execution_props)?;
+                let right = self.optimize(right, execution_props)?;
                 if should_swap_join_order(&left, &right) {
                     // Swap left and right, change join type and (equi-)join 
key order
                     Ok(LogicalPlan::Join {
@@ -147,8 +152,8 @@ impl OptimizerRule for HashBuildProbeOrder {
                 right,
                 schema,
             } => {
-                let left = self.optimize(left)?;
-                let right = self.optimize(right)?;
+                let left = self.optimize(left, execution_props)?;
+                let right = self.optimize(right, execution_props)?;
                 if should_swap_join_order(&left, &right) {
                     // Swap left and right
                     Ok(LogicalPlan::CrossJoin {
@@ -184,7 +189,7 @@ impl OptimizerRule for HashBuildProbeOrder {
                 let inputs = plan.inputs();
                 let new_inputs = inputs
                     .iter()
-                    .map(|plan| self.optimize(plan))
+                    .map(|plan| self.optimize(plan, execution_props))
                     .collect::<Result<Vec<_>>>()?;
 
                 utils::from_plan(plan, &expr, &new_inputs)
diff --git a/datafusion/src/optimizer/limit_push_down.rs 
b/datafusion/src/optimizer/limit_push_down.rs
index 73a231f..e616869 100644
--- a/datafusion/src/optimizer/limit_push_down.rs
+++ b/datafusion/src/optimizer/limit_push_down.rs
@@ -21,6 +21,7 @@ use std::sync::Arc;
 
 use super::utils;
 use crate::error::Result;
+use crate::execution::context::ExecutionProps;
 use crate::logical_plan::LogicalPlan;
 use crate::optimizer::optimizer::OptimizerRule;
 
@@ -125,7 +126,7 @@ fn limit_push_down(
 }
 
 impl OptimizerRule for LimitPushDown {
-    fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+    fn optimize(&self, plan: &LogicalPlan, _: &ExecutionProps) -> 
Result<LogicalPlan> {
         limit_push_down(None, plan)
     }
 
@@ -143,7 +144,9 @@ mod test {
 
     fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
         let rule = LimitPushDown::new();
-        let optimized_plan = rule.optimize(plan).expect("failed to optimize 
plan");
+        let optimized_plan = rule
+            .optimize(plan, &ExecutionProps::new())
+            .expect("failed to optimize plan");
         let formatted_plan = format!("{:?}", optimized_plan);
         assert_eq!(formatted_plan, expected);
     }
diff --git a/datafusion/src/optimizer/optimizer.rs 
b/datafusion/src/optimizer/optimizer.rs
index dee8e06..5cf4047 100644
--- a/datafusion/src/optimizer/optimizer.rs
+++ b/datafusion/src/optimizer/optimizer.rs
@@ -18,6 +18,7 @@
 //! Query optimizer traits
 
 use crate::error::Result;
+use crate::execution::context::ExecutionProps;
 use crate::logical_plan::LogicalPlan;
 
 /// `OptimizerRule` transforms one ['LogicalPlan'] into another which
@@ -25,7 +26,11 @@ use crate::logical_plan::LogicalPlan;
 /// way.
 pub trait OptimizerRule {
     /// Rewrite `plan` to an optimized form
-    fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan>;
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        execution_props: &ExecutionProps,
+    ) -> Result<LogicalPlan>;
 
     /// A human readable name for this optimizer rule
     fn name(&self) -> &str;
diff --git a/datafusion/src/optimizer/projection_push_down.rs 
b/datafusion/src/optimizer/projection_push_down.rs
index 7243fa5..21c9cab 100644
--- a/datafusion/src/optimizer/projection_push_down.rs
+++ b/datafusion/src/optimizer/projection_push_down.rs
@@ -19,6 +19,7 @@
 //! loaded into memory
 
 use crate::error::Result;
+use crate::execution::context::ExecutionProps;
 use crate::logical_plan::{DFField, DFSchema, DFSchemaRef, LogicalPlan, 
ToDFSchema};
 use crate::optimizer::optimizer::OptimizerRule;
 use crate::optimizer::utils;
@@ -32,7 +33,11 @@ use utils::optimize_explain;
 pub struct ProjectionPushDown {}
 
 impl OptimizerRule for ProjectionPushDown {
-    fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        execution_props: &ExecutionProps,
+    ) -> Result<LogicalPlan> {
         // set of all columns refered by the plan (and thus considered 
required by the root)
         let required_columns = plan
             .schema()
@@ -40,7 +45,7 @@ impl OptimizerRule for ProjectionPushDown {
             .iter()
             .map(|f| f.name().clone())
             .collect::<HashSet<String>>();
-        optimize_plan(self, plan, &required_columns, false)
+        optimize_plan(self, plan, &required_columns, false, execution_props)
     }
 
     fn name(&self) -> &str {
@@ -105,6 +110,7 @@ fn optimize_plan(
     plan: &LogicalPlan,
     required_columns: &HashSet<String>, // set of columns required up to this 
step
     has_projection: bool,
+    execution_props: &ExecutionProps,
 ) -> Result<LogicalPlan> {
     let mut new_required_columns = required_columns.clone();
     match plan {
@@ -137,8 +143,13 @@ fn optimize_plan(
                     }
                 })?;
 
-            let new_input =
-                optimize_plan(optimizer, &input, &new_required_columns, true)?;
+            let new_input = optimize_plan(
+                optimizer,
+                &input,
+                &new_required_columns,
+                true,
+                execution_props,
+            )?;
             if new_fields.is_empty() {
                 // no need for an expression at all
                 Ok(new_input)
@@ -167,12 +178,14 @@ fn optimize_plan(
                     &left,
                     &new_required_columns,
                     true,
+                    execution_props,
                 )?),
                 right: Arc::new(optimize_plan(
                     optimizer,
                     &right,
                     &new_required_columns,
                     true,
+                    execution_props,
                 )?),
 
                 join_type: *join_type,
@@ -226,6 +239,7 @@ fn optimize_plan(
                     &input,
                     &new_required_columns,
                     true,
+                    execution_props,
                 )?),
                 schema: DFSchemaRef::new(new_schema),
             })
@@ -259,7 +273,14 @@ fn optimize_plan(
             schema,
         } => {
             let schema = schema.as_ref().to_owned().into();
-            optimize_explain(optimizer, *verbose, &*plan, stringified_plans, 
&schema)
+            optimize_explain(
+                optimizer,
+                *verbose,
+                &*plan,
+                stringified_plans,
+                &schema,
+                execution_props,
+            )
         }
         // all other nodes: Add any additional columns used by
         // expressions in this node to the list of required columns
@@ -281,7 +302,13 @@ fn optimize_plan(
             let new_inputs = inputs
                 .iter()
                 .map(|plan| {
-                    optimize_plan(optimizer, plan, &new_required_columns, 
has_projection)
+                    optimize_plan(
+                        optimizer,
+                        plan,
+                        &new_required_columns,
+                        has_projection,
+                        execution_props,
+                    )
                 })
                 .collect::<Result<Vec<_>>>()?;
 
@@ -538,6 +565,6 @@ mod tests {
 
     fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
         let rule = ProjectionPushDown::new();
-        rule.optimize(plan)
+        rule.optimize(plan, &ExecutionProps::new())
     }
 }
diff --git a/datafusion/src/optimizer/utils.rs 
b/datafusion/src/optimizer/utils.rs
index 0ec3fa7..9288c65 100644
--- a/datafusion/src/optimizer/utils.rs
+++ b/datafusion/src/optimizer/utils.rs
@@ -22,6 +22,7 @@ use std::{collections::HashSet, sync::Arc};
 use arrow::datatypes::Schema;
 
 use super::optimizer::OptimizerRule;
+use crate::execution::context::ExecutionProps;
 use crate::logical_plan::{
     Expr, LogicalPlan, Operator, Partitioning, PlanType, Recursion, 
StringifiedPlan,
     ToDFSchema,
@@ -101,11 +102,12 @@ pub fn optimize_explain(
     plan: &LogicalPlan,
     stringified_plans: &[StringifiedPlan],
     schema: &Schema,
+    execution_props: &ExecutionProps,
 ) -> Result<LogicalPlan> {
     // These are the fields of LogicalPlan::Explain It might be nice
     // to transform that enum Variant into its own struct and avoid
     // passing the fields individually
-    let plan = Arc::new(optimizer.optimize(plan)?);
+    let plan = Arc::new(optimizer.optimize(plan, execution_props)?);
     let mut stringified_plans = stringified_plans.to_vec();
     let optimizer_name = optimizer.name().into();
     stringified_plans.push(StringifiedPlan::new(
@@ -128,6 +130,7 @@ pub fn optimize_explain(
 pub fn optimize_children(
     optimizer: &impl OptimizerRule,
     plan: &LogicalPlan,
+    execution_props: &ExecutionProps,
 ) -> Result<LogicalPlan> {
     if let LogicalPlan::Explain {
         verbose,
@@ -142,6 +145,7 @@ pub fn optimize_children(
             &*plan,
             stringified_plans,
             &schema.as_ref().to_owned().into(),
+            execution_props,
         );
     }
 
@@ -149,7 +153,7 @@ pub fn optimize_children(
     let new_inputs = plan
         .inputs()
         .into_iter()
-        .map(|plan| optimizer.optimize(plan))
+        .map(|plan| optimizer.optimize(plan, execution_props))
         .collect::<Result<Vec<_>>>()?;
 
     from_plan(plan, &new_exprs, &new_inputs)
@@ -443,7 +447,11 @@ mod tests {
     struct TestOptimizer {}
 
     impl OptimizerRule for TestOptimizer {
-        fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+        fn optimize(
+            &self,
+            plan: &LogicalPlan,
+            _: &ExecutionProps,
+        ) -> Result<LogicalPlan> {
             Ok(plan.clone())
         }
 
@@ -465,6 +473,7 @@ mod tests {
             &empty_plan,
             &[StringifiedPlan::new(PlanType::LogicalPlan, "...")],
             schema.as_ref(),
+            &ExecutionProps::new(),
         )?;
 
         match &optimized_explain {
diff --git a/datafusion/src/physical_plan/datetime_expressions.rs 
b/datafusion/src/physical_plan/datetime_expressions.rs
index 7b58161..ec52e6b 100644
--- a/datafusion/src/physical_plan/datetime_expressions.rs
+++ b/datafusion/src/physical_plan/datetime_expressions.rs
@@ -268,6 +268,23 @@ pub fn to_timestamp(args: &[ColumnarValue]) -> 
Result<ColumnarValue> {
     )
 }
 
+/// Create an implementation of `now()` that always returns the
+/// specified timestamp.
+///
+/// The semantics of `now()` require it to return the same value
+/// whenever it is called in a query. This this value is chosen during
+/// planning time and bound into a closure that
+pub fn make_now(
+    now_ts: DateTime<Utc>,
+) -> impl Fn(&[ColumnarValue]) -> Result<ColumnarValue> {
+    let now_ts = Some(now_ts.timestamp_nanos());
+    move |_arg| {
+        Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
+            now_ts,
+        )))
+    }
+}
+
 fn date_trunc_single(granularity: &str, value: i64) -> Result<i64> {
     let value = timestamp_ns_to_datetime(value).with_nanosecond(0);
     let value = match granularity {
@@ -300,7 +317,7 @@ fn date_trunc_single(granularity: &str, value: i64) -> 
Result<i64> {
             return Err(DataFusionError::Execution(format!(
                 "Unsupported date_trunc granularity: {}",
                 unsupported
-            )))
+            )));
         }
     };
     // `with_x(0)` are infalible because `0` are always a valid
diff --git a/datafusion/src/physical_plan/functions.rs 
b/datafusion/src/physical_plan/functions.rs
index 960d7c5..2e053a8 100644
--- a/datafusion/src/physical_plan/functions.rs
+++ b/datafusion/src/physical_plan/functions.rs
@@ -33,6 +33,7 @@ use super::{
     type_coercion::{coerce, data_types},
     ColumnarValue, PhysicalExpr,
 };
+use crate::execution::context::ExecutionContextState;
 use crate::physical_plan::array_expressions;
 use crate::physical_plan::datetime_expressions;
 use crate::physical_plan::expressions::{nullif_func, SUPPORTED_NULLIF_TYPES};
@@ -194,6 +195,8 @@ pub enum BuiltinScalarFunction {
     ToHex,
     /// to_timestamp
     ToTimestamp,
+    ///now
+    Now,
     /// translate
     Translate,
     /// trim
@@ -273,6 +276,7 @@ impl FromStr for BuiltinScalarFunction {
             "substr" => BuiltinScalarFunction::Substr,
             "to_hex" => BuiltinScalarFunction::ToHex,
             "to_timestamp" => BuiltinScalarFunction::ToTimestamp,
+            "now" => BuiltinScalarFunction::Now,
             "translate" => BuiltinScalarFunction::Translate,
             "trim" => BuiltinScalarFunction::Trim,
             "upper" => BuiltinScalarFunction::Upper,
@@ -298,15 +302,6 @@ pub fn return_type(
     // verify that this is a valid set of data types for this function
     data_types(&arg_types, &signature(fun))?;
 
-    if arg_types.is_empty() {
-        // functions currently cannot be evaluated without arguments, as they 
can't
-        // know the number of rows to return.
-        return Err(DataFusionError::Plan(format!(
-            "Function '{}' requires at least one argument",
-            fun
-        )));
-    }
-
     // the return type of the built in function.
     // Some built-in functions' return type depends on the incoming type.
     match fun {
@@ -582,6 +577,7 @@ pub fn return_type(
         BuiltinScalarFunction::ToTimestamp => {
             Ok(DataType::Timestamp(TimeUnit::Nanosecond, None))
         }
+        BuiltinScalarFunction::Now => 
Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)),
         BuiltinScalarFunction::Translate => Ok(match arg_types[0] {
             DataType::LargeUtf8 => DataType::LargeUtf8,
             DataType::Utf8 => DataType::Utf8,
@@ -714,6 +710,7 @@ pub fn create_physical_expr(
     fun: &BuiltinScalarFunction,
     args: &[Arc<dyn PhysicalExpr>],
     input_schema: &Schema,
+    ctx_state: &ExecutionContextState,
 ) -> Result<Arc<dyn PhysicalExpr>> {
     let fun_expr: ScalarFunctionImplementation = Arc::new(match fun {
         // math functions
@@ -805,6 +802,22 @@ pub fn create_physical_expr(
         }
         BuiltinScalarFunction::DatePart => datetime_expressions::date_part,
         BuiltinScalarFunction::DateTrunc => datetime_expressions::date_trunc,
+        BuiltinScalarFunction::Now => {
+            // bind value for now at plan time
+            let fun_expr = Arc::new(datetime_expressions::make_now(
+                ctx_state.execution_props.query_execution_start_time,
+            ));
+
+            // TODO refactor code to not return here, but instead fall through 
below
+            let args = vec![];
+            let arg_types = vec![]; // has no args
+            return Ok(Arc::new(ScalarFunctionExpr::new(
+                &format!("{}", fun),
+                fun_expr,
+                args,
+                &return_type(&fun, &arg_types)?,
+            )));
+        }
         BuiltinScalarFunction::InitCap => |args| match args[0].data_type() {
             DataType::Utf8 => {
                 make_scalar_function(string_expressions::initcap::<i32>)(args)
@@ -1451,13 +1464,14 @@ mod tests {
         ($FUNC:ident, $ARGS:expr, $EXPECTED:expr, $EXPECTED_TYPE:ty, 
$DATA_TYPE: ident, $ARRAY_TYPE:ident) => {
             // used to provide type annotation
             let expected: Result<Option<$EXPECTED_TYPE>> = $EXPECTED;
+            let ctx_state = ExecutionContextState::new();
 
             // any type works here: we evaluate against a literal of `value`
             let schema = Schema::new(vec![Field::new("a", DataType::Int32, 
false)]);
             let columns: Vec<ArrayRef> = 
vec![Arc::new(Int32Array::from(vec![1]))];
 
             let expr =
-                create_physical_expr(&BuiltinScalarFunction::$FUNC, $ARGS, 
&schema)?;
+                create_physical_expr(&BuiltinScalarFunction::$FUNC, $ARGS, 
&schema, &ctx_state)?;
 
             // type is correct
             assert_eq!(expr.data_type(&schema)?, DataType::$DATA_TYPE);
@@ -3618,7 +3632,20 @@ mod tests {
 
     #[test]
     fn test_concat_error() -> Result<()> {
-        let result = return_type(&BuiltinScalarFunction::Concat, &[]);
+        let ctx_state = ExecutionContextState::new();
+        let schema = Schema::new(vec![Field::new("a", DataType::Int32, 
false)]);
+
+        let expr = create_physical_expr(
+            &BuiltinScalarFunction::Concat,
+            &[],
+            &schema,
+            &ctx_state,
+        )?;
+
+        let columns: Vec<ArrayRef> = vec![Arc::new(Int32Array::from(vec![1]))];
+        let batch = RecordBatch::try_new(Arc::new(schema.clone()), columns)?;
+        let result = expr.evaluate(&batch);
+
         if result.is_ok() {
             Err(DataFusionError::Plan(
                 "Function 'concat' cannot accept zero arguments".to_string(),
@@ -3640,11 +3667,13 @@ mod tests {
             Field::new("b", value2.data_type().clone(), false),
         ]);
         let columns: Vec<ArrayRef> = vec![value1, value2];
+        let ctx_state = ExecutionContextState::new();
 
         let expr = create_physical_expr(
             &BuiltinScalarFunction::Array,
             &[col("a"), col("b")],
             &schema,
+            &ctx_state,
         )?;
 
         // type is correct
@@ -3700,6 +3729,7 @@ mod tests {
     #[cfg(feature = "regex_expressions")]
     fn test_regexp_match() -> Result<()> {
         let schema = Schema::new(vec![Field::new("a", DataType::Utf8, false)]);
+        let ctx_state = ExecutionContextState::new();
 
         // concat(value, value)
         let col_value: ArrayRef = Arc::new(StringArray::from(vec!["aaa-555"]));
@@ -3709,6 +3739,7 @@ mod tests {
             &BuiltinScalarFunction::RegexpMatch,
             &[col("a"), pattern],
             &schema,
+            &ctx_state,
         )?;
 
         // type is correct
@@ -3737,6 +3768,7 @@ mod tests {
     #[cfg(feature = "regex_expressions")]
     fn test_regexp_match_all_literals() -> Result<()> {
         let schema = Schema::new(vec![Field::new("a", DataType::Int32, 
false)]);
+        let ctx_state = ExecutionContextState::new();
 
         // concat(value, value)
         let col_value = lit(ScalarValue::Utf8(Some("aaa-555".to_string())));
@@ -3746,6 +3778,7 @@ mod tests {
             &BuiltinScalarFunction::RegexpMatch,
             &[col_value, pattern],
             &schema,
+            &ctx_state,
         )?;
 
         // type is correct
diff --git a/datafusion/src/physical_plan/parquet.rs 
b/datafusion/src/physical_plan/parquet.rs
index 09dd48d..dee0fc8 100644
--- a/datafusion/src/physical_plan/parquet.rs
+++ b/datafusion/src/physical_plan/parquet.rs
@@ -21,25 +21,18 @@ use std::fmt;
 use std::fs::File;
 use std::sync::Arc;
 use std::task::{Context, Poll};
-use std::{
-    any::Any,
-    collections::{HashMap, HashSet},
-};
+use std::{any::Any, collections::HashSet};
 
 use super::{
     planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr, 
RecordBatchStream,
     SendableRecordBatchStream,
 };
-use crate::{
-    catalog::catalog::MemoryCatalogList,
-    physical_plan::{common, ExecutionPlan, Partitioning},
-};
+use crate::physical_plan::{common, ExecutionPlan, Partitioning};
 use crate::{
     error::{DataFusionError, Result},
     execution::context::ExecutionContextState,
     logical_plan::{Expr, Operator},
     optimizer::utils,
-    prelude::ExecutionConfig,
 };
 use arrow::record_batch::RecordBatch;
 use arrow::{
@@ -393,13 +386,7 @@ impl RowGroupPredicateBuilder {
             .map(|(_, _, f)| f.clone())
             .collect::<Vec<_>>();
         let stat_schema = Schema::new(stat_fields);
-        let execution_context_state = ExecutionContextState {
-            catalog_list: Arc::new(MemoryCatalogList::new()),
-            scalar_functions: HashMap::new(),
-            var_provider: HashMap::new(),
-            aggregate_functions: HashMap::new(),
-            config: ExecutionConfig::new(),
-        };
+        let execution_context_state = ExecutionContextState::new();
         let predicate_expr = 
DefaultPhysicalPlanner::default().create_physical_expr(
             &logical_predicate_expr,
             &stat_schema,
diff --git a/datafusion/src/physical_plan/planner.rs 
b/datafusion/src/physical_plan/planner.rs
index acbb863..664e4dc 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -555,7 +555,12 @@ impl DefaultPhysicalPlanner {
                     .iter()
                     .map(|e| self.create_physical_expr(e, input_schema, 
ctx_state))
                     .collect::<Result<Vec<_>>>()?;
-                functions::create_physical_expr(fun, &physical_args, 
input_schema)
+                functions::create_physical_expr(
+                    fun,
+                    &physical_args,
+                    input_schema,
+                    ctx_state,
+                )
             }
             Expr::ScalarUDF { fun, args } => {
                 let mut physical_args = vec![];
@@ -736,30 +741,20 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> 
Result<(T, R)> {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::logical_plan::{DFField, DFSchema, DFSchemaRef};
     use crate::physical_plan::{csv::CsvReadOptions, expressions, Partitioning};
-    use crate::prelude::ExecutionConfig;
     use crate::scalar::ScalarValue;
     use crate::{
-        catalog::catalog::MemoryCatalogList,
-        logical_plan::{DFField, DFSchema, DFSchemaRef},
-    };
-    use crate::{
         logical_plan::{col, lit, sum, LogicalPlanBuilder},
         physical_plan::SendableRecordBatchStream,
     };
     use arrow::datatypes::{DataType, Field, SchemaRef};
     use async_trait::async_trait;
     use fmt::Debug;
-    use std::{any::Any, collections::HashMap, fmt};
+    use std::{any::Any, fmt};
 
     fn make_ctx_state() -> ExecutionContextState {
-        ExecutionContextState {
-            catalog_list: Arc::new(MemoryCatalogList::new()),
-            scalar_functions: HashMap::new(),
-            var_provider: HashMap::new(),
-            aggregate_functions: HashMap::new(),
-            config: ExecutionConfig::new(),
-        }
+        ExecutionContextState::new()
     }
 
     fn plan(logical_plan: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> {
diff --git a/datafusion/src/physical_plan/type_coercion.rs 
b/datafusion/src/physical_plan/type_coercion.rs
index d9f84e7..98ae09c 100644
--- a/datafusion/src/physical_plan/type_coercion.rs
+++ b/datafusion/src/physical_plan/type_coercion.rs
@@ -46,6 +46,10 @@ pub fn coerce(
     schema: &Schema,
     signature: &Signature,
 ) -> Result<Vec<Arc<dyn PhysicalExpr>>> {
+    if expressions.is_empty() {
+        return Ok(vec![]);
+    }
+
     let current_types = expressions
         .iter()
         .map(|e| e.data_type(schema))
@@ -68,6 +72,10 @@ pub fn data_types(
     current_types: &[DataType],
     signature: &Signature,
 ) -> Result<Vec<DataType>> {
+    if current_types.is_empty() {
+        return Ok(vec![]);
+    }
+
     let valid_types = get_valid_types(signature, current_types)?;
 
     if valid_types
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 4b53e2f..c80ffe4 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -2342,7 +2342,7 @@ macro_rules! test_expression {
         let mut ctx = ExecutionContext::new();
         let sql = format!("SELECT {}", $SQL);
         let actual = execute(&mut ctx, sql.as_str()).await;
-        assert_eq!($EXPECTED, actual[0][0]);
+        assert_eq!(actual[0][0], $EXPECTED);
     };
 }
 
@@ -2864,6 +2864,53 @@ async fn test_cast_expressions() -> Result<()> {
 }
 
 #[tokio::test]
+async fn test_current_timestamp_expressions() -> Result<()> {
+    let t1 = chrono::Utc::now().timestamp();
+    let mut ctx = ExecutionContext::new();
+    let actual = execute(&mut ctx, "SELECT NOW(), NOW() as t2").await;
+    let res1 = actual[0][0].as_str();
+    let res2 = actual[0][1].as_str();
+    let t3 = chrono::Utc::now().timestamp();
+    let t2_naive =
+        chrono::NaiveDateTime::parse_from_str(res1, "%Y-%m-%d 
%H:%M:%S%.6f").unwrap();
+
+    let t2 = t2_naive.timestamp();
+    assert!(t1 <= t2 && t2 <= t3);
+    assert_eq!(res2, res1);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_current_timestamp_expressions_non_optimized() -> Result<()> {
+    let t1 = chrono::Utc::now().timestamp();
+    let ctx = ExecutionContext::new();
+    let sql = "SELECT NOW(), NOW() as t2";
+
+    let msg = format!("Creating logical plan for '{}'", sql);
+    let plan = ctx.create_logical_plan(sql).expect(&msg);
+
+    let msg = format!("Creating physical plan for '{}': {:?}", sql, plan);
+    let plan = ctx.create_physical_plan(&plan).expect(&msg);
+
+    let msg = format!("Executing physical plan for '{}': {:?}", sql, plan);
+    let res = collect(plan).await.expect(&msg);
+    let actual = result_vec(&res);
+
+    let res1 = actual[0][0].as_str();
+    let res2 = actual[0][1].as_str();
+    let t3 = chrono::Utc::now().timestamp();
+    let t2_naive =
+        chrono::NaiveDateTime::parse_from_str(res1, "%Y-%m-%d 
%H:%M:%S%.6f").unwrap();
+
+    let t2 = t2_naive.timestamp();
+    assert!(t1 <= t2 && t2 <= t3);
+    assert_eq!(res2, res1);
+
+    Ok(())
+}
+
+#[tokio::test]
 async fn test_cast_expressions_error() -> Result<()> {
     // sin(utf8) should error
     let mut ctx = create_ctx()?;
diff --git a/datafusion/tests/user_defined_plan.rs 
b/datafusion/tests/user_defined_plan.rs
index f9f2443..5e38c57 100644
--- a/datafusion/tests/user_defined_plan.rs
+++ b/datafusion/tests/user_defined_plan.rs
@@ -85,6 +85,7 @@ use std::task::{Context, Poll};
 use std::{any::Any, collections::BTreeMap, fmt, sync::Arc};
 
 use async_trait::async_trait;
+use datafusion::execution::context::ExecutionProps;
 use datafusion::logical_plan::DFSchemaRef;
 
 /// Execute the specified sql and return the resulting record batches
@@ -211,7 +212,11 @@ impl QueryPlanner for TopKQueryPlanner {
 struct TopKOptimizerRule {}
 impl OptimizerRule for TopKOptimizerRule {
     // Example rewrite pass to insert a user defined LogicalPlanNode
-    fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        execution_props: &ExecutionProps,
+    ) -> Result<LogicalPlan> {
         // Note: this code simply looks for the pattern of a Limit followed by 
a
         // Sort and replaces it by a TopK node. It does not handle many
         // edge cases (e.g multiple sort columns, sort ASC / DESC), etc.
@@ -226,7 +231,7 @@ impl OptimizerRule for TopKOptimizerRule {
                     return Ok(LogicalPlan::Extension {
                         node: Arc::new(TopKPlanNode {
                             k: *n,
-                            input: self.optimize(input.as_ref())?,
+                            input: self.optimize(input.as_ref(), 
execution_props)?,
                             expr: expr[0].clone(),
                         }),
                     });
@@ -236,7 +241,7 @@ impl OptimizerRule for TopKOptimizerRule {
 
         // If we didn't find the Limit/Sort combination, recurse as
         // normal and build the result.
-        optimize_children(self, plan)
+        optimize_children(self, plan, execution_props)
     }
 
     fn name(&self) -> &str {

Reply via email to