tustvold commented on a change in pull request #1596:
URL: https://github.com/apache/arrow-datafusion/pull/1596#discussion_r787539714



##########
File path: datafusion/src/physical_plan/sorts/sort.rs
##########
@@ -15,47 +15,432 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Defines the SORT plan
+//! Sort that deals with an arbitrary size of the input.
+//! It will do in-memory sorting if it has enough memory budget
+//! but spills to disk if needed.
 
 use crate::error::{DataFusionError, Result};
+use crate::execution::memory_manager::{
+    ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager,
+};
 use crate::execution::runtime_env::RuntimeEnv;
-use crate::physical_plan::common::AbortOnDropSingle;
+use crate::physical_plan::common::{batch_byte_size, IPCWriter, 
SizedRecordBatchStream};
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::metrics::{
-    BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput,
+    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricsSet, Time,
 };
+use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream;
+use crate::physical_plan::sorts::SortedStream;
+use crate::physical_plan::stream::RecordBatchReceiverStream;
 use crate::physical_plan::{
-    common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    common, DisplayFormatType, Distribution, EmptyRecordBatchStream, 
ExecutionPlan,
+    Partitioning, SendableRecordBatchStream, Statistics,
 };
-use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream, 
Statistics};
+use arrow::array::ArrayRef;
 pub use arrow::compute::SortOptions;
 use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions};
 use arrow::datatypes::SchemaRef;
 use arrow::error::Result as ArrowResult;
+use arrow::ipc::reader::FileReader;
 use arrow::record_batch::RecordBatch;
-use arrow::{array::ArrayRef, error::ArrowError};
 use async_trait::async_trait;
-use futures::stream::Stream;
-use futures::Future;
-use pin_project_lite::pin_project;
+use futures::lock::Mutex;
+use futures::StreamExt;
+use log::{error, info};
 use std::any::Any;
-use std::pin::Pin;
+use std::fmt;
+use std::fmt::{Debug, Formatter};
+use std::fs::File;
+use std::io::BufReader;
+use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
-use std::task::{Context, Poll};
+use std::time::Duration;
+use tokio::sync::mpsc::{Receiver as TKReceiver, Sender as TKSender};
+use tokio::task;
+
+/// Sort arbitrary size of data to get a total order (may spill several times 
during sorting based on free memory available).
+///
+/// The basic architecture of the algorithm:
+/// 1. get a non-empty new batch from input
+/// 2. check with the memory manager if we could buffer the batch in memory
+/// 2.1 if memory sufficient, then buffer batch in memory, go to 1.
+/// 2.2 if the memory threshold is reached, sort all buffered batches and 
spill to file.
+///     buffer the batch in memory, go to 1.
+/// 3. when input is exhausted, merge all in memory batches and spills to get 
a total order.
+struct ExternalSorter {
+    id: MemoryConsumerId,
+    schema: SchemaRef,
+    in_mem_batches: Mutex<Vec<RecordBatch>>,
+    spills: Mutex<Vec<String>>,
+    /// Sort expressions
+    expr: Vec<PhysicalSortExpr>,
+    runtime: Arc<RuntimeEnv>,
+    metrics: AggregatedMetricsSet,
+    used: AtomicUsize,
+    spilled_bytes: AtomicUsize,
+    spilled_count: AtomicUsize,
+}
 
-/// Sort execution plan
+impl ExternalSorter {
+    pub fn new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        metrics: AggregatedMetricsSet,
+        runtime: Arc<RuntimeEnv>,
+    ) -> Self {
+        Self {
+            id: MemoryConsumerId::new(partition_id),
+            schema,
+            in_mem_batches: Mutex::new(vec![]),
+            spills: Mutex::new(vec![]),
+            expr,
+            runtime,
+            metrics,
+            used: AtomicUsize::new(0),
+            spilled_bytes: AtomicUsize::new(0),
+            spilled_count: AtomicUsize::new(0),
+        }
+    }
+
+    async fn insert_batch(&self, input: RecordBatch) -> Result<()> {
+        if input.num_rows() > 0 {
+            let size = batch_byte_size(&input);
+            self.try_grow(size).await?;
+            self.used.fetch_add(size, Ordering::SeqCst);
+            let mut in_mem_batches = self.in_mem_batches.lock().await;
+            in_mem_batches.push(input);
+        }
+        Ok(())
+    }
+
+    async fn spilled_before(&self) -> bool {
+        let spills = self.spills.lock().await;
+        !spills.is_empty()
+    }
+
+    /// MergeSort in mem batches as well as spills into total order with 
`SortPreservingMergeStream`.
+    async fn sort(&self) -> Result<SendableRecordBatchStream> {
+        let partition = self.partition_id();
+        let mut in_mem_batches = self.in_mem_batches.lock().await;
+
+        if self.spilled_before().await {
+            let baseline_metrics = 
self.metrics.new_intermediate_baseline(partition);
+            let mut streams: Vec<SortedStream> = vec![];
+            if in_mem_batches.len() > 0 {
+                let in_mem_stream = in_mem_partial_sort(
+                    &mut *in_mem_batches,
+                    self.schema.clone(),
+                    &self.expr,
+                    baseline_metrics,
+                )
+                .await?;
+                streams.push(SortedStream::new(in_mem_stream, self.used()));
+            }
+
+            let mut spills = self.spills.lock().await;
+
+            for spill in spills.drain(..) {
+                let stream = read_spill_as_stream(spill, 
self.schema.clone()).await?;
+                streams.push(SortedStream::new(stream, 0));
+            }
+            let baseline_metrics = self.metrics.new_final_baseline(partition);
+            Ok(Box::pin(
+                SortPreservingMergeStream::new_from_streams(
+                    streams,
+                    self.schema.clone(),
+                    &self.expr,
+                    baseline_metrics,
+                    partition,
+                    self.runtime.clone(),
+                )
+                .await,
+            ))
+        } else if in_mem_batches.len() > 0 {
+            let baseline_metrics = self.metrics.new_final_baseline(partition);
+            in_mem_partial_sort(
+                &mut *in_mem_batches,
+                self.schema.clone(),
+                &self.expr,
+                baseline_metrics,
+            )
+            .await
+        } else {
+            Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone())))
+        }
+    }
+
+    fn used(&self) -> usize {
+        self.used.load(Ordering::SeqCst)
+    }
+
+    fn spilled_bytes(&self) -> usize {
+        self.spilled_bytes.load(Ordering::SeqCst)
+    }
+
+    fn spilled_count(&self) -> usize {
+        self.spilled_count.load(Ordering::SeqCst)
+    }
+}
+
+impl Debug for ExternalSorter {
+    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+        f.debug_struct("ExternalSorter")
+            .field("id", &self.id())
+            .field("memory_used", &self.used())
+            .field("spilled_bytes", &self.spilled_bytes())
+            .field("spilled_count", &self.spilled_count())
+            .finish()
+    }
+}
+
+#[async_trait]
+impl MemoryConsumer for ExternalSorter {
+    fn name(&self) -> String {
+        "ExternalSorter".to_owned()
+    }
+
+    fn id(&self) -> &MemoryConsumerId {
+        &self.id
+    }
+
+    fn memory_manager(&self) -> Arc<MemoryManager> {
+        self.runtime.memory_manager.clone()
+    }
+
+    fn type_(&self) -> &ConsumerType {
+        &ConsumerType::Requesting
+    }
+
+    async fn spill(&self) -> Result<usize> {
+        info!(
+            "{}[{}] spilling sort data of {} to disk while inserting ({} 
time(s) so far)",
+            self.name(),
+            self.id(),
+            self.used(),
+            self.spilled_count()
+        );
+
+        let partition = self.partition_id();
+        let mut in_mem_batches = self.in_mem_batches.lock().await;
+        // we could always get a chance to free some memory as long as we are 
holding some
+        if in_mem_batches.len() == 0 {
+            return Ok(0);
+        }
+
+        let baseline_metrics = 
self.metrics.new_intermediate_baseline(partition);
+
+        let path = self.runtime.disk_manager.create_tmp_file()?;
+        let stream = in_mem_partial_sort(
+            &mut *in_mem_batches,
+            self.schema.clone(),
+            &*self.expr,
+            baseline_metrics,
+        )
+        .await;
+
+        let total_size =
+            spill_partial_sorted_stream(&mut stream?, path.clone(), 
self.schema.clone())
+                .await?;
+
+        let mut spills = self.spills.lock().await;
+        let used = self.used.swap(0, Ordering::SeqCst);
+        self.spilled_count.fetch_add(1, Ordering::SeqCst);

Review comment:
       FWIW two separate atomic increments with SeqCst are likely slower than a 
single uncontended mutex

##########
File path: datafusion/src/physical_plan/sorts/sort.rs
##########
@@ -15,47 +15,432 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Defines the SORT plan
+//! Sort that deals with an arbitrary size of the input.
+//! It will do in-memory sorting if it has enough memory budget
+//! but spills to disk if needed.
 
 use crate::error::{DataFusionError, Result};
+use crate::execution::memory_manager::{
+    ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager,
+};
 use crate::execution::runtime_env::RuntimeEnv;
-use crate::physical_plan::common::AbortOnDropSingle;
+use crate::physical_plan::common::{batch_byte_size, IPCWriter, 
SizedRecordBatchStream};
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::metrics::{
-    BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput,
+    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricsSet, Time,
 };
+use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream;
+use crate::physical_plan::sorts::SortedStream;
+use crate::physical_plan::stream::RecordBatchReceiverStream;
 use crate::physical_plan::{
-    common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    common, DisplayFormatType, Distribution, EmptyRecordBatchStream, 
ExecutionPlan,
+    Partitioning, SendableRecordBatchStream, Statistics,
 };
-use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream, 
Statistics};
+use arrow::array::ArrayRef;
 pub use arrow::compute::SortOptions;
 use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions};
 use arrow::datatypes::SchemaRef;
 use arrow::error::Result as ArrowResult;
+use arrow::ipc::reader::FileReader;
 use arrow::record_batch::RecordBatch;
-use arrow::{array::ArrayRef, error::ArrowError};
 use async_trait::async_trait;
