tustvold commented on a change in pull request #1596:
URL: https://github.com/apache/arrow-datafusion/pull/1596#discussion_r787635919



##########
File path: datafusion/src/physical_plan/sorts/sort.rs
##########
@@ -15,47 +15,432 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Defines the SORT plan
+//! Sort that deals with an arbitrary size of the input.
+//! It will do in-memory sorting if it has enough memory budget
+//! but spills to disk if needed.
 
 use crate::error::{DataFusionError, Result};
+use crate::execution::memory_manager::{
+    ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager,
+};
 use crate::execution::runtime_env::RuntimeEnv;
-use crate::physical_plan::common::AbortOnDropSingle;
+use crate::physical_plan::common::{batch_byte_size, IPCWriter, 
SizedRecordBatchStream};
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::metrics::{
-    BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput,
+    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricsSet, Time,
 };
+use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream;
+use crate::physical_plan::sorts::SortedStream;
+use crate::physical_plan::stream::RecordBatchReceiverStream;
 use crate::physical_plan::{
-    common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+    common, DisplayFormatType, Distribution, EmptyRecordBatchStream, 
ExecutionPlan,
+    Partitioning, SendableRecordBatchStream, Statistics,
 };
-use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream, 
Statistics};
+use arrow::array::ArrayRef;
 pub use arrow::compute::SortOptions;
 use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions};
 use arrow::datatypes::SchemaRef;
 use arrow::error::Result as ArrowResult;
+use arrow::ipc::reader::FileReader;
 use arrow::record_batch::RecordBatch;
-use arrow::{array::ArrayRef, error::ArrowError};
 use async_trait::async_trait;
-use futures::stream::Stream;
-use futures::Future;
-use pin_project_lite::pin_project;
+use futures::lock::Mutex;
+use futures::StreamExt;
+use log::{error, info};
 use std::any::Any;
-use std::pin::Pin;
+use std::fmt;
+use std::fmt::{Debug, Formatter};
+use std::fs::File;
+use std::io::BufReader;
+use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
-use std::task::{Context, Poll};
+use std::time::Duration;
+use tokio::sync::mpsc::{Receiver as TKReceiver, Sender as TKSender};
+use tokio::task;
+
+/// Sort arbitrary size of data to get a total order (may spill several times 
during sorting based on free memory available).
+///
+/// The basic architecture of the algorithm:
+/// 1. get a non-empty new batch from input
+/// 2. check with the memory manager if we could buffer the batch in memory
+/// 2.1 if memory sufficient, then buffer batch in memory, go to 1.
+/// 2.2 if the memory threshold is reached, sort all buffered batches and 
spill to file.
+///     buffer the batch in memory, go to 1.
+/// 3. when input is exhausted, merge all in memory batches and spills to get 
a total order.
+struct ExternalSorter {
+    id: MemoryConsumerId,
+    schema: SchemaRef,
+    in_mem_batches: Mutex<Vec<RecordBatch>>,
+    spills: Mutex<Vec<String>>,
+    /// Sort expressions
+    expr: Vec<PhysicalSortExpr>,
+    runtime: Arc<RuntimeEnv>,
+    metrics: AggregatedMetricsSet,
+    used: AtomicUsize,
+    spilled_bytes: AtomicUsize,
+    spilled_count: AtomicUsize,
+}
 
