alamb commented on code in PR #9125:
URL: https://github.com/apache/arrow-datafusion/pull/9125#discussion_r1479702363
##########
datafusion/core/src/physical_optimizer/enforce_sorting.rs:
##########
@@ -197,6 +202,41 @@ impl PhysicalOptimizerRule for EnforceSorting {
}
}
+fn replace_with_partial_sort(
+ plan: Arc<dyn ExecutionPlan>,
+) -> Result<Arc<dyn ExecutionPlan>> {
+ let plan_any = plan.as_any();
+ if let Some(sort_plan) = plan_any.downcast_ref::<SortExec>() {
+ let child = sort_plan.children()[0].clone();
+ if !unbounded_output(&child) {
Review Comment:
Can you add a comment here about why this operator is only used with
unbounded output?
I think it is more generally applicable than for just unbounded cases (it
would make any plan more streaming as well as require less memory)
We don't have to do it as part of this PR, but I think we should file a
follow on ticket to use this operation more generally
##########
datafusion/sqllogictest/test_files/group_by.slt:
##########
@@ -2244,6 +2244,42 @@ SELECT a, b, LAST_VALUE(c) as last_c
1 2 74
1 3 99
+query TT
+EXPLAIN SELECT *
+FROM annotated_data_infinite2
+ORDER BY a, b, d;
+----
+logical_plan
+Sort: annotated_data_infinite2.a ASC NULLS LAST, annotated_data_infinite2.b
ASC NULLS LAST, annotated_data_infinite2.d ASC NULLS LAST
+--TableScan: annotated_data_infinite2 projection=[a0, a, b, c, d]
+physical_plan
+PartialSortExec: expr=[a@1 ASC NULLS LAST,b@2 ASC NULLS LAST,d@4 ASC NULLS
LAST], common_prefix_length=[2]
+--StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d],
infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST,
c@3 ASC NULLS LAST]
+
Review Comment:
Could you also add a test that actually runs the query ?
```sql
SELECT *
FROM annotated_data_infinite2
ORDER BY a, b, d;
```
##########
datafusion/physical-plan/src/sorts/partial_sort.rs:
##########
@@ -0,0 +1,992 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Partial Sort deals with input data that partially
+//! satisfies the required sort order. Such an input data can be
+//! partitioned into segments where each segment already has the
+//! required information for lexicographic sorting so sorting
+//! can be done without loading the entire dataset.
+//!
+//! Consider a sort plan having an input with ordering `a ASC, b ASC`
+//!
+//! ```text
+//! +---+---+---+
+//! | a | b | d |
+//! +---+---+---+
+//! | 0 | 0 | 3 |
+//! | 0 | 0 | 2 |
+//! | 0 | 1 | 1 |
+//! | 0 | 2 | 0 |
+//! +---+---+---+
+//!```
+//!
+//! and required ordering for the plan is `a ASC, b ASC, d ASC`.
+//! The first 3 rows(segment) can be sorted as the segment already
+//! has the required information for the sort, but the last row
+//! requires further information as the input can continue with a
+//! batch with a starting row where a and b does not change as below
+//!
+//! ```text
+//! +---+---+---+
+//! | a | b | d |
+//! +---+---+---+
+//! | 0 | 2 | 4 |
+//! +---+---+---+
+//!```
+//!
+//! The plan concats incoming data with such last rows of previous input
+//! and continues partial sorting of the segments.
+
+use std::any::Any;
+use std::fmt::Debug;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::compute::concat_batches;
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use futures::{ready, Stream, StreamExt};
+use log::trace;
+
+use datafusion_common::utils::evaluate_partition_ranges;
+use datafusion_common::Result;
+use datafusion_execution::{RecordBatchStream, TaskContext};
+use datafusion_physical_expr::EquivalenceProperties;
+
+use crate::expressions::PhysicalSortExpr;
+use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
+use crate::sorts::sort::sort_batch;
+use crate::{
+ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+ SendableRecordBatchStream, Statistics,
+};
+
+/// Partial Sort execution plan.
+#[derive(Debug, Clone)]
+pub struct PartialSortExec {
+ /// Input schema
+ pub(crate) input: Arc<dyn ExecutionPlan>,
+ /// Sort expressions
+ expr: Vec<PhysicalSortExpr>,
+ /// Length of continuous matching columns of input that satisfy
+ /// the required ordering for the sort
+ common_prefix_length: usize,
+ /// Containing all metrics set created during sort
+ metrics_set: ExecutionPlanMetricsSet,
+ /// Preserve partitions of input plan. If false, the input partitions
+ /// will be sorted and merged into a single output partition.
+ preserve_partitioning: bool,
+ /// Fetch highest/lowest n results
+ fetch: Option<usize>,
+}
+
+impl PartialSortExec {
+ /// Create a new partial sort execution plan
+ pub fn new(expr: Vec<PhysicalSortExpr>, input: Arc<dyn ExecutionPlan>) ->
Self {
+ Self {
+ input,
+ expr,
+ common_prefix_length: 0,
+ metrics_set: ExecutionPlanMetricsSet::new(),
+ preserve_partitioning: false,
+ fetch: None,
+ }
+ }
+
+ /// Size of first set of continuous common columns satisfied by input and
required by the plan
+ pub fn with_common_prefix_length(mut self, common_prefix_length: usize) ->
Self {
+ self.common_prefix_length = common_prefix_length;
+ self
+ }
+
+ /// Whether this `PartialSortExec` preserves partitioning of the children
+ pub fn preserve_partitioning(&self) -> bool {
+ self.preserve_partitioning
+ }
+
+ /// Specify the partitioning behavior of this partial sort exec
+ ///
+ /// If `preserve_partitioning` is true, sorts each partition
+ /// individually, producing one sorted stream for each input partition.
+ ///
+ /// If `preserve_partitioning` is false, sorts and merges all
+ /// input partitions producing a single, sorted partition.
+ pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool)
-> Self {
+ self.preserve_partitioning = preserve_partitioning;
+ self
+ }
+
+ /// Modify how many rows to include in the result
+ ///
+ /// If None, then all rows will be returned, in sorted order.
+ /// If Some, then only the top `fetch` rows will be returned.
+ /// This can reduce the memory pressure required by the sort
+ /// operation since rows that are not going to be included
+ /// can be dropped.
+ pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
+ self.fetch = fetch;
+ self
+ }
+
+ /// Input schema
+ pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+ &self.input
+ }
+
+ /// Sort expressions
+ pub fn expr(&self) -> &[PhysicalSortExpr] {
+ &self.expr
+ }
+
+ /// If `Some(fetch)`, limits output to only the first "fetch" items
+ pub fn fetch(&self) -> Option<usize> {
+ self.fetch
+ }
+}
+
+impl DisplayAs for PartialSortExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ let expr = PhysicalSortExpr::format_list(&self.expr);
+ let common_prefix_length = self.common_prefix_length;
+ match self.fetch {
+ Some(fetch) => {
+ write!(f, "PartialSortExec: TopK(fetch={fetch}),
expr=[{expr}], common_prefix_length=[{common_prefix_length}]", )
+ }
+ None => write!(f, "PartialSortExec: expr=[{expr}],
common_prefix_length=[{common_prefix_length}]"),
+ }
+ }
+ }
+ }
+}
+
+impl ExecutionPlan for PartialSortExec {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.input.schema()
+ }
+
+ /// Get the output partitioning of this plan
+ fn output_partitioning(&self) -> Partitioning {
+ if self.preserve_partitioning {
+ self.input.output_partitioning()
+ } else {
+ Partitioning::UnknownPartitioning(1)
+ }
+ }
+
+ /// Specifies whether this plan generates an infinite stream of records.
+ /// If the plan does not support pipelining, but its input(s) are
+ /// infinite, returns an error to indicate this.
+ fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+ Ok(children[0])
+ }
+
+ fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+ Some(&self.expr)
+ }
+
+ fn required_input_distribution(&self) -> Vec<Distribution> {
+ if self.preserve_partitioning {
+ vec![Distribution::UnspecifiedDistribution]
+ } else {
+ vec![Distribution::SinglePartition]
+ }
+ }
+
+ fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+ vec![false]
+ }
+
+ fn equivalence_properties(&self) -> EquivalenceProperties {
+ // Reset the ordering equivalence class with the new ordering:
+ self.input
+ .equivalence_properties()
+ .with_reorder(self.expr.to_vec())
+ }
+
+ fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ vec![self.input.clone()]
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let new_partial_sort =
+ PartialSortExec::new(self.expr.clone(), children[0].clone())
+ .with_fetch(self.fetch)
+ .with_preserve_partitioning(self.preserve_partitioning);
+
+ Ok(Arc::new(new_partial_sort))
+ }
+
+ fn execute(
+ &self,
+ partition: usize,
+ context: Arc<TaskContext>,
+ ) -> Result<SendableRecordBatchStream> {
+ trace!("Start PartialSortExec::execute for partition {} of context
session_id {} and task_id {:?}", partition, context.session_id(),
context.task_id());
+
+ let input = self.input.execute(partition, context.clone())?;
+
+ trace!(
+ "End PartialSortExec's input.execute for partition: {}",
+ partition
+ );
+
+ // Make sure common prefix length is larger than 0
+ // Otherwise, we should use SortExec.
+ assert!(self.common_prefix_length > 0);
+
+ Ok(Box::pin(PartialSortStream {
+ input,
+ expr: self.expr.clone(),
+ common_prefix_length: self.common_prefix_length,
+ input_batch: RecordBatch::new_empty(self.schema().clone()),
+ fetch: self.fetch,
+ is_closed: false,
+ baseline_metrics: BaselineMetrics::new(&self.metrics_set,
partition),
+ }))
+ }
+
+ fn metrics(&self) -> Option<MetricsSet> {
+ Some(self.metrics_set.clone_inner())
+ }
+
+ fn statistics(&self) -> Result<Statistics> {
+ self.input.statistics()
+ }
+}
+
+struct PartialSortStream {
+ /// The input plan
+ input: SendableRecordBatchStream,
+ /// Sort expressions
+ expr: Vec<PhysicalSortExpr>,
+ /// Length of
+ common_prefix_length: usize,
+ /// Used as a buffer for part of the input not ready for sort
+ input_batch: RecordBatch,
+ /// Fetch top N results
+ fetch: Option<usize>,
+ /// Whether the stream has finished returning all of its data or not
+ is_closed: bool,
+ /// Execution metrics
+ baseline_metrics: BaselineMetrics,
+}
+
+impl Stream for PartialSortStream {
+ type Item = Result<RecordBatch>;
+
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ let poll = self.poll_next_inner(cx);
+ self.baseline_metrics.record_poll(poll)
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ // we can't predict the size of incoming batches so re-use the size
hint from the input
+ self.input.size_hint()
+ }
+}
+
+impl RecordBatchStream for PartialSortStream {
+ fn schema(&self) -> SchemaRef {
+ self.input.schema()
+ }
+}
+
+impl PartialSortStream {
+ fn poll_next_inner(
+ self: &mut Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Result<RecordBatch>>> {
+ if self.is_closed {
+ assert_eq!(0, self.input_batch.num_rows());
+ return Poll::Ready(None);
+ }
+ let result = match ready!(self.input.poll_next_unpin(cx)) {
+ Some(Ok(batch)) => {
+ self.input_batch =
+ concat_batches(&self.schema(), [&self.input_batch,
&batch])?;
+ if let Some(slice_point) =
+ self.get_slice_point(self.common_prefix_length,
&self.input_batch)?
+ {
+ self.emit(slice_point)
+ } else {
+ Ok(RecordBatch::new_empty(self.schema()))
+ }
+ }
+ Some(Err(e)) => Err(e),
+ None => {
+ self.is_closed = true;
+ // once input is consumed, sort the rest of the inserted
batches in input_batch
+ self.emit(self.input_batch.num_rows())
+ }
+ };
+
+ Poll::Ready(Some(result))
+ }
+
+ /// Sort input_batch upto the slice_point and emit the sorted batch
+ ///
+ /// If fetch is specified for PartialSortStream `emit` will limit
+ /// the last RecordBatch returned and will mark the stream as closed
+ fn emit(self: &mut Pin<&mut Self>, slice_point: usize) ->
Result<RecordBatch> {
+ let original_num_rows = self.input_batch.num_rows();
+ let segment = self.input_batch.slice(0, slice_point);
+ self.input_batch = self
+ .input_batch
+ .slice(slice_point, self.input_batch.num_rows() - slice_point);
+
+ let result = sort_batch(&segment, &self.expr, self.fetch)?;
+ if let Some(remaining_fetch) = self.fetch {
+ self.fetch = Some(remaining_fetch - result.num_rows());
+ if remaining_fetch == result.num_rows() {
+ self.input_batch = RecordBatch::new_empty(self.schema());
+ self.is_closed = true;
+ }
+ } else {
+ assert_eq!(
+ original_num_rows,
+ result.num_rows() + self.input_batch.num_rows()
+ );
+ }
+ Ok(result)
+ }
+
+ /// Return the end index of the second last partition if the batch
+ /// can be partitioned based on its already sorted columns
+ ///
+ /// Return None if the batch cannot be partitioned, which means the
+ /// batch does not have the information for a safe sort
+ fn get_slice_point(
+ &self,
+ common_prefix_len: usize,
+ batch: &RecordBatch,
+ ) -> Result<Option<usize>> {
+ let common_prefix_sort_keys = (0..common_prefix_len)
+ .map(|idx| self.expr[idx].evaluate_to_sort_column(batch))
+ .collect::<Result<Vec<_>>>()?;
+ let partition_points =
+ evaluate_partition_ranges(batch.num_rows(),
&common_prefix_sort_keys)?;
+ // If partition points are [0..100], [100..200], [200..300]
+ // we should return 200, which is the safest and furthest partition
boundary
+ // Please note that we shouldn't return 300 (which is number of rows
in the batch),
+ // because this boundary may change with new data.
+ if partition_points.len() >= 2 {
+ Ok(Some(partition_points[partition_points.len() - 2].end))
+ } else {
+ Ok(None)
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::collections::HashMap;
+
+ use arrow::array::*;
+ use arrow::compute::SortOptions;
+ use arrow::datatypes::*;
+ use futures::FutureExt;
+ use itertools::Itertools;
+
+ use datafusion_common::assert_batches_eq;
+
+ use crate::collect;
+ use crate::expressions::col;
+ use crate::memory::MemoryExec;
+ use crate::sorts::sort::SortExec;
+ use crate::test;
+ use crate::test::assert_is_pending;
+ use crate::test::exec::{assert_strong_count_converges_to_zero,
BlockingExec};
+
+ use super::*;
+
+ #[tokio::test]
+ async fn test_partial_sort() -> Result<()> {
+ let task_ctx = Arc::new(TaskContext::default());
+ let source = test::build_table_scan_i32(
+ ("a", &vec![0, 0, 0, 1, 1, 1]),
+ ("b", &vec![1, 1, 2, 2, 3, 3]),
+ ("c", &vec![1, 0, 5, 4, 3, 2]),
+ );
+ let schema = Schema::new(vec![
+ Field::new("a", DataType::Int32, false),
+ Field::new("b", DataType::Int32, false),
+ Field::new("c", DataType::Int32, false),
+ ]);
+ let option_asc = SortOptions {
+ descending: false,
+ nulls_first: false,
+ };
+
+ let partial_sort_exec = Arc::new(
+ PartialSortExec::new(
+ vec![
+ PhysicalSortExpr {
+ expr: col("a", &schema)?,
+ options: option_asc,
+ },
+ PhysicalSortExpr {
+ expr: col("b", &schema)?,
+ options: option_asc,
+ },
+ PhysicalSortExpr {
+ expr: col("c", &schema)?,
+ options: option_asc,
+ },
+ ],
+ source.clone(),
+ )
+ .with_common_prefix_length(2),
+ ) as Arc<dyn ExecutionPlan>;
+
+ let result = collect(partial_sort_exec, task_ctx.clone()).await?;
+
+ let expected_after_sort = [
+ "+---+---+---+",
+ "| a | b | c |",
+ "+---+---+---+",
+ "| 0 | 1 | 0 |",
+ "| 0 | 1 | 1 |",
+ "| 0 | 2 | 5 |",
+ "| 1 | 2 | 4 |",
+ "| 1 | 3 | 2 |",
+ "| 1 | 3 | 3 |",
+ "+---+---+---+",
+ ];
+ assert_eq!(2, result.len());
+ assert_batches_eq!(expected_after_sort, &result);
+ assert_eq!(
+ task_ctx.runtime_env().memory_pool.reserved(),
+ 0,
+ "The sort should have returned all memory used back to the memory
manager"
+ );
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_partial_sort_with_fetch() -> Result<()> {
+ let task_ctx = Arc::new(TaskContext::default());
+ let source = test::build_table_scan_i32(
+ ("a", &vec![0, 0, 1, 1, 1]),
+ ("b", &vec![1, 2, 2, 3, 3]),
+ ("c", &vec![0, 1, 2, 3, 4]),
+ );
+ let schema = Schema::new(vec![
+ Field::new("a", DataType::Int32, false),
+ Field::new("b", DataType::Int32, false),
+ Field::new("c", DataType::Int32, false),
+ ]);
+ let option_asc = SortOptions {
+ descending: false,
+ nulls_first: false,
+ };
+
+ let partial_sort_exec = Arc::new(
+ PartialSortExec::new(
+ vec![
+ PhysicalSortExpr {
+ expr: col("a", &schema)?,
+ options: option_asc,
+ },
+ PhysicalSortExpr {
+ expr: col("b", &schema)?,
+ options: option_asc,
+ },
+ PhysicalSortExpr {
+ expr: col("c", &schema)?,
+ options: option_asc,
+ },
+ ],
+ source.clone(),
+ )
+ .with_common_prefix_length(2)
+ .with_fetch(Some(4)),
+ ) as Arc<dyn ExecutionPlan>;
+
+ let result = collect(partial_sort_exec, task_ctx.clone()).await?;
+
+ let expected_after_sort = [
+ "+---+---+---+",
+ "| a | b | c |",
+ "+---+---+---+",
+ "| 0 | 1 | 0 |",
+ "| 0 | 2 | 1 |",
+ "| 1 | 2 | 2 |",
+ "| 1 | 3 | 3 |",
+ "+---+---+---+",
+ ];
+ assert_eq!(2, result.len());
+ assert_batches_eq!(expected_after_sort, &result);
+ assert_eq!(
+ task_ctx.runtime_env().memory_pool.reserved(),
+ 0,
+ "The sort should have returned all memory used back to the memory
manager"
+ );
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_partial_sort2() -> Result<()> {
+ let task_ctx = Arc::new(TaskContext::default());
+ let source = test::build_table_scan_i32(
+ ("a", &vec![0, 0, 0, 0, 1, 1, 1, 1]),
+ ("b", &vec![1, 1, 3, 3, 4, 4, 2, 2]),
+ ("c", &vec![0, 1, 2, 3, 4, 5, 6, 7]),
Review Comment:
I recommend having c not be sorted to start
##########
datafusion/physical-plan/src/sorts/partial_sort.rs:
##########
@@ -0,0 +1,992 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Partial Sort deals with input data that partially
+//! satisfies the required sort order. Such an input data can be
+//! partitioned into segments where each segment already has the
+//! required information for lexicographic sorting so sorting
+//! can be done without loading the entire dataset.
+//!
+//! Consider a sort plan having an input with ordering `a ASC, b ASC`
+//!
+//! ```text
+//! +---+---+---+
+//! | a | b | d |
+//! +---+---+---+
+//! | 0 | 0 | 3 |
+//! | 0 | 0 | 2 |
+//! | 0 | 1 | 1 |
+//! | 0 | 2 | 0 |
+//! +---+---+---+
+//!```
+//!
+//! and required ordering for the plan is `a ASC, b ASC, d ASC`.
+//! The first 3 rows(segment) can be sorted as the segment already
+//! has the required information for the sort, but the last row
+//! requires further information as the input can continue with a
+//! batch with a starting row where a and b does not change as below
+//!
+//! ```text
+//! +---+---+---+
+//! | a | b | d |
+//! +---+---+---+
+//! | 0 | 2 | 4 |
+//! +---+---+---+
+//!```
+//!
+//! The plan concats incoming data with such last rows of previous input
+//! and continues partial sorting of the segments.
+
+use std::any::Any;
+use std::fmt::Debug;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::compute::concat_batches;
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use futures::{ready, Stream, StreamExt};
+use log::trace;
+
+use datafusion_common::utils::evaluate_partition_ranges;
+use datafusion_common::Result;
+use datafusion_execution::{RecordBatchStream, TaskContext};
+use datafusion_physical_expr::EquivalenceProperties;
+
+use crate::expressions::PhysicalSortExpr;
+use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
+use crate::sorts::sort::sort_batch;
+use crate::{
+ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+ SendableRecordBatchStream, Statistics,
+};
+
+/// Partial Sort execution plan.
+#[derive(Debug, Clone)]
+pub struct PartialSortExec {
+ /// Input schema
+ pub(crate) input: Arc<dyn ExecutionPlan>,
+ /// Sort expressions
+ expr: Vec<PhysicalSortExpr>,
+ /// Length of continuous matching columns of input that satisfy
+ /// the required ordering for the sort
+ common_prefix_length: usize,
+ /// Containing all metrics set created during sort
+ metrics_set: ExecutionPlanMetricsSet,
+ /// Preserve partitions of input plan. If false, the input partitions
+ /// will be sorted and merged into a single output partition.
+ preserve_partitioning: bool,
+ /// Fetch highest/lowest n results
+ fetch: Option<usize>,
+}
+
+impl PartialSortExec {
+ /// Create a new partial sort execution plan
+ pub fn new(expr: Vec<PhysicalSortExpr>, input: Arc<dyn ExecutionPlan>) ->
Self {
+ Self {
+ input,
+ expr,
+ common_prefix_length: 0,
+ metrics_set: ExecutionPlanMetricsSet::new(),
+ preserve_partitioning: false,
+ fetch: None,
+ }
+ }
+
+ /// Size of first set of continuous common columns satisfied by input and
required by the plan
+ pub fn with_common_prefix_length(mut self, common_prefix_length: usize) ->
Self {
+ self.common_prefix_length = common_prefix_length;
+ self
+ }
+
+ /// Whether this `PartialSortExec` preserves partitioning of the children
+ pub fn preserve_partitioning(&self) -> bool {
+ self.preserve_partitioning
+ }
+
+ /// Specify the partitioning behavior of this partial sort exec
+ ///
+ /// If `preserve_partitioning` is true, sorts each partition
+ /// individually, producing one sorted stream for each input partition.
+ ///
+ /// If `preserve_partitioning` is false, sorts and merges all
+ /// input partitions producing a single, sorted partition.
+ pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool)
-> Self {
+ self.preserve_partitioning = preserve_partitioning;
+ self
+ }
+
+ /// Modify how many rows to include in the result
+ ///
+ /// If None, then all rows will be returned, in sorted order.
+ /// If Some, then only the top `fetch` rows will be returned.
+ /// This can reduce the memory pressure required by the sort
+ /// operation since rows that are not going to be included
+ /// can be dropped.
+ pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
+ self.fetch = fetch;
+ self
+ }
+
+ /// Input schema
+ pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+ &self.input
+ }
+
+ /// Sort expressions
+ pub fn expr(&self) -> &[PhysicalSortExpr] {
+ &self.expr
+ }
+
+ /// If `Some(fetch)`, limits output to only the first "fetch" items
+ pub fn fetch(&self) -> Option<usize> {
+ self.fetch
+ }
+}
+
+impl DisplayAs for PartialSortExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ let expr = PhysicalSortExpr::format_list(&self.expr);
+ let common_prefix_length = self.common_prefix_length;
+ match self.fetch {
+ Some(fetch) => {
+ write!(f, "PartialSortExec: TopK(fetch={fetch}),
expr=[{expr}], common_prefix_length=[{common_prefix_length}]", )
+ }
+ None => write!(f, "PartialSortExec: expr=[{expr}],
common_prefix_length=[{common_prefix_length}]"),
+ }
+ }
+ }
+ }
+}
+
+impl ExecutionPlan for PartialSortExec {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.input.schema()
+ }
+
+ /// Get the output partitioning of this plan
+ fn output_partitioning(&self) -> Partitioning {
+ if self.preserve_partitioning {
+ self.input.output_partitioning()
+ } else {
+ Partitioning::UnknownPartitioning(1)
+ }
+ }
+
+ /// Specifies whether this plan generates an infinite stream of records.
+ /// If the plan does not support pipelining, but its input(s) are
+ /// infinite, returns an error to indicate this.
+ fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+ Ok(children[0])
+ }
+
+ fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+ Some(&self.expr)
+ }
+
+ fn required_input_distribution(&self) -> Vec<Distribution> {
+ if self.preserve_partitioning {
+ vec![Distribution::UnspecifiedDistribution]
+ } else {
+ vec![Distribution::SinglePartition]
+ }
+ }
+
+ fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+ vec![false]
+ }
+
+ fn equivalence_properties(&self) -> EquivalenceProperties {
+ // Reset the ordering equivalence class with the new ordering:
+ self.input
+ .equivalence_properties()
+ .with_reorder(self.expr.to_vec())
+ }
+
+ fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ vec![self.input.clone()]
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let new_partial_sort =
+ PartialSortExec::new(self.expr.clone(), children[0].clone())
+ .with_fetch(self.fetch)
+ .with_preserve_partitioning(self.preserve_partitioning);
+
+ Ok(Arc::new(new_partial_sort))
+ }
+
+ fn execute(
+ &self,
+ partition: usize,
+ context: Arc<TaskContext>,
+ ) -> Result<SendableRecordBatchStream> {
+ trace!("Start PartialSortExec::execute for partition {} of context
session_id {} and task_id {:?}", partition, context.session_id(),
context.task_id());
+
+ let input = self.input.execute(partition, context.clone())?;
+
+ trace!(
+ "End PartialSortExec's input.execute for partition: {}",
+ partition
+ );
+
+ // Make sure common prefix length is larger than 0
+ // Otherwise, we should use SortExec.
+ assert!(self.common_prefix_length > 0);
+
+ Ok(Box::pin(PartialSortStream {
+ input,
+ expr: self.expr.clone(),
+ common_prefix_length: self.common_prefix_length,
+ input_batch: RecordBatch::new_empty(self.schema().clone()),
+ fetch: self.fetch,
+ is_closed: false,
+ baseline_metrics: BaselineMetrics::new(&self.metrics_set,
partition),
+ }))
+ }
+
+ fn metrics(&self) -> Option<MetricsSet> {
+ Some(self.metrics_set.clone_inner())
+ }
+
+ fn statistics(&self) -> Result<Statistics> {
+ self.input.statistics()
+ }
+}
+
+struct PartialSortStream {
+ /// The input plan
+ input: SendableRecordBatchStream,
+ /// Sort expressions
+ expr: Vec<PhysicalSortExpr>,
+ /// Length of
+ common_prefix_length: usize,
+ /// Used as a buffer for part of the input not ready for sort
+ input_batch: RecordBatch,
+ /// Fetch top N results
+ fetch: Option<usize>,
+ /// Whether the stream has finished returning all of its data or not
+ is_closed: bool,
+ /// Execution metrics
+ baseline_metrics: BaselineMetrics,
+}
+
+impl Stream for PartialSortStream {
+ type Item = Result<RecordBatch>;
+
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ let poll = self.poll_next_inner(cx);
+ self.baseline_metrics.record_poll(poll)
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ // we can't predict the size of incoming batches so re-use the size
hint from the input
+ self.input.size_hint()
+ }
+}
+
+impl RecordBatchStream for PartialSortStream {
+ fn schema(&self) -> SchemaRef {
+ self.input.schema()
+ }
+}
+
+impl PartialSortStream {
+ fn poll_next_inner(
+ self: &mut Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Result<RecordBatch>>> {
+ if self.is_closed {
+ assert_eq!(0, self.input_batch.num_rows());
+ return Poll::Ready(None);
+ }
+ let result = match ready!(self.input.poll_next_unpin(cx)) {
+ Some(Ok(batch)) => {
+ self.input_batch =
+ concat_batches(&self.schema(), [&self.input_batch,
&batch])?;
+ if let Some(slice_point) =
+ self.get_slice_point(self.common_prefix_length,
&self.input_batch)?
+ {
+ self.emit(slice_point)
+ } else {
+ Ok(RecordBatch::new_empty(self.schema()))
+ }
+ }
+ Some(Err(e)) => Err(e),
+ None => {
+ self.is_closed = true;
+ // once input is consumed, sort the rest of the inserted
batches in input_batch
+ self.emit(self.input_batch.num_rows())
+ }
+ };
+
+ Poll::Ready(Some(result))
+ }
+
+ /// Sort input_batch upto the slice_point and emit the sorted batch
+ ///
+ /// If fetch is specified for PartialSortStream `emit` will limit
+ /// the last RecordBatch returned and will mark the stream as closed
+ fn emit(self: &mut Pin<&mut Self>, slice_point: usize) ->
Result<RecordBatch> {
+ let original_num_rows = self.input_batch.num_rows();
+ let segment = self.input_batch.slice(0, slice_point);
+ self.input_batch = self
+ .input_batch
+ .slice(slice_point, self.input_batch.num_rows() - slice_point);
+
+ let result = sort_batch(&segment, &self.expr, self.fetch)?;
+ if let Some(remaining_fetch) = self.fetch {
+ self.fetch = Some(remaining_fetch - result.num_rows());
+ if remaining_fetch == result.num_rows() {
+ self.input_batch = RecordBatch::new_empty(self.schema());
+ self.is_closed = true;
+ }
+ } else {
+ assert_eq!(
+ original_num_rows,
+ result.num_rows() + self.input_batch.num_rows()
+ );
+ }
+ Ok(result)
+ }
+
+ /// Return the end index of the second last partition if the batch
+ /// can be partitioned based on its already sorted columns
+ ///
+ /// Return None if the batch cannot be partitioned, which means the
+ /// batch does not have the information for a safe sort
+ fn get_slice_point(
+ &self,
+ common_prefix_len: usize,
+ batch: &RecordBatch,
+ ) -> Result<Option<usize>> {
+ let common_prefix_sort_keys = (0..common_prefix_len)
+ .map(|idx| self.expr[idx].evaluate_to_sort_column(batch))
+ .collect::<Result<Vec<_>>>()?;
+ let partition_points =
+ evaluate_partition_ranges(batch.num_rows(),
&common_prefix_sort_keys)?;
+ // If partition points are [0..100], [100..200], [200..300]
+ // we should return 200, which is the safest and furthest partition
boundary
+ // Please note that we shouldn't return 300 (which is number of rows
in the batch),
+ // because this boundary may change with new data.
+ if partition_points.len() >= 2 {
+ Ok(Some(partition_points[partition_points.len() - 2].end))
+ } else {
+ Ok(None)
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::collections::HashMap;
+
+ use arrow::array::*;
+ use arrow::compute::SortOptions;
+ use arrow::datatypes::*;
+ use futures::FutureExt;
+ use itertools::Itertools;
+
+ use datafusion_common::assert_batches_eq;
+
+ use crate::collect;
+ use crate::expressions::col;
+ use crate::memory::MemoryExec;
+ use crate::sorts::sort::SortExec;
+ use crate::test;
+ use crate::test::assert_is_pending;
+ use crate::test::exec::{assert_strong_count_converges_to_zero,
BlockingExec};
+
+ use super::*;
+
+ #[tokio::test]
+ async fn test_partial_sort() -> Result<()> {
+ let task_ctx = Arc::new(TaskContext::default());
+ let source = test::build_table_scan_i32(
+ ("a", &vec![0, 0, 0, 1, 1, 1]),
+ ("b", &vec![1, 1, 2, 2, 3, 3]),
+ ("c", &vec![1, 0, 5, 4, 3, 2]),
+ );
+ let schema = Schema::new(vec![
+ Field::new("a", DataType::Int32, false),
+ Field::new("b", DataType::Int32, false),
+ Field::new("c", DataType::Int32, false),
+ ]);
+ let option_asc = SortOptions {
+ descending: false,
+ nulls_first: false,
+ };
+
+ let partial_sort_exec = Arc::new(
+ PartialSortExec::new(
+ vec![
+ PhysicalSortExpr {
+ expr: col("a", &schema)?,
+ options: option_asc,
+ },
+ PhysicalSortExpr {
+ expr: col("b", &schema)?,
+ options: option_asc,
+ },
+ PhysicalSortExpr {
+ expr: col("c", &schema)?,
+ options: option_asc,
+ },
+ ],
+ source.clone(),
+ )
+ .with_common_prefix_length(2),
+ ) as Arc<dyn ExecutionPlan>;
+
+ let result = collect(partial_sort_exec, task_ctx.clone()).await?;
+
+ let expected_after_sort = [
+ "+---+---+---+",
+ "| a | b | c |",
+ "+---+---+---+",
+ "| 0 | 1 | 0 |",
+ "| 0 | 1 | 1 |",
+ "| 0 | 2 | 5 |",
+ "| 1 | 2 | 4 |",
+ "| 1 | 3 | 2 |",
+ "| 1 | 3 | 3 |",
+ "+---+---+---+",
+ ];
+ assert_eq!(2, result.len());
+ assert_batches_eq!(expected_after_sort, &result);
+ assert_eq!(
+ task_ctx.runtime_env().memory_pool.reserved(),
+ 0,
+ "The sort should have returned all memory used back to the memory
manager"
+ );
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_partial_sort_with_fetch() -> Result<()> {
+ let task_ctx = Arc::new(TaskContext::default());
+ let source = test::build_table_scan_i32(
+ ("a", &vec![0, 0, 1, 1, 1]),
+ ("b", &vec![1, 2, 2, 3, 3]),
+ ("c", &vec![0, 1, 2, 3, 4]),
+ );
+ let schema = Schema::new(vec![
+ Field::new("a", DataType::Int32, false),
+ Field::new("b", DataType::Int32, false),
+ Field::new("c", DataType::Int32, false),
+ ]);
+ let option_asc = SortOptions {
+ descending: false,
+ nulls_first: false,
+ };
+
+ let partial_sort_exec = Arc::new(
+ PartialSortExec::new(
+ vec![
+ PhysicalSortExpr {
+ expr: col("a", &schema)?,
+ options: option_asc,
+ },
+ PhysicalSortExpr {
+ expr: col("b", &schema)?,
+ options: option_asc,
+ },
+ PhysicalSortExpr {
+ expr: col("c", &schema)?,
+ options: option_asc,
+ },
+ ],
+ source.clone(),
+ )
+ .with_common_prefix_length(2)
+ .with_fetch(Some(4)),
+ ) as Arc<dyn ExecutionPlan>;
+
+ let result = collect(partial_sort_exec, task_ctx.clone()).await?;
+
+ let expected_after_sort = [
+ "+---+---+---+",
+ "| a | b | c |",
+ "+---+---+---+",
+ "| 0 | 1 | 0 |",
+ "| 0 | 2 | 1 |",
+ "| 1 | 2 | 2 |",
+ "| 1 | 3 | 3 |",
+ "+---+---+---+",
+ ];
+ assert_eq!(2, result.len());
+ assert_batches_eq!(expected_after_sort, &result);
+ assert_eq!(
+ task_ctx.runtime_env().memory_pool.reserved(),
+ 0,
+ "The sort should have returned all memory used back to the memory
manager"
+ );
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_partial_sort2() -> Result<()> {
+ let task_ctx = Arc::new(TaskContext::default());
+ let source = test::build_table_scan_i32(
+ ("a", &vec![0, 0, 0, 0, 1, 1, 1, 1]),
+ ("b", &vec![1, 1, 3, 3, 4, 4, 2, 2]),
+ ("c", &vec![0, 1, 2, 3, 4, 5, 6, 7]),
+ );
+ let schema = Schema::new(vec![
+ Field::new("a", DataType::Int32, false),
+ Field::new("b", DataType::Int32, false),
+ Field::new("c", DataType::Int32, false),
+ ]);
+ let option_asc = SortOptions {
+ descending: false,
+ nulls_first: false,
+ };
+
+ let partial_sort_exec = Arc::new(
+ PartialSortExec::new(
+ vec![
+ PhysicalSortExpr {
+ expr: col("a", &schema)?,
+ options: option_asc,
+ },
+ PhysicalSortExpr {
+ expr: col("b", &schema)?,
+ options: option_asc,
+ },
+ PhysicalSortExpr {
+ expr: col("c", &schema)?,
+ options: option_asc,
+ },
+ ],
+ source,
+ )
+ .with_common_prefix_length(1),
Review Comment:
likewise here it would be nice to also check with length 2
##########
datafusion/physical-plan/src/sorts/partial_sort.rs:
##########
@@ -0,0 +1,992 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Partial Sort deals with input data that partially
+//! satisfies the required sort order. Such an input data can be
+//! partitioned into segments where each segment already has the
+//! required information for lexicographic sorting so sorting
+//! can be done without loading the entire dataset.
+//!
+//! Consider a sort plan having an input with ordering `a ASC, b ASC`
+//!
+//! ```text
+//! +---+---+---+
+//! | a | b | d |
+//! +---+---+---+
+//! | 0 | 0 | 3 |
+//! | 0 | 0 | 2 |
+//! | 0 | 1 | 1 |
+//! | 0 | 2 | 0 |
+//! +---+---+---+
+//!```
+//!
+//! and required ordering for the plan is `a ASC, b ASC, d ASC`.
+//! The first 3 rows(segment) can be sorted as the segment already
+//! has the required information for the sort, but the last row
+//! requires further information as the input can continue with a
+//! batch with a starting row where a and b does not change as below
+//!
+//! ```text
+//! +---+---+---+
+//! | a | b | d |
+//! +---+---+---+
+//! | 0 | 2 | 4 |
+//! +---+---+---+
+//!```
+//!
+//! The plan concats incoming data with such last rows of previous input
+//! and continues partial sorting of the segments.
+
+use std::any::Any;
+use std::fmt::Debug;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::compute::concat_batches;
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use futures::{ready, Stream, StreamExt};
+use log::trace;
+
+use datafusion_common::utils::evaluate_partition_ranges;
+use datafusion_common::Result;
+use datafusion_execution::{RecordBatchStream, TaskContext};
+use datafusion_physical_expr::EquivalenceProperties;
+
+use crate::expressions::PhysicalSortExpr;
+use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
+use crate::sorts::sort::sort_batch;
+use crate::{
+ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+ SendableRecordBatchStream, Statistics,
+};
+
+/// Partial Sort execution plan.
+#[derive(Debug, Clone)]
+pub struct PartialSortExec {
+ /// Input schema
+ pub(crate) input: Arc<dyn ExecutionPlan>,
+ /// Sort expressions
+ expr: Vec<PhysicalSortExpr>,
+ /// Length of continuous matching columns of input that satisfy
+ /// the required ordering for the sort
+ common_prefix_length: usize,
+ /// Containing all metrics set created during sort
+ metrics_set: ExecutionPlanMetricsSet,
+ /// Preserve partitions of input plan. If false, the input partitions
+ /// will be sorted and merged into a single output partition.
+ preserve_partitioning: bool,
+ /// Fetch highest/lowest n results
+ fetch: Option<usize>,
+}
+
+impl PartialSortExec {
+ /// Create a new partial sort execution plan
+ pub fn new(expr: Vec<PhysicalSortExpr>, input: Arc<dyn ExecutionPlan>) ->
Self {
+ Self {
+ input,
+ expr,
+ common_prefix_length: 0,
+ metrics_set: ExecutionPlanMetricsSet::new(),
+ preserve_partitioning: false,
+ fetch: None,
+ }
+ }
+
+ /// Size of first set of continuous common columns satisfied by input and
required by the plan
+ pub fn with_common_prefix_length(mut self, common_prefix_length: usize) ->
Self {
+ self.common_prefix_length = common_prefix_length;
Review Comment:
should we assert here `common_prefix_length` is greater than zero?
##########
datafusion/physical-plan/src/sorts/partial_sort.rs:
##########
@@ -0,0 +1,992 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Partial Sort deals with input data that partially
+//! satisfies the required sort order. Such an input data can be
+//! partitioned into segments where each segment already has the
+//! required information for lexicographic sorting so sorting
+//! can be done without loading the entire dataset.
+//!
+//! Consider a sort plan having an input with ordering `a ASC, b ASC`
+//!
+//! ```text
+//! +---+---+---+
+//! | a | b | d |
+//! +---+---+---+
+//! | 0 | 0 | 3 |
+//! | 0 | 0 | 2 |
+//! | 0 | 1 | 1 |
+//! | 0 | 2 | 0 |
+//! +---+---+---+
+//!```
+//!
+//! and required ordering for the plan is `a ASC, b ASC, d ASC`.
+//! The first 3 rows(segment) can be sorted as the segment already
+//! has the required information for the sort, but the last row
+//! requires further information as the input can continue with a
+//! batch with a starting row where a and b does not change as below
+//!
+//! ```text
+//! +---+---+---+
+//! | a | b | d |
+//! +---+---+---+
+//! | 0 | 2 | 4 |
+//! +---+---+---+
+//!```
+//!
+//! The plan concats incoming data with such last rows of previous input
+//! and continues partial sorting of the segments.
+
+use std::any::Any;
+use std::fmt::Debug;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::compute::concat_batches;
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use futures::{ready, Stream, StreamExt};
+use log::trace;
+
+use datafusion_common::utils::evaluate_partition_ranges;
+use datafusion_common::Result;
+use datafusion_execution::{RecordBatchStream, TaskContext};
+use datafusion_physical_expr::EquivalenceProperties;
+
+use crate::expressions::PhysicalSortExpr;
+use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
+use crate::sorts::sort::sort_batch;
+use crate::{
+ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+ SendableRecordBatchStream, Statistics,
+};
+
+/// Partial Sort execution plan.
+#[derive(Debug, Clone)]
+pub struct PartialSortExec {
+ /// Input schema
+ pub(crate) input: Arc<dyn ExecutionPlan>,
+ /// Sort expressions
+ expr: Vec<PhysicalSortExpr>,
+ /// Length of continuous matching columns of input that satisfy
+ /// the required ordering for the sort
+ common_prefix_length: usize,
+ /// Containing all metrics set created during sort
+ metrics_set: ExecutionPlanMetricsSet,
+ /// Preserve partitions of input plan. If false, the input partitions
+ /// will be sorted and merged into a single output partition.
+ preserve_partitioning: bool,
+ /// Fetch highest/lowest n results
+ fetch: Option<usize>,
+}
+
+impl PartialSortExec {
+ /// Create a new partial sort execution plan
+ pub fn new(expr: Vec<PhysicalSortExpr>, input: Arc<dyn ExecutionPlan>) ->
Self {
+ Self {
+ input,
+ expr,
+ common_prefix_length: 0,
+ metrics_set: ExecutionPlanMetricsSet::new(),
+ preserve_partitioning: false,
+ fetch: None,
+ }
+ }
+
+ /// Size of first set of continuous common columns satisfied by input and
required by the plan
+ pub fn with_common_prefix_length(mut self, common_prefix_length: usize) ->
Self {
+ self.common_prefix_length = common_prefix_length;
+ self
+ }
+
+ /// Whether this `PartialSortExec` preserves partitioning of the children
+ pub fn preserve_partitioning(&self) -> bool {
+ self.preserve_partitioning
+ }
+
+ /// Specify the partitioning behavior of this partial sort exec
+ ///
+ /// If `preserve_partitioning` is true, sorts each partition
+ /// individually, producing one sorted stream for each input partition.
+ ///
+ /// If `preserve_partitioning` is false, sorts and merges all
+ /// input partitions producing a single, sorted partition.
+ pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool)
-> Self {
+ self.preserve_partitioning = preserve_partitioning;
+ self
+ }
+
+ /// Modify how many rows to include in the result
+ ///
+ /// If None, then all rows will be returned, in sorted order.
+ /// If Some, then only the top `fetch` rows will be returned.
+ /// This can reduce the memory pressure required by the sort
+ /// operation since rows that are not going to be included
+ /// can be dropped.
+ pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
+ self.fetch = fetch;
+ self
+ }
+
+ /// Input schema
+ pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+ &self.input
+ }
+
+ /// Sort expressions
+ pub fn expr(&self) -> &[PhysicalSortExpr] {
+ &self.expr
+ }
+
+ /// If `Some(fetch)`, limits output to only the first "fetch" items
+ pub fn fetch(&self) -> Option<usize> {
+ self.fetch
+ }
+}
+
+impl DisplayAs for PartialSortExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ let expr = PhysicalSortExpr::format_list(&self.expr);
+ let common_prefix_length = self.common_prefix_length;
+ match self.fetch {
+ Some(fetch) => {
+ write!(f, "PartialSortExec: TopK(fetch={fetch}),
expr=[{expr}], common_prefix_length=[{common_prefix_length}]", )
+ }
+ None => write!(f, "PartialSortExec: expr=[{expr}],
common_prefix_length=[{common_prefix_length}]"),
+ }
+ }
+ }
+ }
+}
+
+impl ExecutionPlan for PartialSortExec {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.input.schema()
+ }
+
+ /// Get the output partitioning of this plan
+ fn output_partitioning(&self) -> Partitioning {
+ if self.preserve_partitioning {
+ self.input.output_partitioning()
+ } else {
+ Partitioning::UnknownPartitioning(1)
+ }
+ }
+
+ /// Specifies whether this plan generates an infinite stream of records.
+ /// If the plan does not support pipelining, but its input(s) are
+ /// infinite, returns an error to indicate this.
+ fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+ Ok(children[0])
+ }
+
+ fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+ Some(&self.expr)
+ }
+
+ fn required_input_distribution(&self) -> Vec<Distribution> {
+ if self.preserve_partitioning {
+ vec![Distribution::UnspecifiedDistribution]
+ } else {
+ vec![Distribution::SinglePartition]
+ }
+ }
+
+ fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+ vec![false]
+ }
+
+ fn equivalence_properties(&self) -> EquivalenceProperties {
+ // Reset the ordering equivalence class with the new ordering:
+ self.input
+ .equivalence_properties()
+ .with_reorder(self.expr.to_vec())
+ }
+
+ fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ vec![self.input.clone()]
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let new_partial_sort =
+ PartialSortExec::new(self.expr.clone(), children[0].clone())
+ .with_fetch(self.fetch)
+ .with_preserve_partitioning(self.preserve_partitioning);
+
+ Ok(Arc::new(new_partial_sort))
+ }
+
+ fn execute(
+ &self,
+ partition: usize,
+ context: Arc<TaskContext>,
+ ) -> Result<SendableRecordBatchStream> {
+ trace!("Start PartialSortExec::execute for partition {} of context
session_id {} and task_id {:?}", partition, context.session_id(),
context.task_id());
+
+ let input = self.input.execute(partition, context.clone())?;
+
+ trace!(
+ "End PartialSortExec's input.execute for partition: {}",
+ partition
+ );
+
+ // Make sure common prefix length is larger than 0
+ // Otherwise, we should use SortExec.
+ assert!(self.common_prefix_length > 0);
+
+ Ok(Box::pin(PartialSortStream {
+ input,
+ expr: self.expr.clone(),
+ common_prefix_length: self.common_prefix_length,
+ input_batch: RecordBatch::new_empty(self.schema().clone()),
+ fetch: self.fetch,
+ is_closed: false,
+ baseline_metrics: BaselineMetrics::new(&self.metrics_set,
partition),
+ }))
+ }
+
+ fn metrics(&self) -> Option<MetricsSet> {
+ Some(self.metrics_set.clone_inner())
+ }
+
+ fn statistics(&self) -> Result<Statistics> {
+ self.input.statistics()
+ }
+}
+
+struct PartialSortStream {
+ /// The input plan
+ input: SendableRecordBatchStream,
+ /// Sort expressions
+ expr: Vec<PhysicalSortExpr>,
+ /// Length of
+ common_prefix_length: usize,
+ /// Used as a buffer for part of the input not ready for sort
+ input_batch: RecordBatch,
+ /// Fetch top N results
+ fetch: Option<usize>,
+ /// Whether the stream has finished returning all of its data or not
+ is_closed: bool,
+ /// Execution metrics
+ baseline_metrics: BaselineMetrics,
+}
+
+impl Stream for PartialSortStream {
+ type Item = Result<RecordBatch>;
+
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ let poll = self.poll_next_inner(cx);
+ self.baseline_metrics.record_poll(poll)
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ // we can't predict the size of incoming batches so re-use the size
hint from the input
+ self.input.size_hint()
+ }
+}
+
+impl RecordBatchStream for PartialSortStream {
+ fn schema(&self) -> SchemaRef {
+ self.input.schema()
+ }
+}
+
+impl PartialSortStream {
+ fn poll_next_inner(
+ self: &mut Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Result<RecordBatch>>> {
+ if self.is_closed {
+ assert_eq!(0, self.input_batch.num_rows());
+ return Poll::Ready(None);
+ }
+ let result = match ready!(self.input.poll_next_unpin(cx)) {
+ Some(Ok(batch)) => {
+ self.input_batch =
+ concat_batches(&self.schema(), [&self.input_batch,
&batch])?;
+ if let Some(slice_point) =
+ self.get_slice_point(self.common_prefix_length,
&self.input_batch)?
+ {
+ self.emit(slice_point)
+ } else {
+ Ok(RecordBatch::new_empty(self.schema()))
+ }
+ }
+ Some(Err(e)) => Err(e),
+ None => {
+ self.is_closed = true;
+ // once input is consumed, sort the rest of the inserted
batches in input_batch
+ self.emit(self.input_batch.num_rows())
+ }
+ };
+
+ Poll::Ready(Some(result))
+ }
+
+ /// Sort input_batch upto the slice_point and emit the sorted batch
+ ///
+ /// If fetch is specified for PartialSortStream `emit` will limit
+ /// the last RecordBatch returned and will mark the stream as closed
+ fn emit(self: &mut Pin<&mut Self>, slice_point: usize) ->
Result<RecordBatch> {
+ let original_num_rows = self.input_batch.num_rows();
+ let segment = self.input_batch.slice(0, slice_point);
+ self.input_batch = self
+ .input_batch
+ .slice(slice_point, self.input_batch.num_rows() - slice_point);
+
+ let result = sort_batch(&segment, &self.expr, self.fetch)?;
Review Comment:
I was thinking you could potentially make the sorting more efficient here by
using only the last `len - prefix` keys (so there are fewer required
comparisons), but that will only work when there is a single partition. Perhaps
that could be a follow on optimization / special case as well
##########
datafusion/physical-plan/src/sorts/partial_sort.rs:
##########
@@ -0,0 +1,992 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Partial Sort deals with input data that partially
+//! satisfies the required sort order. Such an input data can be
+//! partitioned into segments where each segment already has the
+//! required information for lexicographic sorting so sorting
+//! can be done without loading the entire dataset.
+//!
+//! Consider a sort plan having an input with ordering `a ASC, b ASC`
+//!
+//! ```text
+//! +---+---+---+
+//! | a | b | d |
+//! +---+---+---+
+//! | 0 | 0 | 3 |
+//! | 0 | 0 | 2 |
+//! | 0 | 1 | 1 |
+//! | 0 | 2 | 0 |
+//! +---+---+---+
+//!```
+//!
+//! and required ordering for the plan is `a ASC, b ASC, d ASC`.
+//! The first 3 rows(segment) can be sorted as the segment already
+//! has the required information for the sort, but the last row
+//! requires further information as the input can continue with a
+//! batch with a starting row where a and b does not change as below
+//!
+//! ```text
+//! +---+---+---+
+//! | a | b | d |
+//! +---+---+---+
+//! | 0 | 2 | 4 |
+//! +---+---+---+
+//!```
+//!
+//! The plan concats incoming data with such last rows of previous input
+//! and continues partial sorting of the segments.
+
+use std::any::Any;
+use std::fmt::Debug;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::compute::concat_batches;
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use futures::{ready, Stream, StreamExt};
+use log::trace;
+
+use datafusion_common::utils::evaluate_partition_ranges;
+use datafusion_common::Result;
+use datafusion_execution::{RecordBatchStream, TaskContext};
+use datafusion_physical_expr::EquivalenceProperties;
+
+use crate::expressions::PhysicalSortExpr;
+use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
+use crate::sorts::sort::sort_batch;
+use crate::{
+ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+ SendableRecordBatchStream, Statistics,
+};
+
+/// Partial Sort execution plan.
+#[derive(Debug, Clone)]
+pub struct PartialSortExec {
+ /// Input schema
+ pub(crate) input: Arc<dyn ExecutionPlan>,
+ /// Sort expressions
+ expr: Vec<PhysicalSortExpr>,
+ /// Length of continuous matching columns of input that satisfy
+ /// the required ordering for the sort
+ common_prefix_length: usize,
+ /// Containing all metrics set created during sort
+ metrics_set: ExecutionPlanMetricsSet,
+ /// Preserve partitions of input plan. If false, the input partitions
+ /// will be sorted and merged into a single output partition.
+ preserve_partitioning: bool,
+ /// Fetch highest/lowest n results
+ fetch: Option<usize>,
+}
+
+impl PartialSortExec {
+ /// Create a new partial sort execution plan
+ pub fn new(expr: Vec<PhysicalSortExpr>, input: Arc<dyn ExecutionPlan>) ->
Self {
+ Self {
+ input,
+ expr,
+ common_prefix_length: 0,
+ metrics_set: ExecutionPlanMetricsSet::new(),
+ preserve_partitioning: false,
+ fetch: None,
+ }
+ }
+
+ /// Size of first set of continuous common columns satisfied by input and
required by the plan
+ pub fn with_common_prefix_length(mut self, common_prefix_length: usize) ->
Self {
+ self.common_prefix_length = common_prefix_length;
+ self
+ }
+
+ /// Whether this `PartialSortExec` preserves partitioning of the children
+ pub fn preserve_partitioning(&self) -> bool {
+ self.preserve_partitioning
+ }
+
+ /// Specify the partitioning behavior of this partial sort exec
+ ///
+ /// If `preserve_partitioning` is true, sorts each partition
+ /// individually, producing one sorted stream for each input partition.
+ ///
+ /// If `preserve_partitioning` is false, sorts and merges all
+ /// input partitions producing a single, sorted partition.
+ pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool)
-> Self {
+ self.preserve_partitioning = preserve_partitioning;
+ self
+ }
+
+ /// Modify how many rows to include in the result
+ ///
+ /// If None, then all rows will be returned, in sorted order.
+ /// If Some, then only the top `fetch` rows will be returned.
+ /// This can reduce the memory pressure required by the sort
+ /// operation since rows that are not going to be included
+ /// can be dropped.
+ pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
+ self.fetch = fetch;
+ self
+ }
+
+ /// Input schema
+ pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+ &self.input
+ }
+
+ /// Sort expressions
+ pub fn expr(&self) -> &[PhysicalSortExpr] {
+ &self.expr
+ }
+
+ /// If `Some(fetch)`, limits output to only the first "fetch" items
+ pub fn fetch(&self) -> Option<usize> {
+ self.fetch
+ }
+}
+
+impl DisplayAs for PartialSortExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ let expr = PhysicalSortExpr::format_list(&self.expr);
+ let common_prefix_length = self.common_prefix_length;
+ match self.fetch {
+ Some(fetch) => {
+ write!(f, "PartialSortExec: TopK(fetch={fetch}),
expr=[{expr}], common_prefix_length=[{common_prefix_length}]", )
+ }
+ None => write!(f, "PartialSortExec: expr=[{expr}],
common_prefix_length=[{common_prefix_length}]"),
+ }
+ }
+ }
+ }
+}
+
+impl ExecutionPlan for PartialSortExec {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.input.schema()
+ }
+
+ /// Get the output partitioning of this plan
+ fn output_partitioning(&self) -> Partitioning {
+ if self.preserve_partitioning {
+ self.input.output_partitioning()
+ } else {
+ Partitioning::UnknownPartitioning(1)
+ }
+ }
+
+ /// Specifies whether this plan generates an infinite stream of records.
+ /// If the plan does not support pipelining, but its input(s) are
+ /// infinite, returns an error to indicate this.
+ fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+ Ok(children[0])
+ }
+
+ fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+ Some(&self.expr)
+ }
+
+ fn required_input_distribution(&self) -> Vec<Distribution> {
+ if self.preserve_partitioning {
+ vec![Distribution::UnspecifiedDistribution]
+ } else {
+ vec![Distribution::SinglePartition]
+ }
+ }
+
+ fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+ vec![false]
+ }
+
+ fn equivalence_properties(&self) -> EquivalenceProperties {
+ // Reset the ordering equivalence class with the new ordering:
+ self.input
+ .equivalence_properties()
+ .with_reorder(self.expr.to_vec())
+ }
+
+ fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ vec![self.input.clone()]
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let new_partial_sort =
+ PartialSortExec::new(self.expr.clone(), children[0].clone())
+ .with_fetch(self.fetch)
+ .with_preserve_partitioning(self.preserve_partitioning);
+
+ Ok(Arc::new(new_partial_sort))
+ }
+
+ fn execute(
+ &self,
+ partition: usize,
+ context: Arc<TaskContext>,
+ ) -> Result<SendableRecordBatchStream> {
+ trace!("Start PartialSortExec::execute for partition {} of context
session_id {} and task_id {:?}", partition, context.session_id(),
context.task_id());
+
+ let input = self.input.execute(partition, context.clone())?;
+
+ trace!(
+ "End PartialSortExec's input.execute for partition: {}",
+ partition
+ );
+
+ // Make sure common prefix length is larger than 0
+ // Otherwise, we should use SortExec.
+ assert!(self.common_prefix_length > 0);
+
+ Ok(Box::pin(PartialSortStream {
+ input,
+ expr: self.expr.clone(),
+ common_prefix_length: self.common_prefix_length,
+ input_batch: RecordBatch::new_empty(self.schema().clone()),
+ fetch: self.fetch,
+ is_closed: false,
+ baseline_metrics: BaselineMetrics::new(&self.metrics_set,
partition),
+ }))
+ }
+
+ fn metrics(&self) -> Option<MetricsSet> {
+ Some(self.metrics_set.clone_inner())
+ }
+
+ fn statistics(&self) -> Result<Statistics> {
+ self.input.statistics()
+ }
+}
+
+struct PartialSortStream {
+ /// The input plan
+ input: SendableRecordBatchStream,
+ /// Sort expressions
+ expr: Vec<PhysicalSortExpr>,
+ /// Length of
Review Comment:
this comment appears to be cut off
##########
datafusion/physical-plan/src/sorts/partial_sort.rs:
##########
@@ -0,0 +1,992 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Partial Sort deals with input data that partially
+//! satisfies the required sort order. Such an input data can be
+//! partitioned into segments where each segment already has the
+//! required information for lexicographic sorting so sorting
+//! can be done without loading the entire dataset.
+//!
+//! Consider a sort plan having an input with ordering `a ASC, b ASC`
+//!
+//! ```text
+//! +---+---+---+
+//! | a | b | d |
+//! +---+---+---+
+//! | 0 | 0 | 3 |
+//! | 0 | 0 | 2 |
+//! | 0 | 1 | 1 |
+//! | 0 | 2 | 0 |
+//! +---+---+---+
+//!```
+//!
+//! and required ordering for the plan is `a ASC, b ASC, d ASC`.
+//! The first 3 rows(segment) can be sorted as the segment already
+//! has the required information for the sort, but the last row
+//! requires further information as the input can continue with a
+//! batch with a starting row where a and b does not change as below
+//!
+//! ```text
+//! +---+---+---+
+//! | a | b | d |
+//! +---+---+---+
+//! | 0 | 2 | 4 |
+//! +---+---+---+
+//!```
+//!
+//! The plan concats incoming data with such last rows of previous input
+//! and continues partial sorting of the segments.
+
+use std::any::Any;
+use std::fmt::Debug;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::compute::concat_batches;
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use futures::{ready, Stream, StreamExt};
+use log::trace;
+
+use datafusion_common::utils::evaluate_partition_ranges;
+use datafusion_common::Result;
+use datafusion_execution::{RecordBatchStream, TaskContext};
+use datafusion_physical_expr::EquivalenceProperties;
+
+use crate::expressions::PhysicalSortExpr;
+use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
+use crate::sorts::sort::sort_batch;
+use crate::{
+ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+ SendableRecordBatchStream, Statistics,
+};
+
+/// Partial Sort execution plan.
+#[derive(Debug, Clone)]
+pub struct PartialSortExec {
+ /// Input schema
+ pub(crate) input: Arc<dyn ExecutionPlan>,
+ /// Sort expressions
+ expr: Vec<PhysicalSortExpr>,
+ /// Length of continuous matching columns of input that satisfy
+ /// the required ordering for the sort
+ common_prefix_length: usize,
+ /// Containing all metrics set created during sort
+ metrics_set: ExecutionPlanMetricsSet,
+ /// Preserve partitions of input plan. If false, the input partitions
+ /// will be sorted and merged into a single output partition.
+ preserve_partitioning: bool,
+ /// Fetch highest/lowest n results
+ fetch: Option<usize>,
+}
+
+impl PartialSortExec {
+ /// Create a new partial sort execution plan
+ pub fn new(expr: Vec<PhysicalSortExpr>, input: Arc<dyn ExecutionPlan>) ->
Self {
+ Self {
+ input,
+ expr,
+ common_prefix_length: 0,
+ metrics_set: ExecutionPlanMetricsSet::new(),
+ preserve_partitioning: false,
+ fetch: None,
+ }
+ }
+
+ /// Size of first set of continuous common columns satisfied by input and
required by the plan
+ pub fn with_common_prefix_length(mut self, common_prefix_length: usize) ->
Self {
+ self.common_prefix_length = common_prefix_length;
+ self
+ }
+
+ /// Whether this `PartialSortExec` preserves partitioning of the children
+ pub fn preserve_partitioning(&self) -> bool {
+ self.preserve_partitioning
+ }
+
+ /// Specify the partitioning behavior of this partial sort exec
+ ///
+ /// If `preserve_partitioning` is true, sorts each partition
+ /// individually, producing one sorted stream for each input partition.
+ ///
+ /// If `preserve_partitioning` is false, sorts and merges all
+ /// input partitions producing a single, sorted partition.
+ pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool)
-> Self {
+ self.preserve_partitioning = preserve_partitioning;
+ self
+ }
+
+ /// Modify how many rows to include in the result
+ ///
+ /// If None, then all rows will be returned, in sorted order.
+ /// If Some, then only the top `fetch` rows will be returned.
+ /// This can reduce the memory pressure required by the sort
+ /// operation since rows that are not going to be included
+ /// can be dropped.
+ pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
+ self.fetch = fetch;
+ self
+ }
+
+ /// Input schema
+ pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+ &self.input
+ }
+
+ /// Sort expressions
+ pub fn expr(&self) -> &[PhysicalSortExpr] {
+ &self.expr
+ }
+
+ /// If `Some(fetch)`, limits output to only the first "fetch" items
+ pub fn fetch(&self) -> Option<usize> {
+ self.fetch
+ }
+}
+
+impl DisplayAs for PartialSortExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ let expr = PhysicalSortExpr::format_list(&self.expr);
+ let common_prefix_length = self.common_prefix_length;
+ match self.fetch {
+ Some(fetch) => {
+ write!(f, "PartialSortExec: TopK(fetch={fetch}),
expr=[{expr}], common_prefix_length=[{common_prefix_length}]", )
+ }
+ None => write!(f, "PartialSortExec: expr=[{expr}],
common_prefix_length=[{common_prefix_length}]"),
+ }
+ }
+ }
+ }
+}
+
+impl ExecutionPlan for PartialSortExec {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.input.schema()
+ }
+
+ /// Get the output partitioning of this plan
+ fn output_partitioning(&self) -> Partitioning {
+ if self.preserve_partitioning {
+ self.input.output_partitioning()
+ } else {
+ Partitioning::UnknownPartitioning(1)
+ }
+ }
+
+ /// Specifies whether this plan generates an infinite stream of records.
+ /// If the plan does not support pipelining, but its input(s) are
+ /// infinite, returns an error to indicate this.
+ fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+ Ok(children[0])
+ }
+
+ fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+ Some(&self.expr)
+ }
+
+ fn required_input_distribution(&self) -> Vec<Distribution> {
+ if self.preserve_partitioning {
+ vec![Distribution::UnspecifiedDistribution]
+ } else {
+ vec![Distribution::SinglePartition]
+ }
+ }
+
+ fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+ vec![false]
+ }
+
+ fn equivalence_properties(&self) -> EquivalenceProperties {
+ // Reset the ordering equivalence class with the new ordering:
+ self.input
+ .equivalence_properties()
+ .with_reorder(self.expr.to_vec())
+ }
+
+ fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ vec![self.input.clone()]
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let new_partial_sort =
+ PartialSortExec::new(self.expr.clone(), children[0].clone())
+ .with_fetch(self.fetch)
+ .with_preserve_partitioning(self.preserve_partitioning);
+
+ Ok(Arc::new(new_partial_sort))
+ }
+
+ fn execute(
+ &self,
+ partition: usize,
+ context: Arc<TaskContext>,
+ ) -> Result<SendableRecordBatchStream> {
+ trace!("Start PartialSortExec::execute for partition {} of context
session_id {} and task_id {:?}", partition, context.session_id(),
context.task_id());
+
+ let input = self.input.execute(partition, context.clone())?;
+
+ trace!(
+ "End PartialSortExec's input.execute for partition: {}",
+ partition
+ );
+
+ // Make sure common prefix length is larger than 0
+ // Otherwise, we should use SortExec.
+ assert!(self.common_prefix_length > 0);
+
+ Ok(Box::pin(PartialSortStream {
+ input,
+ expr: self.expr.clone(),
+ common_prefix_length: self.common_prefix_length,
+ input_batch: RecordBatch::new_empty(self.schema().clone()),
+ fetch: self.fetch,
+ is_closed: false,
+ baseline_metrics: BaselineMetrics::new(&self.metrics_set,
partition),
+ }))
+ }
+
+ fn metrics(&self) -> Option<MetricsSet> {
+ Some(self.metrics_set.clone_inner())
+ }
+
+ fn statistics(&self) -> Result<Statistics> {
+ self.input.statistics()
+ }
+}
+
+struct PartialSortStream {
+ /// The input plan
+ input: SendableRecordBatchStream,
+ /// Sort expressions
+ expr: Vec<PhysicalSortExpr>,
+ /// Length of
+ common_prefix_length: usize,
+ /// Used as a buffer for part of the input not ready for sort
+ input_batch: RecordBatch,
+ /// Fetch top N results
+ fetch: Option<usize>,
+ /// Whether the stream has finished returning all of its data or not
+ is_closed: bool,
+ /// Execution metrics
+ baseline_metrics: BaselineMetrics,
+}
+
+impl Stream for PartialSortStream {
+ type Item = Result<RecordBatch>;
+
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ let poll = self.poll_next_inner(cx);
+ self.baseline_metrics.record_poll(poll)
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ // we can't predict the size of incoming batches so re-use the size
hint from the input
+ self.input.size_hint()
+ }
+}
+
+impl RecordBatchStream for PartialSortStream {
+ fn schema(&self) -> SchemaRef {
+ self.input.schema()
+ }
+}
+
+impl PartialSortStream {
+ fn poll_next_inner(
+ self: &mut Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Result<RecordBatch>>> {
+ if self.is_closed {
+ assert_eq!(0, self.input_batch.num_rows());
+ return Poll::Ready(None);
+ }
+ let result = match ready!(self.input.poll_next_unpin(cx)) {
+ Some(Ok(batch)) => {
+ self.input_batch =
+ concat_batches(&self.schema(), [&self.input_batch,
&batch])?;
Review Comment:
This will result in copying the first input batch N times (if it takes N
input batches to produce the output)
You could potentially also buffer the batches in something like
`Vec<RecordBatch>` and do the `concat` during `Self::emit`
##########
datafusion/physical-plan/src/sorts/partial_sort.rs:
##########
@@ -0,0 +1,992 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Partial Sort deals with input data that partially
+//! satisfies the required sort order. Such an input data can be
+//! partitioned into segments where each segment already has the
+//! required information for lexicographic sorting so sorting
+//! can be done without loading the entire dataset.
+//!
+//! Consider a sort plan having an input with ordering `a ASC, b ASC`
+//!
+//! ```text
+//! +---+---+---+
+//! | a | b | d |
+//! +---+---+---+
+//! | 0 | 0 | 3 |
+//! | 0 | 0 | 2 |
+//! | 0 | 1 | 1 |
+//! | 0 | 2 | 0 |
+//! +---+---+---+
+//!```
+//!
+//! and required ordering for the plan is `a ASC, b ASC, d ASC`.
+//! The first 3 rows(segment) can be sorted as the segment already
+//! has the required information for the sort, but the last row
+//! requires further information as the input can continue with a
+//! batch with a starting row where a and b does not change as below
+//!
+//! ```text
+//! +---+---+---+
+//! | a | b | d |
+//! +---+---+---+
+//! | 0 | 2 | 4 |
+//! +---+---+---+
+//!```
+//!
+//! The plan concats incoming data with such last rows of previous input
+//! and continues partial sorting of the segments.
+
+use std::any::Any;
+use std::fmt::Debug;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::compute::concat_batches;
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use futures::{ready, Stream, StreamExt};
+use log::trace;
+
+use datafusion_common::utils::evaluate_partition_ranges;
+use datafusion_common::Result;
+use datafusion_execution::{RecordBatchStream, TaskContext};
+use datafusion_physical_expr::EquivalenceProperties;
+
+use crate::expressions::PhysicalSortExpr;
+use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
+use crate::sorts::sort::sort_batch;
+use crate::{
+ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+ SendableRecordBatchStream, Statistics,
+};
+
+/// Partial Sort execution plan.
+#[derive(Debug, Clone)]
+pub struct PartialSortExec {
+ /// Input schema
+ pub(crate) input: Arc<dyn ExecutionPlan>,
+ /// Sort expressions
+ expr: Vec<PhysicalSortExpr>,
+ /// Length of continuous matching columns of input that satisfy
+ /// the required ordering for the sort
+ common_prefix_length: usize,
+ /// Containing all metrics set created during sort
+ metrics_set: ExecutionPlanMetricsSet,
+ /// Preserve partitions of input plan. If false, the input partitions
+ /// will be sorted and merged into a single output partition.
+ preserve_partitioning: bool,
+ /// Fetch highest/lowest n results
+ fetch: Option<usize>,
+}
+
+impl PartialSortExec {
+ /// Create a new partial sort execution plan
+ pub fn new(expr: Vec<PhysicalSortExpr>, input: Arc<dyn ExecutionPlan>) ->
Self {
+ Self {
+ input,
+ expr,
+ common_prefix_length: 0,
+ metrics_set: ExecutionPlanMetricsSet::new(),
+ preserve_partitioning: false,
+ fetch: None,
+ }
+ }
+
+ /// Size of first set of continuous common columns satisfied by input and
required by the plan
+ pub fn with_common_prefix_length(mut self, common_prefix_length: usize) ->
Self {
+ self.common_prefix_length = common_prefix_length;
+ self
+ }
+
+ /// Whether this `PartialSortExec` preserves partitioning of the children
+ pub fn preserve_partitioning(&self) -> bool {
+ self.preserve_partitioning
+ }
+
+ /// Specify the partitioning behavior of this partial sort exec
+ ///
+ /// If `preserve_partitioning` is true, sorts each partition
+ /// individually, producing one sorted stream for each input partition.
+ ///
+ /// If `preserve_partitioning` is false, sorts and merges all
+ /// input partitions producing a single, sorted partition.
+ pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool)
-> Self {
+ self.preserve_partitioning = preserve_partitioning;
+ self
+ }
+
+ /// Modify how many rows to include in the result
+ ///
+ /// If None, then all rows will be returned, in sorted order.
+ /// If Some, then only the top `fetch` rows will be returned.
+ /// This can reduce the memory pressure required by the sort
+ /// operation since rows that are not going to be included
+ /// can be dropped.
+ pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
+ self.fetch = fetch;
+ self
+ }
+
+ /// Input schema
+ pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+ &self.input
+ }
+
+ /// Sort expressions
+ pub fn expr(&self) -> &[PhysicalSortExpr] {
+ &self.expr
+ }
+
+ /// If `Some(fetch)`, limits output to only the first "fetch" items
+ pub fn fetch(&self) -> Option<usize> {
+ self.fetch
+ }
+}
+
+impl DisplayAs for PartialSortExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ let expr = PhysicalSortExpr::format_list(&self.expr);
+ let common_prefix_length = self.common_prefix_length;
+ match self.fetch {
+ Some(fetch) => {
+ write!(f, "PartialSortExec: TopK(fetch={fetch}),
expr=[{expr}], common_prefix_length=[{common_prefix_length}]", )
+ }
+ None => write!(f, "PartialSortExec: expr=[{expr}],
common_prefix_length=[{common_prefix_length}]"),
+ }
+ }
+ }
+ }
+}
+
+impl ExecutionPlan for PartialSortExec {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.input.schema()
+ }
+
+ /// Get the output partitioning of this plan
+ fn output_partitioning(&self) -> Partitioning {
+ if self.preserve_partitioning {
+ self.input.output_partitioning()
+ } else {
+ Partitioning::UnknownPartitioning(1)
+ }
+ }
+
+ /// Specifies whether this plan generates an infinite stream of records.
+ /// If the plan does not support pipelining, but its input(s) are
+ /// infinite, returns an error to indicate this.
+ fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+ Ok(children[0])
+ }
+
+ fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+ Some(&self.expr)
+ }
+
+ fn required_input_distribution(&self) -> Vec<Distribution> {
+ if self.preserve_partitioning {
+ vec![Distribution::UnspecifiedDistribution]
+ } else {
+ vec![Distribution::SinglePartition]
+ }
+ }
+
+ fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+ vec![false]
+ }
+
+ fn equivalence_properties(&self) -> EquivalenceProperties {
+ // Reset the ordering equivalence class with the new ordering:
+ self.input
+ .equivalence_properties()
+ .with_reorder(self.expr.to_vec())
+ }
+
+ fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ vec![self.input.clone()]
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let new_partial_sort =
+ PartialSortExec::new(self.expr.clone(), children[0].clone())
+ .with_fetch(self.fetch)
+ .with_preserve_partitioning(self.preserve_partitioning);
+
+ Ok(Arc::new(new_partial_sort))
+ }
+
+ fn execute(
+ &self,
+ partition: usize,
+ context: Arc<TaskContext>,
+ ) -> Result<SendableRecordBatchStream> {
+ trace!("Start PartialSortExec::execute for partition {} of context
session_id {} and task_id {:?}", partition, context.session_id(),
context.task_id());
+
+ let input = self.input.execute(partition, context.clone())?;
+
+ trace!(
+ "End PartialSortExec's input.execute for partition: {}",
+ partition
+ );
+
+ // Make sure common prefix length is larger than 0
+ // Otherwise, we should use SortExec.
+ assert!(self.common_prefix_length > 0);
+
+ Ok(Box::pin(PartialSortStream {
+ input,
+ expr: self.expr.clone(),
+ common_prefix_length: self.common_prefix_length,
+ input_batch: RecordBatch::new_empty(self.schema().clone()),
+ fetch: self.fetch,
+ is_closed: false,
+ baseline_metrics: BaselineMetrics::new(&self.metrics_set,
partition),
+ }))
+ }
+
+ fn metrics(&self) -> Option<MetricsSet> {
+ Some(self.metrics_set.clone_inner())
+ }
+
+ fn statistics(&self) -> Result<Statistics> {
+ self.input.statistics()
+ }
+}
+
+struct PartialSortStream {
+ /// The input plan
+ input: SendableRecordBatchStream,
+ /// Sort expressions
+ expr: Vec<PhysicalSortExpr>,
+ /// Length of
+ common_prefix_length: usize,
+ /// Used as a buffer for part of the input not ready for sort
+ input_batch: RecordBatch,
+ /// Fetch top N results
+ fetch: Option<usize>,
+ /// Whether the stream has finished returning all of its data or not
+ is_closed: bool,
+ /// Execution metrics
+ baseline_metrics: BaselineMetrics,
+}
+
+impl Stream for PartialSortStream {
+ type Item = Result<RecordBatch>;
+
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ let poll = self.poll_next_inner(cx);
+ self.baseline_metrics.record_poll(poll)
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ // we can't predict the size of incoming batches so re-use the size
hint from the input
+ self.input.size_hint()
+ }
+}
+
+impl RecordBatchStream for PartialSortStream {
+ fn schema(&self) -> SchemaRef {
+ self.input.schema()
+ }
+}
+
+impl PartialSortStream {
+ fn poll_next_inner(
+ self: &mut Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Result<RecordBatch>>> {
+ if self.is_closed {
+ assert_eq!(0, self.input_batch.num_rows());
+ return Poll::Ready(None);
+ }
+ let result = match ready!(self.input.poll_next_unpin(cx)) {
+ Some(Ok(batch)) => {
+ self.input_batch =
+ concat_batches(&self.schema(), [&self.input_batch,
&batch])?;
+ if let Some(slice_point) =
+ self.get_slice_point(self.common_prefix_length,
&self.input_batch)?
+ {
+ self.emit(slice_point)
+ } else {
+ Ok(RecordBatch::new_empty(self.schema()))
+ }
+ }
+ Some(Err(e)) => Err(e),
+ None => {
+ self.is_closed = true;
+ // once input is consumed, sort the rest of the inserted
batches in input_batch
+ self.emit(self.input_batch.num_rows())
+ }
+ };
+
+ Poll::Ready(Some(result))
+ }
+
+ /// Sort input_batch upto the slice_point and emit the sorted batch
+ ///
+ /// If fetch is specified for PartialSortStream `emit` will limit
+ /// the last RecordBatch returned and will mark the stream as closed
+ fn emit(self: &mut Pin<&mut Self>, slice_point: usize) ->
Result<RecordBatch> {
+ let original_num_rows = self.input_batch.num_rows();
+ let segment = self.input_batch.slice(0, slice_point);
+ self.input_batch = self
+ .input_batch
+ .slice(slice_point, self.input_batch.num_rows() - slice_point);
+
+ let result = sort_batch(&segment, &self.expr, self.fetch)?;
+ if let Some(remaining_fetch) = self.fetch {
+ self.fetch = Some(remaining_fetch - result.num_rows());
+ if remaining_fetch == result.num_rows() {
+ self.input_batch = RecordBatch::new_empty(self.schema());
+ self.is_closed = true;
+ }
+ } else {
+ assert_eq!(
+ original_num_rows,
+ result.num_rows() + self.input_batch.num_rows()
+ );
+ }
+ Ok(result)
+ }
+
+ /// Return the end index of the second last partition if the batch
+ /// can be partitioned based on its already sorted columns
+ ///
+ /// Return None if the batch cannot be partitioned, which means the
+ /// batch does not have the information for a safe sort
+ fn get_slice_point(
+ &self,
+ common_prefix_len: usize,
+ batch: &RecordBatch,
+ ) -> Result<Option<usize>> {
+ let common_prefix_sort_keys = (0..common_prefix_len)
+ .map(|idx| self.expr[idx].evaluate_to_sort_column(batch))
+ .collect::<Result<Vec<_>>>()?;
+ let partition_points =
+ evaluate_partition_ranges(batch.num_rows(),
&common_prefix_sort_keys)?;
+ // If partition points are [0..100], [100..200], [200..300]
+ // we should return 200, which is the safest and furthest partition
boundary
+ // Please note that we shouldn't return 300 (which is number of rows
in the batch),
+ // because this boundary may change with new data.
+ if partition_points.len() >= 2 {
+ Ok(Some(partition_points[partition_points.len() - 2].end))
+ } else {
+ Ok(None)
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::collections::HashMap;
+
+ use arrow::array::*;
+ use arrow::compute::SortOptions;
+ use arrow::datatypes::*;
+ use futures::FutureExt;
+ use itertools::Itertools;
+
+ use datafusion_common::assert_batches_eq;
+
+ use crate::collect;
+ use crate::expressions::col;
+ use crate::memory::MemoryExec;
+ use crate::sorts::sort::SortExec;
+ use crate::test;
+ use crate::test::assert_is_pending;
+ use crate::test::exec::{assert_strong_count_converges_to_zero,
BlockingExec};
+
+ use super::*;
+
+ #[tokio::test]
+ async fn test_partial_sort() -> Result<()> {
+ let task_ctx = Arc::new(TaskContext::default());
+ let source = test::build_table_scan_i32(
+ ("a", &vec![0, 0, 0, 1, 1, 1]),
+ ("b", &vec![1, 1, 2, 2, 3, 3]),
+ ("c", &vec![1, 0, 5, 4, 3, 2]),
+ );
+ let schema = Schema::new(vec![
+ Field::new("a", DataType::Int32, false),
+ Field::new("b", DataType::Int32, false),
+ Field::new("c", DataType::Int32, false),
+ ]);
+ let option_asc = SortOptions {
+ descending: false,
+ nulls_first: false,
+ };
+
+ let partial_sort_exec = Arc::new(
+ PartialSortExec::new(
+ vec![
+ PhysicalSortExpr {
+ expr: col("a", &schema)?,
+ options: option_asc,
+ },
+ PhysicalSortExpr {
+ expr: col("b", &schema)?,
+ options: option_asc,
+ },
+ PhysicalSortExpr {
+ expr: col("c", &schema)?,
+ options: option_asc,
+ },
+ ],
+ source.clone(),
+ )
+ .with_common_prefix_length(2),
Review Comment:
I recommend extending this test to also use a common prefix length of 1
(which should still produce the same answer)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]