-use futures::stream::Stream;
-use futures::Future;
-use pin_project_lite::pin_project;
+use futures::lock::Mutex;
+use futures::StreamExt;
+use log::{error, info};
 use std::any::Any;
-use std::pin::Pin;
+use std::fmt;
+use std::fmt::{Debug, Formatter};
+use std::fs::File;
+use std::io::BufReader;
+use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
-use std::task::{Context, Poll};
+use std::time::Duration;
+use tokio::sync::mpsc::{Receiver as TKReceiver, Sender as TKSender};
+use tokio::task;
+
+/// Sort arbitrary size of data to get a total order (may spill several times 
during sorting based on free memory available).
+///
+/// The basic architecture of the algorithm:
+/// 1. get a non-empty new batch from input
+/// 2. check with the memory manager if we could buffer the batch in memory
+/// 2.1 if memory sufficient, then buffer batch in memory, go to 1.
+/// 2.2 if the memory threshold is reached, sort all buffered batches and 
spill to file.
+///     buffer the batch in memory, go to 1.
+/// 3. when input is exhausted, merge all in memory batches and spills to get 
a total order.
+struct ExternalSorter {
+    id: MemoryConsumerId,
+    schema: SchemaRef,
+    in_mem_batches: Mutex<Vec<RecordBatch>>,
+    spills: Mutex<Vec<String>>,
+    /// Sort expressions
+    expr: Vec<PhysicalSortExpr>,
+    runtime: Arc<RuntimeEnv>,
+    metrics: AggregatedMetricsSet,
+    used: AtomicUsize,
+    spilled_bytes: AtomicUsize,
+    spilled_count: AtomicUsize,
+}
 
-/// Sort execution plan
+impl ExternalSorter {
+    pub fn new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        metrics: AggregatedMetricsSet,
+        runtime: Arc<RuntimeEnv>,
+    ) -> Self {
+        Self {
+            id: MemoryConsumerId::new(partition_id),
+            schema,
+            in_mem_batches: Mutex::new(vec![]),
+            spills: Mutex::new(vec![]),
+            expr,
+            runtime,
+            metrics,
+            used: AtomicUsize::new(0),
+            spilled_bytes: AtomicUsize::new(0),
+            spilled_count: AtomicUsize::new(0),
+        }
+    }
+
+    async fn insert_batch(&self, input: RecordBatch) -> Result<()> {
+        if input.num_rows() > 0 {
+            let size = batch_byte_size(&input);
+            self.try_grow(size).await?;
+            self.used.fetch_add(size, Ordering::SeqCst);
+            let mut in_mem_batches = self.in_mem_batches.lock().await;
+            in_mem_batches.push(input);
+        }
+        Ok(())
+    }
+
+    async fn spilled_before(&self) -> bool {
+        let spills = self.spills.lock().await;
+        !spills.is_empty()
+    }
+
+    /// MergeSort in mem batches as well as spills into total order with 
`SortPreservingMergeStream`.
+    async fn sort(&self) -> Result<SendableRecordBatchStream> {
+        let partition = self.partition_id();
+        let mut in_mem_batches = self.in_mem_batches.lock().await;
+
+        if self.spilled_before().await {
+            let baseline_metrics = 
self.metrics.new_intermediate_baseline(partition);
+            let mut streams: Vec<SortedStream> = vec![];
+            if in_mem_batches.len() > 0 {
+                let in_mem_stream = in_mem_partial_sort(
+                    &mut *in_mem_batches,
+                    self.schema.clone(),
+                    &self.expr,
+                    baseline_metrics,
+                )
+                .await?;
+                streams.push(SortedStream::new(in_mem_stream, self.used()));
+            }
+
+            let mut spills = self.spills.lock().await;
+
+            for spill in spills.drain(..) {
+                let stream = read_spill_as_stream(spill, 
self.schema.clone()).await?;
+                streams.push(SortedStream::new(stream, 0));
+            }
+            let baseline_metrics = self.metrics.new_final_baseline(partition);
+            Ok(Box::pin(
+                SortPreservingMergeStream::new_from_streams(
+                    streams,
+                    self.schema.clone(),
+                    &self.expr,
+                    baseline_metrics,
+                    partition,
+                    self.runtime.clone(),
+                )
+                .await,
+            ))
+        } else if in_mem_batches.len() > 0 {
+            let baseline_metrics = self.metrics.new_final_baseline(partition);
+            in_mem_partial_sort(
+                &mut *in_mem_batches,
+                self.schema.clone(),
+                &self.expr,
+                baseline_metrics,
+            )
+            .await
+        } else {
+            Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone())))
+        }
+    }
+
+    fn used(&self) -> usize {
+        self.used.load(Ordering::SeqCst)
+    }
+
+    fn spilled_bytes(&self) -> usize {
+        self.spilled_bytes.load(Ordering::SeqCst)
+    }
+
+    fn spilled_count(&self) -> usize {
+        self.spilled_count.load(Ordering::SeqCst)
+    }
+}
+
+impl Debug for ExternalSorter {
+    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+        f.debug_struct("ExternalSorter")
+            .field("id", &self.id())
+            .field("memory_used", &self.used())
+            .field("spilled_bytes", &self.spilled_bytes())
+            .field("spilled_count", &self.spilled_count())
+            .finish()
+    }
+}
+
+#[async_trait]
+impl MemoryConsumer for ExternalSorter {
+    fn name(&self) -> String {
+        "ExternalSorter".to_owned()
+    }
+
+    fn id(&self) -> &MemoryConsumerId {
+        &self.id
+    }
+
+    fn memory_manager(&self) -> Arc<MemoryManager> {
+        self.runtime.memory_manager.clone()
+    }
+
+    fn type_(&self) -> &ConsumerType {
+        &ConsumerType::Requesting
+    }
+
+    async fn spill(&self) -> Result<usize> {
+        info!(
+            "{}[{}] spilling sort data of {} to disk while inserting ({} 
time(s) so far)",
+            self.name(),
+            self.id(),
+            self.used(),
+            self.spilled_count()
+        );
+
+        let partition = self.partition_id();
+        let mut in_mem_batches = self.in_mem_batches.lock().await;
+        // we could always get a chance to free some memory as long as we are 
holding some
+        if in_mem_batches.len() == 0 {
+            return Ok(0);
+        }
+
+        let baseline_metrics = 
self.metrics.new_intermediate_baseline(partition);
+
+        let path = self.runtime.disk_manager.create_tmp_file()?;
+        let stream = in_mem_partial_sort(
+            &mut *in_mem_batches,
+            self.schema.clone(),
+            &*self.expr,
+            baseline_metrics,
+        )
+        .await;
+
+        let total_size =
+            spill_partial_sorted_stream(&mut stream?, path.clone(), 
self.schema.clone())
+                .await?;
+
+        let mut spills = self.spills.lock().await;
+        let used = self.used.swap(0, Ordering::SeqCst);
+        self.spilled_count.fetch_add(1, Ordering::SeqCst);
+        self.spilled_bytes.fetch_add(total_size, Ordering::SeqCst);
+        spills.push(path);
+        Ok(used)
+    }
+
+    fn mem_used(&self) -> usize {
+        self.used.load(Ordering::SeqCst)
+    }
+}
+
+/// consume the non-empty `sorted_bathes` and do in_mem_sort
+async fn in_mem_partial_sort(

Review comment:
       I don't think this needs to be async either

##########
File path: datafusion/src/physical_plan/sorts/sort.rs
##########
@@ -15,47 +15,432 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Defines the SORT plan
+//! Sort that deals with an arbitrary size of the input.
+//! It will do in-memory sorting if it has enough memory budget
+//! but spills to disk if needed.
 
 use crate::error::{DataFusionError, Result};
+use crate::execution::memory_manager::{
+    ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager,
+};
 use crate::execution::runtime_env::RuntimeEnv;
-use crate::physical_plan::common::AbortOnDropSingle;
+use crate::physical_plan::common::{batch_byte_size, IPCWriter, 
SizedRecordBatchStream};
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::metrics::{
-    BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput,
+    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricsSet, Time,
 };
+use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream;
+use crate::physical_plan::sorts::SortedStream;
+use crate::physical_plan::stream::RecordBatchReceiverStream;
 use crate::physical_plan::{
-    common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    common, DisplayFormatType, Distribution, EmptyRecordBatchStream, 
ExecutionPlan,
+    Partitioning, SendableRecordBatchStream, Statistics,
 };
-use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream, 
Statistics};
+use arrow::array::ArrayRef;
 pub use arrow::compute::SortOptions;
 use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions};
 use arrow::datatypes::SchemaRef;
 use arrow::error::Result as ArrowResult;
+use arrow::ipc::reader::FileReader;
 use arrow::record_batch::RecordBatch;
-use arrow::{array::ArrayRef, error::ArrowError};
 use async_trait::async_trait;
-use futures::stream::Stream;
-use futures::Future;
-use pin_project_lite::pin_project;
+use futures::lock::Mutex;
+use futures::StreamExt;
+use log::{error, info};
 use std::any::Any;
-use std::pin::Pin;
+use std::fmt;
+use std::fmt::{Debug, Formatter};
+use std::fs::File;

Review comment:
       Is there a reason we are using `std::fs::File` and spawn_blocking 
instead of tokio's async file abstraction?
   
   Edit: Oh `arrow::ipc::reader::FileReader`... Perhaps I'll go fix that 
upstream :thinking: 

##########
File path: datafusion/src/physical_plan/sorts/sort.rs
##########
@@ -15,47 +15,432 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Defines the SORT plan
+//! Sort that deals with an arbitrary size of the input.
+//! It will do in-memory sorting if it has enough memory budget
+//! but spills to disk if needed.
 
 use crate::error::{DataFusionError, Result};
+use crate::execution::memory_manager::{
+    ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager,
+};
 use crate::execution::runtime_env::RuntimeEnv;
-use crate::physical_plan::common::AbortOnDropSingle;
+use crate::physical_plan::common::{batch_byte_size, IPCWriter, 
SizedRecordBatchStream};
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::metrics::{
-    BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput,
+    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricsSet, Time,
 };
