alamb commented on code in PR #22871: URL: https://github.com/apache/datafusion/pull/22871#discussion_r3390842703
########## docs/source/library-user-guide/extending-operators.md: ########## @@ -19,11 +19,409 @@ # Extending Operators -DataFusion supports extending operators by transforming [`LogicalPlan`] and [`ExecutionPlan`] through customized [optimizer rules](https://docs.rs/datafusion/latest/datafusion/optimizer/trait.OptimizerRule.html). This section will use the µWheel project to illustrate such capabilities. +DataFusion lets you add operators that are not part of the built-in set. You do this by introducing a custom node into the [`LogicalPlan`], teaching the optimizer to produce it through an [`OptimizerRule`](https://docs.rs/datafusion/latest/datafusion/optimizer/trait.OptimizerRule.html), and providing a matching [`ExecutionPlan`] so the node can run. + +This page walks through a complete example -- a `TopK` operator -- and then points to the µWheel project as a larger, real-world case study. [`logicalplan`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.LogicalPlan.html [`executionplan`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html +## A Worked Example: TopK + +"Top K" is a common query shape: return the N rows with the largest (or smallest) value of some column. For example, "find the top 3 customers by revenue": + +```sql +CREATE EXTERNAL TABLE sales(customer_id VARCHAR, revenue BIGINT) + STORED AS CSV LOCATION 'tests/data/customer.csv'; + +SELECT customer_id, revenue FROM sales ORDER BY revenue DESC LIMIT 3; +``` + +Out of the box, DataFusion plans this as a `Sort` feeding a `Limit`: + +```text +> EXPLAIN SELECT customer_id, revenue FROM sales ORDER BY revenue DESC LIMIT 3; Review Comment: This is out of date as DataFusion now has a special TopK mode in its sort operator. I recommend we just point this out and then move on with the example ```sql > EXPLAIN SELECT customer_id, revenue FROM sales ORDER BY revenue DESC LIMIT 3; +---------------+-------------------------------+ | plan_type | plan | +---------------+-------------------------------+ | physical_plan | ┌───────────────────────────┐ | | | │ SortExec(TopK) │ | | | │ -------------------- │ | | | │ limit: 3 │ | | | │ │ | | | │ revenue@1 DESC │ | | | └─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐ | | | │ EmptyExec │ | | | └───────────────────────────┘ | | | | +---------------+-------------------------------+ 1 row(s) fetched. Elapsed 0.020 seconds. > EXPLAIN FORMAT INDENT SELECT customer_id, revenue FROM sales ORDER BY revenue DESC LIMIT 3; +---------------+-------------------------------------------------------------------------------+ | plan_type | plan | +---------------+-------------------------------------------------------------------------------+ | logical_plan | Sort: sales.revenue DESC NULLS FIRST, fetch=3 | | | TableScan: sales projection=[customer_id, revenue] | | physical_plan | SortExec: TopK(fetch=3), expr=[revenue@1 DESC], preserve_partitioning=[false] | | | EmptyExec | | | | +---------------+-------------------------------------------------------------------------------+ 2 row(s) fetched. Elapsed 0.004 seconds. ``` ########## docs/source/library-user-guide/extending-operators.md: ########## @@ -19,11 +19,409 @@ # Extending Operators -DataFusion supports extending operators by transforming [`LogicalPlan`] and [`ExecutionPlan`] through customized [optimizer rules](https://docs.rs/datafusion/latest/datafusion/optimizer/trait.OptimizerRule.html). This section will use the µWheel project to illustrate such capabilities. +DataFusion lets you add operators that are not part of the built-in set. You do this by introducing a custom node into the [`LogicalPlan`], teaching the optimizer to produce it through an [`OptimizerRule`](https://docs.rs/datafusion/latest/datafusion/optimizer/trait.OptimizerRule.html), and providing a matching [`ExecutionPlan`] so the node can run. + +This page walks through a complete example -- a `TopK` operator -- and then points to the µWheel project as a larger, real-world case study. [`logicalplan`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.LogicalPlan.html [`executionplan`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html +## A Worked Example: TopK + +"Top K" is a common query shape: return the N rows with the largest (or smallest) value of some column. For example, "find the top 3 customers by revenue": + +```sql +CREATE EXTERNAL TABLE sales(customer_id VARCHAR, revenue BIGINT) + STORED AS CSV LOCATION 'tests/data/customer.csv'; + +SELECT customer_id, revenue FROM sales ORDER BY revenue DESC LIMIT 3; +``` + +Out of the box, DataFusion plans this as a `Sort` feeding a `Limit`: Review Comment: I think we should also note that DataFusion has a much more sophisticated topk implementation built in, but this is just for an example Maybe we can update the example so it disables the limit pushdown optimizer pass 🤔 ```suggestion Out of the box, DataFusion already contains an optimized TopK implementation and our example here is just for demonstration purposes. If we disable the LimitPushdown optimization, we see the original plan is a `Sort` feeding a `Limit`: ``` ########## docs/source/library-user-guide/extending-operators.md: ########## @@ -19,11 +19,409 @@ # Extending Operators -DataFusion supports extending operators by transforming [`LogicalPlan`] and [`ExecutionPlan`] through customized [optimizer rules](https://docs.rs/datafusion/latest/datafusion/optimizer/trait.OptimizerRule.html). This section will use the µWheel project to illustrate such capabilities. +DataFusion lets you add operators that are not part of the built-in set. You do this by introducing a custom node into the [`LogicalPlan`], teaching the optimizer to produce it through an [`OptimizerRule`](https://docs.rs/datafusion/latest/datafusion/optimizer/trait.OptimizerRule.html), and providing a matching [`ExecutionPlan`] so the node can run. + +This page walks through a complete example -- a `TopK` operator -- and then points to the µWheel project as a larger, real-world case study. [`logicalplan`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.LogicalPlan.html [`executionplan`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html +## A Worked Example: TopK + +"Top K" is a common query shape: return the N rows with the largest (or smallest) value of some column. For example, "find the top 3 customers by revenue": + +```sql +CREATE EXTERNAL TABLE sales(customer_id VARCHAR, revenue BIGINT) + STORED AS CSV LOCATION 'tests/data/customer.csv'; + +SELECT customer_id, revenue FROM sales ORDER BY revenue DESC LIMIT 3; +``` + +Out of the box, DataFusion plans this as a `Sort` feeding a `Limit`: + +```text +> EXPLAIN SELECT customer_id, revenue FROM sales ORDER BY revenue DESC LIMIT 3; ++--------------+--------------------------------------+ +| plan_type | plan | ++--------------+--------------------------------------+ +| logical_plan | Limit: 3 | +| | Sort: revenue DESC NULLS FIRST | +| | Projection: customer_id, revenue | +| | TableScan: sales | ++--------------+--------------------------------------+ +``` + +That plan is correct, but it fully sorts the input before throwing away everything except the first three rows. The same answer can be produced by scanning the input once and keeping only the largest `k` values seen so far, which bounds the working memory to `k` rows regardless of input size. + +The rest of this section builds a `TopK` operator that recognizes the `Limit` + `Sort` pattern and replaces it with a single, specialized node. A full, runnable version of this example lives in [`user_defined_plan.rs`] in the DataFusion test suite. + +[`user_defined_plan.rs`]: https://github.com/apache/datafusion/blob/main/datafusion/core/tests/user_defined/user_defined_plan.rs + +There are four pieces to implement, plus the wiring that registers them with a `SessionContext`: + +1. A **logical node** (`TopKPlanNode`) that represents the operator in a `LogicalPlan`. +2. An **optimizer rule** (`TopKOptimizerRule`) that rewrites the `Limit` + `Sort` pattern into that node. +3. A **physical operator** (`TopKExec`) that actually computes the result at execution time. +4. An **extension planner** (`TopKPlanner`) that turns the logical node into the physical operator. + +### The Logical Node + +A custom logical operator implements [`UserDefinedLogicalNodeCore`](https://docs.rs/datafusion/latest/datafusion/logical_expr/trait.UserDefinedLogicalNodeCore.html). The node carries the parameters the operator needs -- here, the fetch count `k`, the single input plan, and the sort expression to rank by: + +```rust,ignore +#[derive(PartialEq, Eq, PartialOrd, Hash)] +struct TopKPlanNode { + k: usize, + input: LogicalPlan, + /// The sort expression (this example only supports a single sort expr) + expr: SortExpr, +} + +impl Debug for TopKPlanNode { + /// Use the explain representation as the Debug format. + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + UserDefinedLogicalNodeCore::fmt_for_explain(self, f) + } +} + +impl UserDefinedLogicalNodeCore for TopKPlanNode { + fn name(&self) -> &str { + "TopK" + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![&self.input] + } + + /// The output schema of TopK is the same as its input. + fn schema(&self) -> &DFSchemaRef { + self.input.schema() + } + + fn expressions(&self) -> Vec<Expr> { + vec![self.expr.expr.clone()] + } + + /// How the node renders in `EXPLAIN` output, e.g. `TopK: k=10`. + fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "TopK: k={}", self.k) + } + + /// Rebuild the node after the optimizer has rewritten its + /// expressions or inputs. + fn with_exprs_and_inputs( + &self, + mut exprs: Vec<Expr>, + mut inputs: Vec<LogicalPlan>, + ) -> Result<Self> { + assert_eq!(inputs.len(), 1, "input size inconsistent"); + assert_eq!(exprs.len(), 1, "expression size inconsistent"); + Ok(Self { + k: self.k, + input: inputs.swap_remove(0), + expr: self.expr.with_expr(exprs.swap_remove(0)), + }) + } + + fn supports_limit_pushdown(&self) -> bool { + false // Disallow limit push-down by default + } +} +``` + +The framework relies on `inputs`, `expressions`, and `with_exprs_and_inputs` to treat the node uniformly during plan traversal and rewriting: it reads the children and expressions out, optimizes them, and asks the node to rebuild itself with the results. + +### The Optimizer Rule + +The optimizer rule is what actually introduces the node. It implements [`OptimizerRule`](https://docs.rs/datafusion/latest/datafusion/optimizer/trait.OptimizerRule.html) and looks for a `Limit` whose input is a single-column `Sort`, replacing the pair with a `TopKPlanNode` wrapped in a [`LogicalPlan::Extension`](https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.LogicalPlan.html): + +```rust,ignore +#[derive(Default, Debug)] +struct TopKOptimizerRule {} + +impl OptimizerRule for TopKOptimizerRule { + fn name(&self) -> &str { + "topk" + } + + fn apply_order(&self) -> Option<ApplyOrder> { + Some(ApplyOrder::TopDown) + } + + fn supports_rewrite(&self) -> bool { + true + } + + fn rewrite( + &self, + plan: LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result<Transformed<LogicalPlan>, DataFusionError> { + // Note: this rule only looks for a Limit directly above a Sort and + // replaces it with a TopK node. It does not handle many edge cases + // (multiple sort columns, ASC vs DESC, etc.). + let LogicalPlan::Limit(ref limit) = plan else { + return Ok(Transformed::no(plan)); + }; + let FetchType::Literal(Some(fetch)) = limit.get_fetch_type()? else { + return Ok(Transformed::no(plan)); + }; + + if let LogicalPlan::Sort(Sort { expr, input, .. }) = limit.input.as_ref() + && expr.len() == 1 + { + // found a sort with a single sort expr -- replace it with a TopK + return Ok(Transformed::yes(LogicalPlan::Extension(Extension { + node: Arc::new(TopKPlanNode { + k: fetch, + input: input.as_ref().clone(), + expr: expr[0].clone(), + }), + }))); + } + + Ok(Transformed::no(plan)) + } +} +``` + +Returning `Transformed::yes` tells the optimizer the plan changed, so it knows to keep iterating; `Transformed::no` returns the plan untouched. Setting `apply_order` to `ApplyOrder::TopDown` lets the framework walk the plan tree for you and call `rewrite` on each node. + +### The Physical Operator + +The logical node only describes *what* to compute. The actual work happens in an [`ExecutionPlan`]. `TopKExec` wraps its input and remembers `k`; its `execute` method returns a stream that scans the input once and keeps a running set of the largest `k` rows: + +```rust,ignore +/// Physical operator that implements TopK. This implementation is +/// specialized for the example schema and is meant as an illustration only. +struct TopKExec { + input: Arc<dyn ExecutionPlan>, + /// The maximum number of output rows + k: usize, + cache: Arc<PlanProperties>, +} + +impl TopKExec { + fn new(input: Arc<dyn ExecutionPlan>, k: usize) -> Self { + let cache = Self::compute_properties(input.schema()); + Self { input, k, cache: Arc::new(cache) } + } + + /// Build the cached plan properties: schema, equivalence properties, + /// output partitioning, emission type, and boundedness. + fn compute_properties(schema: SchemaRef) -> PlanProperties { + PlanProperties::new( + EquivalenceProperties::new(schema), + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + ) + } +} + +impl DisplayAs for TopKExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "TopKExec: k={}", self.k) + } + DisplayFormatType::TreeRender => write!(f, ""), + } + } +} + +impl ExecutionPlan for TopKExec { + fn name(&self) -> &'static str { + Self::static_name() + } + + fn properties(&self) -> &Arc<PlanProperties> { + &self.cache + } + + /// Require all input on a single partition so the reader sees every row. + fn required_input_distribution(&self) -> Vec<Distribution> { + vec![Distribution::SinglePartition] + } + + fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { + vec![&self.input] + } + + fn with_new_children( + self: Arc<Self>, + children: Vec<Arc<dyn ExecutionPlan>>, + ) -> Result<Arc<dyn ExecutionPlan>> { + Ok(Arc::new(TopKExec::new(children[0].clone(), self.k))) + } + + fn execute( + &self, + partition: usize, + context: Arc<TaskContext>, + ) -> Result<SendableRecordBatchStream> { + assert_eq_or_internal_err!(partition, 0, "TopKExec invalid partition {partition}"); + + Ok(Box::pin(TopKReader { + input: self.input.execute(partition, context)?, + k: self.k, + done: false, + state: BTreeMap::new(), + })) + } +} +``` + +`required_input_distribution` returns `Distribution::SinglePartition`, which tells the planner to insert a repartition if needed so a single `TopKReader` observes all input rows. + +The stream itself accumulates rows from each incoming batch into a `BTreeMap` keyed by the sort value, discarding the smallest entry whenever the map grows past `k`. When the input is exhausted it emits a single batch with the surviving rows: + +```rust,ignore +/// A very specialized TopK reader. The map keeps at most `k` entries. +struct TopKReader { + input: SendableRecordBatchStream, + k: usize, + done: bool, + /// revenue -> customer_id, ordered by revenue + state: BTreeMap<i64, String>, +} + +impl Stream for TopKReader { + type Item = Result<RecordBatch>; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<Self::Item>> { + if self.done { + return Poll::Ready(None); + } + let k = self.k; + let schema = self.schema(); + match self.input.poll_next_unpin(cx) { + // Fold each input batch into the running top-k set, emitting an + // empty batch to keep the stream making progress. + Poll::Ready(Some(Ok(batch))) => { + self.state = accumulate_batch(&batch, self.state.clone(), &k); + Poll::Ready(Some(Ok(RecordBatch::new_empty(schema)))) + } + // Input is drained: materialize the surviving rows, highest first. + Poll::Ready(None) => { + self.done = true; + let (revenue, customer): (Vec<i64>, Vec<&String>) = + self.state.iter().rev().unzip(); + let customer: Vec<&str> = customer.iter().map(|&s| &**s).collect(); + Poll::Ready(Some( + RecordBatch::try_new( + schema, + vec![ + Arc::new(StringViewArray::from(customer)) as ArrayRef, + Arc::new(Int64Array::from(revenue)), + ], + ) + .map_err(Into::into), + )) + } + other => other, + } + } +} + +impl RecordBatchStream for TopKReader { + fn schema(&self) -> SchemaRef { + self.input.schema() + } +} +``` + +### Connecting Logical and Physical Plans + +Two pieces bridge the logical node and the physical operator. An [`ExtensionPlanner`](https://docs.rs/datafusion/latest/datafusion/physical_planner/trait.ExtensionPlanner.html) knows how to build a `TopKExec` from a `TopKPlanNode`: + +```rust,ignore +/// Physical planner for TopK nodes +struct TopKPlanner {} + +#[async_trait] +impl ExtensionPlanner for TopKPlanner { + async fn plan_extension( + &self, + _planner: &dyn PhysicalPlanner, + node: &dyn UserDefinedLogicalNode, + logical_inputs: &[&LogicalPlan], + physical_inputs: &[Arc<dyn ExecutionPlan>], + _session_state: &SessionState, + ) -> Result<Option<Arc<dyn ExecutionPlan>>> { + Ok( + if let Some(topk_node) = node.as_any().downcast_ref::<TopKPlanNode>() { + assert_eq!(logical_inputs.len(), 1, "Inconsistent number of inputs"); + assert_eq!(physical_inputs.len(), 1, "Inconsistent number of inputs"); + Some(Arc::new(TopKExec::new(physical_inputs[0].clone(), topk_node.k))) + } else { + // Not our node: let other planners try. + None + }, + ) + } +} +``` + +Returning `None` for an unrecognized node lets DataFusion try other extension planners, so several custom operators can coexist. A [`QueryPlanner`](https://docs.rs/datafusion/latest/datafusion/execution/context/trait.QueryPlanner.html) then registers that extension planner with the default physical planner: + +```rust,ignore +#[derive(Debug)] +struct TopKQueryPlanner {} + +#[async_trait] +impl QueryPlanner for TopKQueryPlanner { + async fn create_physical_plan( + &self, + logical_plan: &LogicalPlan, + session_state: &SessionState, + ) -> Result<Arc<dyn ExecutionPlan>> { + // Teach the default physical planner how to plan TopK nodes, and + // delegate the rest of physical planning to it. + let physical_planner = + DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new(TopKPlanner {})]); + physical_planner + .create_physical_plan(logical_plan, session_state) + .await + } +} +``` + +### Putting It Together + +Finally, register the query planner and the optimizer rule on a [`SessionState`](https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html) and run a query. The optimizer rewrites the `Limit` + `Sort` into a `TopK` node, and the query planner turns it into a `TopKExec`: + +```rust,ignore +let config = SessionConfig::new().with_target_partitions(48); +let runtime = Arc::new(RuntimeEnv::default()); +let state = SessionStateBuilder::new() + .with_config(config) + .with_runtime_env(runtime) + .with_default_features() + .with_query_planner(Arc::new(TopKQueryPlanner {})) + .with_optimizer_rule(Arc::new(TopKOptimizerRule {})) + .build(); +let ctx = SessionContext::new_with_state(state); + +// ... register the `sales` table, then: +let df = ctx + .sql("SELECT customer_id, revenue FROM sales ORDER BY revenue DESC LIMIT 3") + .await?; +df.show().await?; Review Comment: it woudl be great here to update the example to use `assert_batches_eq!` so that the actual output is captured in the example too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
