alamb commented on a change in pull request #1596:
URL: https://github.com/apache/arrow-datafusion/pull/1596#discussion_r787133083



##########
File path: datafusion/src/physical_plan/explain.rs
##########
@@ -146,9 +147,13 @@ impl ExecutionPlan for ExplainExec {
             ],
         )?;
 
+        let metrics = ExecutionPlanMetricsSet::new();

Review comment:
       👍 

##########
File path: datafusion/tests/sql/joins.rs
##########
@@ -419,32 +419,32 @@ async fn cross_join_unbalanced() {
 
     // the order of the values is not determinisitic, so we need to sort to 
check the values
     let sql =
-        "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN t2 ORDER BY t1_id, 
t1_name";
+        "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN t2 ORDER BY t1_id, 
t1_name, t2_name";

Review comment:
       👍 

##########
File path: datafusion/src/physical_plan/sorts/mod.rs
##########
@@ -281,6 +287,9 @@ impl Stream for StreamWrapper {
                     Poll::Pending => Poll::Pending,
                 }
             }
+            StreamWrapper::SingleBatch(ref mut ob) => {

Review comment:
       I am not enough of a `rust` futures wizard to know if there is a better 
way to do this, but it looks reasonable to me

##########
File path: datafusion/src/physical_plan/sorts/sort.rs
##########
@@ -15,47 +15,450 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Defines the SORT plan
+//! Sort that deals with an arbitrary size of the input.
+//! It will do in-memory sorting if it has enough memory budget
+//! but spills to disk if needed.
 
 use crate::error::{DataFusionError, Result};
+use crate::execution::memory_manager::{
+    ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager,
+};
 use crate::execution::runtime_env::RuntimeEnv;
-use crate::physical_plan::common::AbortOnDropSingle;
+use crate::physical_plan::common::{batch_byte_size, IPCWriter, 
SizedRecordBatchStream};
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::metrics::{
-    BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput,
+    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricsSet, Time,
 };
+use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream;
+use crate::physical_plan::sorts::SortedStream;
+use crate::physical_plan::stream::RecordBatchReceiverStream;
 use crate::physical_plan::{
-    common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    SendableRecordBatchStream, Statistics,
 };
-use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream, 
Statistics};
+use arrow::array::ArrayRef;
 pub use arrow::compute::SortOptions;
 use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions};
 use arrow::datatypes::SchemaRef;
 use arrow::error::Result as ArrowResult;
+use arrow::ipc::reader::FileReader;
 use arrow::record_batch::RecordBatch;
-use arrow::{array::ArrayRef, error::ArrowError};
 use async_trait::async_trait;
-use futures::stream::Stream;
-use futures::Future;
-use pin_project_lite::pin_project;
+use futures::lock::Mutex;
+use futures::StreamExt;
+use log::{error, info};
 use std::any::Any;
