This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new c8b8c74904 Add  
`SessionContext`/`SessionState::create_physical_expr()`  to create 
`PhysicalExpressions` from `Expr`s (#10330)
c8b8c74904 is described below

commit c8b8c74904972c57f559a167dd0ccd52c8f91076
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue May 7 08:43:01 2024 -0400

    Add  `SessionContext`/`SessionState::create_physical_expr()`  to create 
`PhysicalExpressions` from `Expr`s (#10330)
    
    * Improve coerce API so it does not need DFSchema
    
    * Add `SessionContext::create_physical_expr()` and 
`SessionState::create_physical_expr()`
    
    * Apply suggestions from code review
    
    Co-authored-by: Weston Pace <[email protected]>
    
    * Add note on simplification
    
    ---------
    
    Co-authored-by: Weston Pace <[email protected]>
---
 datafusion-examples/examples/expr_api.rs     |  34 ++---
 datafusion/common/src/dfschema.rs            |  29 +++++
 datafusion/core/src/execution/context/mod.rs | 113 ++++++++++++++++-
 datafusion/core/tests/core_integration.rs    |   3 +
 datafusion/core/tests/expr_api/mod.rs        | 181 +++++++++++++++++++++++++++
 datafusion/optimizer/src/analyzer/mod.rs     |   5 +
 6 files changed, 337 insertions(+), 28 deletions(-)

diff --git a/datafusion-examples/examples/expr_api.rs 
b/datafusion-examples/examples/expr_api.rs
index 2c1470a1d6..0082ed6eb9 100644
--- a/datafusion-examples/examples/expr_api.rs
+++ b/datafusion-examples/examples/expr_api.rs
@@ -25,9 +25,7 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema, 
TimeUnit};
 use datafusion::common::DFSchema;
 use datafusion::error::Result;
 use datafusion::optimizer::simplify_expressions::ExprSimplifier;
-use datafusion::physical_expr::{
-    analyze, create_physical_expr, AnalysisContext, ExprBoundaries, 
PhysicalExpr,
-};
+use datafusion::physical_expr::{analyze, AnalysisContext, ExprBoundaries};
 use datafusion::prelude::*;
 use datafusion_common::{ScalarValue, ToDFSchema};
 use datafusion_expr::execution_props::ExecutionProps;
@@ -92,7 +90,8 @@ fn evaluate_demo() -> Result<()> {
     let expr = col("a").lt(lit(5)).or(col("a").eq(lit(8)));
 
     // First, you make a "physical expression" from the logical `Expr`
-    let physical_expr = physical_expr(&batch.schema(), expr)?;
+    let df_schema = DFSchema::try_from(batch.schema())?;
+    let physical_expr = SessionContext::new().create_physical_expr(expr, 
&df_schema)?;
 
     // Now, you can evaluate the expression against the RecordBatch
     let result = physical_expr.evaluate(&batch)?;
@@ -213,7 +212,7 @@ fn range_analysis_demo() -> Result<()> {
     // `date < '2020-10-01' AND date > '2020-09-01'`
 
     // As always, we need to tell DataFusion the type of column "date"
-    let schema = Schema::new(vec![make_field("date", DataType::Date32)]);
+    let schema = Arc::new(Schema::new(vec![make_field("date", 
DataType::Date32)]));
 
     // You can provide DataFusion any known boundaries on the values of `date`
     // (for example, maybe you know you only have data up to `2020-09-15`), but
@@ -222,9 +221,13 @@ fn range_analysis_demo() -> Result<()> {
     let boundaries = ExprBoundaries::try_new_unbounded(&schema)?;
 
     // Now, we invoke the analysis code to perform the range analysis
-    let physical_expr = physical_expr(&schema, expr)?;
-    let analysis_result =
-        analyze(&physical_expr, AnalysisContext::new(boundaries), &schema)?;
+    let df_schema = DFSchema::try_from(schema)?;
+    let physical_expr = SessionContext::new().create_physical_expr(expr, 
&df_schema)?;
+    let analysis_result = analyze(
+        &physical_expr,
+        AnalysisContext::new(boundaries),
+        df_schema.as_ref(),
+    )?;
 
     // The results of the analysis is an range, encoded as an `Interval`,  for
     // each column in the schema, that must be true in order for the predicate
@@ -248,21 +251,6 @@ fn make_ts_field(name: &str) -> Field {
     make_field(name, DataType::Timestamp(TimeUnit::Nanosecond, tz))
 }
 
-/// Build a physical expression from a logical one, after applying 
simplification and type coercion
-pub fn physical_expr(schema: &Schema, expr: Expr) -> Result<Arc<dyn 
PhysicalExpr>> {
-    let df_schema = schema.clone().to_dfschema_ref()?;
-
-    // Simplify
-    let props = ExecutionProps::new();
-    let simplifier =
-        
ExprSimplifier::new(SimplifyContext::new(&props).with_schema(df_schema.clone()));
-
-    // apply type coercion here to ensure types match
-    let expr = simplifier.coerce(expr, &df_schema)?;
-
-    create_physical_expr(&expr, df_schema.as_ref(), &props)
-}
-
 /// This function shows how to use `Expr::get_type` to retrieve the DataType
 /// of an expression
 fn expression_type_demo() -> Result<()> {
diff --git a/datafusion/common/src/dfschema.rs 
b/datafusion/common/src/dfschema.rs
index b2a3de7235..3686af90db 100644
--- a/datafusion/common/src/dfschema.rs
+++ b/datafusion/common/src/dfschema.rs
@@ -125,6 +125,20 @@ impl DFSchema {
         }
     }
 
+    /// Return a reference to the inner Arrow [`Schema`]
+    ///
+    /// Note this does not have the qualifier information
+    pub fn as_arrow(&self) -> &Schema {
+        self.inner.as_ref()
+    }
+
+    /// Return a reference to the inner Arrow [`SchemaRef`]
+    ///
+    /// Note this does not have the qualifier information
+    pub fn inner(&self) -> &SchemaRef {
+        &self.inner
+    }
+
     /// Create a `DFSchema` from an Arrow schema where all the fields have a 
given qualifier
     pub fn new_with_metadata(
         qualified_fields: Vec<(Option<TableReference>, Arc<Field>)>,
@@ -806,6 +820,21 @@ impl From<&DFSchema> for Schema {
     }
 }
 
+/// Allow DFSchema to be converted into an Arrow `&Schema`
+impl AsRef<Schema> for DFSchema {
+    fn as_ref(&self) -> &Schema {
+        self.as_arrow()
+    }
+}
+
+/// Allow DFSchema to be converted into an Arrow `&SchemaRef` (to clone, for
+/// example)
+impl AsRef<SchemaRef> for DFSchema {
+    fn as_ref(&self) -> &SchemaRef {
+        self.inner()
+    }
+}
+
 /// Create a `DFSchema` from an Arrow schema
 impl TryFrom<Schema> for DFSchema {
     type Error = DataFusionError;
diff --git a/datafusion/core/src/execution/context/mod.rs 
b/datafusion/core/src/execution/context/mod.rs
index d84983f08e..f3af31f895 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -71,13 +71,13 @@ use datafusion_common::{
     config::{ConfigExtension, TableOptions},
     exec_err, not_impl_err, plan_datafusion_err, plan_err,
     tree_node::{TreeNodeRecursion, TreeNodeVisitor},
-    SchemaReference, TableReference,
+    DFSchema, SchemaReference, TableReference,
 };
 use datafusion_execution::registry::SerializerRegistry;
 use datafusion_expr::{
     logical_plan::{DdlStatement, Statement},
     var_provider::is_system_variables,
-    Expr, StringifiedPlan, UserDefinedLogicalNode, WindowUDF,
+    Expr, ExprSchemable, StringifiedPlan, UserDefinedLogicalNode, WindowUDF,
 };
 use datafusion_sql::{
     parser::{CopyToSource, CopyToStatement, DFParser},
@@ -87,15 +87,20 @@ use datafusion_sql::{
 
 use async_trait::async_trait;
 use chrono::{DateTime, Utc};
+use datafusion_common::tree_node::TreeNode;
 use parking_lot::RwLock;
 use sqlparser::dialect::dialect_from_str;
 use url::Url;
 use uuid::Uuid;
 
+use crate::physical_expr::PhysicalExpr;
 pub use datafusion_execution::config::SessionConfig;
 pub use datafusion_execution::TaskContext;
 pub use datafusion_expr::execution_props::ExecutionProps;
 use datafusion_expr::expr_rewriter::FunctionRewrite;
+use datafusion_expr::simplify::SimplifyInfo;
+use datafusion_optimizer::simplify_expressions::ExprSimplifier;
+use datafusion_physical_expr::create_physical_expr;
 
 mod avro;
 mod csv;
@@ -523,6 +528,41 @@ impl SessionContext {
         }
     }
 
+    /// Create a [`PhysicalExpr`] from an [`Expr`] after applying type
+    /// coercion and function rewrites.
+    ///
+    /// Note: The expression is not [simplified] or otherwise optimized:  `a = 
1
+    /// + 2` will not be simplified to `a = 3` as this is a more involved 
process.
+    /// See the [expr_api] example for how to simplify expressions.
+    ///
+    /// # Example
+    /// ```
+    /// # use std::sync::Arc;
+    /// # use arrow::datatypes::{DataType, Field, Schema};
+    /// # use datafusion::prelude::*;
+    /// # use datafusion_common::DFSchema;
+    /// // a = 1 (i64)
+    /// let expr = col("a").eq(lit(1i64));
+    /// // provide type information that `a` is an Int32
+    /// let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
+    /// let df_schema = DFSchema::try_from(schema).unwrap();
+    /// // Create a PhysicalExpr. Note DataFusion automatically coerces 
(casts) `1i64` to `1i32`
+    /// let physical_expr = SessionContext::new()
+    ///   .create_physical_expr(expr, &df_schema).unwrap();
+    /// ```
+    /// # See Also
+    /// * [`SessionState::create_physical_expr`] for a lower level API
+    ///
+    /// [simplified]: datafusion_optimizer::simplify_expressions
+    /// [expr_api]: 
https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/expr_api.rs
+    pub fn create_physical_expr(
+        &self,
+        expr: Expr,
+        df_schema: &DFSchema,
+    ) -> Result<Arc<dyn PhysicalExpr>> {
+        self.state.read().create_physical_expr(expr, df_schema)
+    }
+
     // return an empty dataframe
     fn return_empty_dataframe(&self) -> Result<DataFrame> {
         let plan = LogicalPlanBuilder::empty(false).build()?;
@@ -1946,13 +1986,14 @@ impl SessionState {
         }
     }
 
-    /// Creates a physical plan from a logical plan.
+    /// Creates a physical [`ExecutionPlan`] plan from a [`LogicalPlan`].
     ///
     /// Note: this first calls [`Self::optimize`] on the provided
     /// plan.
     ///
-    /// This function will error for [`LogicalPlan`]s such as catalog
-    /// DDL `CREATE TABLE` must be handled by another layer.
+    /// This function will error for [`LogicalPlan`]s such as catalog DDL like
+    /// `CREATE TABLE`, which do not have corresponding physical plans and must
+    /// be handled by another layer, typically [`SessionContext`].
     pub async fn create_physical_plan(
         &self,
         logical_plan: &LogicalPlan,
@@ -1963,6 +2004,39 @@ impl SessionState {
             .await
     }
 
+    /// Create a [`PhysicalExpr`] from an [`Expr`] after applying type
+    /// coercion, and function rewrites.
+    ///
+    /// Note: The expression is not [simplified] or otherwise optimized:  `a = 
1
+    /// + 2` will not be simplified to `a = 3` as this is a more involved 
process.
+    /// See the [expr_api] example for how to simplify expressions.
+    ///
+    /// # See Also:
+    /// * [`SessionContext::create_physical_expr`] for a higher-level API
+    /// * [`create_physical_expr`] for a lower-level API
+    ///
+    /// [simplified]: datafusion_optimizer::simplify_expressions
+    /// [expr_api]: 
https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/expr_api.rs
+    pub fn create_physical_expr(
+        &self,
+        expr: Expr,
+        df_schema: &DFSchema,
+    ) -> Result<Arc<dyn PhysicalExpr>> {
+        let simplifier =
+            ExprSimplifier::new(SessionSimplifyProvider::new(self, df_schema));
+        // apply type coercion here to ensure types match
+        let mut expr = simplifier.coerce(expr, df_schema)?;
+
+        // rewrite Exprs to functions if necessary
+        let config_options = self.config_options();
+        for rewrite in self.analyzer.function_rewrites() {
+            expr = expr
+                .transform_up(|expr| rewrite.rewrite(expr, df_schema, 
config_options))?
+                .data;
+        }
+        create_physical_expr(&expr, df_schema, self.execution_props())
+    }
+
     /// Return the session ID
     pub fn session_id(&self) -> &str {
         &self.session_id
@@ -2040,6 +2114,35 @@ impl SessionState {
     }
 }
 
+struct SessionSimplifyProvider<'a> {
+    state: &'a SessionState,
+    df_schema: &'a DFSchema,
+}
+
+impl<'a> SessionSimplifyProvider<'a> {
+    fn new(state: &'a SessionState, df_schema: &'a DFSchema) -> Self {
+        Self { state, df_schema }
+    }
+}
+
+impl<'a> SimplifyInfo for SessionSimplifyProvider<'a> {
+    fn is_boolean_type(&self, expr: &Expr) -> Result<bool> {
+        Ok(expr.get_type(self.df_schema)? == DataType::Boolean)
+    }
+
+    fn nullable(&self, expr: &Expr) -> Result<bool> {
+        expr.nullable(self.df_schema)
+    }
+
+    fn execution_props(&self) -> &ExecutionProps {
+        self.state.execution_props()
+    }
+
+    fn get_data_type(&self, expr: &Expr) -> Result<DataType> {
+        expr.get_type(self.df_schema)
+    }
+}
+
 struct SessionContextProvider<'a> {
     state: &'a SessionState,
     tables: HashMap<String, Arc<dyn TableSource>>,
diff --git a/datafusion/core/tests/core_integration.rs 
b/datafusion/core/tests/core_integration.rs
index befefb1d7e..f8ad8f1554 100644
--- a/datafusion/core/tests/core_integration.rs
+++ b/datafusion/core/tests/core_integration.rs
@@ -24,6 +24,9 @@ mod dataframe;
 /// Run all tests that are found in the `macro_hygiene` directory
 mod macro_hygiene;
 
+/// Run all tests that are found in the `expr_api` directory
+mod expr_api;
+
 #[cfg(test)]
 #[ctor::ctor]
 fn init() {
diff --git a/datafusion/core/tests/expr_api/mod.rs 
b/datafusion/core/tests/expr_api/mod.rs
new file mode 100644
index 0000000000..0dde7604cc
--- /dev/null
+++ b/datafusion/core/tests/expr_api/mod.rs
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::util::pretty::pretty_format_columns;
+use arrow_array::builder::{ListBuilder, StringBuilder};
+use arrow_array::{ArrayRef, RecordBatch, StringArray, StructArray};
+use arrow_schema::{DataType, Field};
+use datafusion::prelude::*;
+use datafusion_common::DFSchema;
+/// Tests of using and evaluating `Expr`s outside the context of a LogicalPlan
+use std::sync::{Arc, OnceLock};
+
+#[test]
+fn test_eq() {
+    // id = '2'
+    evaluate_expr_test(
+        col("id").eq(lit("2")),
+        vec![
+            "+-------+",
+            "| expr  |",
+            "+-------+",
+            "| false |",
+            "| true  |",
+            "| false |",
+            "+-------+",
+        ],
+    );
+}
+
+#[test]
+fn test_eq_with_coercion() {
+    // id = 2 (need to coerce the 2 to '2' to evaluate)
+    evaluate_expr_test(
+        col("id").eq(lit(2i32)),
+        vec![
+            "+-------+",
+            "| expr  |",
+            "+-------+",
+            "| false |",
+            "| true  |",
+            "| false |",
+            "+-------+",
+        ],
+    );
+}
+
+#[test]
+fn test_get_field() {
+    // field access Expr::field() requires a rewrite to work
+    evaluate_expr_test(
+        col("props").field("a"),
+        vec![
+            "+------------+",
+            "| expr       |",
+            "+------------+",
+            "| 2021-02-01 |",
+            "| 2021-02-02 |",
+            "| 2021-02-03 |",
+            "+------------+",
+        ],
+    );
+}
+
+#[test]
+fn test_nested_get_field() {
+    // field access Expr::field() requires a rewrite to work, test when it is
+    // not the root expression
+    evaluate_expr_test(
+        col("props")
+            .field("a")
+            .eq(lit("2021-02-02"))
+            .or(col("id").eq(lit(1))),
+        vec![
+            "+-------+",
+            "| expr  |",
+            "+-------+",
+            "| true  |",
+            "| true  |",
+            "| false |",
+            "+-------+",
+        ],
+    );
+}
+
+#[test]
+fn test_list() {
+    // list access also requires a rewrite to work
+    evaluate_expr_test(
+        col("list").index(lit(1i64)),
+        vec![
+            "+------+", "| expr |", "+------+", "| one  |", "| two  |", "| 
five |",
+            "+------+",
+        ],
+    );
+}
+
+#[test]
+fn test_list_range() {
+    // range access also requires a rewrite to work
+    evaluate_expr_test(
+        col("list").range(lit(1i64), lit(2i64)),
+        vec![
+            "+--------------+",
+            "| expr         |",
+            "+--------------+",
+            "| [one]        |",
+            "| [two, three] |",
+            "| [five]       |",
+            "+--------------+",
+        ],
+    );
+}
+
+/// Converts the `Expr` to a `PhysicalExpr`, evaluates it against the provided
+/// `RecordBatch` and compares the result to the expected result.
+fn evaluate_expr_test(expr: Expr, expected_lines: Vec<&str>) {
+    let batch = test_batch();
+    let df_schema = DFSchema::try_from(batch.schema()).unwrap();
+    let physical_expr = SessionContext::new()
+        .create_physical_expr(expr, &df_schema)
+        .unwrap();
+
+    let result = physical_expr.evaluate(&batch).unwrap();
+    let array = result.into_array(1).unwrap();
+    let result = pretty_format_columns("expr", &[array]).unwrap().to_string();
+    let actual_lines = result.lines().collect::<Vec<_>>();
+
+    assert_eq!(
+        expected_lines, actual_lines,
+        "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+        expected_lines, actual_lines
+    );
+}
+
+static TEST_BATCH: OnceLock<RecordBatch> = OnceLock::new();
+
+fn test_batch() -> RecordBatch {
+    TEST_BATCH
+        .get_or_init(|| {
+            let string_array: ArrayRef = Arc::new(StringArray::from(vec!["1", 
"2", "3"]));
+
+            // { a: "2021-02-01" } { a: "2021-02-02" } { a: "2021-02-03" }
+            let struct_array: ArrayRef = Arc::from(StructArray::from(vec![(
+                Arc::new(Field::new("a", DataType::Utf8, false)),
+                Arc::new(StringArray::from(vec![
+                    "2021-02-01",
+                    "2021-02-02",
+                    "2021-02-03",
+                ])) as _,
+            )]));
+
+            // ["one"] ["two", "three", "four"] ["five"]
+            let mut builder = ListBuilder::new(StringBuilder::new());
+            builder.append_value([Some("one")]);
+            builder.append_value([Some("two"), Some("three"), Some("four")]);
+            builder.append_value([Some("five")]);
+            let list_array: ArrayRef = Arc::new(builder.finish());
+
+            RecordBatch::try_from_iter(vec![
+                ("id", string_array),
+                ("props", struct_array),
+                ("list", list_array),
+            ])
+            .unwrap()
+        })
+        .clone()
+}
diff --git a/datafusion/optimizer/src/analyzer/mod.rs 
b/datafusion/optimizer/src/analyzer/mod.rs
index 1553f95266..121e46cc95 100644
--- a/datafusion/optimizer/src/analyzer/mod.rs
+++ b/datafusion/optimizer/src/analyzer/mod.rs
@@ -111,6 +111,11 @@ impl Analyzer {
         self.function_rewrites.push(rewrite);
     }
 
+    /// return the list of function rewrites in this analyzer
+    pub fn function_rewrites(&self) -> &[Arc<dyn FunctionRewrite + Send + 
Sync>] {
+        &self.function_rewrites
+    }
+
     /// Analyze the logical plan by applying analyzer rules, and
     /// do necessary check and fail the invalid plans
     pub fn execute_and_check<F>(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to