This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 37c02c1 ARROW-10673: [Rust] [DataFusion] Made sort not collect on
`execute`.
37c02c1 is described below
commit 37c02c1b3d99e713430d30e7cbdbb7a599530fb9
Author: Jorge C. Leitao <[email protected]>
AuthorDate: Sat Nov 21 18:06:51 2020 -0500
ARROW-10673: [Rust] [DataFusion] Made sort not collect on `execute`.
Currently, we `collect` and `sort` the record batches from the incoming
part of `Sort` on `execute`. However, there is no need to do so: we can
postpone that to the stream, like we do for our aggregates. This allow
executors to postpone the heavy operation of collecting and sorting to only
when they want the first (and only) batch from the stream.
This PR does exactly this: it postpones the execution to when needed.
This code's design is based on HashAggregate, that also uses a one shot
channel.
Closes #8730 from jorgecarleitao/sort_post
Authored-by: Jorge C. Leitao <[email protected]>
Signed-off-by: Andrew Lamb <[email protected]>
---
rust/datafusion/src/physical_plan/sort.rs | 183 +++++++++++++++++++++---------
1 file changed, 129 insertions(+), 54 deletions(-)
diff --git a/rust/datafusion/src/physical_plan/sort.rs
b/rust/datafusion/src/physical_plan/sort.rs
index 0c3601a..c734e2a 100644
--- a/rust/datafusion/src/physical_plan/sort.rs
+++ b/rust/datafusion/src/physical_plan/sort.rs
@@ -18,17 +18,24 @@
//! Defines the SORT plan
use std::any::Any;
+use std::pin::Pin;
use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use futures::stream::Stream;
+use futures::Future;
+
+use pin_project_lite::pin_project;
-use arrow::array::ArrayRef;
pub use arrow::compute::SortOptions;
use arrow::compute::{concat, lexsort_to_indices, take, SortColumn,
TakeOptions};
use arrow::datatypes::SchemaRef;
+use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
+use arrow::{array::ArrayRef, error::ArrowError};
-use super::SendableRecordBatchStream;
+use super::{RecordBatchStream, SendableRecordBatchStream};
use crate::error::{DataFusionError, Result};
-use crate::physical_plan::common::SizedRecordBatchStream;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::{common, Distribution, ExecutionPlan, Partitioning};
@@ -114,60 +121,128 @@ impl ExecutionPlan for SortExec {
"SortExec requires a single input partition".to_owned(),
));
}
- let it = self.input.execute(0).await?;
- let batches = common::collect(it).await?;
-
- // combine all record batches into one for each column
- let combined_batch = RecordBatch::try_new(
- self.schema(),
- self.schema()
- .fields()
- .iter()
- .enumerate()
- .map(|(i, _)| -> Result<ArrayRef> {
- Ok(concat(
- &batches
- .iter()
- .map(|batch| batch.columns()[i].clone())
- .collect::<Vec<ArrayRef>>(),
- )?)
- })
- .collect::<Result<Vec<ArrayRef>>>()?,
- )?;
+ let input = self.input.execute(0).await?;
- // sort combined record batch
- let indices = lexsort_to_indices(
- &self
- .expr
- .iter()
- .map(|e| e.evaluate_to_sort_column(&combined_batch))
- .collect::<Result<Vec<SortColumn>>>()?,
- )?;
+ Ok(Box::pin(SortStream::new(input, self.expr.clone())))
+ }
+}
- // reorder all rows based on sorted indices
- let sorted_batch = RecordBatch::try_new(
- self.schema(),
- combined_batch
- .columns()
- .iter()
- .map(|column| -> Result<ArrayRef> {
- Ok(take(
- column,
- &indices,
- // disable bound check overhead since indices are
already generated from
- // the same record batch
- Some(TakeOptions {
- check_bounds: false,
- }),
- )?)
- })
- .collect::<Result<Vec<ArrayRef>>>()?,
- )?;
+fn sort_batches(
+ batches: &Vec<RecordBatch>,
+ schema: &SchemaRef,
+ expr: &[PhysicalSortExpr],
+) -> ArrowResult<RecordBatch> {
+ // combine all record batches into one for each column
+ let combined_batch = RecordBatch::try_new(
+ schema.clone(),
+ schema
+ .fields()
+ .iter()
+ .enumerate()
+ .map(|(i, _)| {
+ concat(
+ &batches
+ .iter()
+ .map(|batch| batch.columns()[i].clone())
+ .collect::<Vec<ArrayRef>>(),
+ )
+ })
+ .collect::<ArrowResult<Vec<ArrayRef>>>()?,
+ )?;
+
+ // sort combined record batch
+ let indices = lexsort_to_indices(
+ &expr
+ .iter()
+ .map(|e| e.evaluate_to_sort_column(&combined_batch))
+ .collect::<Result<Vec<SortColumn>>>()
+ .map_err(DataFusionError::into_arrow_external_error)?,
+ )?;
+
+ // reorder all rows based on sorted indices
+ RecordBatch::try_new(
+ schema.clone(),
+ combined_batch
+ .columns()
+ .iter()
+ .map(|column| {
+ take(
+ column,
+ &indices,
+ // disable bound check overhead since indices are already
generated from
+ // the same record batch
+ Some(TakeOptions {
+ check_bounds: false,
+ }),
+ )
+ })
+ .collect::<ArrowResult<Vec<ArrayRef>>>()?,
+ )
+}
+
+pin_project! {
+ struct SortStream {
+ #[pin]
+ output: futures::channel::oneshot::Receiver<ArrowResult<RecordBatch>>,
+ finished: bool,
+ schema: SchemaRef,
+ }
+}
+
+impl SortStream {
+ fn new(input: SendableRecordBatchStream, expr: Vec<PhysicalSortExpr>) ->
Self {
+ let (tx, rx) = futures::channel::oneshot::channel();
+
+ let schema = input.schema();
+ tokio::spawn(async move {
+ let schema = input.schema();
+ let sorted_batch = common::collect(input)
+ .await
+ .map_err(DataFusionError::into_arrow_external_error)
+ .and_then(move |batches| sort_batches(&batches, &schema,
&expr));
+
+ tx.send(sorted_batch)
+ });
+
+ Self {
+ output: rx,
+ finished: false,
+ schema,
+ }
+ }
+}
+
+impl Stream for SortStream {
+ type Item = ArrowResult<RecordBatch>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
+ if self.finished {
+ return Poll::Ready(None);
+ }
+
+ // is the output ready?
+ let this = self.project();
+ let output_poll = this.output.poll(cx);
- Ok(Box::pin(SizedRecordBatchStream::new(
- self.schema(),
- vec![Arc::new(sorted_batch)],
- )))
+ match output_poll {
+ Poll::Ready(result) => {
+ *this.finished = true;
+
+ // check for error in receiving channel and unwrap actual
result
+ let result = match result {
+ Err(e) => Err(ArrowError::ExternalError(Box::new(e))), //
error receiving
+ Ok(result) => result,
+ };
+ Poll::Ready(Some(result))
+ }
+ Poll::Pending => Poll::Pending,
+ }
+ }
+}
+
+impl RecordBatchStream for SortStream {
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
}
}