This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new d637871  Implement streaming versions of Dataframe.collect methods 
(#789)
d637871 is described below

commit d637871ce969102681616ea113cf84288ff6c252
Author: Andy Grove <[email protected]>
AuthorDate: Fri Jul 30 10:30:39 2021 -0700

    Implement streaming versions of Dataframe.collect methods (#789)
---
 datafusion/src/dataframe.rs                | 31 ++++++++++
 datafusion/src/execution/dataframe_impl.rs | 44 ++++++++++----
 datafusion/src/physical_plan/mod.rs        | 92 ++++++++++++++++++++++--------
 3 files changed, 130 insertions(+), 37 deletions(-)

diff --git a/datafusion/src/dataframe.rs b/datafusion/src/dataframe.rs
index 507a798..1d4cffd 100644
--- a/datafusion/src/dataframe.rs
+++ b/datafusion/src/dataframe.rs
@@ -24,6 +24,7 @@ use crate::logical_plan::{
 };
 use std::sync::Arc;
 
+use crate::physical_plan::SendableRecordBatchStream;
 use async_trait::async_trait;
 
 /// DataFrame represents a logical set of rows with the same named columns.
@@ -222,6 +223,21 @@ pub trait DataFrame: Send + Sync {
     /// ```
     async fn collect(&self) -> Result<Vec<RecordBatch>>;
 
+    /// Executes this DataFrame and returns a stream over a single partition
+    ///
+    /// ```
+    /// # use datafusion::prelude::*;
+    /// # use datafusion::error::Result;
+    /// # #[tokio::main]
+    /// # async fn main() -> Result<()> {
+    /// let mut ctx = ExecutionContext::new();
+    /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
+    /// let stream = df.execute_stream().await?;
+    /// # Ok(())
+    /// # }
+    /// ```
+    async fn execute_stream(&self) -> Result<SendableRecordBatchStream>;
+
     /// Executes this DataFrame and collects all results into a vector of 
vector of RecordBatch
     /// maintaining the input partitioning.
     ///
@@ -238,6 +254,21 @@ pub trait DataFrame: Send + Sync {
     /// ```
     async fn collect_partitioned(&self) -> Result<Vec<Vec<RecordBatch>>>;
 
+    /// Executes this DataFrame and returns one stream per partition.
+    ///
+    /// ```
+    /// # use datafusion::prelude::*;
+    /// # use datafusion::error::Result;
+    /// # #[tokio::main]
+    /// # async fn main() -> Result<()> {
+    /// let mut ctx = ExecutionContext::new();
+    /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
+    /// let batches = df.execute_stream_partitioned().await?;
+    /// # Ok(())
+    /// # }
+    /// ```
+    async fn execute_stream_partitioned(&self) -> 
Result<Vec<SendableRecordBatchStream>>;
+
     /// Returns the schema describing the output of this DataFrame in terms of 
columns returned,
     /// where each column has a name, data type, and nullability attribute.
 
diff --git a/datafusion/src/execution/dataframe_impl.rs 
b/datafusion/src/execution/dataframe_impl.rs
index 451c4c7..1c0094b 100644
--- a/datafusion/src/execution/dataframe_impl.rs
+++ b/datafusion/src/execution/dataframe_impl.rs
@@ -31,6 +31,9 @@ use crate::{
     physical_plan::{collect, collect_partitioned},
 };
 
+use crate::physical_plan::{
+    execute_stream, execute_stream_partitioned, ExecutionPlan, 
SendableRecordBatchStream,
+};
 use async_trait::async_trait;
 
 /// Implementation of DataFrame API
@@ -47,6 +50,14 @@ impl DataFrameImpl {
             plan: plan.clone(),
         }
     }
+
+    /// Create a physical plan
+    async fn create_physical_plan(&self) -> Result<Arc<dyn ExecutionPlan>> {
+        let state = self.ctx_state.lock().unwrap().clone();
+        let ctx = ExecutionContext::from(Arc::new(Mutex::new(state)));
+        let plan = ctx.optimize(&self.plan)?;
+        ctx.create_physical_plan(&plan)
+    }
 }
 
 #[async_trait]
@@ -138,26 +149,35 @@ impl DataFrame for DataFrameImpl {
         self.plan.clone()
     }
 
-    // Convert the logical plan represented by this DataFrame into a physical 
plan and
-    // execute it
+    /// Convert the logical plan represented by this DataFrame into a physical 
plan and
+    /// execute it, collecting all resulting batches into memory
     async fn collect(&self) -> Result<Vec<RecordBatch>> {
-        let state = self.ctx_state.lock().unwrap().clone();
-        let ctx = ExecutionContext::from(Arc::new(Mutex::new(state)));
-        let plan = ctx.optimize(&self.plan)?;
-        let plan = ctx.create_physical_plan(&plan)?;
+        let plan = self.create_physical_plan().await?;
         Ok(collect(plan).await?)
     }
 
-    // Convert the logical plan represented by this DataFrame into a physical 
plan and
-    // execute it
+    /// Convert the logical plan represented by this DataFrame into a physical 
plan and
+    /// execute it, returning a stream over a single partition
+    async fn execute_stream(&self) -> Result<SendableRecordBatchStream> {
+        let plan = self.create_physical_plan().await?;
+        execute_stream(plan).await
+    }
+
+    /// Convert the logical plan represented by this DataFrame into a physical 
plan and
+    /// execute it, collecting all resulting batches into memory while 
maintaining
+    /// partitioning
     async fn collect_partitioned(&self) -> Result<Vec<Vec<RecordBatch>>> {
-        let state = self.ctx_state.lock().unwrap().clone();
-        let ctx = ExecutionContext::from(Arc::new(Mutex::new(state)));
-        let plan = ctx.optimize(&self.plan)?;
-        let plan = ctx.create_physical_plan(&plan)?;
+        let plan = self.create_physical_plan().await?;
         Ok(collect_partitioned(plan).await?)
     }
 
+    /// Convert the logical plan represented by this DataFrame into a physical 
plan and
+    /// execute it, returning a stream for each partition
+    async fn execute_stream_partitioned(&self) -> 
Result<Vec<SendableRecordBatchStream>> {
+        let plan = self.create_physical_plan().await?;
+        Ok(execute_stream_partitioned(plan).await?)
+    }
+
     /// Returns the schema from the logical plan
     fn schema(&self) -> &DFSchema {
         self.plan.schema()
diff --git a/datafusion/src/physical_plan/mod.rs 
b/datafusion/src/physical_plan/mod.rs
index b3c0dd6..86bceb1 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -17,6 +17,14 @@
 
 //! Traits for physical query plan, supporting parallel execution for 
partitioned relations.
 
+use std::fmt;
+use std::fmt::{Debug, Display};
+use std::ops::Range;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::{any::Any, pin::Pin};
+
 use self::{
     coalesce_partitions::CoalescePartitionsExec, 
display::DisplayableExecutionPlan,
 };
@@ -35,12 +43,6 @@ use async_trait::async_trait;
 pub use display::DisplayFormatType;
 use futures::stream::Stream;
 use hashbrown::HashMap;
-use std::fmt;
-use std::fmt::{Debug, Display};
-use std::ops::Range;
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::Arc;
-use std::{any::Any, pin::Pin};
 
 /// Trait for types that stream [arrow::record_batch::RecordBatch]
 pub trait RecordBatchStream: Stream<Item = ArrowResult<RecordBatch>> {
@@ -54,6 +56,37 @@ pub trait RecordBatchStream: Stream<Item = 
ArrowResult<RecordBatch>> {
 /// Trait for a stream of record batches.
 pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send + 
Sync>>;
 
+/// EmptyRecordBatchStream can be used to create a RecordBatchStream
+/// that will produce no results
+pub struct EmptyRecordBatchStream {
+    /// Schema
+    schema: SchemaRef,
+}
+
+impl EmptyRecordBatchStream {
+    /// Create an empty RecordBatchStream
+    pub fn new(schema: SchemaRef) -> Self {
+        Self { schema }
+    }
+}
+
+impl RecordBatchStream for EmptyRecordBatchStream {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+}
+
+impl Stream for EmptyRecordBatchStream {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        self: Pin<&mut Self>,
+        _cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        Poll::Ready(None)
+    }
+}
+
 /// SQL metric type
 #[derive(Debug, Clone)]
 pub enum MetricType {
@@ -313,18 +346,23 @@ pub fn plan_metrics(plan: Arc<dyn ExecutionPlan>) -> 
HashMap<String, SQLMetric>
 
 /// Execute the [ExecutionPlan] and collect the results in memory
 pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> 
{
+    let stream = execute_stream(plan).await?;
+    common::collect(stream).await
+}
+
+/// Execute the [ExecutionPlan] and return a single stream of results
+pub async fn execute_stream(
+    plan: Arc<dyn ExecutionPlan>,
+) -> Result<SendableRecordBatchStream> {
     match plan.output_partitioning().partition_count() {
-        0 => Ok(vec![]),
-        1 => {
-            let it = plan.execute(0).await?;
-            common::collect(it).await
-        }
+        0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
+        1 => plan.execute(0).await,
         _ => {
             // merge into a single partition
             let plan = CoalescePartitionsExec::new(plan.clone());
             // CoalescePartitionsExec must produce a single partition
             assert_eq!(1, plan.output_partitioning().partition_count());
-            common::collect(plan.execute(0).await?).await
+            plan.execute(0).await
         }
     }
 }
@@ -333,20 +371,24 @@ pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> 
Result<Vec<RecordBatch>> {
 pub async fn collect_partitioned(
     plan: Arc<dyn ExecutionPlan>,
 ) -> Result<Vec<Vec<RecordBatch>>> {
-    match plan.output_partitioning().partition_count() {
-        0 => Ok(vec![]),
-        1 => {
-            let it = plan.execute(0).await?;
-            Ok(vec![common::collect(it).await?])
-        }
-        _ => {
-            let mut partitions = vec![];
-            for i in 0..plan.output_partitioning().partition_count() {
-                partitions.push(common::collect(plan.execute(i).await?).await?)
-            }
-            Ok(partitions)
-        }
+    let streams = execute_stream_partitioned(plan).await?;
+    let mut batches = Vec::with_capacity(streams.len());
+    for stream in streams {
+        batches.push(common::collect(stream).await?);
+    }
+    Ok(batches)
+}
+
+/// Execute the [ExecutionPlan] and return a vec with one stream per output 
partition
+pub async fn execute_stream_partitioned(
+    plan: Arc<dyn ExecutionPlan>,
+) -> Result<Vec<SendableRecordBatchStream>> {
+    let num_partitions = plan.output_partitioning().partition_count();
+    let mut streams = Vec::with_capacity(num_partitions);
+    for i in 0..num_partitions {
+        streams.push(plan.execute(i).await?);
     }
+    Ok(streams)
 }
 
 /// Partitioning schemes supported by operators.

Reply via email to