alamb commented on a change in pull request #8097:
URL: https://github.com/apache/arrow/pull/8097#discussion_r482115402



##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -375,15 +380,39 @@ impl ScalarFunctionRegistry for ExecutionContext {
     }
 }
 
+/// Provides OptimizerRule instances to

Review comment:
       I am not a huge fan of this `OptimizerRuleSource` but it was the best I 
could come up with at this point

##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -375,15 +380,39 @@ impl ScalarFunctionRegistry for ExecutionContext {
     }
 }
 
+/// Provides OptimizerRule instances to
+///
+/// Because OptimizerRule's themselves need
+/// to be mutable to conform to the OptimizerRuleTrait, they
+/// must be instantiated every time a plan is optimized
+/// ExecutionContext::optimize doesn't have a mutable reference to
+/// &self....
+pub trait OptimizerRuleSource {
+    /// Return the OptimizerRules to apply to a LogicalPlan. The rules
+    /// are applied in the order they are returned in the Vec.
+    fn rules(&self) -> Vec<Box<dyn OptimizerRule>>;
+}
+
+/// Supplies no additional optimizer rules
+struct DefaultOptimizerRuleSource {}
+
+impl OptimizerRuleSource for DefaultOptimizerRuleSource {
+    fn rules(&self) -> Vec<Box<dyn OptimizerRule>> {
+        vec![]
+    }
+}
+
 /// Configuration options for execution context
 #[derive(Clone)]
 pub struct ExecutionConfig {
     /// Number of concurrent threads for query execution.
     pub concurrency: usize,
     /// Default batch size when reading data sources
     pub batch_size: usize,
-    /// Optional physical planner to override the default physical planner
-    physical_planner: Option<Arc<dyn PhysicalPlanner>>,
+    /// Physical planner for converting from `LogicalPlan` to `ExecutionPlan`
+    physical_planner: Arc<dyn PhysicalPlanner>,

Review comment:
       I thought the code would be neater if there was always a 
physical_planner (which is an instance of the `DefaultPhysicalPlanner`)

##########
File path: rust/datafusion/tests/user_defined_plan.rs
##########
@@ -0,0 +1,496 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains an end to end demonstration of creating
+//! a user defined operator in DataFusion.
+//!
+//! Specifically, it shows how to define a `TopKNode` that implements
+//! `ExtensionPlanNode`, add an OptimizerRule to rewrite a
+//! `LogicalPlan` to use that node a `LogicalPlan`, create an
+//! `ExecutionPlan` and finally produce results.
+//!
+//! # TopK Background:
+//!
+//! A "Top K" node is a common query optimization which is used for
+//! queries such as "find the top 3 customers by revenue". The
+//! (simplified) SQL for such a query might be:
+//!
+//! ```sql
+//! CREATE EXTERNAL TABLE sales(customer_id VARCHAR, revenue BIGINT)
+//!   STORED AS CSV location 'tests/customer.csv';
+//!
+//! SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3;
+//! ```
+//!
+//! And a naive plan would be:
+//!
+//! ```
+//! > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC 
limit 3;
+//! +--------------+----------------------------------------+
+//! | plan_type    | plan                                   |
+//! +--------------+----------------------------------------+
+//! | logical_plan | Limit: 3                               |
+//! |              |   Sort: #revenue DESC NULLS FIRST      |
+//! |              |     Projection: #customer_id, #revenue |
+//! |              |       TableScan: sales projection=None |
+//! +--------------+----------------------------------------+
+//! ```
+//!
+//! While this plan produces the correct answer, the careful reader
+//! will note it fully sorts the input before discarding everything
+//! other than the top 3 elements.
+//!
+//! The same answer can be produced by simply keeping track of the top
+//! N elements, reducing the total amount of required buffer memory.
+//!
+
+use arrow::{
+    array::{Int64Array, StringArray},
+    datatypes::{Schema, SchemaRef},
+    error::ArrowError,
+    record_batch::{RecordBatch, RecordBatchReader},
+    util::pretty::pretty_format_batches,
+};
+use datafusion::{
+    error::{ExecutionError, Result},
+    execution::context::{ExecutionContextState, OptimizerRuleSource},
+    logical_plan::{Expr, ExtensionPlanNode, LogicalPlan},
+    optimizer::{optimizer::OptimizerRule, utils::optimize_explain},
+    physical_plan::{
+        planner::{DefaultPhysicalPlanner, ExtensionPlanner},
+        Distribution, ExecutionPlan, Partitioning,
+    },
+    prelude::{ExecutionConfig, ExecutionContext},
+};
+use fmt::Debug;
+use std::{
+    any::Any,
+    collections::BTreeMap,
+    fmt,
+    sync::{Arc, Mutex},
+};
+
+/// Execute the specified sql and return the resulting record batches
+/// pretty printed as a String.
+fn exec_sql(ctx: &mut ExecutionContext, sql: &str) -> Result<String> {
+    let df = ctx.sql(sql)?;
+    let batches = df.collect()?;
+    pretty_format_batches(&batches).map_err(|e| ExecutionError::ArrowError(e))
+}
+
+/// Create a test table.
+fn setup_table(mut ctx: ExecutionContext) -> Result<ExecutionContext> {
+    let sql = "CREATE EXTERNAL TABLE sales(customer_id VARCHAR, revenue 
BIGINT) STORED AS CSV location 'tests/customer.csv'";
+
+    let expected = vec!["++", "++"];
+
+    let s = exec_sql(&mut ctx, sql)?;
+    let actual = s.lines().collect::<Vec<_>>();
+
+    assert_eq!(expected, actual, "Creating table");
+    Ok(ctx)
+}
+
+const QUERY: &str =
+    "SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3";
+
+// Run the query using the specified execution context and compare it
+// to the known result
+fn run_and_compare_query(mut ctx: ExecutionContext, description: &str) -> 
Result<()> {
+    let expected = vec![
+        "+-------------+---------+",
+        "| customer_id | revenue |",
+        "+-------------+---------+",
+        "| paul        | 300     |",
+        "| jorge       | 200     |",
+        "| andy        | 150     |",
+        "+-------------+---------+",
+    ];
+
+    let s = exec_sql(&mut ctx, QUERY)?;
+    let actual = s.lines().collect::<Vec<_>>();
+
+    assert_eq!(
+        expected,
+        actual,
+        "output mismatch for {}. Expectedn\n{}Actual:\n{}",
+        description,
+        expected.join("\n"),
+        s
+    );
+    Ok(())
+}
+
+#[test]
+// Run the query using default planners and optimizer
+fn normal_query() -> Result<()> {
+    let ctx = setup_table(ExecutionContext::new())?;
+    run_and_compare_query(ctx, "Default context")
+}
+
+#[test]
+// Run the query using topk optimization
+fn topk_query() -> Result<()> {
+    // Note the only difference is that the top
+    let ctx = setup_table(make_topk_context())?;
+    run_and_compare_query(ctx, "Topk context")
+}
+
+#[test]
+// Run EXPLAIN PLAN and show the plan was in fact rewritten
+fn topk_plan() -> Result<()> {
+    let mut ctx = setup_table(make_topk_context())?;
+
+    let expected = vec![
+        "| logical_plan after topk                 | TopK: k=3                 
                     |",
+        "|                                         |   Projection: 
#customer_id, #revenue           |",
+        "|                                         |     TableScan: sales 
projection=Some([0, 1])   |",
+    ].join("\n");
+
+    let explain_query = format!("EXPLAIN VERBOSE {}", QUERY);
+    let actual_output = exec_sql(&mut ctx, &explain_query)?;
+
+    assert!(actual_output.contains(&expected) , "Expected output not present 
in actual output\nExpected:\n---------\n{}\nActual:\n--------\n{}", expected, 
actual_output);
+    Ok(())
+}
+
+fn make_topk_context() -> ExecutionContext {
+    let physical_planner = 
Arc::new(DefaultPhysicalPlanner::with_extension_planner(

Review comment:
       Here is the example of how to register `OptimizerRule`s and PhysicalPlan 
rules

##########
File path: rust/datafusion/src/physical_plan/planner.rs
##########
@@ -42,14 +44,30 @@ use crate::physical_plan::{AggregateExpr, ExecutionPlan, 
PhysicalExpr, PhysicalP
 use arrow::compute::SortOptions;
 use arrow::datatypes::Schema;
 
+/// This trait permits the `DefaultPhysicalPlanner` to create plans for
+/// user defined `ExtensionPlanNode`s
+pub trait ExtensionPlanner {

Review comment:
       @andygrove  -- here is my  proposed change to support physical planning 
of extension nodes in response to 
https://github.com/apache/arrow/pull/8020#discussion_r475953168. Rather than 
doing physical planning directly in the `ExtensionNode` itself, this is a 
mechanism to register planning capabilities into the `DefaultPhysicalPlanner`

##########
File path: rust/datafusion/src/logical_plan/mod.rs
##########
@@ -890,11 +955,16 @@ pub enum LogicalPlan {
         /// The output schema of the explain (2 columns of text)
         schema: Box<Schema>,
     },
+    /// Extension operator defined outside of DataFusion
+    Extension {
+        /// The runtime extension operator
+        node: Arc<dyn ExtensionPlanNode>,
+    },
 }
 
 impl LogicalPlan {
     /// Get a reference to the logical plan's schema
-    pub fn schema(&self) -> &Box<Schema> {
+    pub fn schema(&self) -> &Schema {

Review comment:
       Returning a `&Box<>` requires implementers to store their `Schema` 
objects in a `Box` -- using a `&Schema` provides additional flexibility




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to