This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 0408c2b159 Rename  `bounded_order_preserving_variants` config to 
`prefer_exising_sort` and update docs (#7723)
0408c2b159 is described below

commit 0408c2b1596417ba55a636fa3c8a601ffbdb0e60
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Oct 4 17:20:01 2023 -0400

    Rename  `bounded_order_preserving_variants` config to `prefer_exising_sort` 
and update docs (#7723)
    
    * Improve documentation for bounded_order_preserving_variants config
    
    * update docs
    
    * fmt
    
    * update config
    
    * fix typo :facepalm
    
    * prettier
    
    * Reword for clarity
---
 datafusion/common/src/config.rs                      | 12 +++++++-----
 .../src/physical_optimizer/enforce_distribution.rs   | 11 +++++------
 .../replace_with_order_preserving_variants.rs        |  4 ++--
 datafusion/execution/src/config.rs                   | 20 ++++++++++++--------
 .../sqllogictest/test_files/information_schema.slt   |  2 +-
 docs/source/user-guide/configs.md                    |  2 +-
 6 files changed, 28 insertions(+), 23 deletions(-)

diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index 261c2bf435..281da1f69e 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -453,11 +453,13 @@ config_namespace! {
         /// ```
         pub repartition_sorts: bool, default = true
 
-        /// When true, DataFusion will opportunistically remove sorts by 
replacing
-        /// `RepartitionExec` with `SortPreservingRepartitionExec`, and
-        /// `CoalescePartitionsExec` with `SortPreservingMergeExec`,
-        /// even when the query is bounded.
-        pub bounded_order_preserving_variants: bool, default = false
+        /// When true, DataFusion will opportunistically remove sorts when the 
data is already sorted,
+        /// (i.e. setting `preserve_order` to true on `RepartitionExec`  and
+        /// using `SortPreservingMergeExec`)
+        ///
+        /// When false, DataFusion will maximize plan parallelism using
+        /// `RepartitionExec` even if this requires subsequently resorting 
data using a `SortExec`.
+        pub prefer_existing_sort: bool, default = false
 
         /// When set to true, the logical plan optimizer will produce warning
         /// messages if any optimization rules produce errors and then proceed 
to the next
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index b3fb41ea10..3463f3a313 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -1228,7 +1228,7 @@ fn ensure_distribution(
     // - it is desired according to config
     // - when plan is unbounded
     let order_preserving_variants_desirable =
-        is_unbounded || config.optimizer.bounded_order_preserving_variants;
+        is_unbounded || config.optimizer.prefer_existing_sort;
 
     if dist_context.plan.children().is_empty() {
         return Ok(Transformed::No(dist_context));
@@ -2085,8 +2085,7 @@ mod tests {
         config.optimizer.enable_round_robin_repartition = false;
         config.optimizer.repartition_file_scans = false;
         config.optimizer.repartition_file_min_size = 1024;
-        config.optimizer.bounded_order_preserving_variants =
-            bounded_order_preserving_variants;
+        config.optimizer.prefer_existing_sort = 
bounded_order_preserving_variants;
         ensure_distribution(distribution_context, &config).map(|item| 
item.into().plan)
     }
 
@@ -2124,7 +2123,7 @@ mod tests {
             config.execution.target_partitions = $TARGET_PARTITIONS;
             config.optimizer.repartition_file_scans = $REPARTITION_FILE_SCANS;
             config.optimizer.repartition_file_min_size = 
$REPARTITION_FILE_MIN_SIZE;
-            config.optimizer.bounded_order_preserving_variants = 
$BOUNDED_ORDER_PRESERVING_VARIANTS;
+            config.optimizer.prefer_existing_sort = 
$BOUNDED_ORDER_PRESERVING_VARIANTS;
 
             // NOTE: These tests verify the joint `EnforceDistribution` + 
`EnforceSorting` cascade
             //       because they were written prior to the separation of 
`BasicEnforcement` into
@@ -4516,7 +4515,7 @@ mod tests {
         let mut config = ConfigOptions::new();
         config.execution.target_partitions = 10;
         config.optimizer.enable_round_robin_repartition = true;
-        config.optimizer.bounded_order_preserving_variants = false;
+        config.optimizer.prefer_existing_sort = false;
         let distribution_plan =
             EnforceDistribution::new().optimize(physical_plan, &config)?;
         assert_plan_txt!(expected, distribution_plan);
@@ -4558,7 +4557,7 @@ mod tests {
         let mut config = ConfigOptions::new();
         config.execution.target_partitions = 10;
         config.optimizer.enable_round_robin_repartition = true;
-        config.optimizer.bounded_order_preserving_variants = false;
+        config.optimizer.prefer_existing_sort = false;
         let distribution_plan =
             EnforceDistribution::new().optimize(physical_plan, &config)?;
         assert_plan_txt!(expected, distribution_plan);
diff --git 
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
 
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index b0ae199a2d..cb3b6c3d07 100644
--- 
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++ 
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -251,7 +251,7 @@ pub(crate) fn replace_with_order_preserving_variants(
         // any case, as doing so helps fix the pipeline.
         // Also do the replacement if opted-in via config options.
         let use_order_preserving_variant =
-            config.optimizer.bounded_order_preserving_variants || 
unbounded_output(plan);
+            config.optimizer.prefer_existing_sort || unbounded_output(plan);
         let updated_sort_input = get_updated_plan(
             exec_tree,
             is_spr_better || use_order_preserving_variant,
@@ -336,7 +336,7 @@ mod tests {
 
             // Run the rule top-down
             // let optimized_physical_plan = 
physical_plan.transform_down(&replace_repartition_execs)?;
-            let config = 
SessionConfig::new().with_bounded_order_preserving_variants($ALLOW_BOUNDED);
+            let config = 
SessionConfig::new().with_prefer_existing_sort($ALLOW_BOUNDED);
             let plan_with_pipeline_fixer = 
OrderPreservationContext::new(physical_plan);
             let parallel = 
plan_with_pipeline_fixer.transform_up(&|plan_with_pipeline_fixer| 
replace_with_order_preserving_variants(plan_with_pipeline_fixer, false, false, 
config.options()))?;
             let optimized_physical_plan = parallel.plan;
diff --git a/datafusion/execution/src/config.rs 
b/datafusion/execution/src/config.rs
index 44fcc2ab49..cfcc205b56 100644
--- a/datafusion/execution/src/config.rs
+++ b/datafusion/execution/src/config.rs
@@ -145,10 +145,12 @@ impl SessionConfig {
         self.options.optimizer.repartition_sorts
     }
 
-    /// Remove sorts by replacing with order-preserving variants of operators,
-    /// even when query is bounded?
-    pub fn bounded_order_preserving_variants(&self) -> bool {
-        self.options.optimizer.bounded_order_preserving_variants
+    /// Prefer existing sort (true) or maximize parallelism (false). See
+    /// [prefer_existing_sort] for more details
+    ///
+    /// [prefer_existing_sort]: 
datafusion_common::config::OptimizerOptions::prefer_existing_sort
+    pub fn prefer_existing_sort(&self) -> bool {
+        self.options.optimizer.prefer_existing_sort
     }
 
     /// Are statistics collected during execution?
@@ -221,10 +223,12 @@ impl SessionConfig {
         self
     }
 
-    /// Enables or disables the use of order-preserving variants of 
`CoalescePartitions`
-    /// and `RepartitionExec` operators, even when the query is bounded
-    pub fn with_bounded_order_preserving_variants(mut self, enabled: bool) -> 
Self {
-        self.options.optimizer.bounded_order_preserving_variants = enabled;
+    /// Prefer existing sort (true) or maximize parallelism (false). See
+    /// [prefer_existing_sort] for more details
+    ///
+    /// [prefer_existing_sort]: 
datafusion_common::config::OptimizerOptions::prefer_existing_sort
+    pub fn with_prefer_existing_sort(mut self, enabled: bool) -> Self {
+        self.options.optimizer.prefer_existing_sort = enabled;
         self
     }
 
diff --git a/datafusion/sqllogictest/test_files/information_schema.slt 
b/datafusion/sqllogictest/test_files/information_schema.slt
index 12aa9089a0..74c1296fa4 100644
--- a/datafusion/sqllogictest/test_files/information_schema.slt
+++ b/datafusion/sqllogictest/test_files/information_schema.slt
@@ -183,12 +183,12 @@ datafusion.explain.logical_plan_only false
 datafusion.explain.physical_plan_only false
 datafusion.explain.show_statistics false
 datafusion.optimizer.allow_symmetric_joins_without_pruning true
-datafusion.optimizer.bounded_order_preserving_variants false
 datafusion.optimizer.enable_round_robin_repartition true
 datafusion.optimizer.enable_topk_aggregation true
 datafusion.optimizer.filter_null_join_keys false
 datafusion.optimizer.hash_join_single_partition_threshold 1048576
 datafusion.optimizer.max_passes 3
+datafusion.optimizer.prefer_existing_sort false
 datafusion.optimizer.prefer_hash_join true
 datafusion.optimizer.repartition_aggregations true
 datafusion.optimizer.repartition_file_min_size 10485760
diff --git a/docs/source/user-guide/configs.md 
b/docs/source/user-guide/configs.md
index 638ac5a36b..9eb0862de9 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -87,7 +87,7 @@ Environment variables are read during `SessionConfig` 
initialisation so they mus
 | datafusion.optimizer.repartition_file_scans                | true            
          | When set to `true`, file groups will be repartitioned to achieve 
maximum parallelism. Currently Parquet and CSV formats are supported. If set to 
`true`, all files will be repartitioned evenly (i.e., a single large file might 
be partitioned into smaller chunks) for parallel scanning. If set to `false`, 
different files will be read in parallel, but repartitioning won't happen 
within a single file.    [...]
 | datafusion.optimizer.repartition_windows                   | true            
          | Should DataFusion repartition data using the partitions keys to 
execute window functions in parallel using the provided `target_partitions` 
level                                                                           
                                                                                
                                                                                
                      [...]
 | datafusion.optimizer.repartition_sorts                     | true            
          | Should DataFusion execute sorts in a per-partition fashion and 
merge afterwards instead of coalescing first and sorting globally. With this 
flag is enabled, plans in the form below `text "SortExec: [a@0 ASC]", " 
CoalescePartitionsExec", " RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1", ` would turn into the plan below which performs better in 
multithreaded environments `text  [...]
-| datafusion.optimizer.bounded_order_preserving_variants     | false           
          | When true, DataFusion will opportunistically remove sorts by 
replacing `RepartitionExec` with `SortPreservingRepartitionExec`, and 
`CoalescePartitionsExec` with `SortPreservingMergeExec`, even when the query is 
bounded.                                                                        
                                                                                
                               [...]
+| datafusion.optimizer.prefer_existing_sort                  | false           
          | When true, DataFusion will opportunistically remove sorts when the 
data is already sorted, (i.e. setting `preserve_order` to true on 
`RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion 
will maximize plan parallelism using `RepartitionExec` even if this requires 
subsequently resorting data using a `SortExec`.                                 
                                  [...]
 | datafusion.optimizer.skip_failed_rules                     | false           
          | When set to true, the logical plan optimizer will produce warning 
messages if any optimization rules produce errors and then proceed to the next 
rule. When set to false, any rules that produce errors will cause the query to 
fail                                                                            
                                                                                
                  [...]
 | datafusion.optimizer.max_passes                            | 3               
          | Number of times that the optimizer will attempt to optimize the 
plan                                                                            
                                                                                
                                                                                
                                                                                
                  [...]
 | datafusion.optimizer.top_down_join_key_reordering          | true            
          | When set to true, the physical plan optimizer will run a top down 
process to reorder the join keys                                                
                                                                                
                                                                                
                                                                                
                [...]

Reply via email to