-use std::pin::Pin;
+use std::fmt;
+use std::fmt::{Debug, Formatter};
+use std::fs::File;
+use std::io::BufReader;
+use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
-use std::task::{Context, Poll};
+use std::time::Duration;
+use tokio::sync::mpsc::{Receiver as TKReceiver, Sender as TKSender};
+use tokio::task;
+
+/// Sort arbitrary size of data to get an total order (may spill several times 
during sorting based on free memory available).
+///
+/// The basic architecture of the algorithm:
+///
+/// let spills = vec![];
+/// let in_mem_batches = vec![];
+/// while (input.has_next()) {
+///     let batch = input.next();
+///     // no enough memory available, spill first.
+///     if exec_memory_available < size_of(batch) {
+///         let ordered_stream = 
sort_preserving_merge(in_mem_batches.drain(..));
+///         let tmp_file = spill_write(ordered_stream);
+///         spills.push(tmp_file);
+///     }
+///     // sort the batch while it's probably still in cache and buffer it.
+///     let sorted = sort_by_key(batch);
+///     in_mem_batches.push(sorted);
+/// }
+///
+/// let partial_ordered_streams = vec![];
+/// let in_mem_stream = sort_preserving_merge(in_mem_batches.drain(..));
+/// partial_ordered_streams.push(in_mem_stream);
+/// partial_ordered_streams.extend(spills.drain(..).map(read_as_stream));
+/// let result = sort_preserving_merge(partial_ordered_streams);
+struct ExternalSorter {
+    id: MemoryConsumerId,
+    schema: SchemaRef,
+    in_mem_batches: Mutex<Vec<RecordBatch>>,
+    spills: Mutex<Vec<String>>,
+    /// Sort expressions
+    expr: Vec<PhysicalSortExpr>,
+    runtime: Arc<RuntimeEnv>,
+    metrics: AggregatedMetricsSet,
+    inner_metrics: BaselineMetrics,
+    used: AtomicUsize,
+    spilled_bytes: AtomicUsize,
+    spilled_count: AtomicUsize,
+}
+
+impl ExternalSorter {
+    pub fn new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        metrics: AggregatedMetricsSet,
+        runtime: Arc<RuntimeEnv>,
+    ) -> Self {
+        let inner_metrics = metrics.new_intermediate_baseline(partition_id);
+        Self {
+            id: MemoryConsumerId::new(partition_id),
+            schema,
+            in_mem_batches: Mutex::new(vec![]),
+            spills: Mutex::new(vec![]),
+            expr,
+            runtime,
+            metrics,
+            inner_metrics,
+            used: AtomicUsize::new(0),
+            spilled_bytes: AtomicUsize::new(0),
+            spilled_count: AtomicUsize::new(0),
+        }
+    }
+
+    async fn insert_batch(&self, input: RecordBatch) -> Result<()> {
+        if input.num_rows() > 0 {
+            let size = batch_byte_size(&input);
+            self.try_grow(size).await?;
+            self.used.fetch_add(size, Ordering::SeqCst);
+            // sort each batch as it's inserted, more probably to be 
cache-resident
+            let elapsed_compute = self.inner_metrics.elapsed_compute().clone();
+            let timer = elapsed_compute.timer();
+            let sorted_batch = sort_batch(input, self.schema.clone(), 
&*self.expr)?;
+            timer.done();
+            let mut in_mem_batches = self.in_mem_batches.lock().await;
+            in_mem_batches.push(sorted_batch);
+        }
+        Ok(())
+    }
+
+    async fn spilled_before(&self) -> bool {
+        let spills = self.spills.lock().await;
+        !spills.is_empty()
+    }
+
+    /// MergeSort in mem batches as well as spills into total order with 
`SortPreservingMergeStream`.
+    async fn sort(&self) -> Result<SendableRecordBatchStream> {
+        let partition = self.partition_id();
+        let mut in_mem_batches = self.in_mem_batches.lock().await;
+
+        if self.spilled_before().await {
+            let baseline_metrics = 
self.metrics.new_intermediate_baseline(partition);
+            let mut streams: Vec<SortedStream> = vec![];
+            let in_mem_stream = in_mem_partial_sort(
+                &mut *in_mem_batches,
+                self.schema.clone(),
+                &self.expr,
+                baseline_metrics,
+                self.runtime.clone(),
+            )
+            .await?;
+            streams.push(SortedStream::new(in_mem_stream, self.used()));
+
+            let mut spills = self.spills.lock().await;
+
+            for spill in spills.drain(..) {
+                let stream = read_spill_as_stream(spill, 
self.schema.clone()).await?;
+                streams.push(SortedStream::new(stream, 0));
+            }
+            let baseline_metrics = self.metrics.new_final_baseline(partition);
+            Ok(Box::pin(
+                SortPreservingMergeStream::new_from_streams(
+                    streams,
+                    self.schema.clone(),
+                    &self.expr,
+                    baseline_metrics,
+                    partition,
+                    self.runtime.clone(),
+                )
+                .await,
+            ))
+        } else {
+            let baseline_metrics = self.metrics.new_final_baseline(partition);
+            in_mem_partial_sort(
+                &mut *in_mem_batches,
+                self.schema.clone(),
+                &self.expr,
+                baseline_metrics,
+                self.runtime.clone(),
+            )
+            .await
+        }
+    }
+
+    fn used(&self) -> usize {
+        self.used.load(Ordering::SeqCst)
+    }
+
+    fn spilled_bytes(&self) -> usize {
+        self.spilled_bytes.load(Ordering::SeqCst)
+    }
+
+    fn spilled_count(&self) -> usize {
+        self.spilled_count.load(Ordering::SeqCst)
+    }
+}
+
+impl Debug for ExternalSorter {
+    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+        f.debug_struct("ExternalSorter")
+            .field("id", &self.id())
+            .field("memory_used", &self.used())
+            .field("spilled_bytes", &self.spilled_bytes())
+            .field("spilled_count", &self.spilled_count())
+            .finish()
+    }
+}
+
+#[async_trait]
+impl MemoryConsumer for ExternalSorter {
+    fn name(&self) -> String {
+        "ExternalSorter".to_owned()
+    }
+
+    fn id(&self) -> &MemoryConsumerId {
+        &self.id
+    }
+
+    fn memory_manager(&self) -> Arc<MemoryManager> {
+        self.runtime.memory_manager.clone()
+    }
+
+    fn type_(&self) -> &ConsumerType {
+        &ConsumerType::Requesting
+    }
+
+    async fn spill(&self) -> Result<usize> {
+        info!(
+            "{}[{}] spilling sort data of {} to disk while inserting ({} 
time(s) so far)",
+            self.name(),
+            self.id(),
+            self.used(),
+            self.spilled_count()
+        );
+
+        let partition = self.partition_id();
+        let mut in_mem_batches = self.in_mem_batches.lock().await;
+        // we could always get a chance to free some memory as long as we are 
holding some
+        if in_mem_batches.len() == 0 {
+            return Ok(0);
+        }
+
+        let baseline_metrics = 
self.metrics.new_intermediate_baseline(partition);
+
+        let path = self.runtime.disk_manager.create_tmp_file()?;
+        let stream = in_mem_partial_sort(
+            &mut *in_mem_batches,
+            self.schema.clone(),
+            &*self.expr,
+            baseline_metrics,
+            self.runtime.clone(),
+        )
+        .await;
+
+        let total_size =
+            spill_partial_sorted_stream(&mut stream?, path.clone(), 
self.schema.clone())
+                .await?;
+
+        let mut spills = self.spills.lock().await;
+        let used = self.used.swap(0, Ordering::SeqCst);
+        self.spilled_count.fetch_add(1, Ordering::SeqCst);
+        self.spilled_bytes.fetch_add(total_size, Ordering::SeqCst);
+        spills.push(path);
+        Ok(used)
+    }
+
+    fn mem_used(&self) -> usize {
+        self.used.load(Ordering::SeqCst)
+    }
+}
+
+/// consume the `sorted_bathes` and do in_mem_sort
+async fn in_mem_partial_sort(
+    sorted_bathes: &mut Vec<RecordBatch>,
+    schema: SchemaRef,
+    expressions: &[PhysicalSortExpr],
+    baseline_metrics: BaselineMetrics,
+    runtime: Arc<RuntimeEnv>,
+) -> Result<SendableRecordBatchStream> {
+    if sorted_bathes.len() == 1 {
+        Ok(Box::pin(SizedRecordBatchStream::new(
+            schema,
+            vec![Arc::new(sorted_bathes.pop().unwrap())],
+            baseline_metrics,
+        )))
+    } else {
+        let batches = sorted_bathes.drain(..).collect();
+        assert_eq!(sorted_bathes.len(), 0);
+        Ok(Box::pin(
+            SortPreservingMergeStream::new_from_batches(
+                batches,
+                schema,
+                expressions,
+                baseline_metrics,
+                runtime,
+            )
+            .await,
+        ))
+    }
+}
+
+async fn spill_partial_sorted_stream(
+    in_mem_stream: &mut SendableRecordBatchStream,
+    path: String,
+    schema: SchemaRef,
+) -> Result<usize> {
+    let (sender, receiver) = tokio::sync::mpsc::channel(2);
+    while let Some(item) = in_mem_stream.next().await {
+        sender.send(Some(item)).await.ok();
+    }
+    sender.send(None).await.ok();
+    let path_clone = path.clone();
+    let res =
+        task::spawn_blocking(move || write_sorted(receiver, path_clone, 
schema)).await;
+    match res {
+        Ok(r) => r,
+        Err(e) => Err(DataFusionError::Execution(format!(
+            "Error occurred while spilling {}",
+            e
+        ))),
+    }
+}
 