+use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream;
+use crate::physical_plan::sorts::SortedStream;
+use crate::physical_plan::stream::RecordBatchReceiverStream;
 use crate::physical_plan::{
-    common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    common, DisplayFormatType, Distribution, EmptyRecordBatchStream, 
ExecutionPlan,
+    Partitioning, SendableRecordBatchStream, Statistics,
 };
-use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream, 
Statistics};
+use arrow::array::ArrayRef;
 pub use arrow::compute::SortOptions;
 use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions};
 use arrow::datatypes::SchemaRef;
 use arrow::error::Result as ArrowResult;
+use arrow::ipc::reader::FileReader;
 use arrow::record_batch::RecordBatch;
-use arrow::{array::ArrayRef, error::ArrowError};
 use async_trait::async_trait;
-use futures::stream::Stream;
-use futures::Future;
-use pin_project_lite::pin_project;
+use futures::lock::Mutex;
+use futures::StreamExt;
+use log::{error, info};
 use std::any::Any;
-use std::pin::Pin;
+use std::fmt;
+use std::fmt::{Debug, Formatter};
+use std::fs::File;
+use std::io::BufReader;
+use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
-use std::task::{Context, Poll};
+use std::time::Duration;
+use tokio::sync::mpsc::{Receiver as TKReceiver, Sender as TKSender};
+use tokio::task;
+
+/// Sort arbitrary size of data to get a total order (may spill several times 
during sorting based on free memory available).
+///
+/// The basic architecture of the algorithm:
+/// 1. get a non-empty new batch from input
+/// 2. check with the memory manager if we could buffer the batch in memory
+/// 2.1 if memory sufficient, then buffer batch in memory, go to 1.
+/// 2.2 if the memory threshold is reached, sort all buffered batches and 
spill to file.
+///     buffer the batch in memory, go to 1.
+/// 3. when input is exhausted, merge all in memory batches and spills to get 
a total order.
+struct ExternalSorter {
+    id: MemoryConsumerId,
+    schema: SchemaRef,
+    in_mem_batches: Mutex<Vec<RecordBatch>>,
+    spills: Mutex<Vec<String>>,
+    /// Sort expressions
+    expr: Vec<PhysicalSortExpr>,
+    runtime: Arc<RuntimeEnv>,
+    metrics: AggregatedMetricsSet,
+    used: AtomicUsize,
+    spilled_bytes: AtomicUsize,
+    spilled_count: AtomicUsize,
+}
 
-/// Sort execution plan
+impl ExternalSorter {
+    pub fn new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        metrics: AggregatedMetricsSet,
+        runtime: Arc<RuntimeEnv>,
+    ) -> Self {
+        Self {
+            id: MemoryConsumerId::new(partition_id),
+            schema,
+            in_mem_batches: Mutex::new(vec![]),
+            spills: Mutex::new(vec![]),
+            expr,
+            runtime,
+            metrics,
+            used: AtomicUsize::new(0),
+            spilled_bytes: AtomicUsize::new(0),
+            spilled_count: AtomicUsize::new(0),
+        }
+    }
+
+    async fn insert_batch(&self, input: RecordBatch) -> Result<()> {
+        if input.num_rows() > 0 {
+            let size = batch_byte_size(&input);
+            self.try_grow(size).await?;
+            self.used.fetch_add(size, Ordering::SeqCst);
+            let mut in_mem_batches = self.in_mem_batches.lock().await;
+            in_mem_batches.push(input);
+        }
+        Ok(())
+    }
+
+    async fn spilled_before(&self) -> bool {
+        let spills = self.spills.lock().await;
+        !spills.is_empty()
+    }
+
+    /// MergeSort in mem batches as well as spills into total order with 
`SortPreservingMergeStream`.
+    async fn sort(&self) -> Result<SendableRecordBatchStream> {
+        let partition = self.partition_id();
+        let mut in_mem_batches = self.in_mem_batches.lock().await;
+
+        if self.spilled_before().await {
+            let baseline_metrics = 
self.metrics.new_intermediate_baseline(partition);
+            let mut streams: Vec<SortedStream> = vec![];
+            if in_mem_batches.len() > 0 {
+                let in_mem_stream = in_mem_partial_sort(
+                    &mut *in_mem_batches,
+                    self.schema.clone(),
+                    &self.expr,
+                    baseline_metrics,
+                )
+                .await?;
+                streams.push(SortedStream::new(in_mem_stream, self.used()));
+            }
+
+            let mut spills = self.spills.lock().await;
+
+            for spill in spills.drain(..) {
+                let stream = read_spill_as_stream(spill, 
self.schema.clone()).await?;
+                streams.push(SortedStream::new(stream, 0));
+            }
+            let baseline_metrics = self.metrics.new_final_baseline(partition);
+            Ok(Box::pin(
+                SortPreservingMergeStream::new_from_streams(
+                    streams,
+                    self.schema.clone(),
+                    &self.expr,
+                    baseline_metrics,
+                    partition,
+                    self.runtime.clone(),
+                )
+                .await,
+            ))
+        } else if in_mem_batches.len() > 0 {
+            let baseline_metrics = self.metrics.new_final_baseline(partition);
+            in_mem_partial_sort(
+                &mut *in_mem_batches,
+                self.schema.clone(),
+                &self.expr,
+                baseline_metrics,
+            )
+            .await
+        } else {
+            Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone())))
+        }
+    }
+
+    fn used(&self) -> usize {
+        self.used.load(Ordering::SeqCst)
+    }
+
+    fn spilled_bytes(&self) -> usize {
+        self.spilled_bytes.load(Ordering::SeqCst)
+    }
+
+    fn spilled_count(&self) -> usize {
+        self.spilled_count.load(Ordering::SeqCst)
+    }
+}
+
+impl Debug for ExternalSorter {
+    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+        f.debug_struct("ExternalSorter")
+            .field("id", &self.id())
+            .field("memory_used", &self.used())
+            .field("spilled_bytes", &self.spilled_bytes())
+            .field("spilled_count", &self.spilled_count())
+            .finish()
+    }
+}
+
+#[async_trait]
+impl MemoryConsumer for ExternalSorter {
+    fn name(&self) -> String {
+        "ExternalSorter".to_owned()
+    }
+
+    fn id(&self) -> &MemoryConsumerId {
+        &self.id
+    }
+
+    fn memory_manager(&self) -> Arc<MemoryManager> {
+        self.runtime.memory_manager.clone()
+    }
+
+    fn type_(&self) -> &ConsumerType {
+        &ConsumerType::Requesting
+    }
+
+    async fn spill(&self) -> Result<usize> {
+        info!(
+            "{}[{}] spilling sort data of {} to disk while inserting ({} 
time(s) so far)",
+            self.name(),
+            self.id(),
+            self.used(),
+            self.spilled_count()
+        );
+
+        let partition = self.partition_id();
+        let mut in_mem_batches = self.in_mem_batches.lock().await;
+        // we could always get a chance to free some memory as long as we are 
holding some
+        if in_mem_batches.len() == 0 {
+            return Ok(0);
+        }
+
+        let baseline_metrics = 
self.metrics.new_intermediate_baseline(partition);
+
+        let path = self.runtime.disk_manager.create_tmp_file()?;
+        let stream = in_mem_partial_sort(
+            &mut *in_mem_batches,
+            self.schema.clone(),
+            &*self.expr,
+            baseline_metrics,
+        )
+        .await;
+
+        let total_size =
+            spill_partial_sorted_stream(&mut stream?, path.clone(), 
self.schema.clone())
+                .await?;
+
+        let mut spills = self.spills.lock().await;
+        let used = self.used.swap(0, Ordering::SeqCst);
+        self.spilled_count.fetch_add(1, Ordering::SeqCst);
+        self.spilled_bytes.fetch_add(total_size, Ordering::SeqCst);
+        spills.push(path);
+        Ok(used)
+    }
+
+    fn mem_used(&self) -> usize {
+        self.used.load(Ordering::SeqCst)
+    }
+}
+
+/// consume the non-empty `sorted_bathes` and do in_mem_sort
+async fn in_mem_partial_sort(
+    buffered_batches: &mut Vec<RecordBatch>,
+    schema: SchemaRef,
+    expressions: &[PhysicalSortExpr],
+    baseline_metrics: BaselineMetrics,
+) -> Result<SendableRecordBatchStream> {
+    assert_ne!(buffered_batches.len(), 0);
+
+    let result = {
+        // NB timer records time taken on drop, so there are no
+        // calls to `timer.done()` below.
+        let _timer = baseline_metrics.elapsed_compute().timer();
+
+        let pre_sort = if buffered_batches.len() == 1 {
+            buffered_batches.pop()
+        } else {
+            let batches = buffered_batches.drain(..).collect::<Vec<_>>();
+            // combine all record batches into one for each column
+            common::combine_batches(&batches, schema.clone())?
+        };
+
+        pre_sort
+            .map(|batch| sort_batch(batch, schema.clone(), expressions))
+            .transpose()?
+    };
+
+    Ok(Box::pin(SizedRecordBatchStream::new(
+        schema,
+        vec![Arc::new(result.unwrap())],
+        baseline_metrics,
+    )))
+}
+
+async fn spill_partial_sorted_stream(
+    in_mem_stream: &mut SendableRecordBatchStream,
+    path: String,
+    schema: SchemaRef,
+) -> Result<usize> {
+    let (sender, receiver) = tokio::sync::mpsc::channel(2);
+    while let Some(item) = in_mem_stream.next().await {
+        sender.send(Some(item)).await.ok();
+    }
+    sender.send(None).await.ok();
+    let path_clone = path.clone();
+    let res =
+        task::spawn_blocking(move || write_sorted(receiver, path_clone, 
schema)).await;
+    match res {
+        Ok(r) => r,
+        Err(e) => Err(DataFusionError::Execution(format!(
+            "Error occurred while spilling {}",
+            e
+        ))),
+    }
+}
+
+async fn read_spill_as_stream(

Review comment:
       I don't think this function needs to be async

##########
File path: datafusion/src/physical_plan/sorts/sort.rs
##########
@@ -15,47 +15,450 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Defines the SORT plan
+//! Sort that deals with an arbitrary size of the input.
+//! It will do in-memory sorting if it has enough memory budget
+//! but spills to disk if needed.
 
 use crate::error::{DataFusionError, Result};
+use crate::execution::memory_manager::{
+    ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager,
+};
 use crate::execution::runtime_env::RuntimeEnv;
-use crate::physical_plan::common::AbortOnDropSingle;
+use crate::physical_plan::common::{batch_byte_size, IPCWriter, 
SizedRecordBatchStream};
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::metrics::{
-    BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput,
+    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricsSet, Time,
 };
+use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream;
+use crate::physical_plan::sorts::SortedStream;
+use crate::physical_plan::stream::RecordBatchReceiverStream;
 use crate::physical_plan::{
-    common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    SendableRecordBatchStream, Statistics,
 };
-use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream, 
Statistics};
+use arrow::array::ArrayRef;
 pub use arrow::compute::SortOptions;
 use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions};
 use arrow::datatypes::SchemaRef;
 use arrow::error::Result as ArrowResult;
