This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 35e8e33930 Minor: Add implementation examples to
ExecutionPlan::execute (#8013)
35e8e33930 is described below
commit 35e8e33930f8a60f406edcb3bbba4b3a4c0800a7
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Wed Nov 1 16:47:55 2023 +0000
Minor: Add implementation examples to ExecutionPlan::execute (#8013)
* Add implementation examples to ExecutionPlan::execute
* Review feedback
---
datafusion/physical-plan/src/lib.rs | 104 ++++++++++++++++++++++++++++++++++++
1 file changed, 104 insertions(+)
diff --git a/datafusion/physical-plan/src/lib.rs
b/datafusion/physical-plan/src/lib.rs
index 3ada2fa163..8ae2a86866 100644
--- a/datafusion/physical-plan/src/lib.rs
+++ b/datafusion/physical-plan/src/lib.rs
@@ -236,6 +236,110 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
}
/// Begin execution of `partition`, returning a stream of
[`RecordBatch`]es.
+ ///
+ /// # Implementation Examples
+ ///
+ /// ## Return Precomputed Batch
+ ///
+ /// We can return a precomputed batch as a stream
+ ///
+ /// ```
+ /// # use std::sync::Arc;
+ /// # use arrow_array::RecordBatch;
+ /// # use arrow_schema::SchemaRef;
+ /// # use datafusion_common::Result;
+ /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
+ /// # use datafusion_physical_plan::memory::MemoryStream;
+ /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
+ /// struct MyPlan {
+ /// batch: RecordBatch,
+ /// }
+ ///
+ /// impl MyPlan {
+ /// fn execute(
+ /// &self,
+ /// partition: usize,
+ /// context: Arc<TaskContext>
+ /// ) -> Result<SendableRecordBatchStream> {
+ /// let fut = futures::future::ready(Ok(self.batch.clone()));
+ /// let stream = futures::stream::once(fut);
+ /// Ok(Box::pin(RecordBatchStreamAdapter::new(self.batch.schema(),
stream)))
+ /// }
+ /// }
+ /// ```
+ ///
+ /// ## Async Compute Batch
+ ///
+ /// We can also lazily compute a RecordBatch when the returned stream is
polled
+ ///
+ /// ```
+ /// # use std::sync::Arc;
+ /// # use arrow_array::RecordBatch;
+ /// # use arrow_schema::SchemaRef;
+ /// # use datafusion_common::Result;
+ /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
+ /// # use datafusion_physical_plan::memory::MemoryStream;
+ /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
+ /// struct MyPlan {
+ /// schema: SchemaRef,
+ /// }
+ ///
+ /// async fn get_batch() -> Result<RecordBatch> {
+ /// todo!()
+ /// }
+ ///
+ /// impl MyPlan {
+ /// fn execute(
+ /// &self,
+ /// partition: usize,
+ /// context: Arc<TaskContext>
+ /// ) -> Result<SendableRecordBatchStream> {
+ /// let fut = get_batch();
+ /// let stream = futures::stream::once(fut);
+ /// Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(),
stream)))
+ /// }
+ /// }
+ /// ```
+ ///
+ /// ## Async Compute Batch Stream
+ ///
+ /// We can lazily compute a RecordBatch stream when the returned stream is
polled
+ /// flattening the result into a single stream
+ ///
+ /// ```
+ /// # use std::sync::Arc;
+ /// # use arrow_array::RecordBatch;
+ /// # use arrow_schema::SchemaRef;
+ /// # use futures::TryStreamExt;
+ /// # use datafusion_common::Result;
+ /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
+ /// # use datafusion_physical_plan::memory::MemoryStream;
+ /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
+ /// struct MyPlan {
+ /// schema: SchemaRef,
+ /// }
+ ///
+ /// async fn get_batch_stream() -> Result<SendableRecordBatchStream> {
+ /// todo!()
+ /// }
+ ///
+ /// impl MyPlan {
+ /// fn execute(
+ /// &self,
+ /// partition: usize,
+ /// context: Arc<TaskContext>
+ /// ) -> Result<SendableRecordBatchStream> {
+ /// // A future that yields a stream
+ /// let fut = get_batch_stream();
+ /// // Use TryStreamExt::try_flatten to flatten the stream of
streams
+ /// let stream = futures::stream::once(fut).try_flatten();
+ /// Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(),
stream)))
+ /// }
+ /// }
+ /// ```
+ ///
+ /// See [`futures::stream::StreamExt`] and
[`futures::stream::TryStreamExt`] for further
+ /// combinators that can be used with streams
fn execute(
&self,
partition: usize,