-/// Sort execution plan
+async fn read_spill_as_stream(
+    path: String,
+    schema: SchemaRef,
+) -> Result<SendableRecordBatchStream> {
+    let (sender, receiver): (
+        TKSender<ArrowResult<RecordBatch>>,
+        TKReceiver<ArrowResult<RecordBatch>>,
+    ) = tokio::sync::mpsc::channel(2);
+    let path_clone = path.clone();
+    let join_handle = task::spawn_blocking(move || {
+        if let Err(e) = read_spill(sender, path_clone) {
+            error!("Failure while reading spill file: {}. Error: {}", path, e);
+        }
+    });
+    Ok(RecordBatchReceiverStream::create(
+        &schema,
+        receiver,
+        join_handle,
+    ))
+}
+
+fn write_sorted(
+    mut receiver: TKReceiver<Option<ArrowResult<RecordBatch>>>,
+    path: String,
+    schema: SchemaRef,
+) -> Result<usize> {
+    let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?;
+    while let Some(Some(batch)) = receiver.blocking_recv() {
+        writer.write(&batch?)?;
+    }
+    writer.finish()?;
+    info!(
+        "Spilled {} batches of total {} rows to disk, memory released {}",
+        writer.num_batches, writer.num_rows, writer.num_bytes
+    );
+    Ok(writer.num_bytes as usize)
+}
+
+fn read_spill(sender: TKSender<ArrowResult<RecordBatch>>, path: String) -> 
Result<()> {
+    let file = BufReader::new(File::open(&path)?);
+    let reader = FileReader::try_new(file)?;
+    for batch in reader {
+        sender
+            .blocking_send(batch)
+            .map_err(|e| DataFusionError::Execution(format!("{}", e)))?;
+    }
+    Ok(())
+}
+
+/// External Sort execution plan
 #[derive(Debug)]
 pub struct SortExec {
     /// Input schema
     input: Arc<dyn ExecutionPlan>,
     /// Sort expressions
     expr: Vec<PhysicalSortExpr>,
-    /// Execution metrics
-    metrics: ExecutionPlanMetricsSet,
+    /// Containing all metrics set created for sort, such as all sets for 
`sort_merge_join`s

Review comment:
       I am not sure what `sort_merge_join` is referring to here

##########
File path: datafusion/src/physical_plan/sorts/mod.rs
##########
@@ -32,11 +33,10 @@ use std::borrow::BorrowMut;
 use std::cmp::Ordering;
 use std::fmt::{Debug, Formatter};
 use std::pin::Pin;
+use std::sync::atomic::AtomicUsize;
 use std::sync::{Arc, RwLock};
 use std::task::{Context, Poll};
 
-pub mod external_sort;

Review comment:
       While you removed the `mod` declaration, I think the file is not yet 
removed in this PR; I suggest
   
   ```shell
   git rm datafusion/src/physical_plan/sorts/external_sort.rs
   ```

##########
File path: datafusion/src/physical_plan/sorts/sort.rs
##########
@@ -15,47 +15,450 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Defines the SORT plan
+//! Sort that deals with an arbitrary size of the input.
+//! It will do in-memory sorting if it has enough memory budget
+//! but spills to disk if needed.
 
 use crate::error::{DataFusionError, Result};
+use crate::execution::memory_manager::{
+    ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager,
+};
 use crate::execution::runtime_env::RuntimeEnv;
-use crate::physical_plan::common::AbortOnDropSingle;
+use crate::physical_plan::common::{batch_byte_size, IPCWriter, 
SizedRecordBatchStream};
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::metrics::{
-    BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput,
+    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricsSet, Time,
 };
+use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream;
+use crate::physical_plan::sorts::SortedStream;
+use crate::physical_plan::stream::RecordBatchReceiverStream;
 use crate::physical_plan::{
-    common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    SendableRecordBatchStream, Statistics,
 };
-use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream, 
Statistics};
+use arrow::array::ArrayRef;
 pub use arrow::compute::SortOptions;
 use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions};
 use arrow::datatypes::SchemaRef;
 use arrow::error::Result as ArrowResult;