+use arrow::ipc::reader::FileReader;
 use arrow::record_batch::RecordBatch;
-use arrow::{array::ArrayRef, error::ArrowError};
 use async_trait::async_trait;
-use futures::stream::Stream;
-use futures::Future;
-use pin_project_lite::pin_project;
+use futures::lock::Mutex;
+use futures::StreamExt;
+use log::{error, info};
 use std::any::Any;
-use std::pin::Pin;
+use std::fmt;
+use std::fmt::{Debug, Formatter};
+use std::fs::File;
+use std::io::BufReader;
+use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
-use std::task::{Context, Poll};
+use std::time::Duration;
+use tokio::sync::mpsc::{Receiver as TKReceiver, Sender as TKSender};
+use tokio::task;
+
+/// Sort arbitrary size of data to get an total order (may spill several times 
during sorting based on free memory available).
+///
+/// The basic architecture of the algorithm:
+///
+/// let spills = vec![];
+/// let in_mem_batches = vec![];
+/// while (input.has_next()) {
+///     let batch = input.next();
+///     // no enough memory available, spill first.
+///     if exec_memory_available < size_of(batch) {
+///         let ordered_stream = 
sort_preserving_merge(in_mem_batches.drain(..));
+///         let tmp_file = spill_write(ordered_stream);
+///         spills.push(tmp_file);
+///     }
+///     // sort the batch while it's probably still in cache and buffer it.
+///     let sorted = sort_by_key(batch);
+///     in_mem_batches.push(sorted);
+/// }
+///
+/// let partial_ordered_streams = vec![];
+/// let in_mem_stream = sort_preserving_merge(in_mem_batches.drain(..));
+/// partial_ordered_streams.push(in_mem_stream);
+/// partial_ordered_streams.extend(spills.drain(..).map(read_as_stream));
+/// let result = sort_preserving_merge(partial_ordered_streams);
+struct ExternalSorter {
+    id: MemoryConsumerId,
+    schema: SchemaRef,
+    in_mem_batches: Mutex<Vec<RecordBatch>>,
+    spills: Mutex<Vec<String>>,
+    /// Sort expressions
+    expr: Vec<PhysicalSortExpr>,
+    runtime: Arc<RuntimeEnv>,
+    metrics: AggregatedMetricsSet,
+    inner_metrics: BaselineMetrics,
+    used: AtomicUsize,
+    spilled_bytes: AtomicUsize,
+    spilled_count: AtomicUsize,
+}
+
+impl ExternalSorter {
+    pub fn new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        metrics: AggregatedMetricsSet,
+        runtime: Arc<RuntimeEnv>,
+    ) -> Self {
+        let inner_metrics = metrics.new_intermediate_baseline(partition_id);
+        Self {
+            id: MemoryConsumerId::new(partition_id),
+            schema,
+            in_mem_batches: Mutex::new(vec![]),
+            spills: Mutex::new(vec![]),
+            expr,
+            runtime,
+            metrics,
+            inner_metrics,
+            used: AtomicUsize::new(0),
+            spilled_bytes: AtomicUsize::new(0),
+            spilled_count: AtomicUsize::new(0),
+        }
+    }
+
+    async fn insert_batch(&self, input: RecordBatch) -> Result<()> {
+        if input.num_rows() > 0 {
+            let size = batch_byte_size(&input);
+            self.try_grow(size).await?;
+            self.used.fetch_add(size, Ordering::SeqCst);
+            // sort each batch as it's inserted, more probably to be 
cache-resident
+            let elapsed_compute = self.inner_metrics.elapsed_compute().clone();
+            let timer = elapsed_compute.timer();
+            let sorted_batch = sort_batch(input, self.schema.clone(), 
&*self.expr)?;
+            timer.done();
+            let mut in_mem_batches = self.in_mem_batches.lock().await;
+            in_mem_batches.push(sorted_batch);
+        }
+        Ok(())
+    }
+
+    async fn spilled_before(&self) -> bool {
+        let spills = self.spills.lock().await;
+        !spills.is_empty()
+    }
+
+    /// MergeSort in mem batches as well as spills into total order with 
`SortPreservingMergeStream`.
+    async fn sort(&self) -> Result<SendableRecordBatchStream> {
+        let partition = self.partition_id();
+        let mut in_mem_batches = self.in_mem_batches.lock().await;
+
+        if self.spilled_before().await {
+            let baseline_metrics = 
self.metrics.new_intermediate_baseline(partition);
+            let mut streams: Vec<SortedStream> = vec![];
+            let in_mem_stream = in_mem_partial_sort(
+                &mut *in_mem_batches,
+                self.schema.clone(),
+                &self.expr,
+                baseline_metrics,
+                self.runtime.clone(),
+            )
+            .await?;
+            streams.push(SortedStream::new(in_mem_stream, self.used()));
+
+            let mut spills = self.spills.lock().await;
+
+            for spill in spills.drain(..) {
+                let stream = read_spill_as_stream(spill, 
self.schema.clone()).await?;
+                streams.push(SortedStream::new(stream, 0));
+            }
+            let baseline_metrics = self.metrics.new_final_baseline(partition);
+            Ok(Box::pin(
+                SortPreservingMergeStream::new_from_streams(

Review comment:
       Nice :+1:

##########
File path: datafusion/src/physical_plan/sorts/sort.rs
##########
@@ -15,47 +15,432 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Defines the SORT plan
+//! Sort that deals with an arbitrary size of the input.
+//! It will do in-memory sorting if it has enough memory budget
+//! but spills to disk if needed.
 
 use crate::error::{DataFusionError, Result};
+use crate::execution::memory_manager::{
+    ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager,
+};
 use crate::execution::runtime_env::RuntimeEnv;
-use crate::physical_plan::common::AbortOnDropSingle;
+use crate::physical_plan::common::{batch_byte_size, IPCWriter, 
SizedRecordBatchStream};
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::metrics::{
-    BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput,
+    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricsSet, Time,
 };
+use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream;
+use crate::physical_plan::sorts::SortedStream;
+use crate::physical_plan::stream::RecordBatchReceiverStream;
 use crate::physical_plan::{
-    common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    common, DisplayFormatType, Distribution, EmptyRecordBatchStream, 
ExecutionPlan,
+    Partitioning, SendableRecordBatchStream, Statistics,
 };
-use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream, 
Statistics};
+use arrow::array::ArrayRef;
 pub use arrow::compute::SortOptions;
 use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions};
 use arrow::datatypes::SchemaRef;
 use arrow::error::Result as ArrowResult;
+use arrow::ipc::reader::FileReader;
 use arrow::record_batch::RecordBatch;
-use arrow::{array::ArrayRef, error::ArrowError};
 use async_trait::async_trait;
-use futures::stream::Stream;
-use futures::Future;
-use pin_project_lite::pin_project;
+use futures::lock::Mutex;
+use futures::StreamExt;
+use log::{error, info};
 use std::any::Any;
-use std::pin::Pin;
+use std::fmt;
+use std::fmt::{Debug, Formatter};
+use std::fs::File;
+use std::io::BufReader;
+use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
-use std::task::{Context, Poll};
+use std::time::Duration;
+use tokio::sync::mpsc::{Receiver as TKReceiver, Sender as TKSender};
+use tokio::task;
+
+/// Sort arbitrary size of data to get a total order (may spill several times 
during sorting based on free memory available).
+///
+/// The basic architecture of the algorithm:
+/// 1. get a non-empty new batch from input
+/// 2. check with the memory manager if we could buffer the batch in memory
+/// 2.1 if memory sufficient, then buffer batch in memory, go to 1.
+/// 2.2 if the memory threshold is reached, sort all buffered batches and 
spill to file.
+///     buffer the batch in memory, go to 1.
+/// 3. when input is exhausted, merge all in memory batches and spills to get 
a total order.
+struct ExternalSorter {
+    id: MemoryConsumerId,
+    schema: SchemaRef,
+    in_mem_batches: Mutex<Vec<RecordBatch>>,
+    spills: Mutex<Vec<String>>,
+    /// Sort expressions
+    expr: Vec<PhysicalSortExpr>,
+    runtime: Arc<RuntimeEnv>,
+    metrics: AggregatedMetricsSet,
+    used: AtomicUsize,
+    spilled_bytes: AtomicUsize,
+    spilled_count: AtomicUsize,
+}
 
-/// Sort execution plan
+impl ExternalSorter {
+    pub fn new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        metrics: AggregatedMetricsSet,
+        runtime: Arc<RuntimeEnv>,
+    ) -> Self {
+        Self {
+            id: MemoryConsumerId::new(partition_id),
+            schema,
+            in_mem_batches: Mutex::new(vec![]),
+            spills: Mutex::new(vec![]),
+            expr,
+            runtime,
+            metrics,
+            used: AtomicUsize::new(0),
+            spilled_bytes: AtomicUsize::new(0),
+            spilled_count: AtomicUsize::new(0),
+        }
+    }
+
+    async fn insert_batch(&self, input: RecordBatch) -> Result<()> {
+        if input.num_rows() > 0 {
+            let size = batch_byte_size(&input);
+            self.try_grow(size).await?;
+            self.used.fetch_add(size, Ordering::SeqCst);
+            let mut in_mem_batches = self.in_mem_batches.lock().await;
+            in_mem_batches.push(input);
+        }
+        Ok(())
+    }
+
+    async fn spilled_before(&self) -> bool {
+        let spills = self.spills.lock().await;
+        !spills.is_empty()
+    }
+
+    /// MergeSort in mem batches as well as spills into total order with 
`SortPreservingMergeStream`.
+    async fn sort(&self) -> Result<SendableRecordBatchStream> {
+        let partition = self.partition_id();
+        let mut in_mem_batches = self.in_mem_batches.lock().await;
+
+        if self.spilled_before().await {
+            let baseline_metrics = 
self.metrics.new_intermediate_baseline(partition);
+            let mut streams: Vec<SortedStream> = vec![];
+            if in_mem_batches.len() > 0 {
+                let in_mem_stream = in_mem_partial_sort(
+                    &mut *in_mem_batches,
+                    self.schema.clone(),
+                    &self.expr,
+                    baseline_metrics,
+                )
+                .await?;
+                streams.push(SortedStream::new(in_mem_stream, self.used()));
+            }
+
+            let mut spills = self.spills.lock().await;
+
+            for spill in spills.drain(..) {
+                let stream = read_spill_as_stream(spill, 
self.schema.clone()).await?;
+                streams.push(SortedStream::new(stream, 0));
+            }
+            let baseline_metrics = self.metrics.new_final_baseline(partition);
+            Ok(Box::pin(
+                SortPreservingMergeStream::new_from_streams(
+                    streams,
+                    self.schema.clone(),
+                    &self.expr,
+                    baseline_metrics,
+                    partition,
+                    self.runtime.clone(),
+                )
+                .await,
+            ))
+        } else if in_mem_batches.len() > 0 {
+            let baseline_metrics = self.metrics.new_final_baseline(partition);
+            in_mem_partial_sort(
+                &mut *in_mem_batches,
+                self.schema.clone(),
+                &self.expr,
+                baseline_metrics,
+            )
+            .await
+        } else {
+            Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone())))
+        }
+    }
+
+    fn used(&self) -> usize {
+        self.used.load(Ordering::SeqCst)
+    }
+
+    fn spilled_bytes(&self) -> usize {
+        self.spilled_bytes.load(Ordering::SeqCst)
+    }
+
+    fn spilled_count(&self) -> usize {
+        self.spilled_count.load(Ordering::SeqCst)
+    }
+}
+
+impl Debug for ExternalSorter {
+    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+        f.debug_struct("ExternalSorter")
+            .field("id", &self.id())
+            .field("memory_used", &self.used())
+            .field("spilled_bytes", &self.spilled_bytes())
+            .field("spilled_count", &self.spilled_count())
+            .finish()
+    }
+}
+
+#[async_trait]
+impl MemoryConsumer for ExternalSorter {
+    fn name(&self) -> String {
+        "ExternalSorter".to_owned()
+    }
+
+    fn id(&self) -> &MemoryConsumerId {
+        &self.id
+    }
+
+    fn memory_manager(&self) -> Arc<MemoryManager> {
+        self.runtime.memory_manager.clone()
+    }
+
+    fn type_(&self) -> &ConsumerType {
+        &ConsumerType::Requesting
+    }
+
+    async fn spill(&self) -> Result<usize> {
+        info!(
+            "{}[{}] spilling sort data of {} to disk while inserting ({} 
time(s) so far)",
+            self.name(),
+            self.id(),
+            self.used(),
+            self.spilled_count()
+        );
+
+        let partition = self.partition_id();
+        let mut in_mem_batches = self.in_mem_batches.lock().await;
+        // we could always get a chance to free some memory as long as we are 
holding some
+        if in_mem_batches.len() == 0 {
+            return Ok(0);
+        }
+
+        let baseline_metrics = 
self.metrics.new_intermediate_baseline(partition);
+
+        let path = self.runtime.disk_manager.create_tmp_file()?;
+        let stream = in_mem_partial_sort(
+            &mut *in_mem_batches,
+            self.schema.clone(),
+            &*self.expr,
+            baseline_metrics,
+        )
+        .await;
+
+        let total_size =
+            spill_partial_sorted_stream(&mut stream?, path.clone(), 
self.schema.clone())
+                .await?;
+
+        let mut spills = self.spills.lock().await;
+        let used = self.used.swap(0, Ordering::SeqCst);
+        self.spilled_count.fetch_add(1, Ordering::SeqCst);
+        self.spilled_bytes.fetch_add(total_size, Ordering::SeqCst);
+        spills.push(path);
+        Ok(used)
+    }
+
+    fn mem_used(&self) -> usize {
+        self.used.load(Ordering::SeqCst)
+    }
+}
+
+/// consume the non-empty `sorted_bathes` and do in_mem_sort
+async fn in_mem_partial_sort(
+    buffered_batches: &mut Vec<RecordBatch>,
+    schema: SchemaRef,
+    expressions: &[PhysicalSortExpr],
+    baseline_metrics: BaselineMetrics,
+) -> Result<SendableRecordBatchStream> {
+    assert_ne!(buffered_batches.len(), 0);
+
+    let result = {
+        // NB timer records time taken on drop, so there are no
+        // calls to `timer.done()` below.
+        let _timer = baseline_metrics.elapsed_compute().timer();
+
+        let pre_sort = if buffered_batches.len() == 1 {
+            buffered_batches.pop()
+        } else {
+            let batches = buffered_batches.drain(..).collect::<Vec<_>>();
+            // combine all record batches into one for each column
+            common::combine_batches(&batches, schema.clone())?
+        };
+
+        pre_sort
+            .map(|batch| sort_batch(batch, schema.clone(), expressions))
+            .transpose()?
+    };
+
+    Ok(Box::pin(SizedRecordBatchStream::new(
+        schema,
+        vec![Arc::new(result.unwrap())],
+        baseline_metrics,
+    )))
+}
+
+async fn spill_partial_sorted_stream(
+    in_mem_stream: &mut SendableRecordBatchStream,
+    path: String,
+    schema: SchemaRef,
+) -> Result<usize> {
+    let (sender, receiver) = tokio::sync::mpsc::channel(2);
+    while let Some(item) = in_mem_stream.next().await {
+        sender.send(Some(item)).await.ok();
+    }
+    sender.send(None).await.ok();

Review comment:
       I believe the tokio stream already has "hangup" detection...

##########
File path: datafusion/src/physical_plan/sorts/sort.rs
##########
@@ -15,47 +15,432 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Defines the SORT plan
+//! Sort that deals with an arbitrary size of the input.
+//! It will do in-memory sorting if it has enough memory budget
+//! but spills to disk if needed.
 
 use crate::error::{DataFusionError, Result};
+use crate::execution::memory_manager::{
+    ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager,
+};
 use crate::execution::runtime_env::RuntimeEnv;
-use crate::physical_plan::common::AbortOnDropSingle;
+use crate::physical_plan::common::{batch_byte_size, IPCWriter, 
SizedRecordBatchStream};
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::metrics::{
-    BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput,
+    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricsSet, Time,
 };
+use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream;
+use crate::physical_plan::sorts::SortedStream;
+use crate::physical_plan::stream::RecordBatchReceiverStream;
 use crate::physical_plan::{
-    common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    common, DisplayFormatType, Distribution, EmptyRecordBatchStream, 
ExecutionPlan,
+    Partitioning, SendableRecordBatchStream, Statistics,
 };
-use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream, 
Statistics};
+use arrow::array::ArrayRef;
 pub use arrow::compute::SortOptions;
 use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions};
 use arrow::datatypes::SchemaRef;
 use arrow::error::Result as ArrowResult;
