This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new c8b8c74904 Add
`SessionContext`/`SessionState::create_physical_expr()` to create
`PhysicalExpressions` from `Expr`s (#10330)
c8b8c74904 is described below
commit c8b8c74904972c57f559a167dd0ccd52c8f91076
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue May 7 08:43:01 2024 -0400
Add `SessionContext`/`SessionState::create_physical_expr()` to create
`PhysicalExpressions` from `Expr`s (#10330)
* Improve coerce API so it does not need DFSchema
* Add `SessionContext::create_physical_expr()` and
`SessionState::create_physical_expr()`
* Apply suggestions from code review
Co-authored-by: Weston Pace <[email protected]>
* Add note on simplification
---------
Co-authored-by: Weston Pace <[email protected]>
---
datafusion-examples/examples/expr_api.rs | 34 ++---
datafusion/common/src/dfschema.rs | 29 +++++
datafusion/core/src/execution/context/mod.rs | 113 ++++++++++++++++-
datafusion/core/tests/core_integration.rs | 3 +
datafusion/core/tests/expr_api/mod.rs | 181 +++++++++++++++++++++++++++
datafusion/optimizer/src/analyzer/mod.rs | 5 +
6 files changed, 337 insertions(+), 28 deletions(-)
diff --git a/datafusion-examples/examples/expr_api.rs
b/datafusion-examples/examples/expr_api.rs
index 2c1470a1d6..0082ed6eb9 100644
--- a/datafusion-examples/examples/expr_api.rs
+++ b/datafusion-examples/examples/expr_api.rs
@@ -25,9 +25,7 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema,
TimeUnit};
use datafusion::common::DFSchema;
use datafusion::error::Result;
use datafusion::optimizer::simplify_expressions::ExprSimplifier;
-use datafusion::physical_expr::{
- analyze, create_physical_expr, AnalysisContext, ExprBoundaries,
PhysicalExpr,
-};
+use datafusion::physical_expr::{analyze, AnalysisContext, ExprBoundaries};
use datafusion::prelude::*;
use datafusion_common::{ScalarValue, ToDFSchema};
use datafusion_expr::execution_props::ExecutionProps;
@@ -92,7 +90,8 @@ fn evaluate_demo() -> Result<()> {
let expr = col("a").lt(lit(5)).or(col("a").eq(lit(8)));
// First, you make a "physical expression" from the logical `Expr`
- let physical_expr = physical_expr(&batch.schema(), expr)?;
+ let df_schema = DFSchema::try_from(batch.schema())?;
+ let physical_expr = SessionContext::new().create_physical_expr(expr,
&df_schema)?;
// Now, you can evaluate the expression against the RecordBatch
let result = physical_expr.evaluate(&batch)?;
@@ -213,7 +212,7 @@ fn range_analysis_demo() -> Result<()> {
// `date < '2020-10-01' AND date > '2020-09-01'`
// As always, we need to tell DataFusion the type of column "date"
- let schema = Schema::new(vec![make_field("date", DataType::Date32)]);
+ let schema = Arc::new(Schema::new(vec![make_field("date",
DataType::Date32)]));
// You can provide DataFusion any known boundaries on the values of `date`
// (for example, maybe you know you only have data up to `2020-09-15`), but
@@ -222,9 +221,13 @@ fn range_analysis_demo() -> Result<()> {
let boundaries = ExprBoundaries::try_new_unbounded(&schema)?;
// Now, we invoke the analysis code to perform the range analysis
- let physical_expr = physical_expr(&schema, expr)?;
- let analysis_result =
- analyze(&physical_expr, AnalysisContext::new(boundaries), &schema)?;
+ let df_schema = DFSchema::try_from(schema)?;
+ let physical_expr = SessionContext::new().create_physical_expr(expr,
&df_schema)?;
+ let analysis_result = analyze(
+ &physical_expr,
+ AnalysisContext::new(boundaries),
+ df_schema.as_ref(),
+ )?;
// The results of the analysis is an range, encoded as an `Interval`, for
// each column in the schema, that must be true in order for the predicate
@@ -248,21 +251,6 @@ fn make_ts_field(name: &str) -> Field {
make_field(name, DataType::Timestamp(TimeUnit::Nanosecond, tz))
}
-/// Build a physical expression from a logical one, after applying
simplification and type coercion
-pub fn physical_expr(schema: &Schema, expr: Expr) -> Result<Arc<dyn
PhysicalExpr>> {
- let df_schema = schema.clone().to_dfschema_ref()?;
-
- // Simplify
- let props = ExecutionProps::new();
- let simplifier =
-
ExprSimplifier::new(SimplifyContext::new(&props).with_schema(df_schema.clone()));
-
- // apply type coercion here to ensure types match
- let expr = simplifier.coerce(expr, &df_schema)?;
-
- create_physical_expr(&expr, df_schema.as_ref(), &props)
-}
-
/// This function shows how to use `Expr::get_type` to retrieve the DataType
/// of an expression
fn expression_type_demo() -> Result<()> {
diff --git a/datafusion/common/src/dfschema.rs
b/datafusion/common/src/dfschema.rs
index b2a3de7235..3686af90db 100644
--- a/datafusion/common/src/dfschema.rs
+++ b/datafusion/common/src/dfschema.rs
@@ -125,6 +125,20 @@ impl DFSchema {
}
}
+ /// Return a reference to the inner Arrow [`Schema`]
+ ///
+ /// Note this does not have the qualifier information
+ pub fn as_arrow(&self) -> &Schema {
+ self.inner.as_ref()
+ }
+
+ /// Return a reference to the inner Arrow [`SchemaRef`]
+ ///
+ /// Note this does not have the qualifier information
+ pub fn inner(&self) -> &SchemaRef {
+ &self.inner
+ }
+
/// Create a `DFSchema` from an Arrow schema where all the fields have a
given qualifier
pub fn new_with_metadata(
qualified_fields: Vec<(Option<TableReference>, Arc<Field>)>,
@@ -806,6 +820,21 @@ impl From<&DFSchema> for Schema {
}
}
+/// Allow DFSchema to be converted into an Arrow `&Schema`
+impl AsRef<Schema> for DFSchema {
+ fn as_ref(&self) -> &Schema {
+ self.as_arrow()
+ }
+}
+
+/// Allow DFSchema to be converted into an Arrow `&SchemaRef` (to clone, for
+/// example)
+impl AsRef<SchemaRef> for DFSchema {
+ fn as_ref(&self) -> &SchemaRef {
+ self.inner()
+ }
+}
+
/// Create a `DFSchema` from an Arrow schema
impl TryFrom<Schema> for DFSchema {
type Error = DataFusionError;
diff --git a/datafusion/core/src/execution/context/mod.rs
b/datafusion/core/src/execution/context/mod.rs
index d84983f08e..f3af31f895 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -71,13 +71,13 @@ use datafusion_common::{
config::{ConfigExtension, TableOptions},
exec_err, not_impl_err, plan_datafusion_err, plan_err,
tree_node::{TreeNodeRecursion, TreeNodeVisitor},
- SchemaReference, TableReference,
+ DFSchema, SchemaReference, TableReference,
};
use datafusion_execution::registry::SerializerRegistry;
use datafusion_expr::{
logical_plan::{DdlStatement, Statement},
var_provider::is_system_variables,
- Expr, StringifiedPlan, UserDefinedLogicalNode, WindowUDF,
+ Expr, ExprSchemable, StringifiedPlan, UserDefinedLogicalNode, WindowUDF,
};
use datafusion_sql::{
parser::{CopyToSource, CopyToStatement, DFParser},
@@ -87,15 +87,20 @@ use datafusion_sql::{
use async_trait::async_trait;
use chrono::{DateTime, Utc};
+use datafusion_common::tree_node::TreeNode;
use parking_lot::RwLock;
use sqlparser::dialect::dialect_from_str;
use url::Url;
use uuid::Uuid;
+use crate::physical_expr::PhysicalExpr;
pub use datafusion_execution::config::SessionConfig;
pub use datafusion_execution::TaskContext;
pub use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::expr_rewriter::FunctionRewrite;
+use datafusion_expr::simplify::SimplifyInfo;
+use datafusion_optimizer::simplify_expressions::ExprSimplifier;
+use datafusion_physical_expr::create_physical_expr;
mod avro;
mod csv;
@@ -523,6 +528,41 @@ impl SessionContext {
}
}
+ /// Create a [`PhysicalExpr`] from an [`Expr`] after applying type
+ /// coercion and function rewrites.
+ ///
+ /// Note: The expression is not [simplified] or otherwise optimized: `a =
1
+ /// + 2` will not be simplified to `a = 3` as this is a more involved
process.
+ /// See the [expr_api] example for how to simplify expressions.
+ ///
+ /// # Example
+ /// ```
+ /// # use std::sync::Arc;
+ /// # use arrow::datatypes::{DataType, Field, Schema};
+ /// # use datafusion::prelude::*;
+ /// # use datafusion_common::DFSchema;
+ /// // a = 1 (i64)
+ /// let expr = col("a").eq(lit(1i64));
+ /// // provide type information that `a` is an Int32
+ /// let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
+ /// let df_schema = DFSchema::try_from(schema).unwrap();
+ /// // Create a PhysicalExpr. Note DataFusion automatically coerces
(casts) `1i64` to `1i32`
+ /// let physical_expr = SessionContext::new()
+ /// .create_physical_expr(expr, &df_schema).unwrap();
+ /// ```
+ /// # See Also
+ /// * [`SessionState::create_physical_expr`] for a lower level API
+ ///
+ /// [simplified]: datafusion_optimizer::simplify_expressions
+ /// [expr_api]:
https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/expr_api.rs
+ pub fn create_physical_expr(
+ &self,
+ expr: Expr,
+ df_schema: &DFSchema,
+ ) -> Result<Arc<dyn PhysicalExpr>> {
+ self.state.read().create_physical_expr(expr, df_schema)
+ }
+
// return an empty dataframe
fn return_empty_dataframe(&self) -> Result<DataFrame> {
let plan = LogicalPlanBuilder::empty(false).build()?;
@@ -1946,13 +1986,14 @@ impl SessionState {
}
}
- /// Creates a physical plan from a logical plan.
+ /// Creates a physical [`ExecutionPlan`] plan from a [`LogicalPlan`].
///
/// Note: this first calls [`Self::optimize`] on the provided
/// plan.
///
- /// This function will error for [`LogicalPlan`]s such as catalog
- /// DDL `CREATE TABLE` must be handled by another layer.
+ /// This function will error for [`LogicalPlan`]s such as catalog DDL like
+ /// `CREATE TABLE`, which do not have corresponding physical plans and must
+ /// be handled by another layer, typically [`SessionContext`].
pub async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
@@ -1963,6 +2004,39 @@ impl SessionState {
.await
}
+ /// Create a [`PhysicalExpr`] from an [`Expr`] after applying type
+ /// coercion, and function rewrites.
+ ///
+ /// Note: The expression is not [simplified] or otherwise optimized: `a =
1
+ /// + 2` will not be simplified to `a = 3` as this is a more involved
process.
+ /// See the [expr_api] example for how to simplify expressions.
+ ///
+ /// # See Also:
+ /// * [`SessionContext::create_physical_expr`] for a higher-level API
+ /// * [`create_physical_expr`] for a lower-level API
+ ///
+ /// [simplified]: datafusion_optimizer::simplify_expressions
+ /// [expr_api]:
https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/expr_api.rs
+ pub fn create_physical_expr(
+ &self,
+ expr: Expr,
+ df_schema: &DFSchema,
+ ) -> Result<Arc<dyn PhysicalExpr>> {
+ let simplifier =
+ ExprSimplifier::new(SessionSimplifyProvider::new(self, df_schema));
+ // apply type coercion here to ensure types match
+ let mut expr = simplifier.coerce(expr, df_schema)?;
+
+ // rewrite Exprs to functions if necessary
+ let config_options = self.config_options();
+ for rewrite in self.analyzer.function_rewrites() {
+ expr = expr
+ .transform_up(|expr| rewrite.rewrite(expr, df_schema,
config_options))?
+ .data;
+ }
+ create_physical_expr(&expr, df_schema, self.execution_props())
+ }
+
/// Return the session ID
pub fn session_id(&self) -> &str {
&self.session_id
@@ -2040,6 +2114,35 @@ impl SessionState {
}
}
+struct SessionSimplifyProvider<'a> {
+ state: &'a SessionState,
+ df_schema: &'a DFSchema,
+}
+
+impl<'a> SessionSimplifyProvider<'a> {
+ fn new(state: &'a SessionState, df_schema: &'a DFSchema) -> Self {
+ Self { state, df_schema }
+ }
+}
+
+impl<'a> SimplifyInfo for SessionSimplifyProvider<'a> {
+ fn is_boolean_type(&self, expr: &Expr) -> Result<bool> {
+ Ok(expr.get_type(self.df_schema)? == DataType::Boolean)
+ }
+
+ fn nullable(&self, expr: &Expr) -> Result<bool> {
+ expr.nullable(self.df_schema)
+ }
+
+ fn execution_props(&self) -> &ExecutionProps {
+ self.state.execution_props()
+ }
+
+ fn get_data_type(&self, expr: &Expr) -> Result<DataType> {
+ expr.get_type(self.df_schema)
+ }
+}
+
struct SessionContextProvider<'a> {
state: &'a SessionState,
tables: HashMap<String, Arc<dyn TableSource>>,
diff --git a/datafusion/core/tests/core_integration.rs
b/datafusion/core/tests/core_integration.rs
index befefb1d7e..f8ad8f1554 100644
--- a/datafusion/core/tests/core_integration.rs
+++ b/datafusion/core/tests/core_integration.rs
@@ -24,6 +24,9 @@ mod dataframe;
/// Run all tests that are found in the `macro_hygiene` directory
mod macro_hygiene;
+/// Run all tests that are found in the `expr_api` directory
+mod expr_api;
+
#[cfg(test)]
#[ctor::ctor]
fn init() {
diff --git a/datafusion/core/tests/expr_api/mod.rs
b/datafusion/core/tests/expr_api/mod.rs
new file mode 100644
index 0000000000..0dde7604cc
--- /dev/null
+++ b/datafusion/core/tests/expr_api/mod.rs
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::util::pretty::pretty_format_columns;
+use arrow_array::builder::{ListBuilder, StringBuilder};
+use arrow_array::{ArrayRef, RecordBatch, StringArray, StructArray};
+use arrow_schema::{DataType, Field};
+use datafusion::prelude::*;
+use datafusion_common::DFSchema;
+/// Tests of using and evaluating `Expr`s outside the context of a LogicalPlan
+use std::sync::{Arc, OnceLock};
+
+#[test]
+fn test_eq() {
+ // id = '2'
+ evaluate_expr_test(
+ col("id").eq(lit("2")),
+ vec![
+ "+-------+",
+ "| expr |",
+ "+-------+",
+ "| false |",
+ "| true |",
+ "| false |",
+ "+-------+",
+ ],
+ );
+}
+
+#[test]
+fn test_eq_with_coercion() {
+ // id = 2 (need to coerce the 2 to '2' to evaluate)
+ evaluate_expr_test(
+ col("id").eq(lit(2i32)),
+ vec![
+ "+-------+",
+ "| expr |",
+ "+-------+",
+ "| false |",
+ "| true |",
+ "| false |",
+ "+-------+",
+ ],
+ );
+}
+
+#[test]
+fn test_get_field() {
+ // field access Expr::field() requires a rewrite to work
+ evaluate_expr_test(
+ col("props").field("a"),
+ vec![
+ "+------------+",
+ "| expr |",
+ "+------------+",
+ "| 2021-02-01 |",
+ "| 2021-02-02 |",
+ "| 2021-02-03 |",
+ "+------------+",
+ ],
+ );
+}
+
+#[test]
+fn test_nested_get_field() {
+ // field access Expr::field() requires a rewrite to work, test when it is
+ // not the root expression
+ evaluate_expr_test(
+ col("props")
+ .field("a")
+ .eq(lit("2021-02-02"))
+ .or(col("id").eq(lit(1))),
+ vec![
+ "+-------+",
+ "| expr |",
+ "+-------+",
+ "| true |",
+ "| true |",
+ "| false |",
+ "+-------+",
+ ],
+ );
+}
+
+#[test]
+fn test_list() {
+ // list access also requires a rewrite to work
+ evaluate_expr_test(
+ col("list").index(lit(1i64)),
+ vec![
+ "+------+", "| expr |", "+------+", "| one |", "| two |", "|
five |",
+ "+------+",
+ ],
+ );
+}
+
+#[test]
+fn test_list_range() {
+ // range access also requires a rewrite to work
+ evaluate_expr_test(
+ col("list").range(lit(1i64), lit(2i64)),
+ vec![
+ "+--------------+",
+ "| expr |",
+ "+--------------+",
+ "| [one] |",
+ "| [two, three] |",
+ "| [five] |",
+ "+--------------+",
+ ],
+ );
+}
+
+/// Converts the `Expr` to a `PhysicalExpr`, evaluates it against the provided
+/// `RecordBatch` and compares the result to the expected result.
+fn evaluate_expr_test(expr: Expr, expected_lines: Vec<&str>) {
+ let batch = test_batch();
+ let df_schema = DFSchema::try_from(batch.schema()).unwrap();
+ let physical_expr = SessionContext::new()
+ .create_physical_expr(expr, &df_schema)
+ .unwrap();
+
+ let result = physical_expr.evaluate(&batch).unwrap();
+ let array = result.into_array(1).unwrap();
+ let result = pretty_format_columns("expr", &[array]).unwrap().to_string();
+ let actual_lines = result.lines().collect::<Vec<_>>();
+
+ assert_eq!(
+ expected_lines, actual_lines,
+ "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+ expected_lines, actual_lines
+ );
+}
+
+static TEST_BATCH: OnceLock<RecordBatch> = OnceLock::new();
+
+fn test_batch() -> RecordBatch {
+ TEST_BATCH
+ .get_or_init(|| {
+ let string_array: ArrayRef = Arc::new(StringArray::from(vec!["1",
"2", "3"]));
+
+ // { a: "2021-02-01" } { a: "2021-02-02" } { a: "2021-02-03" }
+ let struct_array: ArrayRef = Arc::from(StructArray::from(vec![(
+ Arc::new(Field::new("a", DataType::Utf8, false)),
+ Arc::new(StringArray::from(vec![
+ "2021-02-01",
+ "2021-02-02",
+ "2021-02-03",
+ ])) as _,
+ )]));
+
+ // ["one"] ["two", "three", "four"] ["five"]
+ let mut builder = ListBuilder::new(StringBuilder::new());
+ builder.append_value([Some("one")]);
+ builder.append_value([Some("two"), Some("three"), Some("four")]);
+ builder.append_value([Some("five")]);
+ let list_array: ArrayRef = Arc::new(builder.finish());
+
+ RecordBatch::try_from_iter(vec![
+ ("id", string_array),
+ ("props", struct_array),
+ ("list", list_array),
+ ])
+ .unwrap()
+ })
+ .clone()
+}
diff --git a/datafusion/optimizer/src/analyzer/mod.rs
b/datafusion/optimizer/src/analyzer/mod.rs
index 1553f95266..121e46cc95 100644
--- a/datafusion/optimizer/src/analyzer/mod.rs
+++ b/datafusion/optimizer/src/analyzer/mod.rs
@@ -111,6 +111,11 @@ impl Analyzer {
self.function_rewrites.push(rewrite);
}
+ /// return the list of function rewrites in this analyzer
+ pub fn function_rewrites(&self) -> &[Arc<dyn FunctionRewrite + Send +
Sync>] {
+ &self.function_rewrites
+ }
+
/// Analyze the logical plan by applying analyzer rules, and
/// do necessary check and fail the invalid plans
pub fn execute_and_check<F>(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]