This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 4118c438bb Improve SortPreservingMerge::enable_round_robin_repartition
docs (#13826)
4118c438bb is described below
commit 4118c438bb7b84a5ec99f99feae1c33c56760fd4
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Dec 20 16:17:56 2024 -0500
Improve SortPreservingMerge::enable_round_robin_repartition docs (#13826)
* Clarify SortPreservingMerge::enable_round_robin_repartition docs
* tweaks
* Improve comments more
* clippy
* fix doc link
---
datafusion/physical-plan/src/sorts/merge.rs | 28 ++++++++------
.../src/sorts/sort_preserving_merge.rs | 45 ++++++++++++++++------
.../physical-plan/src/sorts/streaming_merge.rs | 4 ++
3 files changed, 55 insertions(+), 22 deletions(-)
diff --git a/datafusion/physical-plan/src/sorts/merge.rs
b/datafusion/physical-plan/src/sorts/merge.rs
index 458c1c29c0..258e234b35 100644
--- a/datafusion/physical-plan/src/sorts/merge.rs
+++ b/datafusion/physical-plan/src/sorts/merge.rs
@@ -99,19 +99,25 @@ pub(crate) struct SortPreservingMergeStream<C:
CursorValues> {
/// Configuration parameter to enable round-robin selection of tied
winners of loser tree.
///
- /// To address the issue of unbalanced polling between partitions due to
tie-breakers being based
- /// on partition index, especially in cases of low cardinality, we are
making changes to the winner
- /// selection mechanism. Previously, partitions with smaller indices were
consistently chosen as the winners,
- /// leading to an uneven distribution of polling. This caused upstream
operator buffers for the other partitions
- /// to grow excessively, as they continued receiving data without
consuming it.
+ /// This option controls the tie-breaker strategy and attempts to avoid the
+ /// issue of unbalanced polling between partitions
///
- /// For example, an upstream operator like a repartition execution would
keep sending data to certain partitions,
- /// but those partitions wouldn't consume the data if they weren't
selected as winners. This resulted in inefficient buffer usage.
+ /// If `true`, when multiple partitions have the same value, the partition
+ /// that has the fewest poll counts is selected. This strategy ensures that
+ /// multiple partitions with the same value are chosen equally,
distributing
+ /// the polling load in a round-robin fashion. This approach balances the
+ /// workload more effectively across partitions and avoids excessive buffer
+ /// growth.
///
- /// To resolve this, we are modifying the tie-breaking logic. Instead of
always choosing the partition with the smallest index,
- /// we now select the partition that has the fewest poll counts for the
same value.
- /// This ensures that multiple partitions with the same value are chosen
equally, distributing the polling load in a round-robin fashion.
- /// This approach balances the workload more effectively across partitions
and avoids excessive buffer growth.
+ /// if `false`, partitions with smaller indices are consistently chosen as
+ /// the winners, which can lead to an uneven distribution of polling and
potentially
+ /// causing upstream operator buffers for the other partitions to grow
+ /// excessively, as they continued receiving data without consuming it.
+ ///
+ /// For example, an upstream operator like `RepartitonExec` execution would
+ /// keep sending data to certain partitions, but those partitions wouldn't
+ /// consume the data if they weren't selected as winners. This resulted in
+ /// inefficient buffer usage.
enable_round_robin_tie_breaker: bool,
/// Flag indicating whether we are in the mode of round-robin
diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
index 21597fb856..adcb28e538 100644
--- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
+++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-//! Defines the sort preserving merge plan
+//! [`SortPreservingMergeExec`] merges multiple sorted streams into one sorted
stream.
use std::any::Any;
use std::sync::Arc;
@@ -38,10 +38,22 @@ use log::{debug, trace};
/// Sort preserving merge execution plan
///
-/// This takes an input execution plan and a list of sort expressions, and
-/// provided each partition of the input plan is sorted with respect to
-/// these sort expressions, this operator will yield a single partition
-/// that is also sorted with respect to them
+/// # Overview
+///
+/// This operator implements a K-way merge. It is used to merge multiple sorted
+/// streams into a single sorted stream and is highly optimized.
+///
+/// ## Inputs:
+///
+/// 1. A list of sort expressions
+/// 2. An input plan, where each partition is sorted with respect to
+/// these sort expressions.
+///
+/// ## Output:
+///
+/// 1. A single partition that is also sorted with respect to the expressions
+///
+/// ## Diagram
///
/// ```text
/// ┌─────────────────────────┐
@@ -55,12 +67,12 @@ use log::{debug, trace};
/// ┌─────────────────────────┐ │ └───────────────────┘
└─┬─────┴───────────────────────┘
/// │ ╔═══╦═══╗ │ │
/// │ ║ B ║ E ║ ... │──┘ │
-/// │ ╚═══╩═══╝ │ Note Stable Sort: the merged
stream
-/// └─────────────────────────┘ places equal rows from stream 1
+/// │ ╚═══╩═══╝ │ Stable sort if
`enable_round_robin_repartition=false`:
+/// └─────────────────────────┘ the merged stream places equal
rows from stream 1
/// Stream 2
///
///
-/// Input Streams Output stream
+/// Input Partitions Output Partition
/// (sorted) (sorted)
/// ```
///
@@ -70,7 +82,7 @@ use log::{debug, trace};
/// the output and inputs are not polled again.
#[derive(Debug, Clone)]
pub struct SortPreservingMergeExec {
- /// Input plan
+ /// Input plan with sorted partitions
input: Arc<dyn ExecutionPlan>,
/// Sort expressions
expr: LexOrdering,
@@ -80,7 +92,9 @@ pub struct SortPreservingMergeExec {
fetch: Option<usize>,
/// Cache holding plan properties like equivalences, output partitioning
etc.
cache: PlanProperties,
- /// Configuration parameter to enable round-robin selection of tied
winners of loser tree.
+ /// Use round-robin selection of tied winners of loser tree
+ ///
+ /// See [`Self::with_round_robin_repartition`] for more information.
enable_round_robin_repartition: bool,
}
@@ -105,6 +119,14 @@ impl SortPreservingMergeExec {
}
/// Sets the selection strategy of tied winners of the loser tree algorithm
+ ///
+ /// If true (the default) equal output rows are placed in the merged stream
+ /// in round robin fashion. This approach consumes input streams at more
+ /// even rates when there are many rows with the same sort key.
+ ///
+ /// If false, equal output rows are always placed in the merged stream in
+ /// the order of the inputs, resulting in potentially slower execution but
a
+ /// stable output order.
pub fn with_round_robin_repartition(
mut self,
enable_round_robin_repartition: bool,
@@ -128,7 +150,8 @@ impl SortPreservingMergeExec {
self.fetch
}
- /// This function creates the cache object that stores the plan properties
such as schema, equivalence properties, ordering, partitioning, etc.
+ /// Creates the cache object that stores the plan properties
+ /// such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(
input: &Arc<dyn ExecutionPlan>,
ordering: LexOrdering,
diff --git a/datafusion/physical-plan/src/sorts/streaming_merge.rs
b/datafusion/physical-plan/src/sorts/streaming_merge.rs
index 2178cc012a..448d70760d 100644
--- a/datafusion/physical-plan/src/sorts/streaming_merge.rs
+++ b/datafusion/physical-plan/src/sorts/streaming_merge.rs
@@ -120,6 +120,10 @@ impl<'a> StreamingMergeBuilder<'a> {
self
}
+ /// See [SortPreservingMergeExec::with_round_robin_repartition] for more
+ /// information.
+ ///
+ /// [SortPreservingMergeExec::with_round_robin_repartition]:
crate::sorts::sort_preserving_merge::SortPreservingMergeExec::with_round_robin_repartition
pub fn with_round_robin_tie_breaker(
mut self,
enable_round_robin_tie_breaker: bool,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]