This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 03cfcb2  Update API for extension planning to include logical plan 
(#643)
03cfcb2 is described below

commit 03cfcb26ad6a566dc6fabe6f93e4f3b3d416432d
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Jul 1 13:29:00 2021 -0400

    Update API for extension planning to include logical plan (#643)
    
    * Update API for extension planning to include logical plan
    
    * Review comments
---
 datafusion/src/execution/context.rs     | 10 ++++
 datafusion/src/physical_plan/mod.rs     | 14 +-----
 datafusion/src/physical_plan/planner.rs | 85 +++++++++++++++++++++++++++++----
 datafusion/tests/user_defined_plan.rs   |  9 ++--
 4 files changed, 93 insertions(+), 25 deletions(-)

diff --git a/datafusion/src/execution/context.rs 
b/datafusion/src/execution/context.rs
index 436bce5..d5a8486 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -3376,6 +3376,16 @@ mod tests {
                 "query not supported".to_string(),
             ))
         }
+
+        fn create_physical_expr(
+            &self,
+            _expr: &Expr,
+            _input_dfschema: &crate::logical_plan::DFSchema,
+            _input_schema: &Schema,
+            _ctx_state: &ExecutionContextState,
+        ) -> Result<Arc<dyn crate::physical_plan::PhysicalExpr>> {
+            unimplemented!()
+        }
     }
 
     struct MyQueryPlanner {}
diff --git a/datafusion/src/physical_plan/mod.rs 
b/datafusion/src/physical_plan/mod.rs
index 2122751..307fff6 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -20,8 +20,6 @@
 use self::{
     coalesce_partitions::CoalescePartitionsExec, 
display::DisplayableExecutionPlan,
 };
-use crate::execution::context::ExecutionContextState;
-use crate::logical_plan::LogicalPlan;
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::{
     error::{DataFusionError, Result},
@@ -122,16 +120,8 @@ impl SQLMetric {
     }
 }
 
-/// Physical query planner that converts a `LogicalPlan` to an
-/// `ExecutionPlan` suitable for execution.
-pub trait PhysicalPlanner {
-    /// Create a physical plan from a logical plan
-    fn create_physical_plan(
-        &self,
-        logical_plan: &LogicalPlan,
-        ctx_state: &ExecutionContextState,
-    ) -> Result<Arc<dyn ExecutionPlan>>;
-}
+/// Physical planner interface
+pub use self::planner::PhysicalPlanner;
 
 /// `ExecutionPlan` represent nodes in the DataFusion Physical Plan.
 ///
diff --git a/datafusion/src/physical_plan/planner.rs 
b/datafusion/src/physical_plan/planner.rs
index 75f1565..5b43ec1 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -39,9 +39,7 @@ use crate::physical_plan::sort::SortExec;
 use crate::physical_plan::udf;
 use crate::physical_plan::windows::WindowAggExec;
 use crate::physical_plan::{hash_utils, Partitioning};
-use crate::physical_plan::{
-    AggregateExpr, ExecutionPlan, PhysicalExpr, PhysicalPlanner, WindowExpr,
-};
+use crate::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, 
WindowExpr};
 use crate::prelude::JoinType;
 use crate::scalar::ScalarValue;
 use crate::sql::utils::{generate_sort_key, window_expr_common_partition_keys};
@@ -172,16 +170,51 @@ fn physical_name(e: &Expr, input_schema: &DFSchema) -> 
Result<String> {
     }
 }
 
+/// Physical query planner that converts a `LogicalPlan` to an
+/// `ExecutionPlan` suitable for execution.
+pub trait PhysicalPlanner {
+    /// Create a physical plan from a logical plan
+    fn create_physical_plan(
+        &self,
+        logical_plan: &LogicalPlan,
+        ctx_state: &ExecutionContextState,
+    ) -> Result<Arc<dyn ExecutionPlan>>;
+
+    /// Create a physical expression from a logical expression
+    /// suitable for evaluation
+    ///
+    /// `expr`: the expression to convert
+    ///
+    /// `input_dfschema`: the logical plan schema for evaluating `e`
+    ///
+    /// `input_schema`: the physical schema for evaluating `e`
+    fn create_physical_expr(
+        &self,
+        expr: &Expr,
+        input_dfschema: &DFSchema,
+        input_schema: &Schema,
+        ctx_state: &ExecutionContextState,
+    ) -> Result<Arc<dyn PhysicalExpr>>;
+}
+
 /// This trait exposes the ability to plan an [`ExecutionPlan`] out of a 
