This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 03cfcb2 Update API for extension planning to include logical plan
(#643)
03cfcb2 is described below
commit 03cfcb26ad6a566dc6fabe6f93e4f3b3d416432d
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Jul 1 13:29:00 2021 -0400
Update API for extension planning to include logical plan (#643)
* Update API for extension planning to include logical plan
* Review comments
---
datafusion/src/execution/context.rs | 10 ++++
datafusion/src/physical_plan/mod.rs | 14 +-----
datafusion/src/physical_plan/planner.rs | 85 +++++++++++++++++++++++++++++----
datafusion/tests/user_defined_plan.rs | 9 ++--
4 files changed, 93 insertions(+), 25 deletions(-)
diff --git a/datafusion/src/execution/context.rs
b/datafusion/src/execution/context.rs
index 436bce5..d5a8486 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -3376,6 +3376,16 @@ mod tests {
"query not supported".to_string(),
))
}
+
+ fn create_physical_expr(
+ &self,
+ _expr: &Expr,
+ _input_dfschema: &crate::logical_plan::DFSchema,
+ _input_schema: &Schema,
+ _ctx_state: &ExecutionContextState,
+ ) -> Result<Arc<dyn crate::physical_plan::PhysicalExpr>> {
+ unimplemented!()
+ }
}
struct MyQueryPlanner {}
diff --git a/datafusion/src/physical_plan/mod.rs
b/datafusion/src/physical_plan/mod.rs
index 2122751..307fff6 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -20,8 +20,6 @@
use self::{
coalesce_partitions::CoalescePartitionsExec,
display::DisplayableExecutionPlan,
};
-use crate::execution::context::ExecutionContextState;
-use crate::logical_plan::LogicalPlan;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::{
error::{DataFusionError, Result},
@@ -122,16 +120,8 @@ impl SQLMetric {
}
}
-/// Physical query planner that converts a `LogicalPlan` to an
-/// `ExecutionPlan` suitable for execution.
-pub trait PhysicalPlanner {
- /// Create a physical plan from a logical plan
- fn create_physical_plan(
- &self,
- logical_plan: &LogicalPlan,
- ctx_state: &ExecutionContextState,
- ) -> Result<Arc<dyn ExecutionPlan>>;
-}
+/// Physical planner interface
+pub use self::planner::PhysicalPlanner;
/// `ExecutionPlan` represent nodes in the DataFusion Physical Plan.
///
diff --git a/datafusion/src/physical_plan/planner.rs
b/datafusion/src/physical_plan/planner.rs
index 75f1565..5b43ec1 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -39,9 +39,7 @@ use crate::physical_plan::sort::SortExec;
use crate::physical_plan::udf;
use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::{hash_utils, Partitioning};
-use crate::physical_plan::{
- AggregateExpr, ExecutionPlan, PhysicalExpr, PhysicalPlanner, WindowExpr,
-};
+use crate::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr,
WindowExpr};
use crate::prelude::JoinType;
use crate::scalar::ScalarValue;
use crate::sql::utils::{generate_sort_key, window_expr_common_partition_keys};
@@ -172,16 +170,51 @@ fn physical_name(e: &Expr, input_schema: &DFSchema) ->
Result<String> {
}
}
+/// Physical query planner that converts a `LogicalPlan` to an
+/// `ExecutionPlan` suitable for execution.
+pub trait PhysicalPlanner {
+ /// Create a physical plan from a logical plan
+ fn create_physical_plan(
+ &self,
+ logical_plan: &LogicalPlan,
+ ctx_state: &ExecutionContextState,
+ ) -> Result<Arc<dyn ExecutionPlan>>;
+
+ /// Create a physical expression from a logical expression
+ /// suitable for evaluation
+ ///
+ /// `expr`: the expression to convert
+ ///
+ /// `input_dfschema`: the logical plan schema for evaluating `e`
+ ///
+ /// `input_schema`: the physical schema for evaluating `e`
+ fn create_physical_expr(
+ &self,
+ expr: &Expr,
+ input_dfschema: &DFSchema,
+ input_schema: &Schema,
+ ctx_state: &ExecutionContextState,
+ ) -> Result<Arc<dyn PhysicalExpr>>;
+}
+
/// This trait exposes the ability to plan an [`ExecutionPlan`] out of a
[`LogicalPlan`].
pub trait ExtensionPlanner {
/// Create a physical plan for a [`UserDefinedLogicalNode`].
- /// This errors when the planner knows how to plan the concrete
implementation of `node`
- /// but errors while doing so, and `None` when the planner does not know
how to plan the `node`
- /// and wants to delegate the planning to another [`ExtensionPlanner`].
+ ///
+ /// `input_dfschema`: the logical plan schema for the inputs to this node
+ ///
+ /// Returns an error when the planner knows how to plan the concrete
+ /// implementation of `node` but errors while doing so.
+ ///
+ /// Returns `None` when the planner does not know how to plan the
+ /// `node` and wants to delegate the planning to another
+ /// [`ExtensionPlanner`].
fn plan_extension(
&self,
+ planner: &dyn PhysicalPlanner,
node: &dyn UserDefinedLogicalNode,
- inputs: &[Arc<dyn ExecutionPlan>],
+ logical_inputs: &[&LogicalPlan],
+ physical_inputs: &[Arc<dyn ExecutionPlan>],
ctx_state: &ExecutionContextState,
) -> Result<Option<Arc<dyn ExecutionPlan>>>;
}
@@ -210,6 +243,30 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
let plan = self.create_initial_plan(logical_plan, ctx_state)?;
self.optimize_plan(plan, ctx_state)
}
+
+ /// Create a physical expression from a logical expression
+ /// suitable for evaluation
+ ///
+ /// `e`: the expression to convert
+ ///
+ /// `input_dfschema`: the logical plan schema for evaluating `e`
+ ///
+ /// `input_schema`: the physical schema for evaluating `e`
+ fn create_physical_expr(
+ &self,
+ expr: &Expr,
+ input_dfschema: &DFSchema,
+ input_schema: &Schema,
+ ctx_state: &ExecutionContextState,
+ ) -> Result<Arc<dyn PhysicalExpr>> {
+ DefaultPhysicalPlanner::create_physical_expr(
+ self,
+ expr,
+ input_dfschema,
+ input_schema,
+ ctx_state,
+ )
+ }
}
impl DefaultPhysicalPlanner {
@@ -721,7 +778,7 @@ impl DefaultPhysicalPlanner {
)))
}
LogicalPlan::Extension { node } => {
- let inputs = node
+ let physical_inputs = node
.inputs()
.into_iter()
.map(|input_plan| self.create_initial_plan(input_plan,
ctx_state))
@@ -733,7 +790,13 @@ impl DefaultPhysicalPlanner {
if let Some(plan) = maybe_plan {
Ok(Some(plan))
} else {
- planner.plan_extension(node.as_ref(), &inputs,
ctx_state)
+ planner.plan_extension(
+ self,
+ node.as_ref(),
+ &node.inputs(),
+ &physical_inputs,
+ ctx_state,
+ )
}
},
)?;
@@ -1644,8 +1707,10 @@ mod tests {
/// Create a physical plan for an extension node
fn plan_extension(
&self,
+ _planner: &dyn PhysicalPlanner,
_node: &dyn UserDefinedLogicalNode,
- _inputs: &[Arc<dyn ExecutionPlan>],
+ _logical_inputs: &[&LogicalPlan],
+ _physical_inputs: &[Arc<dyn ExecutionPlan>],
_ctx_state: &ExecutionContextState,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
Ok(Some(Arc::new(NoOpExecutionPlan {
diff --git a/datafusion/tests/user_defined_plan.rs
b/datafusion/tests/user_defined_plan.rs
index 22ebec8..21b4963 100644
--- a/datafusion/tests/user_defined_plan.rs
+++ b/datafusion/tests/user_defined_plan.rs
@@ -321,16 +321,19 @@ impl ExtensionPlanner for TopKPlanner {
/// Create a physical plan for an extension node
fn plan_extension(
&self,
+ _planner: &dyn PhysicalPlanner,
node: &dyn UserDefinedLogicalNode,
- inputs: &[Arc<dyn ExecutionPlan>],
+ logical_inputs: &[&LogicalPlan],
+ physical_inputs: &[Arc<dyn ExecutionPlan>],
_ctx_state: &ExecutionContextState,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
Ok(
if let Some(topk_node) =
node.as_any().downcast_ref::<TopKPlanNode>() {
- assert_eq!(inputs.len(), 1, "Inconsistent number of inputs");
+ assert_eq!(logical_inputs.len(), 1, "Inconsistent number of
inputs");
+ assert_eq!(physical_inputs.len(), 1, "Inconsistent number of
inputs");
// figure out input name
Some(Arc::new(TopKExec {
- input: inputs[0].clone(),
+ input: physical_inputs[0].clone(),
k: topk_node.k,
}))
} else {