Dandandan commented on code in PR #7721: URL: https://github.com/apache/arrow-datafusion/pull/7721#discussion_r1344548482
########## datafusion/physical-plan/src/topk/mod.rs: ########## @@ -0,0 +1,647 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! TopK: Combination of Sort / LIMIT + +use arrow::{ + compute::interleave, + row::{RowConverter, Rows, SortField}, +}; +use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc}; + +use arrow_array::{Array, ArrayRef, RecordBatch}; +use arrow_schema::SchemaRef; +use datafusion_common::Result; +use datafusion_execution::{ + memory_pool::{MemoryConsumer, MemoryReservation}, + runtime_env::RuntimeEnv, +}; +use datafusion_physical_expr::PhysicalSortExpr; +use hashbrown::HashMap; + +use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream}; + +use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder}; + +/// Global TopK +/// +/// # Background +/// +/// "Top K" is a common query optimization used for queries such as +/// "find the top 3 customers by revenue". The (simplified) SQL for +/// such a query might be: +/// +/// ```sql +/// SELECT customer_id, revenue FROM 'sales.csv' ORDER BY revenue DESC limit 3; +/// ``` +/// +/// The simple plan would be: +/// +/// ```sql +/// > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3; +/// +--------------+----------------------------------------+ +/// | plan_type | plan | +/// +--------------+----------------------------------------+ +/// | logical_plan | Limit: 3 | +/// | | Sort: revenue DESC NULLS FIRST | +/// | | Projection: customer_id, revenue | +/// | | TableScan: sales | +/// +--------------+----------------------------------------+ +/// ``` +/// +/// While this plan produces the correct answer, it will fully sorts the +/// input before discarding everything other than the top 3 elements. +/// +/// The same answer can be produced by simply keeping track of the top +/// N elements, reducing the total amount of required buffer memory. +/// +/// # Structure +/// +/// This operator tracks the top K items using a `TopKHeap`. +pub struct TopK { + /// schema of the output (and the input) + schema: SchemaRef, + /// Runtime metrics + metrics: TopKMetrics, + /// Reservation + reservation: MemoryReservation, + /// The target number of rows for output batches + batch_size: usize, + /// sort expressions + expr: Arc<[PhysicalSortExpr]>, + /// row converter, for sort keys + row_converter: RowConverter, + /// scratch space for converting rows + scratch_rows: Rows, + /// stores the top k values and their sort key values, in order + heap: TopKHeap, +} + +impl TopK { + /// Create a new [`TopK`] that stores the top `k` values, as + /// defined by the sort expressions in `expr`. + // TOOD: make a builder or some other nicer API to avoid the + // clippy warning + #[allow(clippy::too_many_arguments)] + pub fn try_new( + partition_id: usize, + schema: SchemaRef, + expr: Vec<PhysicalSortExpr>, + k: usize, + batch_size: usize, + runtime: Arc<RuntimeEnv>, + metrics: &ExecutionPlanMetricsSet, + partition: usize, + ) -> Result<Self> { + let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]")) + .register(&runtime.memory_pool); + + let expr: Arc<[PhysicalSortExpr]> = expr.into(); + + let sort_fields: Vec<_> = expr + .iter() + .map(|e| { + Ok(SortField::new_with_options( + e.expr.data_type(&schema)?, + e.options, + )) + }) + .collect::<Result<_>>()?; + + let row_converter = RowConverter::new(sort_fields)?; + let scratch_rows = row_converter.empty_rows( + batch_size, + 20 * batch_size, // guestimate 20 bytes per row + ); + + Ok(Self { + schema: schema.clone(), + metrics: TopKMetrics::new(metrics, partition), + reservation, + batch_size, + expr, + row_converter, + scratch_rows, + heap: TopKHeap::new(k, batch_size, schema), + }) + } + + /// Insert `batch`, remembering it if any of its values are among + /// the top k seen so far. + pub fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> { + // Updates on drop + let _timer = self.metrics.baseline.elapsed_compute().timer(); + + let sort_keys: Vec<ArrayRef> = self + .expr + .iter() + .map(|expr| { + let value = expr.expr.evaluate(&batch)?; + Ok(value.into_array(batch.num_rows())) + }) + .collect::<Result<Vec<_>>>()?; + + // reuse existing `Rows` to avoid reallocations + let rows = &mut self.scratch_rows; + rows.clear(); + self.row_converter.append(rows, &sort_keys)?; + + // TODO make this algorithmically better?: + // 1. only check topk values in rows + // 2. only do one update through top_k + + let mut batch_entry = self.heap.register_batch(batch); + for (index, row) in rows.iter().enumerate() { + match self.heap.max() { + // heap has k items, and the new row is greater than the + // current max in the heap ==> it is not a new topk + Some(max_row) if row.as_ref() >= max_row.row() => {} + // don't yet have k items or new item is lower than the currently k low values + None | Some(_) => { + self.heap.add(&mut batch_entry, row, index); + self.metrics.row_replacements.add(1); + } + } + } + self.heap.insert_batch_entry(batch_entry); + + // conserve memory + self.heap.maybe_compact()?; + + // update memory reservation + self.reservation.try_resize(self.size())?; + Ok(()) + } + + /// Returns the top k results broken into `batch_size` [`RecordBatch`]es, consuming the heap + pub fn emit(self) -> Result<SendableRecordBatchStream> { + let Self { + schema, + metrics, + reservation: _, + batch_size, + expr: _, + row_converter: _, + scratch_rows: _, + mut heap, + } = self; + let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop + + let mut batch = heap.emit()?; + metrics.baseline.output_rows().add(batch.num_rows()); + + // break into record batches as needed + let mut batches = vec![]; + loop { + if batch.num_rows() < batch_size { + batches.push(Ok(batch)); + break; + } else { + batches.push(Ok(batch.slice(0, batch_size))); + let remaining_length = batch.num_rows() - batch_size; + batch = batch.slice(batch_size, remaining_length); + } + } + Ok(Box::pin(RecordBatchStreamAdapter::new( + schema, + futures::stream::iter(batches), + ))) + } + + /// return the size of memory used by this operator, in bytes + fn size(&self) -> usize { + std::mem::size_of::<Self>() + + self.row_converter.size() + + self.scratch_rows.size() + + self.heap.size() + } +} + +struct TopKMetrics { + /// metrics + pub baseline: BaselineMetrics, + + /// count of how many rows were replaced in the heap + pub row_replacements: Count, +} + +impl TopKMetrics { + fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { + Self { + baseline: BaselineMetrics::new(metrics, partition), + row_replacements: MetricBuilder::new(metrics) + .counter("row_replacements", partition), + } + } +} + +/// This structure keeps at most the *smallest* k items, using the +/// [arrow::row] format for sort keys. While it is called "topK" for +/// values like `1, 2, 3, 4, 5` the "top 3" really means the +/// *smallest* 3 , `1, 2, 3`, not the *largest* 3 `3, 4, 5`. +/// +/// Using the `Row` format handles things such as ascending vs +/// descending and nulls first vs nulls last. +/// +/// It doesn't use `BinaryHeap` in the Rust standard library because +/// it is important to check the current minimum value in the heap +/// prior to creating a new value to insert. +struct TopKHeap { + /// The maximum number of elemenents to store in this heap. + k: usize, + /// The target number of rows for output batches + batch_size: usize, + /// Storage for up at most `k` items using a BinaryHeap. Reverserd + /// so that the smallest k so far is on the top + inner: BinaryHeap<TopKRow>, + /// Storage the original row values (TopKRow only has the sort key) + store: RecordBatchStore, + /// The size of all owned data held by this heap + owned_bytes: usize, +} + +impl TopKHeap { + fn new(k: usize, batch_size: usize, schema: SchemaRef) -> Self { + assert!(k > 0); + Self { + k, + batch_size, + inner: BinaryHeap::new(), + store: RecordBatchStore::new(schema), + owned_bytes: 0, + } + } + + /// Register a [`RecordBatch`] with the heap, returning the + /// appropriate entry + pub fn register_batch(&mut self, batch: RecordBatch) -> RecordBatchEntry { + self.store.register(batch) + } + + /// Insert a [`RecordBatchEntry`] created by a previous call to + /// [`Self::register_batch`] into storage. + pub fn insert_batch_entry(&mut self, entry: RecordBatchEntry) { + self.store.insert(entry) + } + + /// Returns the largest value stored by the heap if there are k + /// items, otherwise returns None. Remember this structure is + /// keeping the "smallest" k values + fn max(&self) -> Option<&TopKRow> { + if self.inner.len() < self.k { + None + } else { + self.inner.peek() + } + } + + /// Adds `row` to this heap. If inserting this new item would + /// increase the size past `k`, removes the previously smallest + /// item. + fn add( + &mut self, + batch_entry: &mut RecordBatchEntry, + row: impl AsRef<[u8]>, + index: usize, + ) { + let batch_id = batch_entry.id; + batch_entry.uses += 1; + + assert!(self.inner.len() <= self.k); + let row = row.as_ref(); + + // Reuse storage for evicted item if possible + let new_top_k = if self.inner.len() == self.k { + let prev_min = self.inner.pop().unwrap(); + + // Update batch use + if prev_min.batch_id == batch_entry.id { + batch_entry.uses -= 1; + } else { + self.store.unuse(prev_min.batch_id); + } + + // update memory accounting + self.owned_bytes -= prev_min.owned_size(); + prev_min.with_new_row(row, batch_id, index) + } else { + TopKRow::new(row, batch_id, index) + }; + + self.owned_bytes += new_top_k.owned_size(); + + // put the new row into the heap + self.inner.push(new_top_k) + } + + /// Returns the values stored in this heap, from values low to + /// high, as a single [`RecordBatch`], resetting the inner heap + pub fn emit(&mut self) -> Result<RecordBatch> { + Ok(self.emit_with_state()?.0) + } + + /// Returns the values stored in this heap, from values low to + /// high, as a single [`RecordBatch`], and a sorted vec of the + /// current heap's contents + pub fn emit_with_state(&mut self) -> Result<(RecordBatch, Vec<TopKRow>)> { + let schema = self.store.schema().clone(); + + // generate sorted rows + let topk_rows = std::mem::take(&mut self.inner).into_sorted_vec(); + + if self.store.is_empty() { + return Ok((RecordBatch::new_empty(schema), topk_rows)); + } + + // Indices for each row within its respective RecordBatch + let indices: Vec<_> = topk_rows + .iter() + .enumerate() + .map(|(i, k)| (i, k.index)) + .collect(); + + let num_columns = schema.fields().len(); + + // build the output columns one at time, using the + // `interleave` kernel to pick rows from different arrays + let output_columns: Vec<_> = (0..num_columns) + .map(|col| { + let input_arrays: Vec<_> = topk_rows + .iter() + .map(|k| { + let entry = + self.store.get(k.batch_id).expect("invalid stored batch id"); + entry.batch.column(col) as &dyn Array + }) + .collect(); + + // at this point `indices` contains indexes within the + // rows and `input_arrays` contains a reference to the + // relevant Array for that index. `interleave` pulls + // them together into a single new array + Ok(interleave(&input_arrays, &indices)?) + }) + .collect::<Result<_>>()?; + + let new_batch = RecordBatch::try_new(schema, output_columns)?; + Ok((new_batch, topk_rows)) + } + + /// Compact this heap, rewriting all stored batches into a single + /// input batch + pub fn maybe_compact(&mut self) -> Result<()> { + // we compact if the number of "unused" rows in the store is Review Comment: I'll take a look :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