[`LogicalPlan`].
 pub trait ExtensionPlanner {
     /// Create a physical plan for a [`UserDefinedLogicalNode`].
-    /// This errors when the planner knows how to plan the concrete 
implementation of `node`
-    /// but errors while doing so, and `None` when the planner does not know 
how to plan the `node`
-    /// and wants to delegate the planning to another [`ExtensionPlanner`].
+    ///
+    /// `input_dfschema`: the logical plan schema for the inputs to this node
+    ///
+    /// Returns an error when the planner knows how to plan the concrete
+    /// implementation of `node` but errors while doing so.
+    ///
+    /// Returns `None` when the planner does not know how to plan the
+    /// `node` and wants to delegate the planning to another
+    /// [`ExtensionPlanner`].
     fn plan_extension(
         &self,
+        planner: &dyn PhysicalPlanner,
         node: &dyn UserDefinedLogicalNode,
-        inputs: &[Arc<dyn ExecutionPlan>],
+        logical_inputs: &[&LogicalPlan],
+        physical_inputs: &[Arc<dyn ExecutionPlan>],
         ctx_state: &ExecutionContextState,
     ) -> Result<Option<Arc<dyn ExecutionPlan>>>;
 }
@@ -210,6 +243,30 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
         let plan = self.create_initial_plan(logical_plan, ctx_state)?;
         self.optimize_plan(plan, ctx_state)
     }
+
+    /// Create a physical expression from a logical expression
+    /// suitable for evaluation
+    ///
+    /// `e`: the expression to convert
+    ///
+    /// `input_dfschema`: the logical plan schema for evaluating `e`
+    ///
+    /// `input_schema`: the physical schema for evaluating `e`
+    fn create_physical_expr(
+        &self,
+        expr: &Expr,
+        input_dfschema: &DFSchema,
+        input_schema: &Schema,
+        ctx_state: &ExecutionContextState,
+    ) -> Result<Arc<dyn PhysicalExpr>> {
+        DefaultPhysicalPlanner::create_physical_expr(
+            self,
+            expr,
+            input_dfschema,
+            input_schema,
+            ctx_state,
+        )
+    }
 }
 
 impl DefaultPhysicalPlanner {
@@ -721,7 +778,7 @@ impl DefaultPhysicalPlanner {
                 )))
             }
             LogicalPlan::Extension { node } => {
-                let inputs = node
+                let physical_inputs = node
                     .inputs()
                     .into_iter()
                     .map(|input_plan| self.create_initial_plan(input_plan, 
ctx_state))
@@ -733,7 +790,13 @@ impl DefaultPhysicalPlanner {
                         if let Some(plan) = maybe_plan {
                             Ok(Some(plan))
                         } else {
-                            planner.plan_extension(node.as_ref(), &inputs, 
ctx_state)
+                            planner.plan_extension(
+                                self,
+                                node.as_ref(),
+                                &node.inputs(),
+                                &physical_inputs,
+                                ctx_state,
+                            )
                         }
                     },
                 )?;
@@ -1644,8 +1707,10 @@ mod tests {
         /// Create a physical plan for an extension node
         fn plan_extension(
             &self,
+            _planner: &dyn PhysicalPlanner,
             _node: &dyn UserDefinedLogicalNode,
-            _inputs: &[Arc<dyn ExecutionPlan>],
+            _logical_inputs: &[&LogicalPlan],
+            _physical_inputs: &[Arc<dyn ExecutionPlan>],
             _ctx_state: &ExecutionContextState,
         ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
             Ok(Some(Arc::new(NoOpExecutionPlan {
diff --git a/datafusion/tests/user_defined_plan.rs 
b/datafusion/tests/user_defined_plan.rs
index 22ebec8..21b4963 100644
--- a/datafusion/tests/user_defined_plan.rs
+++ b/datafusion/tests/user_defined_plan.rs
@@ -321,16 +321,19 @@ impl ExtensionPlanner for TopKPlanner {
     /// Create a physical plan for an extension node
     fn plan_extension(
         &self,
+        _planner: &dyn PhysicalPlanner,
         node: &dyn UserDefinedLogicalNode,
-        inputs: &[Arc<dyn ExecutionPlan>],
+        logical_inputs: &[&LogicalPlan],
+        physical_inputs: &[Arc<dyn ExecutionPlan>],
         _ctx_state: &ExecutionContextState,
     ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
         Ok(
             if let Some(topk_node) = 
node.as_any().downcast_ref::<TopKPlanNode>() {
-                assert_eq!(inputs.len(), 1, "Inconsistent number of inputs");
+                assert_eq!(logical_inputs.len(), 1, "Inconsistent number of 
inputs");
+                assert_eq!(physical_inputs.len(), 1, "Inconsistent number of 
inputs");
                 // figure out input name
                 Some(Arc::new(TopKExec {
-                    input: inputs[0].clone(),
+                    input: physical_inputs[0].clone(),
                     k: topk_node.k,
                 }))
             } else {

Reply via email to