This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 8682be57f5 Minor: Improve `ExecutionPlan` documentation (#8019)
8682be57f5 is described below
commit 8682be57f5208a9d1097729802e18f35c09392cd
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Nov 2 14:55:02 2023 -0400
Minor: Improve `ExecutionPlan` documentation (#8019)
* Minor: Improve `ExecutionPlan` documentation
* Add link to Partitioning
---
datafusion/physical-plan/src/lib.rs | 113 +++++++++++++++++++++++++-----------
1 file changed, 79 insertions(+), 34 deletions(-)
diff --git a/datafusion/physical-plan/src/lib.rs
b/datafusion/physical-plan/src/lib.rs
index 8ae2a86866..ed795a1cb3 100644
--- a/datafusion/physical-plan/src/lib.rs
+++ b/datafusion/physical-plan/src/lib.rs
@@ -91,16 +91,27 @@ pub use datafusion_physical_expr::{
pub use crate::stream::EmptyRecordBatchStream;
pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
-/// `ExecutionPlan` represent nodes in the DataFusion Physical Plan.
+/// Represent nodes in the DataFusion Physical Plan.
///
-/// Each `ExecutionPlan` is partition-aware and is responsible for
-/// creating the actual `async` [`SendableRecordBatchStream`]s
-/// of [`RecordBatch`] that incrementally compute the operator's
-/// output from its input partition.
+/// Calling [`execute`] produces an `async` [`SendableRecordBatchStream`] of
+/// [`RecordBatch`] that incrementally computes a partition of the
+/// `ExecutionPlan`'s output from its input. See [`Partitioning`] for more
+/// details on partitioning.
+///
+/// Methods such as [`schema`] and [`output_partitioning`] communicate
+/// properties of this output to the DataFusion optimizer, and methods such as
+/// [`required_input_distribution`] and [`required_input_ordering`] express
+/// requirements of the `ExecutionPlan` from its input.
///
/// [`ExecutionPlan`] can be displayed in a simplified form using the
/// return value from [`displayable`] in addition to the (normally
/// quite verbose) `Debug` output.
+///
+/// [`execute`]: ExecutionPlan::execute
+/// [`schema`]: ExecutionPlan::schema
+/// [`output_partitioning`]: ExecutionPlan::output_partitioning
+/// [`required_input_distribution`]: ExecutionPlan::required_input_distribution
+/// [`required_input_ordering`]: ExecutionPlan::required_input_ordering
pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
/// Returns the execution plan as [`Any`] so that it can be
/// downcast to a specific implementation.
@@ -109,7 +120,8 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
/// Get the schema for this execution plan
fn schema(&self) -> SchemaRef;
- /// Specifies the output partitioning scheme of this plan
+ /// Specifies how the output of this `ExecutionPlan` is split into
+ /// partitions.
fn output_partitioning(&self) -> Partitioning;
/// Specifies whether this plan generates an infinite stream of records.
@@ -123,7 +135,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
}
}
- /// If the output of this operator within each partition is sorted,
+ /// If the output of this `ExecutionPlan` within each partition is sorted,
/// returns `Some(keys)` with the description of how it was sorted.
///
/// For example, Sort, (obviously) produces sorted output as does
@@ -131,17 +143,19 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
/// produces sorted output if its input was sorted as it does not
/// reorder the input rows,
///
- /// It is safe to return `None` here if your operator does not
+ /// It is safe to return `None` here if your `ExecutionPlan` does not
/// have any particular output order here
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>;
/// Specifies the data distribution requirements for all the
- /// children for this operator, By default it's
[[Distribution::UnspecifiedDistribution]] for each child,
+ /// children for this `ExecutionPlan`, By default it's
[[Distribution::UnspecifiedDistribution]] for each child,
fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::UnspecifiedDistribution; self.children().len()]
}
- /// Specifies the ordering requirements for all of the children
+ /// Specifies the ordering required for all of the children of this
+ /// `ExecutionPlan`.
+ ///
/// For each child, it's the local ordering requirement within
/// each partition rather than the global ordering
///
@@ -152,7 +166,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
vec![None; self.children().len()]
}
- /// Returns `false` if this operator's implementation may reorder
+ /// Returns `false` if this `ExecutionPlan`'s implementation may reorder
/// rows within or between partitions.
///
/// For example, Projection, Filter, and Limit maintain the order
@@ -166,19 +180,21 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
/// The default implementation returns `false`
///
/// WARNING: if you override this default, you *MUST* ensure that
- /// the operator's maintains the ordering invariant or else
+ /// the `ExecutionPlan`'s maintains the ordering invariant or else
/// DataFusion may produce incorrect results.
fn maintains_input_order(&self) -> Vec<bool> {
vec![false; self.children().len()]
}
- /// Specifies whether the operator benefits from increased parallelization
- /// at its input for each child. If set to `true`, this indicates that the
- /// operator would benefit from partitioning its corresponding child
- /// (and thus from more parallelism). For operators that do very little
work
- /// the overhead of extra parallelism may outweigh any benefits
+ /// Specifies whether the `ExecutionPlan` benefits from increased
+ /// parallelization at its input for each child.
///
- /// The default implementation returns `true` unless this operator
+ /// If returns `true`, the `ExecutionPlan` would benefit from partitioning
+ /// its corresponding child (and thus from more parallelism). For
+ /// `ExecutionPlan` that do very little work the overhead of extra
+ /// parallelism may outweigh any benefits
+ ///
+ /// The default implementation returns `true` unless this `ExecutionPlan`
/// has signalled it requires a single child input partition.
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
// By default try to maximize parallelism with more CPUs if
@@ -199,12 +215,14 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
OrderingEquivalenceProperties::new(self.schema())
}
- /// Get a list of child execution plans that provide the input for this
plan. The returned list
- /// will be empty for leaf nodes, will contain a single value for unary
nodes, or two
- /// values for binary nodes (such as joins).
+ /// Get a list of `ExecutionPlan` that provide input for this plan. The
+ /// returned list will be empty for leaf nodes such as scans, will contain
a
+ /// single value for unary nodes, or two values for binary nodes (such as
+ /// joins).
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>>;
- /// Returns a new plan where all children were replaced by new plans.
+ /// Returns a new `ExecutionPlan` where all existing children were replaced
+ /// by the `children`, oi order
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
@@ -235,13 +253,40 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
Ok(None)
}
- /// Begin execution of `partition`, returning a stream of
[`RecordBatch`]es.
+ /// Begin execution of `partition`, returning a [`Stream`] of
+ /// [`RecordBatch`]es.
+ ///
+ /// # Notes
+ ///
+ /// The `execute` method itself is not `async` but it returns an `async`
+ /// [`futures::stream::Stream`]. This `Stream` should incrementally compute
+ /// the output, `RecordBatch` by `RecordBatch` (in a streaming fashion).
+ /// Most `ExecutionPlan`s should not do any work before the first
+ /// `RecordBatch` is requested from the stream.
+ ///
+ /// [`RecordBatchStreamAdapter`] can be used to convert an `async`
+ /// [`Stream`] into a [`SendableRecordBatchStream`].
+ ///
+ /// Using `async` `Streams` allows for network I/O during execution and
+ /// takes advantage of Rust's built in support for `async` continuations
and
+ /// crate ecosystem.
+ ///
+ /// [`Stream`]: futures::stream::Stream
+ /// [`StreamExt`]: futures::stream::StreamExt
+ /// [`TryStreamExt`]: futures::stream::TryStreamExt
+ /// [`RecordBatchStreamAdapter`]: crate::stream::RecordBatchStreamAdapter
///
/// # Implementation Examples
///
- /// ## Return Precomputed Batch
+ /// While `async` `Stream`s have a non trivial learning curve, the
+ /// [`futures`] crate provides [`StreamExt`] and [`TryStreamExt`]
+ /// which help simplify many common operations.
///
- /// We can return a precomputed batch as a stream
+ /// Here are some common patterns:
+ ///
+ /// ## Return Precomputed `RecordBatch`
+ ///
+ /// We can return a precomputed `RecordBatch` as a `Stream`:
///
/// ```
/// # use std::sync::Arc;
@@ -261,6 +306,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
/// partition: usize,
/// context: Arc<TaskContext>
/// ) -> Result<SendableRecordBatchStream> {
+ /// // use functions from futures crate convert the batch into a
stream
/// let fut = futures::future::ready(Ok(self.batch.clone()));
/// let stream = futures::stream::once(fut);
/// Ok(Box::pin(RecordBatchStreamAdapter::new(self.batch.schema(),
stream)))
@@ -268,9 +314,9 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
/// }
/// ```
///
- /// ## Async Compute Batch
+ /// ## Lazily (async) Compute `RecordBatch`
///
- /// We can also lazily compute a RecordBatch when the returned stream is
polled
+ /// We can also lazily compute a `RecordBatch` when the returned `Stream`
is polled
///
/// ```
/// # use std::sync::Arc;
@@ -284,6 +330,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
/// schema: SchemaRef,
/// }
///
+ /// /// Returns a single batch when the returned stream is polled
/// async fn get_batch() -> Result<RecordBatch> {
/// todo!()
/// }
@@ -301,10 +348,10 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
/// }
/// ```
///
- /// ## Async Compute Batch Stream
+ /// ## Lazily (async) create a Stream
///
- /// We can lazily compute a RecordBatch stream when the returned stream is
polled
- /// flattening the result into a single stream
+ /// If you need to to create the return `Stream` using an `async` function,
+ /// you can do so by flattening the result:
///
/// ```
/// # use std::sync::Arc;
@@ -319,6 +366,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
/// schema: SchemaRef,
/// }
///
+ /// /// async function that returns a stream
/// async fn get_batch_stream() -> Result<SendableRecordBatchStream> {
/// todo!()
/// }
@@ -337,9 +385,6 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
/// }
/// }
/// ```
- ///
- /// See [`futures::stream::StreamExt`] and
[`futures::stream::TryStreamExt`] for further
- /// combinators that can be used with streams
fn execute(
&self,
partition: usize,
@@ -372,7 +417,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
/// Indicate whether a data exchange is needed for the input of `plan`, which
will be very helpful
/// especially for the distributed engine to judge whether need to deal with
shuffling.
/// Currently there are 3 kinds of execution plan which needs data exchange
-/// 1. RepartitionExec for changing the partition number between two
operators
+/// 1. RepartitionExec for changing the partition number between two
`ExecutionPlan`s
/// 2. CoalescePartitionsExec for collapsing all of the partitions into
one without ordering guarantee
/// 3. SortPreservingMergeExec for collapsing all of the sorted partitions
into one with ordering guarantee
pub fn need_data_exchange(plan: Arc<dyn ExecutionPlan>) -> bool {