+use arrow::ipc::reader::FileReader;
 use arrow::record_batch::RecordBatch;
-use arrow::{array::ArrayRef, error::ArrowError};
 use async_trait::async_trait;
-use futures::stream::Stream;
-use futures::Future;
-use pin_project_lite::pin_project;
+use futures::lock::Mutex;

Review comment:
       I think it should be possible to de-async the stream constructor to 
allow using parking_lot which is both lighter weight, and avoids the absolute 
brain melt that are async locks (they're also a monumental pain to debug)

##########
File path: datafusion/src/physical_plan/sorts/sort.rs
##########
@@ -15,47 +15,432 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Defines the SORT plan
+//! Sort that deals with an arbitrary size of the input.
+//! It will do in-memory sorting if it has enough memory budget
+//! but spills to disk if needed.
 
 use crate::error::{DataFusionError, Result};
+use crate::execution::memory_manager::{
+    ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager,
+};
 use crate::execution::runtime_env::RuntimeEnv;
-use crate::physical_plan::common::AbortOnDropSingle;
+use crate::physical_plan::common::{batch_byte_size, IPCWriter, 
SizedRecordBatchStream};
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::metrics::{
-    BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput,
+    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricsSet, Time,
 };
+use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream;
+use crate::physical_plan::sorts::SortedStream;
+use crate::physical_plan::stream::RecordBatchReceiverStream;
 use crate::physical_plan::{
-    common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    common, DisplayFormatType, Distribution, EmptyRecordBatchStream, 
ExecutionPlan,
+    Partitioning, SendableRecordBatchStream, Statistics,
 };
-use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream, 
Statistics};
+use arrow::array::ArrayRef;
 pub use arrow::compute::SortOptions;
 use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions};
 use arrow::datatypes::SchemaRef;
 use arrow::error::Result as ArrowResult;
+use arrow::ipc::reader::FileReader;
 use arrow::record_batch::RecordBatch;
-use arrow::{array::ArrayRef, error::ArrowError};
 use async_trait::async_trait;
-use futures::stream::Stream;
-use futures::Future;
-use pin_project_lite::pin_project;
+use futures::lock::Mutex;
+use futures::StreamExt;
+use log::{error, info};
 use std::any::Any;
