yjshen commented on a change in pull request #1596:
URL: https://github.com/apache/arrow-datafusion/pull/1596#discussion_r787593754
##########
File path: datafusion/src/physical_plan/sorts/sort.rs
##########
@@ -15,47 +15,432 @@
// specific language governing permissions and limitations
// under the License.
-//! Defines the SORT plan
+//! Sort that deals with an arbitrary size of the input.
+//! It will do in-memory sorting if it has enough memory budget
+//! but spills to disk if needed.
use crate::error::{DataFusionError, Result};
+use crate::execution::memory_manager::{
+ ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager,
+};
use crate::execution::runtime_env::RuntimeEnv;
-use crate::physical_plan::common::AbortOnDropSingle;
+use crate::physical_plan::common::{batch_byte_size, IPCWriter,
SizedRecordBatchStream};
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::metrics::{
- BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput,
+ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricsSet, Time,
};
+use
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream;
+use crate::physical_plan::sorts::SortedStream;
+use crate::physical_plan::stream::RecordBatchReceiverStream;
use crate::physical_plan::{
- common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+ common, DisplayFormatType, Distribution, EmptyRecordBatchStream,
ExecutionPlan,
+ Partitioning, SendableRecordBatchStream, Statistics,
};
-use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream,
Statistics};
+use arrow::array::ArrayRef;
pub use arrow::compute::SortOptions;
use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions};
use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
+use arrow::ipc::reader::FileReader;
use arrow::record_batch::RecordBatch;
-use arrow::{array::ArrayRef, error::ArrowError};
use async_trait::async_trait;
-use futures::stream::Stream;
-use futures::Future;
-use pin_project_lite::pin_project;
+use futures::lock::Mutex;
Review comment:
I think parking_lot is not used in DataFusion, I once use that but
removed it later
https://github.com/yjshen/arrow-datafusion/pull/3/commits/66796280ae51f045c6f6f10fa152968e18c71b8f.
Do you think we can add this dependency?
##########
File path: datafusion/tests/sql/joins.rs
##########
@@ -419,32 +419,32 @@ async fn cross_join_unbalanced() {
// the order of the values is not determinisitic, so we need to sort to
check the values
let sql =
- "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN t2 ORDER BY t1_id,
t1_name";
+ "SELECT t1_id, t1_name, t2_name FROM t1 CROSS JOIN t2 ORDER BY t1_id,
t1_name, t2_name";
Review comment:
Previously, since I was using SPMS and heap sort to do in_mem_sort, I
find the sort non-stable. for the current situation that I change back to
combine then sort method, the behavior is unchanged.
However, I think the test here against stableness is out of the scope for
SQL sort operator?
##########
File path: datafusion/src/physical_plan/sorts/sort.rs
##########
@@ -15,47 +15,432 @@
// specific language governing permissions and limitations
// under the License.
-//! Defines the SORT plan
+//! Sort that deals with an arbitrary size of the input.
+//! It will do in-memory sorting if it has enough memory budget
+//! but spills to disk if needed.
use crate::error::{DataFusionError, Result};
+use crate::execution::memory_manager::{
+ ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager,
+};
use crate::execution::runtime_env::RuntimeEnv;
-use crate::physical_plan::common::AbortOnDropSingle;
+use crate::physical_plan::common::{batch_byte_size, IPCWriter,
SizedRecordBatchStream};
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::metrics::{
- BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput,
+ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricsSet, Time,
};
+use
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream;
+use crate::physical_plan::sorts::SortedStream;
+use crate::physical_plan::stream::RecordBatchReceiverStream;
use crate::physical_plan::{
- common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
+ common, DisplayFormatType, Distribution, EmptyRecordBatchStream,
ExecutionPlan,
+ Partitioning, SendableRecordBatchStream, Statistics,
};
-use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream,
Statistics};
+use arrow::array::ArrayRef;
pub use arrow::compute::SortOptions;
use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions};
use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
+use arrow::ipc::reader::FileReader;
use arrow::record_batch::RecordBatch;
-use arrow::{array::ArrayRef, error::ArrowError};
use async_trait::async_trait;
-use futures::stream::Stream;
-use futures::Future;
-use pin_project_lite::pin_project;
+use futures::lock::Mutex;
+use futures::StreamExt;
+use log::{error, info};
use std::any::Any;
-use std::pin::Pin;
+use std::fmt;
+use std::fmt::{Debug, Formatter};
+use std::fs::File;
+use std::io::BufReader;
+use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
-use std::task::{Context, Poll};
+use std::time::Duration;
+use tokio::sync::mpsc::{Receiver as TKReceiver, Sender as TKSender};
+use tokio::task;
+
+/// Sort arbitrary size of data to get a total order (may spill several times
during sorting based on free memory available).
+///
+/// The basic architecture of the algorithm:
+/// 1. get a non-empty new batch from input
+/// 2. check with the memory manager if we could buffer the batch in memory
+/// 2.1 if memory sufficient, then buffer batch in memory, go to 1.
+/// 2.2 if the memory threshold is reached, sort all buffered batches and
spill to file.
+/// buffer the batch in memory, go to 1.
+/// 3. when input is exhausted, merge all in memory batches and spills to get
a total order.
+struct ExternalSorter {
+ id: MemoryConsumerId,
+ schema: SchemaRef,
+ in_mem_batches: Mutex<Vec<RecordBatch>>,
+ spills: Mutex<Vec<String>>,
+ /// Sort expressions
+ expr: Vec<PhysicalSortExpr>,
+ runtime: Arc<RuntimeEnv>,
+ metrics: AggregatedMetricsSet,
+ used: AtomicUsize,
+ spilled_bytes: AtomicUsize,
+ spilled_count: AtomicUsize,
+}
-/// Sort execution plan
+impl ExternalSorter {
+ pub fn new(
+ partition_id: usize,
+ schema: SchemaRef,
+ expr: Vec<PhysicalSortExpr>,
+ metrics: AggregatedMetricsSet,
+ runtime: Arc<RuntimeEnv>,
+ ) -> Self {
+ Self {
+ id: MemoryConsumerId::new(partition_id),
+ schema,
+ in_mem_batches: Mutex::new(vec![]),
+ spills: Mutex::new(vec![]),
+ expr,
+ runtime,
+ metrics,
+ used: AtomicUsize::new(0),
+ spilled_bytes: AtomicUsize::new(0),
+ spilled_count: AtomicUsize::new(0),
+ }
+ }
+
+ async fn insert_batch(&self, input: RecordBatch) -> Result<()> {
+ if input.num_rows() > 0 {
+ let size = batch_byte_size(&input);
+ self.try_grow(size).await?;
+ self.used.fetch_add(size, Ordering::SeqCst);
+ let mut in_mem_batches = self.in_mem_batches.lock().await;
+ in_mem_batches.push(input);
+ }
+ Ok(())
+ }
+
+ async fn spilled_before(&self) -> bool {
+ let spills = self.spills.lock().await;
+ !spills.is_empty()
+ }
+
+ /// MergeSort in mem batches as well as spills into total order with
`SortPreservingMergeStream`.
+ async fn sort(&self) -> Result<SendableRecordBatchStream> {
+ let partition = self.partition_id();
+ let mut in_mem_batches = self.in_mem_batches.lock().await;
+
+ if self.spilled_before().await {
+ let baseline_metrics =
self.metrics.new_intermediate_baseline(partition);
+ let mut streams: Vec<SortedStream> = vec![];
+ if in_mem_batches.len() > 0 {
+ let in_mem_stream = in_mem_partial_sort(
+ &mut *in_mem_batches,
+ self.schema.clone(),
+ &self.expr,
+ baseline_metrics,
+ )
+ .await?;
+ streams.push(SortedStream::new(in_mem_stream, self.used()));
+ }
+
+ let mut spills = self.spills.lock().await;
+
+ for spill in spills.drain(..) {
+ let stream = read_spill_as_stream(spill,
self.schema.clone()).await?;
+ streams.push(SortedStream::new(stream, 0));
+ }
+ let baseline_metrics = self.metrics.new_final_baseline(partition);
+ Ok(Box::pin(
+ SortPreservingMergeStream::new_from_streams(
+ streams,
+ self.schema.clone(),
+ &self.expr,
+ baseline_metrics,
+ partition,
+ self.runtime.clone(),
+ )
+ .await,
+ ))
+ } else if in_mem_batches.len() > 0 {
+ let baseline_metrics = self.metrics.new_final_baseline(partition);
+ in_mem_partial_sort(
+ &mut *in_mem_batches,
+ self.schema.clone(),
+ &self.expr,
+ baseline_metrics,
+ )
+ .await
+ } else {
+ Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone())))
+ }
+ }
+
+ fn used(&self) -> usize {
+ self.used.load(Ordering::SeqCst)
+ }
+
+ fn spilled_bytes(&self) -> usize {
+ self.spilled_bytes.load(Ordering::SeqCst)
+ }
+
+ fn spilled_count(&self) -> usize {
+ self.spilled_count.load(Ordering::SeqCst)
+ }
+}
+
+impl Debug for ExternalSorter {
+ fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+ f.debug_struct("ExternalSorter")
+ .field("id", &self.id())
+ .field("memory_used", &self.used())
+ .field("spilled_bytes", &self.spilled_bytes())
+ .field("spilled_count", &self.spilled_count())
+ .finish()
+ }
+}
+
+#[async_trait]
+impl MemoryConsumer for ExternalSorter {
+ fn name(&self) -> String {
+ "ExternalSorter".to_owned()
+ }
+
+ fn id(&self) -> &MemoryConsumerId {
+ &self.id
+ }
+
+ fn memory_manager(&self) -> Arc<MemoryManager> {
+ self.runtime.memory_manager.clone()
+ }
+
+ fn type_(&self) -> &ConsumerType {
+ &ConsumerType::Requesting
+ }
+
+ async fn spill(&self) -> Result<usize> {
+ info!(
+ "{}[{}] spilling sort data of {} to disk while inserting ({}
time(s) so far)",
+ self.name(),
+ self.id(),
+ self.used(),
+ self.spilled_count()
+ );
+
+ let partition = self.partition_id();
+ let mut in_mem_batches = self.in_mem_batches.lock().await;
+ // we could always get a chance to free some memory as long as we are
holding some
+ if in_mem_batches.len() == 0 {
+ return Ok(0);
+ }
+
+ let baseline_metrics =
self.metrics.new_intermediate_baseline(partition);
+
+ let path = self.runtime.disk_manager.create_tmp_file()?;
+ let stream = in_mem_partial_sort(
+ &mut *in_mem_batches,
+ self.schema.clone(),
+ &*self.expr,
+ baseline_metrics,
+ )
+ .await;
+
+ let total_size =
+ spill_partial_sorted_stream(&mut stream?, path.clone(),
self.schema.clone())
+ .await?;
+
+ let mut spills = self.spills.lock().await;
+ let used = self.used.swap(0, Ordering::SeqCst);
+ self.spilled_count.fetch_add(1, Ordering::SeqCst);
+ self.spilled_bytes.fetch_add(total_size, Ordering::SeqCst);
+ spills.push(path);
+ Ok(used)
+ }
+
+ fn mem_used(&self) -> usize {
+ self.used.load(Ordering::SeqCst)
+ }
+}
+
+/// consume the non-empty `sorted_bathes` and do in_mem_sort
+async fn in_mem_partial_sort(
Review comment:
Thanks! I think this is async as well as the SMPS constructor's async is
introduced when I have `register consumer` / `drop consumer` as both async.
but it's synced now because of the use of `std::sync::Mutex` instead of
`futures::lock::Mutex`.
Thanks again for remaining and I'll remove this.
##########
File path: datafusion/src/physical_plan/sorts/sort.rs
##########
@@ -227,115 +618,56 @@ pub(crate) fn sort_batch(
)
}
-pin_project! {
- /// stream for sort plan
- struct SortStream {
- #[pin]
- output:
futures::channel::oneshot::Receiver<ArrowResult<Option<RecordBatch>>>,
- finished: bool,
- schema: SchemaRef,
- drop_helper: AbortOnDropSingle<()>,
- }
-}
-
-impl SortStream {
- fn new(
- input: SendableRecordBatchStream,
- expr: Vec<PhysicalSortExpr>,
- baseline_metrics: BaselineMetrics,
- ) -> Self {
- let (tx, rx) = futures::channel::oneshot::channel();
- let schema = input.schema();
- let join_handle = tokio::spawn(async move {
- let schema = input.schema();
- let sorted_batch = common::collect(input)
- .await
- .map_err(DataFusionError::into_arrow_external_error)
- .and_then(move |batches| {
- let timer = baseline_metrics.elapsed_compute().timer();
- // combine all record batches into one for each column
- let combined = common::combine_batches(&batches,
schema.clone())?;
- // sort combined record batch
- let result = combined
- .map(|batch| sort_batch(batch, schema, &expr))
- .transpose()?
- .record_output(&baseline_metrics);
- timer.done();
- Ok(result)
- });
-
- // failing here is OK, the receiver is gone and does not care
about the result
- tx.send(sorted_batch).ok();
- });
-
- Self {
- output: rx,
- finished: false,
- schema,
- drop_helper: AbortOnDropSingle::new(join_handle),
- }
- }
-}
-
-impl Stream for SortStream {
- type Item = ArrowResult<RecordBatch>;
-
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
- if self.finished {
- return Poll::Ready(None);
- }
-
- // is the output ready?
- let this = self.project();
- let output_poll = this.output.poll(cx);
-
- match output_poll {
- Poll::Ready(result) => {
- *this.finished = true;
-
- // check for error in receiving channel and unwrap actual
result
- let result = match result {
- Err(e) =>
Some(Err(ArrowError::ExternalError(Box::new(e)))), // error receiving
- Ok(result) => result.transpose(),
- };
-
- Poll::Ready(result)
- }
- Poll::Pending => Poll::Pending,
- }
+async fn do_sort(
+ mut input: SendableRecordBatchStream,
+ partition_id: usize,
+ expr: Vec<PhysicalSortExpr>,
+ metrics: AggregatedMetricsSet,
+ runtime: Arc<RuntimeEnv>,
+) -> Result<SendableRecordBatchStream> {
+ let schema = input.schema();
+ let sorter = Arc::new(ExternalSorter::new(
+ partition_id,
+ schema.clone(),
+ expr,
+ metrics,
+ runtime.clone(),
+ ));
+ runtime.register_consumer(&(sorter.clone() as Arc<dyn MemoryConsumer>));
Review comment:
```
/// Sort arbitrary size of data to get a total order (may spill several
times during sorting based on free memory available).
///
/// The basic architecture of the algorithm:
/// 1. get a non-empty new batch from input
/// 2. check with the memory manager if we could buffer the batch in memory
/// 2.1 if memory sufficient, then buffer batch in memory, go to 1.
/// 2.2 if the memory threshold is reached, sort all buffered batches and
spill to file.
/// buffer the batch in memory, go to 1.
/// 3. when input is exhausted, merge all in memory batches and spills to
get a total order.
```
Each time a consumer tries to acquire more memory, it asks the memory
manager.
The memory manager checks for the number of active consumers and grants at
most 1/n of available memory to each consumer, so if the consumer already has
more than 1/n of memory, it calls spill and frees the memory first.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]