This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 7a029a59c8 Docs: Add docs to `RepartitionExec` and architecture guide
(#7003)
7a029a59c8 is described below
commit 7a029a59c84c5f0fb19b215fb233a151a7836fb6
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Jul 19 06:40:26 2023 -0400
Docs: Add docs to `RepartitionExec` and architecture guide (#7003)
* Docs: Add docs to `RepartitionExec` and architecture guide
* Update datafusion/core/src/physical_plan/repartition/mod.rs
Co-authored-by: Yongting You <[email protected]>
---------
Co-authored-by: Yongting You <[email protected]>
---
datafusion/core/src/lib.rs | 19 ++++---
datafusion/core/src/physical_plan/mod.rs | 2 +-
.../core/src/physical_plan/repartition/mod.rs | 63 +++++++++++++++++++++-
datafusion/expr/src/logical_plan/plan.rs | 2 +-
4 files changed, 75 insertions(+), 11 deletions(-)
diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs
index ad2435f9c1..0b431cc101 100644
--- a/datafusion/core/src/lib.rs
+++ b/datafusion/core/src/lib.rs
@@ -330,13 +330,17 @@
//! ```
//!
//! [`ExecutionPlan`]s process data using the [Apache Arrow] memory
-//! format, largely with functions from the [arrow] crate. When
-//! [`execute`] is called, a [`SendableRecordBatchStream`] is returned
-//! that produces the desired output as a [`Stream`] of [`RecordBatch`]es.
+//! format, making heavy use of functions from the [arrow]
+//! crate. Calling [`execute`] produces 1 or more partitions of data,
+//! consisting an operator that implements
+//! [`SendableRecordBatchStream`].
//!
-//! Values are
-//! represented with [`ColumnarValue`], which are either single
-//! constant values ([`ScalarValue`]) or Arrow Arrays ([`ArrayRef`]).
+//! Values are represented with [`ColumnarValue`], which are either
+//! [`ScalarValue`] (single constant values) or [`ArrayRef`] (Arrow
+//! Arrays).
+//!
+//! Balanced parallelism is achieved using [`RepartitionExec`], which
+//! implements a [Volcano style] "Exchange".
//!
//! [`execute`]: physical_plan::ExecutionPlan::execute
//! [`SendableRecordBatchStream`]:
crate::physical_plan::SendableRecordBatchStream
@@ -345,9 +349,10 @@
//! [`ArrayRef`]: arrow::array::ArrayRef
//! [`Stream`]: futures::stream::Stream
//!
-//!
//! See the [implementors of `ExecutionPlan`] for a list of physical operators
available.
//!
+//! [`RepartitionExec`]:
https://docs.rs/datafusion/latest/datafusion/physical_plan/repartition/struct.RepartitionExec.html
+//! [Volcano style]:
https://w6113.github.io/files/papers/volcanoparallelism-89.pdf
//! [implementors of `ExecutionPlan`]:
https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#implementors
//!
//! ## State Management and Configuration
diff --git a/datafusion/core/src/physical_plan/mod.rs
b/datafusion/core/src/physical_plan/mod.rs
index 4e1de2a6cb..335b10eb46 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -55,7 +55,7 @@ pub trait RecordBatchStream: Stream<Item =
Result<RecordBatch>> {
fn schema(&self) -> SchemaRef;
}
-/// Trait for a stream of record batches.
+/// Trait for a [`Stream`] of [`RecordBatch`]es
pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;
/// EmptyRecordBatchStream can be used to create a RecordBatchStream
diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs
b/datafusion/core/src/physical_plan/repartition/mod.rs
index c5b8b2da42..c47c992681 100644
--- a/datafusion/core/src/physical_plan/repartition/mod.rs
+++ b/datafusion/core/src/physical_plan/repartition/mod.rs
@@ -239,8 +239,67 @@ impl BatchPartitioner {
}
}
-/// The repartition operator maps N input partitions to M output partitions
based on a
-/// partitioning scheme. No guarantees are made about the order of the
resulting partitions.
+/// Maps `N` input partitions to `M` output partitions based on a
+/// [`Partitioning`] scheme.
+///
+/// # Background
+///
+/// DataFusion, like most other commercial systems, with the the
+/// notable exception of DuckDB, uses the "Exchange Operator" based
+/// approach to parallelism which works well in practice given
+/// sufficient care in implementation.
+///
+/// DataFusion's planner picks the target number of partitions and
+/// then `RepartionExec` redistributes [`RecordBatch`]es to that number
+/// of output partitions.
+///
+/// For example, given `target_partitions=3` (trying to use 3 cores)
+/// but scanning an input with 2 partitions, `RepartitionExec` can be
+/// used to get 3 even streams of `RecordBatch`es
+///
+///
+///```text
+/// ▲ ▲ ▲
+/// │ │ │
+/// │ │ │
+/// │ │ │
+///┌───────────────┐ ┌───────────────┐ ┌───────────────┐
+///│ GroupBy │ │ GroupBy │ │ GroupBy │
+///│ (Partial) │ │ (Partial) │ │ (Partial) │
+///└───────────────┘ └───────────────┘ └───────────────┘
+/// ▲ ▲ ▲
+/// └──────────────────┼──────────────────┘
+/// │
+/// ┌─────────────────────────┐
+/// │ RepartitionExec │
+/// │ (hash/round robin) │
+/// └─────────────────────────┘
+/// ▲ ▲
+/// ┌───────────┘ └───────────┐
+/// │ │
+/// │ │
+/// .─────────. .─────────.
+/// ,─' '─. ,─' '─.
+/// ; Input : ; Input :
+/// : Partition 0 ; : Partition 1 ;
+/// ╲ ╱ ╲ ╱
+/// '─. ,─' '─. ,─'
+/// `───────' `───────'
+///```
+///
+/// # Output Ordering
+///
+/// No guarantees are made about the order of the resulting
+/// partitions unless `preserve_order` is set.
+///
+/// # Footnote
+///
+/// The "Exchange Operator" was first described in the 1989 paper
+/// [Encapsulation of parallelism in the Volcano query processing
+/// system
+/// Paper](https://w6113.github.io/files/papers/volcanoparallelism-89.pdf)
+/// which uses the term "Exchange" for the concept of repartitioning
+/// data across threads.
#[derive(Debug)]
pub struct RepartitionExec {
/// Input execution plan
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index d9bb255733..0485973364 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -82,7 +82,7 @@ pub enum LogicalPlan {
Join(Join),
/// Apply Cross Join to two logical plans
CrossJoin(CrossJoin),
- /// Repartition the plan based on a partitioning scheme.
+ /// Repartition the plan based on a partitioning scheme
Repartition(Repartition),
/// Union multiple inputs
Union(Union),