This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new cca5225263 chore: add config option for allowing bounded use of 
sort-preserving operators (#7164)
cca5225263 is described below

commit cca5225263e1f838bd5f1ace91425bbdc18c2969
Author: Christopher M. Wolff <[email protected]>
AuthorDate: Tue Aug 1 06:15:46 2023 -0700

    chore: add config option for allowing bounded use of sort-preserving 
operators (#7164)
---
 datafusion/common/src/config.rs                    |  6 +++
 .../replace_with_order_preserving_variants.rs      | 56 ++++++++++++++++++++--
 .../src/physical_optimizer/sort_enforcement.rs     |  1 +
 .../test_files/information_schema.slt              |  1 +
 datafusion/execution/src/config.rs                 | 13 +++++
 docs/source/user-guide/configs.md                  |  1 +
 6 files changed, 74 insertions(+), 4 deletions(-)

diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index 020a240654..f681ae57a3 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -354,6 +354,12 @@ config_namespace! {
         /// ```
         pub repartition_sorts: bool, default = true
 
+        /// When true, DataFusion will opportunistically remove sorts by 
replacing
+        /// `RepartitionExec` with `SortPreservingRepartitionExec`, and
+        /// `CoalescePartitionsExec` with `SortPreservingMergeExec`,
+        /// even when the query is bounded.
+        pub bounded_order_preserving_variants: bool, default = false
+
         /// When set to true, the logical plan optimizer will produce warning
         /// messages if any optimization rules produce errors and then proceed 
to the next
         /// rule. When set to false, any rules that produce errors will cause 
the query to fail
diff --git 
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
 
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index 8c86906a68..7d45c171b7 100644
--- 
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++ 
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -28,6 +28,7 @@ use crate::physical_plan::{with_new_children_if_necessary, 
ExecutionPlan};
 
 use super::utils::is_repartition;
 
+use datafusion_common::config::ConfigOptions;
 use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
 use datafusion_physical_expr::utils::ordering_satisfy;
 
@@ -204,6 +205,10 @@ fn get_updated_plan(
 /// If this replacement is helpful for removing a `SortExec`, it updates the 
plan.
 /// Otherwise, it leaves the plan unchanged.
 ///
+/// Note: this optimizer sub-rule will only produce 
`SortPreservingRepartitionExec`s
+/// if the query is bounded or if the config option 
`bounded_order_preserving_variants`
+/// is set to `true`.
+///
 /// The algorithm flow is simply like this:
 /// 1. Visit nodes of the physical plan bottom-up and look for `SortExec` 
nodes.
 /// 1_1. During the traversal, build an `ExecTree` to keep track of operators
@@ -232,6 +237,7 @@ pub(crate) fn replace_with_order_preserving_variants(
     // a `SortExec` from the plan. If this flag is `false`, this replacement
     // should only be made to fix the pipeline (streaming).
     is_spm_better: bool,
+    config: &ConfigOptions,
 ) -> Result<Transformed<OrderPreservationContext>> {
     let plan = &requirements.plan;
     let ordering_onwards = &requirements.ordering_onwards;
@@ -243,11 +249,13 @@ pub(crate) fn replace_with_order_preserving_variants(
         };
         // For unbounded cases, replace with the order-preserving variant in
         // any case, as doing so helps fix the pipeline.
-        let is_unbounded = unbounded_output(plan);
+        // Also do the replacement if opted-in via config options.
+        let use_order_preserving_variant =
+            config.optimizer.bounded_order_preserving_variants || 
unbounded_output(plan);
         let updated_sort_input = get_updated_plan(
             exec_tree,
-            is_spr_better || is_unbounded,
-            is_spm_better || is_unbounded,
+            is_spr_better || use_order_preserving_variant,
+            is_spm_better || use_order_preserving_variant,
         )?;
         // If this sort is unnecessary, we should remove it and update the 
plan:
         if ordering_satisfy(
@@ -270,6 +278,8 @@ pub(crate) fn replace_with_order_preserving_variants(
 mod tests {
     use super::*;
 
+    use crate::prelude::SessionConfig;
+
     use crate::datasource::file_format::file_type::FileCompressionType;
     use crate::datasource::listing::PartitionedFile;
     use crate::datasource::physical_plan::{CsvExec, FileScanConfig};
@@ -299,8 +309,17 @@ mod tests {
     /// `$EXPECTED_PLAN_LINES`: input plan
     /// `$EXPECTED_OPTIMIZED_PLAN_LINES`: optimized plan
     /// `$PLAN`: the plan to optimized
+    /// `$ALLOW_BOUNDED`: whether to allow the plan to be optimized for 
bounded cases
     macro_rules! assert_optimized {
         ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, 
$PLAN: expr) => {
+            assert_optimized!(
+                $EXPECTED_PLAN_LINES,
+                $EXPECTED_OPTIMIZED_PLAN_LINES,
+                $PLAN,
+                false
+            );
+        };
+        ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, 
$PLAN: expr, $ALLOW_BOUNDED: expr) => {
             let physical_plan = $PLAN;
             let formatted = 
displayable(physical_plan.as_ref()).indent(true).to_string();
             let actual: Vec<&str> = formatted.trim().lines().collect();
@@ -317,8 +336,9 @@ mod tests {
 
             // Run the rule top-down
             // let optimized_physical_plan = 
physical_plan.transform_down(&replace_repartition_execs)?;
+            let config = 
SessionConfig::new().with_bounded_order_preserving_variants($ALLOW_BOUNDED);
             let plan_with_pipeline_fixer = 
OrderPreservationContext::new(physical_plan);
-            let parallel = 
plan_with_pipeline_fixer.transform_up(&|plan_with_pipeline_fixer| 
replace_with_order_preserving_variants(plan_with_pipeline_fixer, false, 
false))?;
+            let parallel = 
plan_with_pipeline_fixer.transform_up(&|plan_with_pipeline_fixer| 
replace_with_order_preserving_variants(plan_with_pipeline_fixer, false, false, 
config.options()))?;
             let optimized_physical_plan = parallel.plan;
 
             // Get string representation of the plan
@@ -751,6 +771,34 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_with_bounded_input() -> Result<()> {
+        let schema = create_test_schema()?;
+        let sort_exprs = vec![sort_expr("a", &schema)];
+        let source = csv_exec_sorted(&schema, sort_exprs, false);
+        let repartition = 
repartition_exec_hash(repartition_exec_round_robin(source));
+        let sort = sort_exec(vec![sort_expr("a", &schema)], repartition, true);
+
+        let physical_plan =
+            sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort);
+
+        let expected_input = vec![
+            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+            "  SortExec: expr=[a@0 ASC NULLS LAST]",
+            "    RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "        CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+        ];
+        let expected_optimized = vec![
+            "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
+            "  SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "    RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "      CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+        ];
+        assert_optimized!(expected_input, expected_optimized, physical_plan, 
true);
+        Ok(())
+    }
+
     // End test cases
     // Start test helpers
 
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs 
b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 5d8b44f50d..97fc79720b 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -376,6 +376,7 @@ impl PhysicalOptimizerRule for EnforceSorting {
                     plan_with_pipeline_fixer,
                     false,
                     true,
+                    config,
                 )
             })?;
 
diff --git 
a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt 
b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
index 38f1d2cd05..fcb818d5fd 100644
--- a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt
@@ -158,6 +158,7 @@ datafusion.execution.time_zone +00:00
 datafusion.explain.logical_plan_only false
 datafusion.explain.physical_plan_only false
 datafusion.optimizer.allow_symmetric_joins_without_pruning true
+datafusion.optimizer.bounded_order_preserving_variants false
 datafusion.optimizer.enable_round_robin_repartition true
 datafusion.optimizer.filter_null_join_keys false
 datafusion.optimizer.hash_join_single_partition_threshold 1048576
diff --git a/datafusion/execution/src/config.rs 
b/datafusion/execution/src/config.rs
index 97770eb99c..a860e15fa2 100644
--- a/datafusion/execution/src/config.rs
+++ b/datafusion/execution/src/config.rs
@@ -145,6 +145,12 @@ impl SessionConfig {
         self.options.optimizer.repartition_sorts
     }
 
+    /// Remove sorts by replacing with order-preserving variants of operators,
+    /// even when query is bounded?
+    pub fn bounded_order_preserving_variants(&self) -> bool {
+        self.options.optimizer.bounded_order_preserving_variants
+    }
+
     /// Are statistics collected during execution?
     pub fn collect_statistics(&self) -> bool {
         self.options.execution.collect_statistics
@@ -215,6 +221,13 @@ impl SessionConfig {
         self
     }
 
+    /// Enables or disables the use of order-preserving variants of 
`CoalescePartitions`
+    /// and `RepartitionExec` operators, even when the query is bounded
+    pub fn with_bounded_order_preserving_variants(mut self, enabled: bool) -> 
Self {
+        self.options.optimizer.bounded_order_preserving_variants = enabled;
+        self
+    }
+
     /// Enables or disables the use of pruning predicate for parquet readers 
to skip row groups
     pub fn with_parquet_pruning(mut self, enabled: bool) -> Self {
         self.options.execution.parquet.pruning = enabled;
diff --git a/docs/source/user-guide/configs.md 
b/docs/source/user-guide/configs.md
index 4229e3af70..bff7cb4da0 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -66,6 +66,7 @@ Environment variables are read during `SessionConfig` 
initialisation so they mus
 | datafusion.optimizer.repartition_file_scans                | true       | 
When set to `true`, file groups will be repartitioned to achieve maximum 
parallelism. Currently Parquet and CSV formats are supported. If set to `true`, 
all files will be repartitioned evenly (i.e., a single large file might be 
partitioned into smaller chunks) for parallel scanning. If set to `false`, 
different files will be read in parallel, but repartitioning won't happen 
within a single file.                   [...]
 | datafusion.optimizer.repartition_windows                   | true       | 
Should DataFusion repartition data using the partitions keys to execute window 
functions in parallel using the provided `target_partitions` level              
                                                                                
                                                                                
                                                                                
                  [...]
 | datafusion.optimizer.repartition_sorts                     | true       | 
Should DataFusion execute sorts in a per-partition fashion and merge afterwards 
instead of coalescing first and sorting globally. With this flag is enabled, 
plans in the form below `text "SortExec: [a@0 ASC]", " CoalescePartitionsExec", 
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` 
would turn into the plan below which performs better in multithreaded 
environments `text "SortPreserving [...]
+| datafusion.optimizer.bounded_order_preserving_variants     | false      | 
When true, DataFusion will opportunistically remove sorts by replacing 
`RepartitionExec` with `SortPreservingRepartitionExec`, and 
`CoalescePartitionsExec` with `SortPreservingMergeExec`, even when the query is 
bounded.                                                                        
                                                                                
                                              [...]
 | datafusion.optimizer.skip_failed_rules                     | false      | 
When set to true, the logical plan optimizer will produce warning messages if 
any optimization rules produce errors and then proceed to the next rule. When 
set to false, any rules that produce errors will cause the query to fail        
                                                                                
                                                                                
                     [...]
 | datafusion.optimizer.max_passes                            | 3          | 
Number of times that the optimizer will attempt to optimize the plan            
                                                                                
                                                                                
                                                                                
                                                                                
                 [...]
 | datafusion.optimizer.top_down_join_key_reordering          | true       | 
When set to true, the physical plan optimizer will run a top down process to 
reorder the join keys                                                           
                                                                                
                                                                                
                                                                                
                    [...]

Reply via email to