andygrove commented on a change in pull request #8020:
URL: https://github.com/apache/arrow/pull/8020#discussion_r475953168



##########
File path: rust/datafusion/src/lp_limit.rs
##########
@@ -0,0 +1,99 @@
+//! Example of how a "User Defined logical plan node would work. Use
+//! the LogicalPlanNode trait for LimitNode
+
+use crate::error::Result;
+use crate::{
+    execution::{
+        context::ExecutionContextState,
+        physical_plan::{limit::GlobalLimitExec, ExecutionPlan},
+    },
+    logicalplan::{Expr, LogicalPlan},
+    lp::LogicalPlanNode,
+};
+use arrow::datatypes::Schema;
+use core::fmt;
+use std::sync::{Arc, Mutex};
+
+/// Produces the first `n` tuples from its input and discards the rest.
+#[derive(Clone)]
+pub struct LimitNode {
+    /// The limit
+    n: usize,
+    /// The input plan -- always exactly a single entry, but stored in
+    /// a Vector to implement `LogicalPlanNode::inputs`
+    inputs: Vec<Arc<LogicalPlan>>,
+}
+
+impl LimitNode {
+    pub fn new(n: usize, input: LogicalPlan) -> Self {
+        LimitNode {
+            n,
+            inputs: vec![Arc::new(input)],
+        }
+    }
+
+    // a limit node has a single input
+    pub fn input(&self) -> Arc<LogicalPlan> {
+        self.inputs[0].clone()
+    }
+}
+
+impl LogicalPlanNode for LimitNode {
+    // returns a reference to the inputs of this node
+    fn inputs(&self) -> Vec<&LogicalPlan> {
+        self.inputs.iter().map(|arc| arc.as_ref()).collect()
+    }
+
+    fn schema(&self) -> &Schema {
+        self.inputs[0].schema()
+    }
+
+    fn expressions(&self) -> Vec<Expr> {
+        Vec::new()
+    }
+
+    /// Write a single line human readable string to `f` for use in explain 
plan
+    fn format_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        write!(f, "Limit: {}", self.n)
+    }
+
+    fn dyn_clone(&self) -> Box<dyn LogicalPlanNode> {
+        Box::new(self.clone())
+    }
+
+    fn clone_from_template(
+        &self,
+        _exprs: &Vec<Expr>,
+        inputs: &Vec<LogicalPlan>,
+    ) -> Box<dyn LogicalPlanNode> {
+        let inputs = inputs.iter().map(|lp| Arc::new(lp.clone())).collect();
+
+        Box::new(LimitNode { n: self.n, inputs })
+    }
+
+    fn create_physical_plan(

Review comment:
       This is the only part of the PR that is problematic for me because it 
ties the logical plan to a specific physical plan. I would prefer to keep the 
query planning logic in a separate (pluggable) planner, so that users of the 
crate can provide their own implementations.
   
   I think that the query planner could still do pattern matching on the 
logical nodes by attempting to downcast to specific known operators. For 
example:
   
   ```rust
   
   fn create_physical_plan(node: &LogicalPlanNode) -> Result<Arc<dyn 
ExecutionPlan>> {
     if let Some(limit) = node.as_any().downcast_ref::<LimitNode>() {
       // create physical operator for limit
     } else if .... {
     }
   }
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to