alamb commented on code in PR #7250: URL: https://github.com/apache/arrow-datafusion/pull/7250#discussion_r1293316867
########## datafusion/core/src/physical_plan/topk/mod.rs: ########## @@ -0,0 +1,516 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! TopK: Combination of Sort / LIMIT + +use arrow::{ + compute::interleave, + row::{OwnedRow, RowConverter, Rows, SortField}, +}; +use std::{cmp::Ordering, sync::Arc}; + +use arrow_array::{Array, ArrayRef, RecordBatch}; +use arrow_schema::SchemaRef; +use datafusion_common::Result; +use datafusion_execution::{ + memory_pool::{MemoryConsumer, MemoryReservation}, + runtime_env::RuntimeEnv, +}; +use datafusion_physical_expr::PhysicalSortExpr; +use hashbrown::HashMap; + +use crate::physical_plan::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream}; + +use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder}; + +/// Global TopK +/// +/// # Background +/// +/// "Top K" is a common query optimization used for queries such as +/// "find the top 3 customers by revenue". The (simplified) SQL for +/// such a query might be: +/// +/// ```sql +/// SELECT customer_id, revenue FROM 'sales.csv' ORDER BY revenue DESC limit 3; +/// ``` +/// +/// The simple plan would be: +/// +/// ``` +/// > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3; +/// +--------------+----------------------------------------+ +/// | plan_type | plan | +/// +--------------+----------------------------------------+ +/// | logical_plan | Limit: 3 | +/// | | Sort: revenue DESC NULLS FIRST | +/// | | Projection: customer_id, revenue | +/// | | TableScan: sales | +/// +--------------+----------------------------------------+ +/// ``` +/// +/// While this plan produces the correct answer, it will fully sorts the +/// input before discarding everything other than the top 3 elements. +/// +/// The same answer can be produced by simply keeping track of the top +/// N elements, reducing the total amount of required buffer memory. +/// +/// # Structure +/// +/// This operator tracks the top K items using a `TopKHeap`. +pub struct TopK { + /// schema of the output (and the input) + schema: SchemaRef, + /// Runtime metrics + metrics: TopKMetrics, + /// Reservation + reservation: MemoryReservation, + /// The target number of rows for output batches + batch_size: usize, + /// sort expressions + expr: Arc<[PhysicalSortExpr]>, + /// row converter, for sort keys + row_converter: RowConverter, + /// scratch space for converting rows + scratch_rows: Rows, + /// stores the top k values and their sort key values, in order + heap: TopKHeap, +} + +impl TopK { + /// Create a new [`TopK`] that stores the top `k` values, as + /// defined by the sort expressions in `expr`. + // TOOD: make a builder or some other nicer API to avoid the + // clippy warning + #[allow(clippy::too_many_arguments)] + pub fn try_new( + partition_id: usize, + schema: SchemaRef, + expr: Vec<PhysicalSortExpr>, + k: usize, + batch_size: usize, + runtime: Arc<RuntimeEnv>, + metrics: &ExecutionPlanMetricsSet, + partition: usize, + ) -> Result<Self> { + let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]")) + .register(&runtime.memory_pool); + + let expr: Arc<[PhysicalSortExpr]> = expr.into(); + + let sort_fields: Vec<_> = expr + .iter() + .map(|e| { + Ok(SortField::new_with_options( + e.expr.data_type(&schema)?, + e.options, + )) + }) + .collect::<Result<_>>()?; + + let row_converter = RowConverter::new(sort_fields)?; + let scratch_rows = row_converter.empty_rows( + batch_size, + 20 * batch_size, // guestimate 20 bytes per row + ); + + Ok(Self { + schema, + metrics: TopKMetrics::new(metrics, partition), + reservation, + batch_size, + expr, + row_converter, + scratch_rows, + heap: TopKHeap::new(k), + }) + } + + /// Insert `batch`, remembering it if any of its values are among + /// the top k seen so far. + pub fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> { + // Updates on drop + let _timer = self.metrics.baseline.elapsed_compute().timer(); + + let sort_keys: Vec<ArrayRef> = self + .expr + .iter() + .map(|expr| { + let value = expr.expr.evaluate(&batch)?; + Ok(value.into_array(batch.num_rows())) + }) + .collect::<Result<Vec<_>>>()?; + + // reuse existing `Rows` to avoid reallocations + let rows = &mut self.scratch_rows; + rows.clear(); + self.row_converter.append(rows, &sort_keys)?; + + // TODO make this algorithmically better?: + // 1. only check topk values in rows + // 2. only do one update through top_k + + let mut batch_entry = self.heap.register_batch(batch); + for (index, row) in rows.iter().enumerate() { Review Comment: > However, when query planning this, should we take into account worst-case scenarios? Because that is where I've spent my time struggling with my PR. Yes I think we should -- worst case is going to be reverse sorted data (where every value is a new TopK I think For this particular PR I think with some more work this approach can do better in all cases than existing Sort w/ `fetch` partly due to how Sort w/ `fetch` is implemented. I do think there will need to be some decision of when to fall back to sort with limit as the sort with limit handles spilling to disk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
