This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new d637871 Implement streaming versions of Dataframe.collect methods
(#789)
d637871 is described below
commit d637871ce969102681616ea113cf84288ff6c252
Author: Andy Grove <[email protected]>
AuthorDate: Fri Jul 30 10:30:39 2021 -0700
Implement streaming versions of Dataframe.collect methods (#789)
---
datafusion/src/dataframe.rs | 31 ++++++++++
datafusion/src/execution/dataframe_impl.rs | 44 ++++++++++----
datafusion/src/physical_plan/mod.rs | 92 ++++++++++++++++++++++--------
3 files changed, 130 insertions(+), 37 deletions(-)
diff --git a/datafusion/src/dataframe.rs b/datafusion/src/dataframe.rs
index 507a798..1d4cffd 100644
--- a/datafusion/src/dataframe.rs
+++ b/datafusion/src/dataframe.rs
@@ -24,6 +24,7 @@ use crate::logical_plan::{
};
use std::sync::Arc;
+use crate::physical_plan::SendableRecordBatchStream;
use async_trait::async_trait;
/// DataFrame represents a logical set of rows with the same named columns.
@@ -222,6 +223,21 @@ pub trait DataFrame: Send + Sync {
/// ```
async fn collect(&self) -> Result<Vec<RecordBatch>>;
+ /// Executes this DataFrame and returns a stream over a single partition
+ ///
+ /// ```
+ /// # use datafusion::prelude::*;
+ /// # use datafusion::error::Result;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<()> {
+ /// let mut ctx = ExecutionContext::new();
+ /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
+ /// let stream = df.execute_stream().await?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ async fn execute_stream(&self) -> Result<SendableRecordBatchStream>;
+
/// Executes this DataFrame and collects all results into a vector of
vector of RecordBatch
/// maintaining the input partitioning.
///
@@ -238,6 +254,21 @@ pub trait DataFrame: Send + Sync {
/// ```
async fn collect_partitioned(&self) -> Result<Vec<Vec<RecordBatch>>>;
+ /// Executes this DataFrame and returns one stream per partition.
+ ///
+ /// ```
+ /// # use datafusion::prelude::*;
+ /// # use datafusion::error::Result;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<()> {
+ /// let mut ctx = ExecutionContext::new();
+ /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
+ /// let batches = df.execute_stream_partitioned().await?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ async fn execute_stream_partitioned(&self) ->
Result<Vec<SendableRecordBatchStream>>;
+
/// Returns the schema describing the output of this DataFrame in terms of
columns returned,
/// where each column has a name, data type, and nullability attribute.
diff --git a/datafusion/src/execution/dataframe_impl.rs
b/datafusion/src/execution/dataframe_impl.rs
index 451c4c7..1c0094b 100644
--- a/datafusion/src/execution/dataframe_impl.rs
+++ b/datafusion/src/execution/dataframe_impl.rs
@@ -31,6 +31,9 @@ use crate::{
physical_plan::{collect, collect_partitioned},
};
+use crate::physical_plan::{
+ execute_stream, execute_stream_partitioned, ExecutionPlan,
SendableRecordBatchStream,
+};
use async_trait::async_trait;
/// Implementation of DataFrame API
@@ -47,6 +50,14 @@ impl DataFrameImpl {
plan: plan.clone(),
}
}
+
+ /// Create a physical plan
+ async fn create_physical_plan(&self) -> Result<Arc<dyn ExecutionPlan>> {
+ let state = self.ctx_state.lock().unwrap().clone();
+ let ctx = ExecutionContext::from(Arc::new(Mutex::new(state)));
+ let plan = ctx.optimize(&self.plan)?;
+ ctx.create_physical_plan(&plan)
+ }
}
#[async_trait]
@@ -138,26 +149,35 @@ impl DataFrame for DataFrameImpl {
self.plan.clone()
}
- // Convert the logical plan represented by this DataFrame into a physical
plan and
- // execute it
+ /// Convert the logical plan represented by this DataFrame into a physical
plan and
+ /// execute it, collecting all resulting batches into memory
async fn collect(&self) -> Result<Vec<RecordBatch>> {
- let state = self.ctx_state.lock().unwrap().clone();
- let ctx = ExecutionContext::from(Arc::new(Mutex::new(state)));
- let plan = ctx.optimize(&self.plan)?;
- let plan = ctx.create_physical_plan(&plan)?;
+ let plan = self.create_physical_plan().await?;
Ok(collect(plan).await?)
}
- // Convert the logical plan represented by this DataFrame into a physical
plan and
- // execute it
+ /// Convert the logical plan represented by this DataFrame into a physical
plan and
+ /// execute it, returning a stream over a single partition
+ async fn execute_stream(&self) -> Result<SendableRecordBatchStream> {
+ let plan = self.create_physical_plan().await?;
+ execute_stream(plan).await
+ }
+
+ /// Convert the logical plan represented by this DataFrame into a physical
plan and
+ /// execute it, collecting all resulting batches into memory while
maintaining
+ /// partitioning
async fn collect_partitioned(&self) -> Result<Vec<Vec<RecordBatch>>> {
- let state = self.ctx_state.lock().unwrap().clone();
- let ctx = ExecutionContext::from(Arc::new(Mutex::new(state)));
- let plan = ctx.optimize(&self.plan)?;
- let plan = ctx.create_physical_plan(&plan)?;
+ let plan = self.create_physical_plan().await?;
Ok(collect_partitioned(plan).await?)
}
+ /// Convert the logical plan represented by this DataFrame into a physical
plan and
+ /// execute it, returning a stream for each partition
+ async fn execute_stream_partitioned(&self) ->
Result<Vec<SendableRecordBatchStream>> {
+ let plan = self.create_physical_plan().await?;
+ Ok(execute_stream_partitioned(plan).await?)
+ }
+
/// Returns the schema from the logical plan
fn schema(&self) -> &DFSchema {
self.plan.schema()
diff --git a/datafusion/src/physical_plan/mod.rs
b/datafusion/src/physical_plan/mod.rs
index b3c0dd6..86bceb1 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -17,6 +17,14 @@
//! Traits for physical query plan, supporting parallel execution for
partitioned relations.
+use std::fmt;
+use std::fmt::{Debug, Display};
+use std::ops::Range;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::{any::Any, pin::Pin};
+
use self::{
coalesce_partitions::CoalescePartitionsExec,
display::DisplayableExecutionPlan,
};
@@ -35,12 +43,6 @@ use async_trait::async_trait;
pub use display::DisplayFormatType;
use futures::stream::Stream;
use hashbrown::HashMap;
-use std::fmt;
-use std::fmt::{Debug, Display};
-use std::ops::Range;
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::Arc;
-use std::{any::Any, pin::Pin};
/// Trait for types that stream [arrow::record_batch::RecordBatch]
pub trait RecordBatchStream: Stream<Item = ArrowResult<RecordBatch>> {
@@ -54,6 +56,37 @@ pub trait RecordBatchStream: Stream<Item =
ArrowResult<RecordBatch>> {
/// Trait for a stream of record batches.
pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send +
Sync>>;
+/// EmptyRecordBatchStream can be used to create a RecordBatchStream
+/// that will produce no results
+pub struct EmptyRecordBatchStream {
+ /// Schema
+ schema: SchemaRef,
+}
+
+impl EmptyRecordBatchStream {
+ /// Create an empty RecordBatchStream
+ pub fn new(schema: SchemaRef) -> Self {
+ Self { schema }
+ }
+}
+
+impl RecordBatchStream for EmptyRecordBatchStream {
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+}
+
+impl Stream for EmptyRecordBatchStream {
+ type Item = ArrowResult<RecordBatch>;
+
+ fn poll_next(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ Poll::Ready(None)
+ }
+}
+
/// SQL metric type
#[derive(Debug, Clone)]
pub enum MetricType {
@@ -313,18 +346,23 @@ pub fn plan_metrics(plan: Arc<dyn ExecutionPlan>) ->
HashMap<String, SQLMetric>
/// Execute the [ExecutionPlan] and collect the results in memory
pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>>
{
+ let stream = execute_stream(plan).await?;
+ common::collect(stream).await
+}
+
+/// Execute the [ExecutionPlan] and return a single stream of results
+pub async fn execute_stream(
+ plan: Arc<dyn ExecutionPlan>,
+) -> Result<SendableRecordBatchStream> {
match plan.output_partitioning().partition_count() {
- 0 => Ok(vec![]),
- 1 => {
- let it = plan.execute(0).await?;
- common::collect(it).await
- }
+ 0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
+ 1 => plan.execute(0).await,
_ => {
// merge into a single partition
let plan = CoalescePartitionsExec::new(plan.clone());
// CoalescePartitionsExec must produce a single partition
assert_eq!(1, plan.output_partitioning().partition_count());
- common::collect(plan.execute(0).await?).await
+ plan.execute(0).await
}
}
}
@@ -333,20 +371,24 @@ pub async fn collect(plan: Arc<dyn ExecutionPlan>) ->
Result<Vec<RecordBatch>> {
pub async fn collect_partitioned(
plan: Arc<dyn ExecutionPlan>,
) -> Result<Vec<Vec<RecordBatch>>> {
- match plan.output_partitioning().partition_count() {
- 0 => Ok(vec![]),
- 1 => {
- let it = plan.execute(0).await?;
- Ok(vec![common::collect(it).await?])
- }
- _ => {
- let mut partitions = vec![];
- for i in 0..plan.output_partitioning().partition_count() {
- partitions.push(common::collect(plan.execute(i).await?).await?)
- }
- Ok(partitions)
- }
+ let streams = execute_stream_partitioned(plan).await?;
+ let mut batches = Vec::with_capacity(streams.len());
+ for stream in streams {
+ batches.push(common::collect(stream).await?);
+ }
+ Ok(batches)
+}
+
+/// Execute the [ExecutionPlan] and return a vec with one stream per output
partition
+pub async fn execute_stream_partitioned(
+ plan: Arc<dyn ExecutionPlan>,
+) -> Result<Vec<SendableRecordBatchStream>> {
+ let num_partitions = plan.output_partitioning().partition_count();
+ let mut streams = Vec::with_capacity(num_partitions);
+ for i in 0..num_partitions {
+ streams.push(plan.execute(i).await?);
}
+ Ok(streams)
}
/// Partitioning schemes supported by operators.