+use arrow::ipc::reader::FileReader;
 use arrow::record_batch::RecordBatch;
-use arrow::{array::ArrayRef, error::ArrowError};
 use async_trait::async_trait;
-use futures::stream::Stream;
-use futures::Future;
-use pin_project_lite::pin_project;
+use futures::lock::Mutex;
+use futures::StreamExt;
+use log::{error, info};
 use std::any::Any;
-use std::pin::Pin;
+use std::fmt;
+use std::fmt::{Debug, Formatter};
+use std::fs::File;
+use std::io::BufReader;
+use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
-use std::task::{Context, Poll};
+use std::time::Duration;
+use tokio::sync::mpsc::{Receiver as TKReceiver, Sender as TKSender};
+use tokio::task;
+
+/// Sort arbitrary size of data to get an total order (may spill several times 
during sorting based on free memory available).
+///
+/// The basic architecture of the algorithm:
+///
+/// let spills = vec![];
+/// let in_mem_batches = vec![];
+/// while (input.has_next()) {
+///     let batch = input.next();
+///     // no enough memory available, spill first.
+///     if exec_memory_available < size_of(batch) {
+///         let ordered_stream = 
sort_preserving_merge(in_mem_batches.drain(..));
+///         let tmp_file = spill_write(ordered_stream);
+///         spills.push(tmp_file);
+///     }
+///     // sort the batch while it's probably still in cache and buffer it.
+///     let sorted = sort_by_key(batch);
+///     in_mem_batches.push(sorted);
+/// }
+///
+/// let partial_ordered_streams = vec![];
+/// let in_mem_stream = sort_preserving_merge(in_mem_batches.drain(..));
+/// partial_ordered_streams.push(in_mem_stream);
+/// partial_ordered_streams.extend(spills.drain(..).map(read_as_stream));
+/// let result = sort_preserving_merge(partial_ordered_streams);
+struct ExternalSorter {
+    id: MemoryConsumerId,
+    schema: SchemaRef,
+    in_mem_batches: Mutex<Vec<RecordBatch>>,
+    spills: Mutex<Vec<String>>,
+    /// Sort expressions
+    expr: Vec<PhysicalSortExpr>,
+    runtime: Arc<RuntimeEnv>,
+    metrics: AggregatedMetricsSet,
+    inner_metrics: BaselineMetrics,
+    used: AtomicUsize,
+    spilled_bytes: AtomicUsize,
+    spilled_count: AtomicUsize,
+}
+
+impl ExternalSorter {
+    pub fn new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        metrics: AggregatedMetricsSet,
+        runtime: Arc<RuntimeEnv>,
+    ) -> Self {
+        let inner_metrics = metrics.new_intermediate_baseline(partition_id);
+        Self {
+            id: MemoryConsumerId::new(partition_id),
+            schema,
+            in_mem_batches: Mutex::new(vec![]),
+            spills: Mutex::new(vec![]),
+            expr,
+            runtime,
+            metrics,
+            inner_metrics,
+            used: AtomicUsize::new(0),
+            spilled_bytes: AtomicUsize::new(0),

Review comment:
       it would be neat to add spilled bytes to the metrics reported by 
`ExternalSorter` so that they appeared in `EXPLAIN ANALYZE` for example. 
Perhaps as a follow on task

##########
File path: datafusion/src/physical_plan/sorts/sort.rs
##########
@@ -227,115 +636,56 @@ pub(crate) fn sort_batch(
     )
 }
 
-pin_project! {
-    /// stream for sort plan
-    struct SortStream {
-        #[pin]
-        output: 
futures::channel::oneshot::Receiver<ArrowResult<Option<RecordBatch>>>,
-        finished: bool,
-        schema: SchemaRef,
-        drop_helper: AbortOnDropSingle<()>,
-    }
-}
-
-impl SortStream {
-    fn new(
-        input: SendableRecordBatchStream,
-        expr: Vec<PhysicalSortExpr>,
-        baseline_metrics: BaselineMetrics,
-    ) -> Self {
-        let (tx, rx) = futures::channel::oneshot::channel();
-        let schema = input.schema();
-        let join_handle = tokio::spawn(async move {
-            let schema = input.schema();
-            let sorted_batch = common::collect(input)
-                .await
-                .map_err(DataFusionError::into_arrow_external_error)
-                .and_then(move |batches| {
-                    let timer = baseline_metrics.elapsed_compute().timer();
-                    // combine all record batches into one for each column
-                    let combined = common::combine_batches(&batches, 
schema.clone())?;
-                    // sort combined record batch
-                    let result = combined
-                        .map(|batch| sort_batch(batch, schema, &expr))
-                        .transpose()?
-                        .record_output(&baseline_metrics);
-                    timer.done();
-                    Ok(result)
-                });
-
-            // failing here is OK, the receiver is gone and does not care 
about the result
-            tx.send(sorted_batch).ok();
-        });
-
-        Self {
-            output: rx,
-            finished: false,
-            schema,
-            drop_helper: AbortOnDropSingle::new(join_handle),
-        }
+async fn external_sort(

Review comment:
       I recommend calling this something other than `external_sort` as it may 
or may not actually be an external sort
   
   Perhaps something like `do_sort` or `execute_sort`?

##########
File path: datafusion/src/physical_plan/sorts/sort_preserving_merge.rs
##########
@@ -381,6 +383,45 @@ impl SortPreservingMergeStream {
             aborted: false,
             in_progress: vec![],
             next_batch_index: 0,
+            min_heap: BinaryHeap::with_capacity(stream_count),
+            runtime,
+        }
+    }
+
+    pub(crate) async fn new_from_batches(
+        batches: Vec<RecordBatch>,
+        schema: SchemaRef,
+        expressions: &[PhysicalSortExpr],
+        baseline_metrics: BaselineMetrics,
+        runtime: Arc<RuntimeEnv>,
+    ) -> Self {
+        let batch_count = batches.len();
+        let wrappers = batches

Review comment:
       👍 

##########
File path: datafusion/src/physical_plan/sorts/sort.rs
##########
@@ -15,47 +15,450 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Defines the SORT plan
+//! Sort that deals with an arbitrary size of the input.
+//! It will do in-memory sorting if it has enough memory budget
+//! but spills to disk if needed.
 
 use crate::error::{DataFusionError, Result};
+use crate::execution::memory_manager::{
+    ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager,
+};
 use crate::execution::runtime_env::RuntimeEnv;
-use crate::physical_plan::common::AbortOnDropSingle;
+use crate::physical_plan::common::{batch_byte_size, IPCWriter, 
SizedRecordBatchStream};
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::metrics::{
-    BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput,
+    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricsSet, Time,
 };
+use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream;
+use crate::physical_plan::sorts::SortedStream;
+use crate::physical_plan::stream::RecordBatchReceiverStream;
 use crate::physical_plan::{
-    common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    SendableRecordBatchStream, Statistics,
 };
-use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream, 
Statistics};
+use arrow::array::ArrayRef;
 pub use arrow::compute::SortOptions;
 use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions};
 use arrow::datatypes::SchemaRef;
 use arrow::error::Result as ArrowResult;
+use arrow::ipc::reader::FileReader;
 use arrow::record_batch::RecordBatch;
-use arrow::{array::ArrayRef, error::ArrowError};
 use async_trait::async_trait;
-use futures::stream::Stream;
-use futures::Future;
-use pin_project_lite::pin_project;
+use futures::lock::Mutex;
+use futures::StreamExt;
+use log::{error, info};
 use std::any::Any;
-use std::pin::Pin;
+use std::fmt;
+use std::fmt::{Debug, Formatter};
+use std::fs::File;
+use std::io::BufReader;
+use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
-use std::task::{Context, Poll};
+use std::time::Duration;
+use tokio::sync::mpsc::{Receiver as TKReceiver, Sender as TKSender};
+use tokio::task;
+
+/// Sort arbitrary size of data to get an total order (may spill several times 
during sorting based on free memory available).
+///
+/// The basic architecture of the algorithm:
+///
+/// let spills = vec![];

Review comment:
       thank you for this comment. 👍 

##########
File path: datafusion/src/physical_plan/sorts/sort_preserving_merge.rs
##########
@@ -414,47 +455,38 @@ impl SortPreservingMergeStream {
                 return Poll::Ready(Err(e));
             }
             Some(Ok(batch)) => {
-                let cursor = match SortKeyCursor::new(
-                    self.next_batch_index, // assign this batch an ID
-                    Arc::new(batch),
-                    &self.column_expressions,
-                    self.sort_options.clone(),
-                ) {
-                    Ok(cursor) => cursor,
-                    Err(e) => {
-                        return 
Poll::Ready(Err(ArrowError::ExternalError(Box::new(e))));
-                    }
-                };
+                let cursor = Arc::new(
+                    match SortKeyCursor::new(
+                        idx,
+                        self.next_batch_index, // assign this batch an ID
+                        Arc::new(batch),
+                        &self.column_expressions,
+                        self.sort_options.clone(),
+                    ) {
+                        Ok(cursor) => cursor,
+                        Err(e) => {
+                            return Poll::Ready(Err(ArrowError::ExternalError(
+                                Box::new(e),
+                            )));
+                        }
+                    },
+                );
                 self.next_batch_index += 1;
+                self.min_heap.push(cursor.clone());
                 self.cursors[idx].push_back(cursor)
             }
         }
 
         Poll::Ready(Ok(()))
     }
 
