This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 3f4aaabc7 feat: async extension planner (#2713)
3f4aaabc7 is described below
commit 3f4aaabc74796fe8c538981c259d79da789b96ae
Author: Ruihang Xia <[email protected]>
AuthorDate: Mon Jun 20 01:18:35 2022 +0800
feat: async extension planner (#2713)
* feat: async extension planner
Signed-off-by: Ruihang Xia <[email protected]>
* run cargo fmt
Signed-off-by: Ruihang Xia <[email protected]>
* replace Arc with ref in plan_extension
Signed-off-by: Ruihang Xia <[email protected]>
* add Send & Sync bound to PhysicalPlanner
Signed-off-by: Ruihang Xia <[email protected]>
* add Send & Sync bound to UserDefinedLogicalNode
Signed-off-by: Ruihang Xia <[email protected]>
* remote useless parentheses
Signed-off-by: Ruihang Xia <[email protected]>
* format code
Signed-off-by: Ruihang Xia <[email protected]>
---
datafusion/core/src/physical_plan/planner.rs | 47 +++++++++++++++------------
datafusion/core/tests/user_defined_plan.rs | 5 +--
datafusion/expr/src/logical_plan/extension.rs | 4 +--
datafusion/expr/src/logical_plan/plan.rs | 2 +-
datafusion/optimizer/src/test/user_defined.rs | 2 +-
datafusion/proto/src/lib.rs | 2 +-
6 files changed, 35 insertions(+), 27 deletions(-)
diff --git a/datafusion/core/src/physical_plan/planner.rs
b/datafusion/core/src/physical_plan/planner.rs
index fa112c82d..527638668 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -259,7 +259,7 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) ->
Result<String> {
/// Physical query planner that converts a `LogicalPlan` to an
/// `ExecutionPlan` suitable for execution.
#[async_trait]
-pub trait PhysicalPlanner {
+pub trait PhysicalPlanner: Send + Sync {
/// Create a physical plan from a logical plan
async fn create_physical_plan(
&self,
@@ -285,6 +285,7 @@ pub trait PhysicalPlanner {
}
/// This trait exposes the ability to plan an [`ExecutionPlan`] out of a
[`LogicalPlan`].
+#[async_trait]
pub trait ExtensionPlanner {
/// Create a physical plan for a [`UserDefinedLogicalNode`].
///
@@ -296,7 +297,7 @@ pub trait ExtensionPlanner {
/// Returns `None` when the planner does not know how to plan the
/// `node` and wants to delegate the planning to another
/// [`ExtensionPlanner`].
- fn plan_extension(
+ async fn plan_extension(
&self,
planner: &dyn PhysicalPlanner,
node: &dyn UserDefinedLogicalNode,
@@ -964,22 +965,27 @@ impl DefaultPhysicalPlanner {
.try_collect::<Vec<_>>()
.await?;
- let maybe_plan = self.extension_planners.iter().try_fold(
- None,
- |maybe_plan, planner| {
- if let Some(plan) = maybe_plan {
- Ok(Some(plan))
- } else {
- planner.plan_extension(
- self,
- e.node.as_ref(),
- &e.node.inputs(),
- &physical_inputs,
- session_state,
- )
- }
- },
- )?;
+ let mut maybe_plan = None;
+ for planner in &self.extension_planners {
+ if maybe_plan.is_some() {
+ break;
+ }
+
+ let logical_input = e.node.inputs();
+ let plan = planner.plan_extension(
+ self,
+ e.node.as_ref(),
+ &logical_input,
+ &physical_inputs,
+ session_state,
+ );
+ let plan = plan.await;
+ if plan.is_err() {
+ continue;
+ }
+ maybe_plan = plan.unwrap();
+ }
+
let plan = maybe_plan.ok_or_else(||
DataFusionError::Plan(format!(
"No installed planner was able to convert the custom
node to an execution plan: {:?}", e.node
)))?;
@@ -2080,7 +2086,7 @@ mod tests {
&self,
_exprs: &[Expr],
_inputs: &[LogicalPlan],
- ) -> Arc<dyn UserDefinedLogicalNode + Send + Sync> {
+ ) -> Arc<dyn UserDefinedLogicalNode> {
unimplemented!("NoOp");
}
}
@@ -2152,9 +2158,10 @@ mod tests {
// the logical plan node.
struct BadExtensionPlanner {}
+ #[async_trait]
impl ExtensionPlanner for BadExtensionPlanner {
/// Create a physical plan for an extension node
- fn plan_extension(
+ async fn plan_extension(
&self,
_planner: &dyn PhysicalPlanner,
_node: &dyn UserDefinedLogicalNode,
diff --git a/datafusion/core/tests/user_defined_plan.rs
b/datafusion/core/tests/user_defined_plan.rs
index 33cfd1f56..201beaa78 100644
--- a/datafusion/core/tests/user_defined_plan.rs
+++ b/datafusion/core/tests/user_defined_plan.rs
@@ -367,7 +367,7 @@ impl UserDefinedLogicalNode for TopKPlanNode {
&self,
exprs: &[Expr],
inputs: &[LogicalPlan],
- ) -> Arc<dyn UserDefinedLogicalNode + Send + Sync> {
+ ) -> Arc<dyn UserDefinedLogicalNode> {
assert_eq!(inputs.len(), 1, "input size inconsistent");
assert_eq!(exprs.len(), 1, "expression size inconsistent");
Arc::new(TopKPlanNode {
@@ -381,9 +381,10 @@ impl UserDefinedLogicalNode for TopKPlanNode {
/// Physical planner for TopK nodes
struct TopKPlanner {}
+#[async_trait]
impl ExtensionPlanner for TopKPlanner {
/// Create a physical plan for an extension node
- fn plan_extension(
+ async fn plan_extension(
&self,
_planner: &dyn PhysicalPlanner,
node: &dyn UserDefinedLogicalNode,
diff --git a/datafusion/expr/src/logical_plan/extension.rs
b/datafusion/expr/src/logical_plan/extension.rs
index 73ca8b9f2..fd3274144 100644
--- a/datafusion/expr/src/logical_plan/extension.rs
+++ b/datafusion/expr/src/logical_plan/extension.rs
@@ -26,7 +26,7 @@ use std::{any::Any, collections::HashSet, fmt, sync::Arc};
/// See the example in
/// [user_defined_plan.rs](../../tests/user_defined_plan.rs) for an
/// example of how to use this extension API
-pub trait UserDefinedLogicalNode: fmt::Debug {
+pub trait UserDefinedLogicalNode: fmt::Debug + Send + Sync {
/// Return a reference to self as Any, to support dynamic downcasting
fn as_any(&self) -> &dyn Any;
@@ -76,5 +76,5 @@ pub trait UserDefinedLogicalNode: fmt::Debug {
&self,
exprs: &[Expr],
inputs: &[LogicalPlan],
- ) -> Arc<dyn UserDefinedLogicalNode + Send + Sync>;
+ ) -> Arc<dyn UserDefinedLogicalNode>;
}
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index 5b21a7ef3..10b9b25a0 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -1161,7 +1161,7 @@ pub struct Analyze {
#[derive(Clone)]
pub struct Extension {
/// The runtime extension operator
- pub node: Arc<dyn UserDefinedLogicalNode + Send + Sync>,
+ pub node: Arc<dyn UserDefinedLogicalNode>,
}
/// Produces the first `n` tuples from its input and discards the rest.
diff --git a/datafusion/optimizer/src/test/user_defined.rs
b/datafusion/optimizer/src/test/user_defined.rs
index c9993568c..92b56ee75 100644
--- a/datafusion/optimizer/src/test/user_defined.rs
+++ b/datafusion/optimizer/src/test/user_defined.rs
@@ -69,7 +69,7 @@ impl UserDefinedLogicalNode for TestUserDefinedPlanNode {
&self,
exprs: &[Expr],
inputs: &[LogicalPlan],
- ) -> Arc<dyn UserDefinedLogicalNode + Send + Sync> {
+ ) -> Arc<dyn UserDefinedLogicalNode> {
assert_eq!(inputs.len(), 1, "input size inconsistent");
assert_eq!(exprs.len(), 0, "expression size inconsistent");
Arc::new(TestUserDefinedPlanNode {
diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs
index f08a00b49..0aa00bc75 100644
--- a/datafusion/proto/src/lib.rs
+++ b/datafusion/proto/src/lib.rs
@@ -191,7 +191,7 @@ mod roundtrip_tests {
&self,
exprs: &[Expr],
inputs: &[LogicalPlan],
- ) -> Arc<dyn UserDefinedLogicalNode + Send + Sync> {
+ ) -> Arc<dyn UserDefinedLogicalNode> {
assert_eq!(inputs.len(), 1, "input size inconsistent");
assert_eq!(exprs.len(), 1, "expression size inconsistent");
Arc::new(TopKPlanNode {