This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 8682be57f5 Minor: Improve `ExecutionPlan` documentation (#8019)
8682be57f5 is described below

commit 8682be57f5208a9d1097729802e18f35c09392cd
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Nov 2 14:55:02 2023 -0400

    Minor: Improve `ExecutionPlan` documentation (#8019)
    
    * Minor: Improve `ExecutionPlan` documentation
    
    * Add link to Partitioning
---
 datafusion/physical-plan/src/lib.rs | 113 +++++++++++++++++++++++++-----------
 1 file changed, 79 insertions(+), 34 deletions(-)

diff --git a/datafusion/physical-plan/src/lib.rs 
b/datafusion/physical-plan/src/lib.rs
index 8ae2a86866..ed795a1cb3 100644
--- a/datafusion/physical-plan/src/lib.rs
+++ b/datafusion/physical-plan/src/lib.rs
@@ -91,16 +91,27 @@ pub use datafusion_physical_expr::{
 pub use crate::stream::EmptyRecordBatchStream;
 pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
 
-/// `ExecutionPlan` represent nodes in the DataFusion Physical Plan.
+/// Represent nodes in the DataFusion Physical Plan.
 ///
-/// Each `ExecutionPlan` is partition-aware and is responsible for
-/// creating the actual `async` [`SendableRecordBatchStream`]s
-/// of [`RecordBatch`] that incrementally compute the operator's
-/// output from its input partition.
+/// Calling [`execute`] produces an `async` [`SendableRecordBatchStream`] of
+/// [`RecordBatch`] that incrementally computes a partition of the
+/// `ExecutionPlan`'s output from its input. See [`Partitioning`] for more
+/// details on partitioning.
+///
+/// Methods such as [`schema`] and [`output_partitioning`] communicate
+/// properties of this output to the DataFusion optimizer, and methods such as
+/// [`required_input_distribution`] and [`required_input_ordering`] express
+/// requirements of the `ExecutionPlan` from its input.
 ///
 /// [`ExecutionPlan`] can be displayed in a simplified form using the
 /// return value from [`displayable`] in addition to the (normally
 /// quite verbose) `Debug` output.
+///
+/// [`execute`]: ExecutionPlan::execute
+/// [`schema`]: ExecutionPlan::schema
+/// [`output_partitioning`]: ExecutionPlan::output_partitioning
+/// [`required_input_distribution`]: ExecutionPlan::required_input_distribution
+/// [`required_input_ordering`]: ExecutionPlan::required_input_ordering
 pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     /// Returns the execution plan as [`Any`] so that it can be
     /// downcast to a specific implementation.
@@ -109,7 +120,8 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     /// Get the schema for this execution plan
     fn schema(&self) -> SchemaRef;
 
-    /// Specifies the output partitioning scheme of this plan
+    /// Specifies how the output of this `ExecutionPlan` is split into
+    /// partitions.
     fn output_partitioning(&self) -> Partitioning;
 
     /// Specifies whether this plan generates an infinite stream of records.
@@ -123,7 +135,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
         }
     }
 
-    /// If the output of this operator within each partition is sorted,
+    /// If the output of this `ExecutionPlan` within each partition is sorted,
     /// returns `Some(keys)` with the description of how it was sorted.
     ///
     /// For example, Sort, (obviously) produces sorted output as does
@@ -131,17 +143,19 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     /// produces sorted output if its input was sorted as it does not
     /// reorder the input rows,
     ///
-    /// It is safe to return `None` here if your operator does not
+    /// It is safe to return `None` here if your `ExecutionPlan` does not
     /// have any particular output order here
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>;
 
     /// Specifies the data distribution requirements for all the
-    /// children for this operator, By default it's 
[[Distribution::UnspecifiedDistribution]] for each child,
+    /// children for this `ExecutionPlan`, By default it's 
[[Distribution::UnspecifiedDistribution]] for each child,
     fn required_input_distribution(&self) -> Vec<Distribution> {
         vec![Distribution::UnspecifiedDistribution; self.children().len()]
     }
 
-    /// Specifies the ordering requirements for all of the children
+    /// Specifies the ordering required for all of the children of this
+    /// `ExecutionPlan`.
+    ///
     /// For each child, it's the local ordering requirement within
     /// each partition rather than the global ordering
     ///
@@ -152,7 +166,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
         vec![None; self.children().len()]
     }
 
-    /// Returns `false` if this operator's implementation may reorder
+    /// Returns `false` if this `ExecutionPlan`'s implementation may reorder
     /// rows within or between partitions.
     ///
     /// For example, Projection, Filter, and Limit maintain the order
@@ -166,19 +180,21 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     /// The default implementation returns `false`
     ///
     /// WARNING: if you override this default, you *MUST* ensure that
-    /// the operator's maintains the ordering invariant or else
+    /// the `ExecutionPlan`'s maintains the ordering invariant or else
     /// DataFusion may produce incorrect results.
     fn maintains_input_order(&self) -> Vec<bool> {
         vec![false; self.children().len()]
     }
 
-    /// Specifies whether the operator benefits from increased parallelization
-    /// at its input for each child. If set to `true`, this indicates that the
-    /// operator would benefit from partitioning its corresponding child
-    /// (and thus from more parallelism). For operators that do very little 
work
-    /// the overhead of extra parallelism may outweigh any benefits
+    /// Specifies whether the `ExecutionPlan` benefits from increased
+    /// parallelization at its input for each child.
     ///
-    /// The default implementation returns `true` unless this operator
+    /// If returns `true`, the `ExecutionPlan` would benefit from partitioning
+    /// its corresponding child (and thus from more parallelism). For
+    /// `ExecutionPlan` that do very little work the overhead of extra
+    /// parallelism may outweigh any benefits
+    ///
+    /// The default implementation returns `true` unless this `ExecutionPlan`
     /// has signalled it requires a single child input partition.
     fn benefits_from_input_partitioning(&self) -> Vec<bool> {
         // By default try to maximize parallelism with more CPUs if
@@ -199,12 +215,14 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
         OrderingEquivalenceProperties::new(self.schema())
     }
 
