mustafasrepo commented on code in PR #9125:
URL: https://github.com/apache/arrow-datafusion/pull/9125#discussion_r1478106835


##########
datafusion/physical-plan/src/sorts/partial_sort.rs:
##########
@@ -0,0 +1,1005 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Partial Sort deals with cases where the input data partially
+//! satisfies the required sort order and the input dataset has
+//! segments where sorted column/columns already has the required
+//! information for lexicographic sorting. Then these segments
+//! can be sorted without loading the entire dataset.
+//!
+//! Consider the input with ordering a ASC, b ASC
+//! ```text
+//! +---+---+---+
+//! | a | b | d |
+//! +---+---+---+
+//! | 0 | 0 | 3 |
+//! | 0 | 0 | 2 |
+//! | 0 | 1 | 1 |
+//! | 0 | 2 | 0 |
+//!```
+//! and required ordering is a ASC, b ASC, d ASC.
+//! The first 3 rows(segment) can be sorted as the segment already has the 
required
+//! information for the sort, but the last row requires further information
+//! as the input can continue with a batch starting with a row where a and b
+//! does not change as below
+//! ```text
+//! +---+---+---+
+//! | a | b | d |
+//! +---+---+---+
+//! | 0 | 2 | 4 |
+//!```
+//! The plan concats these last segments with incoming data and continues
+//! incremental sorting of segments.
+
+use std::any::Any;
+use std::fmt::Debug;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::compute;
+use arrow::compute::{lexsort_to_indices, take};
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+
+use datafusion_common::utils::evaluate_partition_ranges;
+use datafusion_common::Result;
+use datafusion_execution::{RecordBatchStream, TaskContext};
+use datafusion_physical_expr::EquivalenceProperties;
+
+use crate::expressions::PhysicalSortExpr;
+use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
+use crate::{
+    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    SendableRecordBatchStream, Statistics,
+};
+
+use futures::{ready, Stream, StreamExt};
+use log::trace;
+
+/// Partial Sort execution plan.
+#[derive(Debug, Clone)]
+pub struct PartialSortExec {
+    /// Input schema
+    pub(crate) input: Arc<dyn ExecutionPlan>,
+    /// Sort expressions
+    expr: Vec<PhysicalSortExpr>,
+    /// Length of continuous matching columns of input that satisfy
+    /// the required ordering for the sort
+    common_prefix_length: usize,
+    /// Containing all metrics set created during sort
+    metrics_set: ExecutionPlanMetricsSet,
+    /// TODO: check usage
+    preserve_partitioning: bool,
+    /// Fetch highest/lowest n results
+    fetch: Option<usize>,
+}
+
+impl PartialSortExec {
+    /// Create a new partial sort execution plan
+    pub fn new(expr: Vec<PhysicalSortExpr>, input: Arc<dyn ExecutionPlan>) -> 
Self {
+        Self {
+            input,
+            expr,
+            common_prefix_length: 0,
+            metrics_set: ExecutionPlanMetricsSet::new(),
+            preserve_partitioning: false,
+            fetch: None,
+        }
+    }
+
+    /// Size of first set of continuous common columns satisfied by input and 
required by the plan
+    pub fn with_common_prefix_length(mut self, common_prefix_length: usize) -> 
Self {
+        self.common_prefix_length = common_prefix_length;
+        self
+    }
+
+    /// Whether this `PartialSortExec` preserves partitioning of the children
+    pub fn preserve_partitioning(&self) -> bool {
+        self.preserve_partitioning
+    }
+
+    /// Specify the partitioning behavior of this partial sort exec
+    ///
+    /// If `preserve_partitioning` is true, sorts each partition
+    /// individually, producing one sorted stream for each input partition.
+    ///
+    /// If `preserve_partitioning` is false, sorts and merges all
+    /// input partitions producing a single, sorted partition.
+    pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool) 
-> Self {
+        self.preserve_partitioning = preserve_partitioning;
+        self
+    }
+
+    /// Modify how many rows to include in the result
+    ///
+    /// If None, then all rows will be returned, in sorted order.
+    /// If Some, then only the top `fetch` rows will be returned.
+    /// This can reduce the memory pressure required by the sort
+    /// operation since rows that are not going to be included
+    /// can be dropped.
+    pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
+        self.fetch = fetch;
+        self
+    }
+
+    /// Input schema
+    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.input
+    }
+
+    /// Sort expressions
+    pub fn expr(&self) -> &[PhysicalSortExpr] {
+        &self.expr
+    }
+
+    /// If `Some(fetch)`, limits output to only the first "fetch" items
+    pub fn fetch(&self) -> Option<usize> {
+        self.fetch
+    }
+}
+
+impl DisplayAs for PartialSortExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                let expr = PhysicalSortExpr::format_list(&self.expr);
+                let common_prefix_length = self.common_prefix_length;
+                match self.fetch {
+                    Some(fetch) => {
+                        write!(f, "PartialSortExec: TopK(fetch={fetch}), 
expr=[{expr}], common_prefix_length=[{common_prefix_length}]", )
+                    }
+                    None => write!(f, "PartialSortExec: expr=[{expr}], 
common_prefix_length=[{common_prefix_length}]"),
+                }
+            }
+        }
+    }
+}
+
+impl ExecutionPlan for PartialSortExec {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.input.schema()
+    }
+
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        if self.preserve_partitioning {
+            self.input.output_partitioning()
+        } else {
+            Partitioning::UnknownPartitioning(1)
+        }
+    }
+
+    /// Specifies whether this plan generates an infinite stream of records.
+    /// If the plan does not support pipelining, but its input(s) are
+    /// infinite, returns an error to indicate this.
+    fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
+        Ok(children[0])
+    }
+
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+        Some(&self.expr)
+    }
+
+    // TODO: check usages
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        if self.preserve_partitioning {
+            vec![Distribution::UnspecifiedDistribution]
+        } else {
+            // global sort
+            // TODO: support RangePartition and OrderedDistribution
+            vec![Distribution::SinglePartition]
+        }
+    }
+
+    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+        vec![false]
+    }
+
+    fn equivalence_properties(&self) -> EquivalenceProperties {
+        // Reset the ordering equivalence class with the new ordering:
+        self.input
+            .equivalence_properties()
+            .with_reorder(self.expr.to_vec())
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let new_sort = PartialSortExec::new(self.expr.clone(), 
children[0].clone())
+            .with_fetch(self.fetch)
+            .with_preserve_partitioning(self.preserve_partitioning);
+
+        Ok(Arc::new(new_sort))
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        trace!("Start PartialSortExec::execute for partition {} of context 
session_id {} and task_id {:?}", partition, context.session_id(), 
context.task_id());
+
+        let input = self.input.execute(partition, context.clone())?;
+
+        trace!(
+            "End PartialSortExec's input.execute for partition: {}",
+            partition
+        );
+
+        // Make sure common prefix length is larger than 0
+        // Otherwise, we should use SortExec.
+        assert!(self.common_prefix_length > 0);
+
+        Ok(Box::pin(PartialSortStream {
+            input,
+            expr: self.expr.clone(),
+            common_prefix_length: self.common_prefix_length,
+            input_batch: RecordBatch::new_empty(self.schema().clone()),
+            fetch: self.fetch,
+            is_closed: false,
+            baseline_metrics: BaselineMetrics::new(&self.metrics_set, 
partition),
+        }))
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics_set.clone_inner())
+    }
+
+    fn statistics(&self) -> Result<Statistics> {
+        self.input.statistics()
+    }
+}
+
+struct PartialSortStream {
+    /// The input plan
+    input: SendableRecordBatchStream,
+    /// Sort expressions
+    expr: Vec<PhysicalSortExpr>,
+    /// Length of
+    common_prefix_length: usize,
+    /// Used as a buffer for part of the input not ready for sort
+    input_batch: RecordBatch,
+    /// Fetch top N results
+    fetch: Option<usize>,
+    /// Whether the stream has finished returning all of its data or not
+    is_closed: bool,
+    /// Execution metrics
+    baseline_metrics: BaselineMetrics,
+}
+
+impl Stream for PartialSortStream {
+    type Item = Result<RecordBatch>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        let poll = self.poll_next_inner(cx);
+        self.baseline_metrics.record_poll(poll)
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        // we can't predict the size of incoming batches so re-use the size 
hint from the input
+        self.input.size_hint()
+    }
+}
+
+impl RecordBatchStream for PartialSortStream {
+    fn schema(&self) -> SchemaRef {
+        self.input.schema()
+    }
+}
+
+impl PartialSortStream {
+    fn poll_next_inner(
+        self: &mut Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Result<RecordBatch>>> {
+        if self.is_closed {
+            assert_eq!(0, self.input_batch.num_rows());
+            return Poll::Ready(None);
+        }
+        let result = match ready!(self.input.poll_next_unpin(cx)) {
+            Some(Ok(batch)) => {
+                self.input_batch = compute::concat_batches(
+                    &self.schema(),
+                    &[self.input_batch.clone(), batch],
+                )?;
+                let slice_point =
+                    self.get_slice_point(self.common_prefix_length, 
&self.input_batch)?;
+                if slice_point == self.input_batch.num_rows() {
+                    Ok(RecordBatch::new_empty(self.schema()))
+                } else {
+                    self.emit(slice_point)
+                }
+            }
+            Some(Err(e)) => Err(e),
+            None => {
+                self.is_closed = true;
+                // once input is consumed, sort the rest of the inserted 
batches in input_batch
+                self.emit(self.input_batch.num_rows())
+            }
+        };
+
+        Poll::Ready(Some(result))
+    }
+
+    fn emit(self: &mut Pin<&mut Self>, slice_point: usize) -> 
Result<RecordBatch> {

Review Comment:
   This method can have a docstring also



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to