This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 35e8e33930 Minor: Add implementation examples to 
ExecutionPlan::execute (#8013)
35e8e33930 is described below

commit 35e8e33930f8a60f406edcb3bbba4b3a4c0800a7
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Wed Nov 1 16:47:55 2023 +0000

    Minor: Add implementation examples to ExecutionPlan::execute (#8013)
    
    * Add implementation examples to ExecutionPlan::execute
    
    * Review feedback
---
 datafusion/physical-plan/src/lib.rs | 104 ++++++++++++++++++++++++++++++++++++
 1 file changed, 104 insertions(+)

diff --git a/datafusion/physical-plan/src/lib.rs 
b/datafusion/physical-plan/src/lib.rs
index 3ada2fa163..8ae2a86866 100644
--- a/datafusion/physical-plan/src/lib.rs
+++ b/datafusion/physical-plan/src/lib.rs
@@ -236,6 +236,110 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     }
 
     /// Begin execution of `partition`, returning a stream of 
[`RecordBatch`]es.
+    ///
+    /// # Implementation Examples
+    ///
+    /// ## Return Precomputed Batch
+    ///
+    /// We can return a precomputed batch as a stream
+    ///
+    /// ```
+    /// # use std::sync::Arc;
+    /// # use arrow_array::RecordBatch;
+    /// # use arrow_schema::SchemaRef;
+    /// # use datafusion_common::Result;
+    /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
+    /// # use datafusion_physical_plan::memory::MemoryStream;
+    /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
+    /// struct MyPlan {
+    ///     batch: RecordBatch,
+    /// }
+    ///
+    /// impl MyPlan {
+    ///     fn execute(
+    ///         &self,
+    ///         partition: usize,
+    ///         context: Arc<TaskContext>
+    ///     ) -> Result<SendableRecordBatchStream> {
+    ///         let fut = futures::future::ready(Ok(self.batch.clone()));
+    ///         let stream = futures::stream::once(fut);
+    ///         Ok(Box::pin(RecordBatchStreamAdapter::new(self.batch.schema(), 
stream)))
+    ///     }
+    /// }
+    /// ```
+    ///
+    /// ## Async Compute Batch
+    ///
+    /// We can also lazily compute a RecordBatch when the returned stream is 
polled
+    ///
+    /// ```
+    /// # use std::sync::Arc;
+    /// # use arrow_array::RecordBatch;
+    /// # use arrow_schema::SchemaRef;
+    /// # use datafusion_common::Result;
+    /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
+    /// # use datafusion_physical_plan::memory::MemoryStream;
+    /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
+    /// struct MyPlan {
+    ///     schema: SchemaRef,
+    /// }
+    ///
+    /// async fn get_batch() -> Result<RecordBatch> {
+    ///     todo!()
+    /// }
+    ///
+    /// impl MyPlan {
+    ///     fn execute(
+    ///         &self,
+    ///         partition: usize,
+    ///         context: Arc<TaskContext>
+    ///     ) -> Result<SendableRecordBatchStream> {
+    ///         let fut = get_batch();
+    ///         let stream = futures::stream::once(fut);
+    ///         Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), 
stream)))
+    ///     }
+    /// }
+    /// ```
+    ///
+    /// ## Async Compute Batch Stream
+    ///
+    /// We can lazily compute a RecordBatch stream when the returned stream is 
polled
+    /// flattening the result into a single stream
+    ///
+    /// ```
+    /// # use std::sync::Arc;
+    /// # use arrow_array::RecordBatch;
+    /// # use arrow_schema::SchemaRef;
+    /// # use futures::TryStreamExt;
+    /// # use datafusion_common::Result;
+    /// # use datafusion_execution::{SendableRecordBatchStream, TaskContext};
+    /// # use datafusion_physical_plan::memory::MemoryStream;
+    /// # use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
+    /// struct MyPlan {
+    ///     schema: SchemaRef,
+    /// }
+    ///
+    /// async fn get_batch_stream() -> Result<SendableRecordBatchStream> {
+    ///     todo!()
+    /// }
+    ///
+    /// impl MyPlan {
+    ///     fn execute(
+    ///         &self,
+    ///         partition: usize,
+    ///         context: Arc<TaskContext>
+    ///     ) -> Result<SendableRecordBatchStream> {
+    ///         // A future that yields a stream
+    ///         let fut = get_batch_stream();
+    ///         // Use TryStreamExt::try_flatten to flatten the stream of 
streams
+    ///         let stream = futures::stream::once(fut).try_flatten();
+    ///         Ok(Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), 
stream)))
+    ///     }
+    /// }
+    /// ```
+    ///
+    /// See [`futures::stream::StreamExt`] and 
[`futures::stream::TryStreamExt`] for further
+    /// combinators that can be used with streams
     fn execute(
         &self,
         partition: usize,

Reply via email to