-    /// Get a list of child execution plans that provide the input for this 
plan. The returned list
-    /// will be empty for leaf nodes, will contain a single value for unary 
nodes, or two
-    /// values for binary nodes (such as joins).
+    /// Get a list of `ExecutionPlan` that provide input for this plan. The
+    /// returned list will be empty for leaf nodes such as scans, will contain 
a
+    /// single value for unary nodes, or two values for binary nodes (such as
+    /// joins).
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>>;
 
-    /// Returns a new plan where all children were replaced by new plans.
+    /// Returns a new `ExecutionPlan` where all existing children were replaced
+    /// by the `children`, oi order
     fn with_new_children(
         self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
@@ -235,13 +253,40 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
         Ok(None)
     }
 
-    /// Begin execution of `partition`, returning a stream of 
[`RecordBatch`]es.
+    /// Begin execution of `partition`, returning a [`Stream`] of
+    /// [`RecordBatch`]es.
+    ///
+    /// # Notes
+    ///
+    /// The `execute` method itself is not `async` but it returns an `async`
+    /// [`futures::stream::Stream`]. This `Stream` should incrementally compute
+    /// the output, `RecordBatch` by `RecordBatch` (in a streaming fashion).
+    /// Most `ExecutionPlan`s should not do any work before the first
+    /// `RecordBatch` is requested from the stream.
+    ///
+    /// [`RecordBatchStreamAdapter`] can be used to convert an `async`
+    /// [`Stream`] into a [`SendableRecordBatchStream`].
+    ///
+    /// Using `async` `Streams` allows for network I/O during execution and
+    /// takes advantage of Rust's built in support for `async` continuations 
and
+    /// crate ecosystem.
+    ///
+    /// [`Stream`]: futures::stream::Stream
+    /// [`StreamExt`]: futures::stream::StreamExt
+    /// [`TryStreamExt`]: futures::stream::TryStreamExt
+    /// [`RecordBatchStreamAdapter`]: crate::stream::RecordBatchStreamAdapter
     ///
     /// # Implementation Examples
     ///
-    /// ## Return Precomputed Batch
+    /// While `async` `Stream`s have a non trivial learning curve, the
+    /// [`futures`] crate provides [`StreamExt`] and [`TryStreamExt`]
+    /// which help simplify many common operations.
     ///
-    /// We can return a precomputed batch as a stream
+    /// Here are some common patterns:
+    ///
+    /// ## Return Precomputed `RecordBatch`
+    ///
+    /// We can return a precomputed `RecordBatch` as a `Stream`:
     ///
     /// ```
     /// # use std::sync::Arc;
@@ -261,6 +306,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     ///         partition: usize,
     ///         context: Arc<TaskContext>
     ///     ) -> Result<SendableRecordBatchStream> {
+    ///         // use functions from futures crate convert the batch into a 
stream
     ///         let fut = futures::future::ready(Ok(self.batch.clone()));
     ///         let stream = futures::stream::once(fut);
     ///         Ok(Box::pin(RecordBatchStreamAdapter::new(self.batch.schema(), 
stream)))
@@ -268,9 +314,9 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     /// }
     /// ```
     ///
-    /// ## Async Compute Batch
+    /// ## Lazily (async) Compute `RecordBatch`
     ///
-    /// We can also lazily compute a RecordBatch when the returned stream is 
polled
+    /// We can also lazily compute a `RecordBatch` when the returned `Stream` 
is polled
     ///
     /// ```
     /// # use std::sync::Arc;
@@ -284,6 +330,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     ///     schema: SchemaRef,
     /// }
     ///
+    /// /// Returns a single batch when the returned stream is polled
     /// async fn get_batch() -> Result<RecordBatch> {
     ///     todo!()
     /// }
@@ -301,10 +348,10 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     /// }
     /// ```
     ///
-    /// ## Async Compute Batch Stream
+    /// ## Lazily (async) create a Stream
     ///
-    /// We can lazily compute a RecordBatch stream when the returned stream is 
polled
-    /// flattening the result into a single stream
+    /// If you need to to create the return `Stream` using an `async` function,
+    /// you can do so by flattening the result:
     ///
     /// ```
     /// # use std::sync::Arc;
@@ -319,6 +366,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     ///     schema: SchemaRef,
     /// }
     ///
+    /// /// async function that returns a stream
     /// async fn get_batch_stream() -> Result<SendableRecordBatchStream> {
     ///     todo!()
     /// }
@@ -337,9 +385,6 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     ///     }
     /// }
     /// ```
-    ///
-    /// See [`futures::stream::StreamExt`] and 
[`futures::stream::TryStreamExt`] for further
-    /// combinators that can be used with streams
     fn execute(
         &self,
         partition: usize,
@@ -372,7 +417,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
 /// Indicate whether a data exchange is needed for the input of `plan`, which 
will be very helpful
 /// especially for the distributed engine to judge whether need to deal with 
shuffling.
 /// Currently there are 3 kinds of execution plan which needs data exchange
-///     1. RepartitionExec for changing the partition number between two 
operators
+///     1. RepartitionExec for changing the partition number between two 
`ExecutionPlan`s
 ///     2. CoalescePartitionsExec for collapsing all of the partitions into 
one without ordering guarantee
 ///     3. SortPreservingMergeExec for collapsing all of the sorted partitions 
into one with ordering guarantee
 pub fn need_data_exchange(plan: Arc<dyn ExecutionPlan>) -> bool {

Reply via email to