-use std::pin::Pin;
+use std::fmt;
+use std::fmt::{Debug, Formatter};
+use std::fs::File;
+use std::io::BufReader;
+use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
-use std::task::{Context, Poll};
+use std::time::Duration;
+use tokio::sync::mpsc::{Receiver as TKReceiver, Sender as TKSender};
+use tokio::task;
+
+/// Sort arbitrary size of data to get a total order (may spill several times 
during sorting based on free memory available).
+///
+/// The basic architecture of the algorithm:
+/// 1. get a non-empty new batch from input
+/// 2. check with the memory manager if we could buffer the batch in memory
+/// 2.1 if memory sufficient, then buffer batch in memory, go to 1.
+/// 2.2 if the memory threshold is reached, sort all buffered batches and 
spill to file.
+///     buffer the batch in memory, go to 1.
+/// 3. when input is exhausted, merge all in memory batches and spills to get 
a total order.
+struct ExternalSorter {
+    id: MemoryConsumerId,
+    schema: SchemaRef,
+    in_mem_batches: Mutex<Vec<RecordBatch>>,
+    spills: Mutex<Vec<String>>,
+    /// Sort expressions
+    expr: Vec<PhysicalSortExpr>,
+    runtime: Arc<RuntimeEnv>,
+    metrics: AggregatedMetricsSet,
+    used: AtomicUsize,
+    spilled_bytes: AtomicUsize,
+    spilled_count: AtomicUsize,
+}
 
-/// Sort execution plan
+impl ExternalSorter {
+    pub fn new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        metrics: AggregatedMetricsSet,
+        runtime: Arc<RuntimeEnv>,
+    ) -> Self {
+        Self {
+            id: MemoryConsumerId::new(partition_id),
+            schema,
+            in_mem_batches: Mutex::new(vec![]),
+            spills: Mutex::new(vec![]),
+            expr,
+            runtime,
+            metrics,
+            used: AtomicUsize::new(0),
+            spilled_bytes: AtomicUsize::new(0),
+            spilled_count: AtomicUsize::new(0),
+        }
+    }
+
+    async fn insert_batch(&self, input: RecordBatch) -> Result<()> {
+        if input.num_rows() > 0 {
+            let size = batch_byte_size(&input);
+            self.try_grow(size).await?;
+            self.used.fetch_add(size, Ordering::SeqCst);
+            let mut in_mem_batches = self.in_mem_batches.lock().await;
+            in_mem_batches.push(input);
+        }
+        Ok(())
+    }
+
+    async fn spilled_before(&self) -> bool {
+        let spills = self.spills.lock().await;
+        !spills.is_empty()
+    }
+
+    /// MergeSort in mem batches as well as spills into total order with 
`SortPreservingMergeStream`.
+    async fn sort(&self) -> Result<SendableRecordBatchStream> {

Review comment:
       I understand that as ExternalSorter implements MemoryConsumer directly 
we need to wrap it in an Arc, but just an observation that the interface and 
implementation of this component would be simpler if it took mutable 
references... Can `insert_batch` be called after `sort`, if so what happens? 
What about concurrently? 
   
   Maybe something to think about, the borrow checker can only help you if you 
don't go behind its back with `Mutex`, `RefCell` and similar :laughing: 

##########
File path: datafusion/src/physical_plan/sorts/sort_preserving_merge.rs
##########
@@ -345,28 +347,28 @@ impl SortPreservingMergeStream {
             aborted: false,
             in_progress: vec![],
             next_batch_index: 0,
+            min_heap: BinaryHeap::with_capacity(stream_count),
             runtime,
         }
     }
 
-    pub(crate) async fn new_from_stream(
+    pub(crate) async fn new_from_streams(

Review comment:
       I know this PR didn't create this, but I'm not sure why constructors are 
async

##########
File path: datafusion/src/physical_plan/sorts/sort.rs
##########
@@ -227,115 +618,56 @@ pub(crate) fn sort_batch(
     )
 }
 
-pin_project! {
-    /// stream for sort plan
-    struct SortStream {
-        #[pin]
-        output: 
futures::channel::oneshot::Receiver<ArrowResult<Option<RecordBatch>>>,
-        finished: bool,
-        schema: SchemaRef,
-        drop_helper: AbortOnDropSingle<()>,
-    }
-}
-
-impl SortStream {
-    fn new(
-        input: SendableRecordBatchStream,
-        expr: Vec<PhysicalSortExpr>,
-        baseline_metrics: BaselineMetrics,
-    ) -> Self {
-        let (tx, rx) = futures::channel::oneshot::channel();
-        let schema = input.schema();
-        let join_handle = tokio::spawn(async move {
-            let schema = input.schema();
-            let sorted_batch = common::collect(input)
-                .await
-                .map_err(DataFusionError::into_arrow_external_error)
-                .and_then(move |batches| {
-                    let timer = baseline_metrics.elapsed_compute().timer();
-                    // combine all record batches into one for each column
-                    let combined = common::combine_batches(&batches, 
schema.clone())?;
-                    // sort combined record batch
-                    let result = combined
-                        .map(|batch| sort_batch(batch, schema, &expr))
-                        .transpose()?
-                        .record_output(&baseline_metrics);
-                    timer.done();
-                    Ok(result)
-                });
-
-            // failing here is OK, the receiver is gone and does not care 
about the result
-            tx.send(sorted_batch).ok();
-        });
-
-        Self {
-            output: rx,
-            finished: false,
-            schema,
-            drop_helper: AbortOnDropSingle::new(join_handle),
-        }
-    }
-}
-
-impl Stream for SortStream {
-    type Item = ArrowResult<RecordBatch>;
-
-    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
-        if self.finished {
-            return Poll::Ready(None);
-        }
-
-        // is the output ready?
-        let this = self.project();
-        let output_poll = this.output.poll(cx);
-
-        match output_poll {
-            Poll::Ready(result) => {
-                *this.finished = true;
-
-                // check for error in receiving channel and unwrap actual 
result
-                let result = match result {
-                    Err(e) => 
Some(Err(ArrowError::ExternalError(Box::new(e)))), // error receiving
-                    Ok(result) => result.transpose(),
-                };
-
-                Poll::Ready(result)
-            }
-            Poll::Pending => Poll::Pending,
-        }
+async fn do_sort(
+    mut input: SendableRecordBatchStream,
+    partition_id: usize,
+    expr: Vec<PhysicalSortExpr>,
+    metrics: AggregatedMetricsSet,
+    runtime: Arc<RuntimeEnv>,
+) -> Result<SendableRecordBatchStream> {
+    let schema = input.schema();
+    let sorter = Arc::new(ExternalSorter::new(
+        partition_id,
+        schema.clone(),
+        expr,
+        metrics,
+        runtime.clone(),
+    ));
+    runtime.register_consumer(&(sorter.clone() as Arc<dyn MemoryConsumer>));

Review comment:
       Is the eventual plan that the runtime can request a memory consumer 
spill its data, and this just currently isn't implemented. Just trying to work 
out what purpose this serves :sweat_smile: 

##########
File path: datafusion/src/physical_plan/sorts/sort.rs
##########
@@ -15,47 +15,432 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Defines the SORT plan
+//! Sort that deals with an arbitrary size of the input.
+//! It will do in-memory sorting if it has enough memory budget
+//! but spills to disk if needed.
 
 use crate::error::{DataFusionError, Result};
+use crate::execution::memory_manager::{
+    ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager,
+};
 use crate::execution::runtime_env::RuntimeEnv;
-use crate::physical_plan::common::AbortOnDropSingle;
+use crate::physical_plan::common::{batch_byte_size, IPCWriter, 
SizedRecordBatchStream};
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::metrics::{
-    BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput,
+    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricsSet, Time,
 };
+use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream;
+use crate::physical_plan::sorts::SortedStream;
+use crate::physical_plan::stream::RecordBatchReceiverStream;
 use crate::physical_plan::{
-    common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    common, DisplayFormatType, Distribution, EmptyRecordBatchStream, 
ExecutionPlan,
+    Partitioning, SendableRecordBatchStream, Statistics,
 };
-use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream, 
Statistics};
+use arrow::array::ArrayRef;
 pub use arrow::compute::SortOptions;
 use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions};
 use arrow::datatypes::SchemaRef;
 use arrow::error::Result as ArrowResult;
+use arrow::ipc::reader::FileReader;
 use arrow::record_batch::RecordBatch;
-use arrow::{array::ArrayRef, error::ArrowError};
 use async_trait::async_trait;
-use futures::stream::Stream;
-use futures::Future;
-use pin_project_lite::pin_project;
+use futures::lock::Mutex;
+use futures::StreamExt;
+use log::{error, info};
 use std::any::Any;
-use std::pin::Pin;
+use std::fmt;
+use std::fmt::{Debug, Formatter};
+use std::fs::File;
+use std::io::BufReader;
+use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
-use std::task::{Context, Poll};
+use std::time::Duration;
+use tokio::sync::mpsc::{Receiver as TKReceiver, Sender as TKSender};
+use tokio::task;
+
+/// Sort arbitrary size of data to get a total order (may spill several times 
during sorting based on free memory available).
+///
+/// The basic architecture of the algorithm:
+/// 1. get a non-empty new batch from input
+/// 2. check with the memory manager if we could buffer the batch in memory
+/// 2.1 if memory sufficient, then buffer batch in memory, go to 1.
+/// 2.2 if the memory threshold is reached, sort all buffered batches and 
spill to file.
+///     buffer the batch in memory, go to 1.
+/// 3. when input is exhausted, merge all in memory batches and spills to get 
a total order.
+struct ExternalSorter {
+    id: MemoryConsumerId,
+    schema: SchemaRef,
+    in_mem_batches: Mutex<Vec<RecordBatch>>,
+    spills: Mutex<Vec<String>>,
+    /// Sort expressions
+    expr: Vec<PhysicalSortExpr>,
+    runtime: Arc<RuntimeEnv>,
+    metrics: AggregatedMetricsSet,
+    used: AtomicUsize,
+    spilled_bytes: AtomicUsize,
+    spilled_count: AtomicUsize,
+}
 
