This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 4118c438bb Improve SortPreservingMerge::enable_round_robin_repartition 
 docs (#13826)
4118c438bb is described below

commit 4118c438bb7b84a5ec99f99feae1c33c56760fd4
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Dec 20 16:17:56 2024 -0500

    Improve SortPreservingMerge::enable_round_robin_repartition  docs (#13826)
    
    * Clarify SortPreservingMerge::enable_round_robin_repartition  docs
    
    * tweaks
    
    * Improve comments more
    
    * clippy
    
    * fix doc link
---
 datafusion/physical-plan/src/sorts/merge.rs        | 28 ++++++++------
 .../src/sorts/sort_preserving_merge.rs             | 45 ++++++++++++++++------
 .../physical-plan/src/sorts/streaming_merge.rs     |  4 ++
 3 files changed, 55 insertions(+), 22 deletions(-)

diff --git a/datafusion/physical-plan/src/sorts/merge.rs 
b/datafusion/physical-plan/src/sorts/merge.rs
index 458c1c29c0..258e234b35 100644
--- a/datafusion/physical-plan/src/sorts/merge.rs
+++ b/datafusion/physical-plan/src/sorts/merge.rs
@@ -99,19 +99,25 @@ pub(crate) struct SortPreservingMergeStream<C: 
CursorValues> {
 
     /// Configuration parameter to enable round-robin selection of tied 
winners of loser tree.
     ///
-    /// To address the issue of unbalanced polling between partitions due to 
tie-breakers being based
-    /// on partition index, especially in cases of low cardinality, we are 
making changes to the winner
-    /// selection mechanism. Previously, partitions with smaller indices were 
consistently chosen as the winners,
-    /// leading to an uneven distribution of polling. This caused upstream 
operator buffers for the other partitions
-    /// to grow excessively, as they continued receiving data without 
consuming it.
+    /// This option controls the tie-breaker strategy and attempts to avoid the
+    /// issue of unbalanced polling between partitions
     ///
-    /// For example, an upstream operator like a repartition execution would 
keep sending data to certain partitions,
-    /// but those partitions wouldn't consume the data if they weren't 
selected as winners. This resulted in inefficient buffer usage.
+    /// If `true`, when multiple partitions have the same value, the partition
+    /// that has the fewest poll counts is selected. This strategy ensures that
+    /// multiple partitions with the same value are chosen equally, 
distributing
+    /// the polling load in a round-robin fashion. This approach balances the
+    /// workload more effectively across partitions and avoids excessive buffer
+    /// growth.
     ///
-    /// To resolve this, we are modifying the tie-breaking logic. Instead of 
always choosing the partition with the smallest index,
-    /// we now select the partition that has the fewest poll counts for the 
same value.
-    /// This ensures that multiple partitions with the same value are chosen 
equally, distributing the polling load in a round-robin fashion.
-    /// This approach balances the workload more effectively across partitions 
and avoids excessive buffer growth.
+    /// if `false`, partitions with smaller indices are consistently chosen as
+    /// the winners, which can lead to an uneven distribution of polling and 
potentially
+    /// causing upstream operator buffers for the other partitions to grow
+    /// excessively, as they continued receiving data without consuming it.
+    ///
+    /// For example, an upstream operator like `RepartitonExec` execution would
+    /// keep sending data to certain partitions, but those partitions wouldn't
+    /// consume the data if they weren't selected as winners. This resulted in
+    /// inefficient buffer usage.
     enable_round_robin_tie_breaker: bool,
 
     /// Flag indicating whether we are in the mode of round-robin
diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs 
b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
index 21597fb856..adcb28e538 100644
--- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
+++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Defines the sort preserving merge plan
+//! [`SortPreservingMergeExec`] merges multiple sorted streams into one sorted 
stream.
 
 use std::any::Any;
 use std::sync::Arc;
@@ -38,10 +38,22 @@ use log::{debug, trace};
 
 /// Sort preserving merge execution plan
 ///
-/// This takes an input execution plan and a list of sort expressions, and
-/// provided each partition of the input plan is sorted with respect to
-/// these sort expressions, this operator will yield a single partition
-/// that is also sorted with respect to them
+/// # Overview
+///
+/// This operator implements a K-way merge. It is used to merge multiple sorted
+/// streams into a single sorted stream and is highly optimized.
+///
+/// ## Inputs:
+///
+/// 1. A list of sort expressions
+/// 2. An input plan, where each partition is sorted with respect to
+///    these sort expressions.
+///
+/// ## Output:
+///
+/// 1. A single partition that is also sorted with respect to the expressions
+///
+/// ## Diagram
 ///
 /// ```text
 /// ┌─────────────────────────┐
@@ -55,12 +67,12 @@ use log::{debug, trace};
 /// ┌─────────────────────────┐  │  └───────────────────┘    
└─┬─────┴───────────────────────┘
 /// │ ╔═══╦═══╗               │  │
 /// │ ║ B ║ E ║     ...       │──┘                             │
-/// │ ╚═══╩═══╝               │              Note Stable Sort: the merged 
stream
-/// └─────────────────────────┘                places equal rows from stream 1
+/// │ ╚═══╩═══╝               │              Stable sort if 
`enable_round_robin_repartition=false`:
+/// └─────────────────────────┘              the merged stream places equal 
rows from stream 1
 ///   Stream 2
 ///
 ///
-///  Input Streams                                             Output stream
+///  Input Partitions                                          Output Partition
 ///    (sorted)                                                  (sorted)
 /// ```
 ///
@@ -70,7 +82,7 @@ use log::{debug, trace};
 /// the output and inputs are not polled again.
 #[derive(Debug, Clone)]
 pub struct SortPreservingMergeExec {
-    /// Input plan
+    /// Input plan with sorted partitions
     input: Arc<dyn ExecutionPlan>,
     /// Sort expressions
     expr: LexOrdering,
@@ -80,7 +92,9 @@ pub struct SortPreservingMergeExec {
     fetch: Option<usize>,
     /// Cache holding plan properties like equivalences, output partitioning 
etc.
     cache: PlanProperties,
-    /// Configuration parameter to enable round-robin selection of tied 
winners of loser tree.
+    /// Use round-robin selection of tied winners of loser tree
+    ///
+    /// See [`Self::with_round_robin_repartition`] for more information.
     enable_round_robin_repartition: bool,
 }
 
@@ -105,6 +119,14 @@ impl SortPreservingMergeExec {
     }
 
     /// Sets the selection strategy of tied winners of the loser tree algorithm
+    ///
+    /// If true (the default) equal output rows are placed in the merged stream
+    /// in round robin fashion. This approach consumes input streams at more
+    /// even rates when there are many rows with the same sort key.
+    ///
+    /// If false, equal output rows are always placed in the merged stream in
+    /// the order of the inputs, resulting in potentially slower execution but 
a
+    /// stable output order.
     pub fn with_round_robin_repartition(
         mut self,
         enable_round_robin_repartition: bool,
@@ -128,7 +150,8 @@ impl SortPreservingMergeExec {
         self.fetch
     }
 
-    /// This function creates the cache object that stores the plan properties 
such as schema, equivalence properties, ordering, partitioning, etc.
+    /// Creates the cache object that stores the plan properties
+    /// such as schema, equivalence properties, ordering, partitioning, etc.
     fn compute_properties(
         input: &Arc<dyn ExecutionPlan>,
         ordering: LexOrdering,
diff --git a/datafusion/physical-plan/src/sorts/streaming_merge.rs 
b/datafusion/physical-plan/src/sorts/streaming_merge.rs
index 2178cc012a..448d70760d 100644
--- a/datafusion/physical-plan/src/sorts/streaming_merge.rs
+++ b/datafusion/physical-plan/src/sorts/streaming_merge.rs
@@ -120,6 +120,10 @@ impl<'a> StreamingMergeBuilder<'a> {
         self
     }
 
+    /// See [SortPreservingMergeExec::with_round_robin_repartition] for more
+    /// information.
+    ///
+    /// [SortPreservingMergeExec::with_round_robin_repartition]: 
crate::sorts::sort_preserving_merge::SortPreservingMergeExec::with_round_robin_repartition
     pub fn with_round_robin_tie_breaker(
         mut self,
         enable_round_robin_tie_breaker: bool,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to