This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 7a029a59c8 Docs: Add docs to `RepartitionExec` and architecture guide 
(#7003)
7a029a59c8 is described below

commit 7a029a59c84c5f0fb19b215fb233a151a7836fb6
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Jul 19 06:40:26 2023 -0400

    Docs: Add docs to `RepartitionExec` and architecture guide (#7003)
    
    * Docs: Add docs to `RepartitionExec` and architecture guide
    
    * Update datafusion/core/src/physical_plan/repartition/mod.rs
    
    Co-authored-by: Yongting You <[email protected]>
    
    ---------
    
    Co-authored-by: Yongting You <[email protected]>
---
 datafusion/core/src/lib.rs                         | 19 ++++---
 datafusion/core/src/physical_plan/mod.rs           |  2 +-
 .../core/src/physical_plan/repartition/mod.rs      | 63 +++++++++++++++++++++-
 datafusion/expr/src/logical_plan/plan.rs           |  2 +-
 4 files changed, 75 insertions(+), 11 deletions(-)

diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs
index ad2435f9c1..0b431cc101 100644
--- a/datafusion/core/src/lib.rs
+++ b/datafusion/core/src/lib.rs
@@ -330,13 +330,17 @@
 //! ```
 //!
 //! [`ExecutionPlan`]s process data using the [Apache Arrow] memory
-//! format, largely with functions from the [arrow] crate. When
-//! [`execute`] is called, a [`SendableRecordBatchStream`] is returned
-//! that produces the desired output as a [`Stream`] of [`RecordBatch`]es.
+//! format, making heavy use of functions from the [arrow]
+//! crate. Calling [`execute`] produces 1 or more partitions of data,
+//! consisting an operator that implements
+//! [`SendableRecordBatchStream`].
 //!
-//! Values are
-//! represented with [`ColumnarValue`], which are either single
-//! constant values ([`ScalarValue`]) or Arrow Arrays ([`ArrayRef`]).
+//! Values are represented with [`ColumnarValue`], which are either
+//! [`ScalarValue`] (single constant values) or [`ArrayRef`] (Arrow
+//! Arrays).
+//!
+//! Balanced parallelism is achieved using [`RepartitionExec`], which
+//! implements a [Volcano style] "Exchange".
 //!
 //! [`execute`]: physical_plan::ExecutionPlan::execute
 //! [`SendableRecordBatchStream`]: 
crate::physical_plan::SendableRecordBatchStream
@@ -345,9 +349,10 @@
 //! [`ArrayRef`]: arrow::array::ArrayRef
 //! [`Stream`]: futures::stream::Stream
 //!
-//!
 //! See the [implementors of `ExecutionPlan`] for a list of physical operators 
available.
 //!
+//! [`RepartitionExec`]: 
https://docs.rs/datafusion/latest/datafusion/physical_plan/repartition/struct.RepartitionExec.html
+//! [Volcano style]: 
https://w6113.github.io/files/papers/volcanoparallelism-89.pdf
 //! [implementors of `ExecutionPlan`]: 
https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#implementors
 //!
 //! ## State Management and Configuration
diff --git a/datafusion/core/src/physical_plan/mod.rs 
b/datafusion/core/src/physical_plan/mod.rs
index 4e1de2a6cb..335b10eb46 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -55,7 +55,7 @@ pub trait RecordBatchStream: Stream<Item = 
Result<RecordBatch>> {
     fn schema(&self) -> SchemaRef;
 }
 
-/// Trait for a stream of record batches.
+/// Trait for a [`Stream`] of [`RecordBatch`]es
 pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;
 
 /// EmptyRecordBatchStream can be used to create a RecordBatchStream
diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs 
b/datafusion/core/src/physical_plan/repartition/mod.rs
index c5b8b2da42..c47c992681 100644
--- a/datafusion/core/src/physical_plan/repartition/mod.rs
+++ b/datafusion/core/src/physical_plan/repartition/mod.rs
@@ -239,8 +239,67 @@ impl BatchPartitioner {
     }
 }
 
-/// The repartition operator maps N input partitions to M output partitions 
based on a
-/// partitioning scheme. No guarantees are made about the order of the 
resulting partitions.
+/// Maps `N` input partitions to `M` output partitions based on a
+/// [`Partitioning`] scheme.
+///
+/// # Background
+///
+/// DataFusion, like most other commercial systems, with the the
+/// notable exception of DuckDB, uses the "Exchange Operator" based
+/// approach to parallelism which works well in practice given
+/// sufficient care in implementation.
+///
+/// DataFusion's planner picks the target number of partitions and
+/// then `RepartionExec` redistributes [`RecordBatch`]es to that number
+/// of output partitions.
+///
+/// For example, given `target_partitions=3` (trying to use 3 cores)
+/// but scanning an input with 2 partitions, `RepartitionExec` can be
+/// used to get 3 even streams of `RecordBatch`es
+///
+///
+///```text
+///        ▲                  ▲                  ▲
+///        │                  │                  │
+///        │                  │                  │
+///        │                  │                  │
+///┌───────────────┐  ┌───────────────┐  ┌───────────────┐
+///│    GroupBy    │  │    GroupBy    │  │    GroupBy    │
+///│   (Partial)   │  │   (Partial)   │  │   (Partial)   │
+///└───────────────┘  └───────────────┘  └───────────────┘
+///        ▲                  ▲                  ▲
+///        └──────────────────┼──────────────────┘
+///                           │
+///              ┌─────────────────────────┐
+///              │     RepartitionExec     │
+///              │   (hash/round robin)    │
+///              └─────────────────────────┘
+///                         ▲   ▲
+///             ┌───────────┘   └───────────┐
+///             │                           │
+///             │                           │
+///        .─────────.                 .─────────.
+///     ,─'           '─.           ,─'           '─.
+///    ;      Input      :         ;      Input      :
+///    :   Partition 0   ;         :   Partition 1   ;
+///     ╲               ╱           ╲               ╱
+///      '─.         ,─'             '─.         ,─'
+///         `───────'                   `───────'
+///```
+///
+/// # Output Ordering
+///
+/// No guarantees are made about the order of the resulting
+/// partitions unless `preserve_order` is set.
+///
+/// # Footnote
+///
+/// The "Exchange Operator" was first described in the 1989 paper
+/// [Encapsulation of parallelism in the Volcano query processing
+/// system
+/// Paper](https://w6113.github.io/files/papers/volcanoparallelism-89.pdf)
+/// which uses the term "Exchange" for the concept of repartitioning
+/// data across threads.
 #[derive(Debug)]
 pub struct RepartitionExec {
     /// Input execution plan
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index d9bb255733..0485973364 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -82,7 +82,7 @@ pub enum LogicalPlan {
     Join(Join),
     /// Apply Cross Join to two logical plans
     CrossJoin(CrossJoin),
-    /// Repartition the plan based on a partitioning scheme.
+    /// Repartition the plan based on a partitioning scheme
     Repartition(Repartition),
     /// Union multiple inputs
     Union(Union),

Reply via email to