This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f4aaabc7 feat: async extension planner (#2713)
3f4aaabc7 is described below

commit 3f4aaabc74796fe8c538981c259d79da789b96ae
Author: Ruihang Xia <[email protected]>
AuthorDate: Mon Jun 20 01:18:35 2022 +0800

    feat: async extension planner (#2713)
    
    * feat: async extension planner
    
    Signed-off-by: Ruihang Xia <[email protected]>
    
    * run cargo fmt
    
    Signed-off-by: Ruihang Xia <[email protected]>
    
    * replace Arc with ref in plan_extension
    
    Signed-off-by: Ruihang Xia <[email protected]>
    
    * add Send & Sync bound to PhysicalPlanner
    
    Signed-off-by: Ruihang Xia <[email protected]>
    
    * add Send & Sync bound to UserDefinedLogicalNode
    
    Signed-off-by: Ruihang Xia <[email protected]>
    
    * remote useless parentheses
    
    Signed-off-by: Ruihang Xia <[email protected]>
    
    * format code
    
    Signed-off-by: Ruihang Xia <[email protected]>
---
 datafusion/core/src/physical_plan/planner.rs  | 47 +++++++++++++++------------
 datafusion/core/tests/user_defined_plan.rs    |  5 +--
 datafusion/expr/src/logical_plan/extension.rs |  4 +--
 datafusion/expr/src/logical_plan/plan.rs      |  2 +-
 datafusion/optimizer/src/test/user_defined.rs |  2 +-
 datafusion/proto/src/lib.rs                   |  2 +-
 6 files changed, 35 insertions(+), 27 deletions(-)

diff --git a/datafusion/core/src/physical_plan/planner.rs 
b/datafusion/core/src/physical_plan/planner.rs
index fa112c82d..527638668 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -259,7 +259,7 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> 
Result<String> {
 /// Physical query planner that converts a `LogicalPlan` to an
 /// `ExecutionPlan` suitable for execution.
 #[async_trait]
-pub trait PhysicalPlanner {
+pub trait PhysicalPlanner: Send + Sync {
     /// Create a physical plan from a logical plan
     async fn create_physical_plan(
         &self,
@@ -285,6 +285,7 @@ pub trait PhysicalPlanner {
 }
 
 /// This trait exposes the ability to plan an [`ExecutionPlan`] out of a 
[`LogicalPlan`].
+#[async_trait]
 pub trait ExtensionPlanner {
     /// Create a physical plan for a [`UserDefinedLogicalNode`].
     ///
@@ -296,7 +297,7 @@ pub trait ExtensionPlanner {
     /// Returns `None` when the planner does not know how to plan the
     /// `node` and wants to delegate the planning to another
     /// [`ExtensionPlanner`].
-    fn plan_extension(
+    async fn plan_extension(
         &self,
         planner: &dyn PhysicalPlanner,
         node: &dyn UserDefinedLogicalNode,
@@ -964,22 +965,27 @@ impl DefaultPhysicalPlanner {
                         .try_collect::<Vec<_>>()
                         .await?;
 
-                    let maybe_plan = self.extension_planners.iter().try_fold(
-                        None,
-                        |maybe_plan, planner| {
-                            if let Some(plan) = maybe_plan {
-                                Ok(Some(plan))
-                            } else {
-                                planner.plan_extension(
-                                    self,
-                                    e.node.as_ref(),
-                                    &e.node.inputs(),
-                                    &physical_inputs,
-                                    session_state,
-                                )
-                            }
-                        },
-                    )?;
+                    let mut maybe_plan = None;
+                    for planner in &self.extension_planners {
+                        if maybe_plan.is_some() {
+                            break;
+                        }
+
+                        let logical_input = e.node.inputs();
+                        let plan = planner.plan_extension(
+                            self,
+                            e.node.as_ref(),
+                            &logical_input,
+                            &physical_inputs,
+                            session_state,
+                        );
+                        let plan = plan.await;
+                        if plan.is_err() {
+                            continue;
+                        }
+                        maybe_plan = plan.unwrap();
+                    }
+
                     let plan = maybe_plan.ok_or_else(|| 
DataFusionError::Plan(format!(
                         "No installed planner was able to convert the custom 
node to an execution plan: {:?}", e.node
                     )))?;
@@ -2080,7 +2086,7 @@ mod tests {
             &self,
             _exprs: &[Expr],
             _inputs: &[LogicalPlan],
-        ) -> Arc<dyn UserDefinedLogicalNode + Send + Sync> {
+        ) -> Arc<dyn UserDefinedLogicalNode> {
             unimplemented!("NoOp");
         }
     }
@@ -2152,9 +2158,10 @@ mod tests {
     //  the logical plan node.
     struct BadExtensionPlanner {}
 
+    #[async_trait]
     impl ExtensionPlanner for BadExtensionPlanner {
         /// Create a physical plan for an extension node
-        fn plan_extension(
+        async fn plan_extension(
             &self,
             _planner: &dyn PhysicalPlanner,
             _node: &dyn UserDefinedLogicalNode,
diff --git a/datafusion/core/tests/user_defined_plan.rs 
b/datafusion/core/tests/user_defined_plan.rs
index 33cfd1f56..201beaa78 100644
--- a/datafusion/core/tests/user_defined_plan.rs
+++ b/datafusion/core/tests/user_defined_plan.rs
@@ -367,7 +367,7 @@ impl UserDefinedLogicalNode for TopKPlanNode {
         &self,
         exprs: &[Expr],
         inputs: &[LogicalPlan],
-    ) -> Arc<dyn UserDefinedLogicalNode + Send + Sync> {
+    ) -> Arc<dyn UserDefinedLogicalNode> {
         assert_eq!(inputs.len(), 1, "input size inconsistent");
         assert_eq!(exprs.len(), 1, "expression size inconsistent");
         Arc::new(TopKPlanNode {
@@ -381,9 +381,10 @@ impl UserDefinedLogicalNode for TopKPlanNode {
 /// Physical planner for TopK nodes
 struct TopKPlanner {}
 
+#[async_trait]
 impl ExtensionPlanner for TopKPlanner {
     /// Create a physical plan for an extension node
-    fn plan_extension(
+    async fn plan_extension(
         &self,
         _planner: &dyn PhysicalPlanner,
         node: &dyn UserDefinedLogicalNode,
diff --git a/datafusion/expr/src/logical_plan/extension.rs 
b/datafusion/expr/src/logical_plan/extension.rs
index 73ca8b9f2..fd3274144 100644
--- a/datafusion/expr/src/logical_plan/extension.rs
+++ b/datafusion/expr/src/logical_plan/extension.rs
@@ -26,7 +26,7 @@ use std::{any::Any, collections::HashSet, fmt, sync::Arc};
 /// See the example in
 /// [user_defined_plan.rs](../../tests/user_defined_plan.rs) for an
 /// example of how to use this extension API
-pub trait UserDefinedLogicalNode: fmt::Debug {
+pub trait UserDefinedLogicalNode: fmt::Debug + Send + Sync {
     /// Return a reference to self as Any, to support dynamic downcasting
     fn as_any(&self) -> &dyn Any;
 
@@ -76,5 +76,5 @@ pub trait UserDefinedLogicalNode: fmt::Debug {
         &self,
         exprs: &[Expr],
         inputs: &[LogicalPlan],
-    ) -> Arc<dyn UserDefinedLogicalNode + Send + Sync>;
+    ) -> Arc<dyn UserDefinedLogicalNode>;
 }
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index 5b21a7ef3..10b9b25a0 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -1161,7 +1161,7 @@ pub struct Analyze {
 #[derive(Clone)]
 pub struct Extension {
     /// The runtime extension operator
-    pub node: Arc<dyn UserDefinedLogicalNode + Send + Sync>,
+    pub node: Arc<dyn UserDefinedLogicalNode>,
 }
 
 /// Produces the first `n` tuples from its input and discards the rest.
diff --git a/datafusion/optimizer/src/test/user_defined.rs 
b/datafusion/optimizer/src/test/user_defined.rs
index c9993568c..92b56ee75 100644
--- a/datafusion/optimizer/src/test/user_defined.rs
+++ b/datafusion/optimizer/src/test/user_defined.rs
@@ -69,7 +69,7 @@ impl UserDefinedLogicalNode for TestUserDefinedPlanNode {
         &self,
         exprs: &[Expr],
         inputs: &[LogicalPlan],
-    ) -> Arc<dyn UserDefinedLogicalNode + Send + Sync> {
+    ) -> Arc<dyn UserDefinedLogicalNode> {
         assert_eq!(inputs.len(), 1, "input size inconsistent");
         assert_eq!(exprs.len(), 0, "expression size inconsistent");
         Arc::new(TestUserDefinedPlanNode {
diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs
index f08a00b49..0aa00bc75 100644
--- a/datafusion/proto/src/lib.rs
+++ b/datafusion/proto/src/lib.rs
@@ -191,7 +191,7 @@ mod roundtrip_tests {
             &self,
             exprs: &[Expr],
             inputs: &[LogicalPlan],
-        ) -> Arc<dyn UserDefinedLogicalNode + Send + Sync> {
+        ) -> Arc<dyn UserDefinedLogicalNode> {
             assert_eq!(inputs.len(), 1, "input size inconsistent");
             assert_eq!(exprs.len(), 1, "expression size inconsistent");
             Arc::new(TopKPlanNode {

Reply via email to