-/// Sort execution plan
+impl ExternalSorter {
+    pub fn new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        metrics: AggregatedMetricsSet,
+        runtime: Arc<RuntimeEnv>,
+    ) -> Self {
+        Self {
+            id: MemoryConsumerId::new(partition_id),
+            schema,
+            in_mem_batches: Mutex::new(vec![]),
+            spills: Mutex::new(vec![]),
+            expr,
+            runtime,
+            metrics,
+            used: AtomicUsize::new(0),
+            spilled_bytes: AtomicUsize::new(0),
+            spilled_count: AtomicUsize::new(0),
+        }
+    }
+
+    async fn insert_batch(&self, input: RecordBatch) -> Result<()> {
+        if input.num_rows() > 0 {
+            let size = batch_byte_size(&input);
+            self.try_grow(size).await?;
+            self.used.fetch_add(size, Ordering::SeqCst);
+            let mut in_mem_batches = self.in_mem_batches.lock().await;
+            in_mem_batches.push(input);
+        }
+        Ok(())
+    }
+
+    async fn spilled_before(&self) -> bool {
+        let spills = self.spills.lock().await;
+        !spills.is_empty()
+    }
+
+    /// MergeSort in mem batches as well as spills into total order with 
`SortPreservingMergeStream`.
+    async fn sort(&self) -> Result<SendableRecordBatchStream> {
+        let partition = self.partition_id();
+        let mut in_mem_batches = self.in_mem_batches.lock().await;
+
+        if self.spilled_before().await {
+            let baseline_metrics = 
self.metrics.new_intermediate_baseline(partition);
+            let mut streams: Vec<SortedStream> = vec![];
+            if in_mem_batches.len() > 0 {
+                let in_mem_stream = in_mem_partial_sort(
+                    &mut *in_mem_batches,
+                    self.schema.clone(),
+                    &self.expr,
+                    baseline_metrics,
+                )
+                .await?;
+                streams.push(SortedStream::new(in_mem_stream, self.used()));
+            }
+
+            let mut spills = self.spills.lock().await;
+
+            for spill in spills.drain(..) {
+                let stream = read_spill_as_stream(spill, 
self.schema.clone()).await?;
+                streams.push(SortedStream::new(stream, 0));
+            }
+            let baseline_metrics = self.metrics.new_final_baseline(partition);
+            Ok(Box::pin(
+                SortPreservingMergeStream::new_from_streams(
+                    streams,
+                    self.schema.clone(),
+                    &self.expr,
+                    baseline_metrics,
+                    partition,
+                    self.runtime.clone(),
+                )
+                .await,
+            ))
+        } else if in_mem_batches.len() > 0 {
+            let baseline_metrics = self.metrics.new_final_baseline(partition);
+            in_mem_partial_sort(
+                &mut *in_mem_batches,
+                self.schema.clone(),
+                &self.expr,
+                baseline_metrics,
+            )
+            .await
+        } else {
+            Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone())))
+        }
+    }
+
+    fn used(&self) -> usize {
+        self.used.load(Ordering::SeqCst)
+    }
+
+    fn spilled_bytes(&self) -> usize {
+        self.spilled_bytes.load(Ordering::SeqCst)
+    }
+
+    fn spilled_count(&self) -> usize {
+        self.spilled_count.load(Ordering::SeqCst)
+    }
+}
+
+impl Debug for ExternalSorter {
+    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+        f.debug_struct("ExternalSorter")
+            .field("id", &self.id())
+            .field("memory_used", &self.used())
+            .field("spilled_bytes", &self.spilled_bytes())
+            .field("spilled_count", &self.spilled_count())
+            .finish()
+    }
+}
+
+#[async_trait]
+impl MemoryConsumer for ExternalSorter {
+    fn name(&self) -> String {
+        "ExternalSorter".to_owned()
+    }
+
+    fn id(&self) -> &MemoryConsumerId {
+        &self.id
+    }
+
+    fn memory_manager(&self) -> Arc<MemoryManager> {
+        self.runtime.memory_manager.clone()
+    }
+
+    fn type_(&self) -> &ConsumerType {
+        &ConsumerType::Requesting
+    }
+
+    async fn spill(&self) -> Result<usize> {
+        info!(
+            "{}[{}] spilling sort data of {} to disk while inserting ({} 
time(s) so far)",
+            self.name(),
+            self.id(),
+            self.used(),
+            self.spilled_count()
+        );
+
+        let partition = self.partition_id();
+        let mut in_mem_batches = self.in_mem_batches.lock().await;
+        // we could always get a chance to free some memory as long as we are 
holding some
+        if in_mem_batches.len() == 0 {
+            return Ok(0);
+        }
+
+        let baseline_metrics = 
self.metrics.new_intermediate_baseline(partition);
+
+        let path = self.runtime.disk_manager.create_tmp_file()?;
+        let stream = in_mem_partial_sort(
+            &mut *in_mem_batches,
+            self.schema.clone(),
+            &*self.expr,
+            baseline_metrics,
+        )
+        .await;
+
+        let total_size =
+            spill_partial_sorted_stream(&mut stream?, path.clone(), 
self.schema.clone())
+                .await?;
+
+        let mut spills = self.spills.lock().await;
+        let used = self.used.swap(0, Ordering::SeqCst);
+        self.spilled_count.fetch_add(1, Ordering::SeqCst);
+        self.spilled_bytes.fetch_add(total_size, Ordering::SeqCst);
+        spills.push(path);
+        Ok(used)
+    }
+
+    fn mem_used(&self) -> usize {
+        self.used.load(Ordering::SeqCst)
+    }
+}
+
+/// consume the non-empty `sorted_bathes` and do in_mem_sort
+async fn in_mem_partial_sort(
+    buffered_batches: &mut Vec<RecordBatch>,
+    schema: SchemaRef,
+    expressions: &[PhysicalSortExpr],
+    baseline_metrics: BaselineMetrics,
+) -> Result<SendableRecordBatchStream> {
+    assert_ne!(buffered_batches.len(), 0);
+
+    let result = {
+        // NB timer records time taken on drop, so there are no
+        // calls to `timer.done()` below.
+        let _timer = baseline_metrics.elapsed_compute().timer();
+
+        let pre_sort = if buffered_batches.len() == 1 {
+            buffered_batches.pop()
+        } else {
+            let batches = buffered_batches.drain(..).collect::<Vec<_>>();
+            // combine all record batches into one for each column
+            common::combine_batches(&batches, schema.clone())?
+        };
+
+        pre_sort
+            .map(|batch| sort_batch(batch, schema.clone(), expressions))
+            .transpose()?
+    };
+
+    Ok(Box::pin(SizedRecordBatchStream::new(
+        schema,
+        vec![Arc::new(result.unwrap())],
+        baseline_metrics,
+    )))
+}
+
+async fn spill_partial_sorted_stream(
+    in_mem_stream: &mut SendableRecordBatchStream,
+    path: String,
+    schema: SchemaRef,
+) -> Result<usize> {
+    let (sender, receiver) = tokio::sync::mpsc::channel(2);
+    while let Some(item) = in_mem_stream.next().await {
+        sender.send(Some(item)).await.ok();
+    }
+    sender.send(None).await.ok();
+    let path_clone = path.clone();
+    let res =
+        task::spawn_blocking(move || write_sorted(receiver, path_clone, 
schema)).await;
+    match res {
+        Ok(r) => r,
+        Err(e) => Err(DataFusionError::Execution(format!(
+            "Error occurred while spilling {}",
+            e
+        ))),
+    }
+}
+
+async fn read_spill_as_stream(
+    path: String,
+    schema: SchemaRef,
+) -> Result<SendableRecordBatchStream> {
+    let (sender, receiver): (
+        TKSender<ArrowResult<RecordBatch>>,
+        TKReceiver<ArrowResult<RecordBatch>>,
+    ) = tokio::sync::mpsc::channel(2);
+    let path_clone = path.clone();
+    let join_handle = task::spawn_blocking(move || {
+        if let Err(e) = read_spill(sender, path_clone) {
+            error!("Failure while reading spill file: {}. Error: {}", path, e);
+        }
+    });
+    Ok(RecordBatchReceiverStream::create(
+        &schema,
+        receiver,
+        join_handle,
+    ))
+}
+
+fn write_sorted(
+    mut receiver: TKReceiver<Option<ArrowResult<RecordBatch>>>,
+    path: String,
+    schema: SchemaRef,
+) -> Result<usize> {
+    let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?;
+    while let Some(Some(batch)) = receiver.blocking_recv() {
+        writer.write(&batch?)?;
+    }
+    writer.finish()?;
+    info!(
+        "Spilled {} batches of total {} rows to disk, memory released {}",
+        writer.num_batches, writer.num_rows, writer.num_bytes
+    );
+    Ok(writer.num_bytes as usize)
+}
+
+fn read_spill(sender: TKSender<ArrowResult<RecordBatch>>, path: String) -> 
Result<()> {
+    let file = BufReader::new(File::open(&path)?);
+    let reader = FileReader::try_new(file)?;
+    for batch in reader {
+        sender
+            .blocking_send(batch)
+            .map_err(|e| DataFusionError::Execution(format!("{}", e)))?;
+    }
+    Ok(())
+}
+
+/// External Sort execution plan
 #[derive(Debug)]
 pub struct SortExec {
     /// Input schema
     input: Arc<dyn ExecutionPlan>,
     /// Sort expressions
     expr: Vec<PhysicalSortExpr>,
-    /// Execution metrics
-    metrics: ExecutionPlanMetricsSet,
+    /// Containing all metrics set created during sort
+    all_metrics: AggregatedMetricsSet,
     /// Preserve partitions of input plan
     preserve_partitioning: bool,
 }
 