-    /// Returns the index of the next stream to pull a row from, or None
+    /// Returns the cursor of the next stream to pull a row from, or None
     /// if all cursors for all streams are exhausted
-    fn next_stream_idx(&mut self) -> Result<Option<usize>> {
-        let mut min_cursor: Option<(usize, &mut SortKeyCursor)> = None;
-        for (idx, candidate) in self.cursors.iter_mut().enumerate() {
-            if let Some(candidate) = candidate.back_mut() {
-                if candidate.is_finished() {
-                    continue;
-                }
-
-                match min_cursor {
-                    None => min_cursor = Some((idx, candidate)),
-                    Some((_, ref mut min)) => {
-                        if min.compare(candidate)? == Ordering::Greater {
-                            min_cursor = Some((idx, candidate))
-                        }
-                    }
-                }
-            }
+    fn next_cursor(&mut self) -> Result<Option<Arc<SortKeyCursor>>> {
+        match self.min_heap.pop() {

Review comment:
       isn't this the same as `Ok(self.min_heap.pop())`?

##########
File path: datafusion/src/physical_plan/sorts/mod.rs
##########
@@ -32,11 +33,10 @@ use std::borrow::BorrowMut;
 use std::cmp::Ordering;
 use std::fmt::{Debug, Formatter};
 use std::pin::Pin;
+use std::sync::atomic::AtomicUsize;

Review comment:
       for anyone else following along, the switch to using `AtomicUsize` I 
think is driven by needing to share the SortKeyCursors in a `BinaryHeap` as 
well as in a `Vec` of cursors (see comment below)

##########
File path: datafusion/src/physical_plan/sorts/sort.rs
##########
@@ -15,47 +15,450 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Defines the SORT plan
+//! Sort that deals with an arbitrary size of the input.
+//! It will do in-memory sorting if it has enough memory budget
+//! but spills to disk if needed.
 
 use crate::error::{DataFusionError, Result};
+use crate::execution::memory_manager::{
+    ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager,
+};
 use crate::execution::runtime_env::RuntimeEnv;
-use crate::physical_plan::common::AbortOnDropSingle;
+use crate::physical_plan::common::{batch_byte_size, IPCWriter, 
SizedRecordBatchStream};
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::metrics::{
-    BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput,
+    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricsSet, Time,
 };
+use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream;
+use crate::physical_plan::sorts::SortedStream;
+use crate::physical_plan::stream::RecordBatchReceiverStream;
 use crate::physical_plan::{
-    common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    SendableRecordBatchStream, Statistics,
 };
-use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream, 
Statistics};
+use arrow::array::ArrayRef;
 pub use arrow::compute::SortOptions;
 use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions};
 use arrow::datatypes::SchemaRef;
 use arrow::error::Result as ArrowResult;
+use arrow::ipc::reader::FileReader;
 use arrow::record_batch::RecordBatch;
-use arrow::{array::ArrayRef, error::ArrowError};
 use async_trait::async_trait;
-use futures::stream::Stream;
-use futures::Future;
-use pin_project_lite::pin_project;
+use futures::lock::Mutex;
+use futures::StreamExt;
+use log::{error, info};
 use std::any::Any;
-use std::pin::Pin;
+use std::fmt;
+use std::fmt::{Debug, Formatter};
+use std::fs::File;
+use std::io::BufReader;
+use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
-use std::task::{Context, Poll};
+use std::time::Duration;
+use tokio::sync::mpsc::{Receiver as TKReceiver, Sender as TKSender};
+use tokio::task;
+
+/// Sort arbitrary size of data to get an total order (may spill several times 
during sorting based on free memory available).
+///
+/// The basic architecture of the algorithm:
+///
+/// let spills = vec![];
+/// let in_mem_batches = vec![];
+/// while (input.has_next()) {
+///     let batch = input.next();
+///     // no enough memory available, spill first.
+///     if exec_memory_available < size_of(batch) {
+///         let ordered_stream = 
sort_preserving_merge(in_mem_batches.drain(..));
+///         let tmp_file = spill_write(ordered_stream);
+///         spills.push(tmp_file);
+///     }
+///     // sort the batch while it's probably still in cache and buffer it.
+///     let sorted = sort_by_key(batch);
+///     in_mem_batches.push(sorted);
+/// }
+///
+/// let partial_ordered_streams = vec![];
+/// let in_mem_stream = sort_preserving_merge(in_mem_batches.drain(..));
+/// partial_ordered_streams.push(in_mem_stream);
+/// partial_ordered_streams.extend(spills.drain(..).map(read_as_stream));
+/// let result = sort_preserving_merge(partial_ordered_streams);
+struct ExternalSorter {
+    id: MemoryConsumerId,
+    schema: SchemaRef,
+    in_mem_batches: Mutex<Vec<RecordBatch>>,
+    spills: Mutex<Vec<String>>,
+    /// Sort expressions
+    expr: Vec<PhysicalSortExpr>,
+    runtime: Arc<RuntimeEnv>,
+    metrics: AggregatedMetricsSet,
+    inner_metrics: BaselineMetrics,
+    used: AtomicUsize,
+    spilled_bytes: AtomicUsize,
+    spilled_count: AtomicUsize,
+}
+
+impl ExternalSorter {
+    pub fn new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        metrics: AggregatedMetricsSet,
+        runtime: Arc<RuntimeEnv>,
+    ) -> Self {
+        let inner_metrics = metrics.new_intermediate_baseline(partition_id);
+        Self {
+            id: MemoryConsumerId::new(partition_id),
+            schema,
+            in_mem_batches: Mutex::new(vec![]),
+            spills: Mutex::new(vec![]),
+            expr,
+            runtime,
+            metrics,
+            inner_metrics,
+            used: AtomicUsize::new(0),
+            spilled_bytes: AtomicUsize::new(0),
+            spilled_count: AtomicUsize::new(0),
+        }
+    }
+
+    async fn insert_batch(&self, input: RecordBatch) -> Result<()> {
+        if input.num_rows() > 0 {
+            let size = batch_byte_size(&input);
+            self.try_grow(size).await?;
+            self.used.fetch_add(size, Ordering::SeqCst);
+            // sort each batch as it's inserted, more probably to be 
cache-resident
+            let elapsed_compute = self.inner_metrics.elapsed_compute().clone();
+            let timer = elapsed_compute.timer();
+            let sorted_batch = sort_batch(input, self.schema.clone(), 
&*self.expr)?;
+            timer.done();
+            let mut in_mem_batches = self.in_mem_batches.lock().await;
+            in_mem_batches.push(sorted_batch);
+        }
+        Ok(())
+    }
+
+    async fn spilled_before(&self) -> bool {
+        let spills = self.spills.lock().await;
+        !spills.is_empty()
+    }
+
+    /// MergeSort in mem batches as well as spills into total order with 
`SortPreservingMergeStream`.
+    async fn sort(&self) -> Result<SendableRecordBatchStream> {
+        let partition = self.partition_id();
+        let mut in_mem_batches = self.in_mem_batches.lock().await;
+
+        if self.spilled_before().await {
+            let baseline_metrics = 
self.metrics.new_intermediate_baseline(partition);
+            let mut streams: Vec<SortedStream> = vec![];
+            let in_mem_stream = in_mem_partial_sort(
+                &mut *in_mem_batches,
+                self.schema.clone(),
+                &self.expr,
+                baseline_metrics,
+                self.runtime.clone(),
+            )
+            .await?;
+            streams.push(SortedStream::new(in_mem_stream, self.used()));
+
+            let mut spills = self.spills.lock().await;
+
+            for spill in spills.drain(..) {
+                let stream = read_spill_as_stream(spill, 
self.schema.clone()).await?;
+                streams.push(SortedStream::new(stream, 0));
+            }
+            let baseline_metrics = self.metrics.new_final_baseline(partition);
+            Ok(Box::pin(
+                SortPreservingMergeStream::new_from_streams(

Review comment:
       FYI @tustvold 

##########
File path: datafusion/src/physical_plan/sorts/sort.rs
##########
@@ -159,14 +561,25 @@ impl ExecutionPlan for SortExec {
             }
         }
 
-        let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
-        let input = self.input.execute(partition, runtime).await?;
+        let input = self.input.execute(partition, runtime.clone()).await?;
 
-        Ok(Box::pin(SortStream::new(
+        external_sort(
             input,
+            partition,
             self.expr.clone(),
-            baseline_metrics,
-        )))
+            self.all_metrics.clone(),
+            runtime,
+        )
+        .await
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        let metrics = ExecutionPlanMetricsSet::new();
+        let baseline = BaselineMetrics::new(&metrics, 0);

Review comment:
       I don't really understand this code -- it appears to be merging in new 
(zero'd) metrics to sef.all_metrics`? Why not just return self.all_metrics ?

##########
File path: datafusion/src/physical_plan/sorts/sort_preserving_merge.rs
##########
@@ -414,47 +455,38 @@ impl SortPreservingMergeStream {
                 return Poll::Ready(Err(e));
             }
             Some(Ok(batch)) => {
-                let cursor = match SortKeyCursor::new(
-                    self.next_batch_index, // assign this batch an ID
-                    Arc::new(batch),
-                    &self.column_expressions,
-                    self.sort_options.clone(),
-                ) {
-                    Ok(cursor) => cursor,
-                    Err(e) => {
-                        return 
Poll::Ready(Err(ArrowError::ExternalError(Box::new(e))));
-                    }
-                };
+                let cursor = Arc::new(
+                    match SortKeyCursor::new(
+                        idx,
+                        self.next_batch_index, // assign this batch an ID
+                        Arc::new(batch),
+                        &self.column_expressions,
+                        self.sort_options.clone(),
+                    ) {
+                        Ok(cursor) => cursor,
+                        Err(e) => {
+                            return Poll::Ready(Err(ArrowError::ExternalError(
+                                Box::new(e),
+                            )));
+                        }
+                    },
+                );
                 self.next_batch_index += 1;
+                self.min_heap.push(cursor.clone());
                 self.cursors[idx].push_back(cursor)
             }
         }
 
         Poll::Ready(Ok(()))
     }
 
-    /// Returns the index of the next stream to pull a row from, or None
+    /// Returns the cursor of the next stream to pull a row from, or None
     /// if all cursors for all streams are exhausted
-    fn next_stream_idx(&mut self) -> Result<Option<usize>> {
-        let mut min_cursor: Option<(usize, &mut SortKeyCursor)> = None;
-        for (idx, candidate) in self.cursors.iter_mut().enumerate() {
-            if let Some(candidate) = candidate.back_mut() {
-                if candidate.is_finished() {
-                    continue;
-                }
-
-                match min_cursor {
-                    None => min_cursor = Some((idx, candidate)),
-                    Some((_, ref mut min)) => {
-                        if min.compare(candidate)? == Ordering::Greater {
-                            min_cursor = Some((idx, candidate))
-                        }
-                    }
-                }
-            }
+    fn next_cursor(&mut self) -> Result<Option<Arc<SortKeyCursor>>> {
+        match self.min_heap.pop() {

Review comment:
       And since it is infallible, perhaps this could just return 
`Option<Arc<SortKeyCursor>>`

##########
File path: datafusion/src/physical_plan/sorts/sort.rs
##########
@@ -15,47 +15,450 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Defines the SORT plan
+//! Sort that deals with an arbitrary size of the input.
+//! It will do in-memory sorting if it has enough memory budget
+//! but spills to disk if needed.
 
 use crate::error::{DataFusionError, Result};
+use crate::execution::memory_manager::{
+    ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager,
+};
 use crate::execution::runtime_env::RuntimeEnv;
-use crate::physical_plan::common::AbortOnDropSingle;
+use crate::physical_plan::common::{batch_byte_size, IPCWriter, 
SizedRecordBatchStream};
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::metrics::{
-    BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput,
+    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricsSet, Time,
 };
+use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream;
+use crate::physical_plan::sorts::SortedStream;
+use crate::physical_plan::stream::RecordBatchReceiverStream;
 use crate::physical_plan::{
-    common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    SendableRecordBatchStream, Statistics,
 };
-use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream, 
Statistics};
+use arrow::array::ArrayRef;
 pub use arrow::compute::SortOptions;
 use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions};
 use arrow::datatypes::SchemaRef;
 use arrow::error::Result as ArrowResult;
+use arrow::ipc::reader::FileReader;
 use arrow::record_batch::RecordBatch;
-use arrow::{array::ArrayRef, error::ArrowError};
 use async_trait::async_trait;
-use futures::stream::Stream;
-use futures::Future;
-use pin_project_lite::pin_project;
+use futures::lock::Mutex;
+use futures::StreamExt;
+use log::{error, info};
 use std::any::Any;
-use std::pin::Pin;
+use std::fmt;
+use std::fmt::{Debug, Formatter};
+use std::fs::File;
+use std::io::BufReader;
+use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
-use std::task::{Context, Poll};
+use std::time::Duration;
+use tokio::sync::mpsc::{Receiver as TKReceiver, Sender as TKSender};
+use tokio::task;
+
+/// Sort arbitrary size of data to get an total order (may spill several times 
during sorting based on free memory available).
+///
+/// The basic architecture of the algorithm:
+///
+/// let spills = vec![];
+/// let in_mem_batches = vec![];
+/// while (input.has_next()) {
+///     let batch = input.next();
+///     // no enough memory available, spill first.
+///     if exec_memory_available < size_of(batch) {
+///         let ordered_stream = 
sort_preserving_merge(in_mem_batches.drain(..));
+///         let tmp_file = spill_write(ordered_stream);
+///         spills.push(tmp_file);
+///     }
+///     // sort the batch while it's probably still in cache and buffer it.
+///     let sorted = sort_by_key(batch);
+///     in_mem_batches.push(sorted);
+/// }
+///
+/// let partial_ordered_streams = vec![];
+/// let in_mem_stream = sort_preserving_merge(in_mem_batches.drain(..));
+/// partial_ordered_streams.push(in_mem_stream);
+/// partial_ordered_streams.extend(spills.drain(..).map(read_as_stream));
+/// let result = sort_preserving_merge(partial_ordered_streams);
+struct ExternalSorter {
+    id: MemoryConsumerId,
+    schema: SchemaRef,
+    in_mem_batches: Mutex<Vec<RecordBatch>>,
+    spills: Mutex<Vec<String>>,
+    /// Sort expressions
+    expr: Vec<PhysicalSortExpr>,
+    runtime: Arc<RuntimeEnv>,
+    metrics: AggregatedMetricsSet,
+    inner_metrics: BaselineMetrics,
+    used: AtomicUsize,
+    spilled_bytes: AtomicUsize,
+    spilled_count: AtomicUsize,
+}
+
+impl ExternalSorter {
+    pub fn new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        metrics: AggregatedMetricsSet,
+        runtime: Arc<RuntimeEnv>,
+    ) -> Self {
+        let inner_metrics = metrics.new_intermediate_baseline(partition_id);
+        Self {
+            id: MemoryConsumerId::new(partition_id),
+            schema,
+            in_mem_batches: Mutex::new(vec![]),
+            spills: Mutex::new(vec![]),
+            expr,
+            runtime,
+            metrics,
+            inner_metrics,
+            used: AtomicUsize::new(0),
+            spilled_bytes: AtomicUsize::new(0),
+            spilled_count: AtomicUsize::new(0),
+        }
+    }
+
+    async fn insert_batch(&self, input: RecordBatch) -> Result<()> {
+        if input.num_rows() > 0 {
+            let size = batch_byte_size(&input);
+            self.try_grow(size).await?;
+            self.used.fetch_add(size, Ordering::SeqCst);
+            // sort each batch as it's inserted, more probably to be 
cache-resident

Review comment:
       by doing this, however, it also may prevent the next batch from being 
computed. I think it might be worth considering sorting this batch as another 
task, but that will definitely make the code more complicated

##########
File path: datafusion/src/physical_plan/sorts/sort_preserving_merge.rs
##########
@@ -304,6 +303,9 @@ pub(crate) struct SortPreservingMergeStream {
     /// An index to uniquely identify the input stream batch
     next_batch_index: usize,
 
+    /// min heap for record comparison
+    min_heap: BinaryHeap<Arc<SortKeyCursor>>,

Review comment:
       I think the reason for `Arc<SortKeyCursor>` is so they can be in the 
`BinaryHeap` as well as `cursors`, though it seems like it causes a bunch of 
compliation elsewhere (e.g. having to use Atomic's for updates, for example). 
   
   I wonder if it would be possible to remove the `cursors` structure entirely 
and simply use the `BinaryHeap` to store the `SortKeyCursors` directly and 
avoid the sharing. Perhaps @tustvold  has some thoughts




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to