-/// Sort execution plan
+impl ExternalSorter {
+    pub fn new(
+        partition_id: usize,
+        schema: SchemaRef,
+        expr: Vec<PhysicalSortExpr>,
+        metrics: AggregatedMetricsSet,
+        runtime: Arc<RuntimeEnv>,
+    ) -> Self {
+        Self {
+            id: MemoryConsumerId::new(partition_id),
+            schema,
+            in_mem_batches: Mutex::new(vec![]),
+            spills: Mutex::new(vec![]),
+            expr,
+            runtime,
+            metrics,
+            used: AtomicUsize::new(0),
+            spilled_bytes: AtomicUsize::new(0),
+            spilled_count: AtomicUsize::new(0),
+        }
+    }
+
+    async fn insert_batch(&self, input: RecordBatch) -> Result<()> {
+        if input.num_rows() > 0 {
+            let size = batch_byte_size(&input);
+            self.try_grow(size).await?;
+            self.used.fetch_add(size, Ordering::SeqCst);
+            let mut in_mem_batches = self.in_mem_batches.lock().await;
+            in_mem_batches.push(input);
+        }
+        Ok(())
+    }
+
+    async fn spilled_before(&self) -> bool {
+        let spills = self.spills.lock().await;
+        !spills.is_empty()
+    }
+
+    /// MergeSort in mem batches as well as spills into total order with 
`SortPreservingMergeStream`.
+    async fn sort(&self) -> Result<SendableRecordBatchStream> {
+        let partition = self.partition_id();
+        let mut in_mem_batches = self.in_mem_batches.lock().await;
+
+        if self.spilled_before().await {
+            let baseline_metrics = 
self.metrics.new_intermediate_baseline(partition);
+            let mut streams: Vec<SortedStream> = vec![];
+            if in_mem_batches.len() > 0 {
+                let in_mem_stream = in_mem_partial_sort(
+                    &mut *in_mem_batches,
+                    self.schema.clone(),
+                    &self.expr,
+                    baseline_metrics,
+                )
+                .await?;
+                streams.push(SortedStream::new(in_mem_stream, self.used()));
+            }
+
+            let mut spills = self.spills.lock().await;
+
+            for spill in spills.drain(..) {
+                let stream = read_spill_as_stream(spill, 
self.schema.clone()).await?;
+                streams.push(SortedStream::new(stream, 0));
+            }
+            let baseline_metrics = self.metrics.new_final_baseline(partition);
+            Ok(Box::pin(
+                SortPreservingMergeStream::new_from_streams(
+                    streams,
+                    self.schema.clone(),
+                    &self.expr,
+                    baseline_metrics,
+                    partition,
+                    self.runtime.clone(),
+                )
+                .await,
+            ))
+        } else if in_mem_batches.len() > 0 {
+            let baseline_metrics = self.metrics.new_final_baseline(partition);
+            in_mem_partial_sort(
+                &mut *in_mem_batches,
+                self.schema.clone(),
+                &self.expr,
+                baseline_metrics,
+            )
+            .await
+        } else {
+            Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone())))
+        }
+    }
+
+    fn used(&self) -> usize {
+        self.used.load(Ordering::SeqCst)
+    }
+
+    fn spilled_bytes(&self) -> usize {
+        self.spilled_bytes.load(Ordering::SeqCst)
+    }
+
+    fn spilled_count(&self) -> usize {
+        self.spilled_count.load(Ordering::SeqCst)
+    }
+}
+
+impl Debug for ExternalSorter {
+    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+        f.debug_struct("ExternalSorter")
+            .field("id", &self.id())
+            .field("memory_used", &self.used())
+            .field("spilled_bytes", &self.spilled_bytes())
+            .field("spilled_count", &self.spilled_count())
+            .finish()
+    }
+}
+
+#[async_trait]
+impl MemoryConsumer for ExternalSorter {
+    fn name(&self) -> String {
+        "ExternalSorter".to_owned()
+    }
+
+    fn id(&self) -> &MemoryConsumerId {
+        &self.id
+    }
+
+    fn memory_manager(&self) -> Arc<MemoryManager> {
+        self.runtime.memory_manager.clone()
+    }
+
+    fn type_(&self) -> &ConsumerType {
+        &ConsumerType::Requesting
+    }
+
+    async fn spill(&self) -> Result<usize> {
+        info!(
+            "{}[{}] spilling sort data of {} to disk while inserting ({} 
time(s) so far)",
+            self.name(),
+            self.id(),
+            self.used(),
+            self.spilled_count()
+        );
+
+        let partition = self.partition_id();
+        let mut in_mem_batches = self.in_mem_batches.lock().await;
+        // we could always get a chance to free some memory as long as we are 
holding some
+        if in_mem_batches.len() == 0 {
+            return Ok(0);
+        }
+
+        let baseline_metrics = 
self.metrics.new_intermediate_baseline(partition);
+
+        let path = self.runtime.disk_manager.create_tmp_file()?;
+        let stream = in_mem_partial_sort(
+            &mut *in_mem_batches,
+            self.schema.clone(),
+            &*self.expr,
+            baseline_metrics,
+        )
+        .await;
+
+        let total_size =
+            spill_partial_sorted_stream(&mut stream?, path.clone(), 
self.schema.clone())
+                .await?;
+
+        let mut spills = self.spills.lock().await;
+        let used = self.used.swap(0, Ordering::SeqCst);
+        self.spilled_count.fetch_add(1, Ordering::SeqCst);
+        self.spilled_bytes.fetch_add(total_size, Ordering::SeqCst);
+        spills.push(path);
+        Ok(used)
+    }
+
+    fn mem_used(&self) -> usize {
+        self.used.load(Ordering::SeqCst)
+    }
+}
+
+/// consume the non-empty `sorted_bathes` and do in_mem_sort
+async fn in_mem_partial_sort(
+    buffered_batches: &mut Vec<RecordBatch>,
+    schema: SchemaRef,
+    expressions: &[PhysicalSortExpr],
+    baseline_metrics: BaselineMetrics,
+) -> Result<SendableRecordBatchStream> {
+    assert_ne!(buffered_batches.len(), 0);
+
+    let result = {
+        // NB timer records time taken on drop, so there are no
+        // calls to `timer.done()` below.
+        let _timer = baseline_metrics.elapsed_compute().timer();
+
+        let pre_sort = if buffered_batches.len() == 1 {
+            buffered_batches.pop()
+        } else {
+            let batches = buffered_batches.drain(..).collect::<Vec<_>>();
+            // combine all record batches into one for each column
+            common::combine_batches(&batches, schema.clone())?
+        };
+
+        pre_sort
+            .map(|batch| sort_batch(batch, schema.clone(), expressions))
+            .transpose()?
+    };
+
+    Ok(Box::pin(SizedRecordBatchStream::new(
+        schema,
+        vec![Arc::new(result.unwrap())],
+        baseline_metrics,
+    )))
+}
+
+async fn spill_partial_sorted_stream(
+    in_mem_stream: &mut SendableRecordBatchStream,
+    path: String,
+    schema: SchemaRef,
+) -> Result<usize> {
+    let (sender, receiver) = tokio::sync::mpsc::channel(2);
+    while let Some(item) = in_mem_stream.next().await {
+        sender.send(Some(item)).await.ok();
+    }
+    sender.send(None).await.ok();
+    let path_clone = path.clone();
+    let res =
+        task::spawn_blocking(move || write_sorted(receiver, path_clone, 
schema)).await;
+    match res {
+        Ok(r) => r,
+        Err(e) => Err(DataFusionError::Execution(format!(
+            "Error occurred while spilling {}",
+            e
+        ))),
+    }
+}
+
+async fn read_spill_as_stream(
+    path: String,
+    schema: SchemaRef,
+) -> Result<SendableRecordBatchStream> {
+    let (sender, receiver): (
+        TKSender<ArrowResult<RecordBatch>>,
+        TKReceiver<ArrowResult<RecordBatch>>,
+    ) = tokio::sync::mpsc::channel(2);
+    let path_clone = path.clone();
+    let join_handle = task::spawn_blocking(move || {
+        if let Err(e) = read_spill(sender, path_clone) {
+            error!("Failure while reading spill file: {}. Error: {}", path, e);
+        }
+    });
+    Ok(RecordBatchReceiverStream::create(
+        &schema,
+        receiver,
+        join_handle,
+    ))
+}
+
+fn write_sorted(
+    mut receiver: TKReceiver<Option<ArrowResult<RecordBatch>>>,
+    path: String,
+    schema: SchemaRef,
+) -> Result<usize> {
+    let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?;

Review comment:
       More as an FYI for @alamb but poking around here `IPCWriter` uses 
dictionary IDs and will error if batches with different IDs are written. This 
will likely cause problems with the way arrow-rs, and IOx, currently handle 
dictionaries... I've created https://github.com/apache/arrow-rs/issues/1206 to 
clarify what is going on here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to