+#[derive(Debug, Clone)]
+struct AggregatedMetricsSet {

Review comment:
       I think some doc comments might help explain what this is for, what the 
methods do, etc...

##########
File path: datafusion/tests/sql/joins.rs
##########
@@ -419,32 +419,32 @@ async fn cross_join_unbalanced() {
 
     // the order of the values is not determinisitic, so we need to sort to 
check the values
     let sql =
-        "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN t2 ORDER BY t1_id, 
t1_name";
+        "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN t2 ORDER BY t1_id, 
t1_name, t2_name";

Review comment:
       Does this mean that the merge is no longer stable, i.e. always takes 
from the lowest partition in the event of equivalent keys? This would be a 
non-trivial behaviour change, that FWIW would break IOx

##########
File path: datafusion/src/physical_plan/sorts/sort.rs
##########
@@ -15,47 +15,432 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Defines the SORT plan
+//! Sort that deals with an arbitrary size of the input.
+//! It will do in-memory sorting if it has enough memory budget
+//! but spills to disk if needed.
 
 use crate::error::{DataFusionError, Result};
+use crate::execution::memory_manager::{
+    ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager,
+};
 use crate::execution::runtime_env::RuntimeEnv;
-use crate::physical_plan::common::AbortOnDropSingle;
+use crate::physical_plan::common::{batch_byte_size, IPCWriter, 
SizedRecordBatchStream};
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::metrics::{
-    BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput,
+    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricsSet, Time,
 };
+use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream;
+use crate::physical_plan::sorts::SortedStream;
+use crate::physical_plan::stream::RecordBatchReceiverStream;
 use crate::physical_plan::{
-    common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    common, DisplayFormatType, Distribution, EmptyRecordBatchStream, 
ExecutionPlan,
+    Partitioning, SendableRecordBatchStream, Statistics,
 };
-use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream, 
Statistics};
+use arrow::array::ArrayRef;
 pub use arrow::compute::SortOptions;
 use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions};
 use arrow::datatypes::SchemaRef;
 use arrow::error::Result as ArrowResult;
+use arrow::ipc::reader::FileReader;
 use arrow::record_batch::RecordBatch;
-use arrow::{array::ArrayRef, error::ArrowError};
 use async_trait::async_trait;
-use futures::stream::Stream;
-use futures::Future;
-use pin_project_lite::pin_project;
+use futures::lock::Mutex;
+use futures::StreamExt;
+use log::{error, info};
 use std::any::Any;
-use std::pin::Pin;
+use std::fmt;
+use std::fmt::{Debug, Formatter};
+use std::fs::File;
+use std::io::BufReader;
+use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
-use std::task::{Context, Poll};
+use std::time::Duration;
+use tokio::sync::mpsc::{Receiver as TKReceiver, Sender as TKSender};
+use tokio::task;
+
+/// Sort arbitrary size of data to get a total order (may spill several times 
during sorting based on free memory available).
+///
+/// The basic architecture of the algorithm:
+/// 1. get a non-empty new batch from input
+/// 2. check with the memory manager if we could buffer the batch in memory
+/// 2.1 if memory sufficient, then buffer batch in memory, go to 1.
+/// 2.2 if the memory threshold is reached, sort all buffered batches and 
spill to file.
+///     buffer the batch in memory, go to 1.
+/// 3. when input is exhausted, merge all in memory batches and spills to get 
a total order.
+struct ExternalSorter {
+    id: MemoryConsumerId,
+    schema: SchemaRef,
+    in_mem_batches: Mutex<Vec<RecordBatch>>,
+    spills: Mutex<Vec<String>>,
+    /// Sort expressions
+    expr: Vec<PhysicalSortExpr>,
+    runtime: Arc<RuntimeEnv>,
+    metrics: AggregatedMetricsSet,
+    used: AtomicUsize,
+    spilled_bytes: AtomicUsize,
+    spilled_count: AtomicUsize,
+}
 
-/// Sort execution plan
+impl ExternalSorter {
+    pub fn new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        metrics: AggregatedMetricsSet,
+        runtime: Arc<RuntimeEnv>,
+    ) -> Self {
+        Self {
+            id: MemoryConsumerId::new(partition_id),
+            schema,
+            in_mem_batches: Mutex::new(vec![]),
+            spills: Mutex::new(vec![]),
+            expr,
+            runtime,
+            metrics,
+            used: AtomicUsize::new(0),
+            spilled_bytes: AtomicUsize::new(0),
+            spilled_count: AtomicUsize::new(0),
+        }
+    }
+
+    async fn insert_batch(&self, input: RecordBatch) -> Result<()> {
+        if input.num_rows() > 0 {
+            let size = batch_byte_size(&input);
+            self.try_grow(size).await?;
+            self.used.fetch_add(size, Ordering::SeqCst);
+            let mut in_mem_batches = self.in_mem_batches.lock().await;
+            in_mem_batches.push(input);
+        }
+        Ok(())
+    }
+
+    async fn spilled_before(&self) -> bool {
+        let spills = self.spills.lock().await;
+        !spills.is_empty()
+    }
+
+    /// MergeSort in mem batches as well as spills into total order with 
`SortPreservingMergeStream`.
+    async fn sort(&self) -> Result<SendableRecordBatchStream> {
+        let partition = self.partition_id();
+        let mut in_mem_batches = self.in_mem_batches.lock().await;
+
+        if self.spilled_before().await {
+            let baseline_metrics = 
self.metrics.new_intermediate_baseline(partition);
+            let mut streams: Vec<SortedStream> = vec![];
+            if in_mem_batches.len() > 0 {
+                let in_mem_stream = in_mem_partial_sort(
+                    &mut *in_mem_batches,
+                    self.schema.clone(),
+                    &self.expr,
+                    baseline_metrics,
+                )
+                .await?;
+                streams.push(SortedStream::new(in_mem_stream, self.used()));
+            }
+
+            let mut spills = self.spills.lock().await;
+
+            for spill in spills.drain(..) {
+                let stream = read_spill_as_stream(spill, 
self.schema.clone()).await?;
+                streams.push(SortedStream::new(stream, 0));
+            }
+            let baseline_metrics = self.metrics.new_final_baseline(partition);
+            Ok(Box::pin(
+                SortPreservingMergeStream::new_from_streams(
+                    streams,
+                    self.schema.clone(),
+                    &self.expr,
+                    baseline_metrics,
+                    partition,
+                    self.runtime.clone(),
+                )
+                .await,
+            ))
+        } else if in_mem_batches.len() > 0 {
+            let baseline_metrics = self.metrics.new_final_baseline(partition);
+            in_mem_partial_sort(
+                &mut *in_mem_batches,
+                self.schema.clone(),
+                &self.expr,
+                baseline_metrics,
+            )
+            .await
+        } else {
+            Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone())))
+        }
+    }
+
+    fn used(&self) -> usize {
+        self.used.load(Ordering::SeqCst)
+    }
+
+    fn spilled_bytes(&self) -> usize {
+        self.spilled_bytes.load(Ordering::SeqCst)
+    }
+
+    fn spilled_count(&self) -> usize {
+        self.spilled_count.load(Ordering::SeqCst)
+    }
+}
+
+impl Debug for ExternalSorter {
+    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+        f.debug_struct("ExternalSorter")
+            .field("id", &self.id())
+            .field("memory_used", &self.used())
+            .field("spilled_bytes", &self.spilled_bytes())
+            .field("spilled_count", &self.spilled_count())
+            .finish()
+    }
+}
+
+#[async_trait]
+impl MemoryConsumer for ExternalSorter {
+    fn name(&self) -> String {
+        "ExternalSorter".to_owned()
+    }
+
+    fn id(&self) -> &MemoryConsumerId {
+        &self.id
+    }
+
+    fn memory_manager(&self) -> Arc<MemoryManager> {
+        self.runtime.memory_manager.clone()
+    }
+
+    fn type_(&self) -> &ConsumerType {
+        &ConsumerType::Requesting
+    }
+
+    async fn spill(&self) -> Result<usize> {
+        info!(
+            "{}[{}] spilling sort data of {} to disk while inserting ({} 
time(s) so far)",
+            self.name(),
+            self.id(),
+            self.used(),
+            self.spilled_count()
+        );
+
+        let partition = self.partition_id();
+        let mut in_mem_batches = self.in_mem_batches.lock().await;
+        // we could always get a chance to free some memory as long as we are 
holding some
+        if in_mem_batches.len() == 0 {
+            return Ok(0);
+        }
+
+        let baseline_metrics = 
self.metrics.new_intermediate_baseline(partition);
+
+        let path = self.runtime.disk_manager.create_tmp_file()?;
+        let stream = in_mem_partial_sort(
+            &mut *in_mem_batches,
+            self.schema.clone(),
+            &*self.expr,
+            baseline_metrics,
+        )
+        .await;
+
+        let total_size =
+            spill_partial_sorted_stream(&mut stream?, path.clone(), 
self.schema.clone())
+                .await?;
+
+        let mut spills = self.spills.lock().await;
+        let used = self.used.swap(0, Ordering::SeqCst);
+        self.spilled_count.fetch_add(1, Ordering::SeqCst);
+        self.spilled_bytes.fetch_add(total_size, Ordering::SeqCst);
+        spills.push(path);
+        Ok(used)
+    }
+
+    fn mem_used(&self) -> usize {
+        self.used.load(Ordering::SeqCst)
+    }
+}
+
+/// consume the non-empty `sorted_bathes` and do in_mem_sort
+async fn in_mem_partial_sort(
+    buffered_batches: &mut Vec<RecordBatch>,
+    schema: SchemaRef,
+    expressions: &[PhysicalSortExpr],
+    baseline_metrics: BaselineMetrics,
+) -> Result<SendableRecordBatchStream> {
+    assert_ne!(buffered_batches.len(), 0);
+
+    let result = {
+        // NB timer records time taken on drop, so there are no
+        // calls to `timer.done()` below.
+        let _timer = baseline_metrics.elapsed_compute().timer();
+
+        let pre_sort = if buffered_batches.len() == 1 {
+            buffered_batches.pop()
+        } else {
+            let batches = buffered_batches.drain(..).collect::<Vec<_>>();
+            // combine all record batches into one for each column
+            common::combine_batches(&batches, schema.clone())?
+        };
+
+        pre_sort
+            .map(|batch| sort_batch(batch, schema.clone(), expressions))
+            .transpose()?
+    };
+
+    Ok(Box::pin(SizedRecordBatchStream::new(
+        schema,
+        vec![Arc::new(result.unwrap())],
+        baseline_metrics,
+    )))
+}
+
+async fn spill_partial_sorted_stream(
+    in_mem_stream: &mut SendableRecordBatchStream,
+    path: String,
+    schema: SchemaRef,
+) -> Result<usize> {
+    let (sender, receiver) = tokio::sync::mpsc::channel(2);
+    while let Some(item) = in_mem_stream.next().await {
+        sender.send(Some(item)).await.ok();

Review comment:
       Won't this deadlock if it sends more than two batches as the receiver 
isn't yet consuming?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to