This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 1f13607e7e Cleanup the usage of round-robin repartitioning (#8794)
1f13607e7e is described below

commit 1f13607e7e2c3613af57d4e5731e0cd212d3b017
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Thu Jan 11 09:33:01 2024 -0800

    Cleanup the usage of round-robin repartitioning (#8794)
    
    * Clean up
    
    * Add sqllogictest
    
    * fix
    
    * Fix
    
    * Enable enable_round_robin_repartition for enforce_distribution tests
    
    * Fix
    
    * Try to stable test
    
    * Update datafusion/sqllogictest/test_files/repartition.slt
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * Update datafusion/sqllogictest/test_files/repartition.slt
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * Fix scratch space
    
    * Fix test
    
    * Update datafusion/core/src/physical_optimizer/enforce_distribution.rs
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 .../src/physical_optimizer/enforce_distribution.rs | 48 ++++++--------
 datafusion/sqllogictest/test_files/repartition.slt | 73 ++++++++++++++++++++++
 2 files changed, 93 insertions(+), 28 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index bf3f9ef0f3..a2f530c0e6 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -925,7 +925,6 @@ fn add_hash_on_top(
     mut input: DistributionContext,
     hash_exprs: Vec<Arc<dyn PhysicalExpr>>,
     n_target: usize,
-    repartition_beneficial_stats: bool,
 ) -> Result<DistributionContext> {
     // Early return if hash repartition is unnecessary
     if n_target == 1 {
@@ -950,12 +949,6 @@ fn add_hash_on_top(
         //   requirements.
         // - Usage of order preserving variants is not desirable (per the flag
         //   `config.optimizer.prefer_existing_sort`).
-        if repartition_beneficial_stats {
-            // Since hashing benefits from partitioning, add a round-robin 
repartition
-            // before it:
-            input = add_roundrobin_on_top(input, n_target)?;
-        }
-
         let partitioning = Partitioning::Hash(hash_exprs, n_target);
         let repartition = RepartitionExec::try_new(input.plan.clone(), 
partitioning)?
             .with_preserve_order();
@@ -1208,6 +1201,12 @@ fn ensure_distribution(
                 true
             };
 
+            let add_roundrobin = enable_round_robin
+                // Operator benefits from partitioning (e.g. filter):
+                && (would_benefit && repartition_beneficial_stats)
+                // Unless partitioning increases the partition count, it is 
not beneficial:
+                && child.plan.output_partitioning().partition_count() < 
target_partitions;
+
             // When `repartition_file_scans` is set, attempt to increase
             // parallelism at the source.
             if repartition_file_scans && repartition_beneficial_stats {
@@ -1218,33 +1217,26 @@ fn ensure_distribution(
                 }
             }
 
-            if enable_round_robin
-                // Operator benefits from partitioning (e.g. filter):
-                && (would_benefit && repartition_beneficial_stats)
-                // Unless partitioning doesn't increase the partition count, 
it is not beneficial:
-                && child.plan.output_partitioning().partition_count() < 
target_partitions
-            {
-                // Increase parallelism by adding round-robin repartitioning
-                // on top of the operator. Note that we only do this if the
-                // partition count is not already equal to the desired 
partition
-                // count.
-                child = add_roundrobin_on_top(child, target_partitions)?;
-            }
-
             // Satisfy the distribution requirement if it is unmet.
             match requirement {
                 Distribution::SinglePartition => {
                     child = add_spm_on_top(child);
                 }
                 Distribution::HashPartitioned(exprs) => {
-                    child = add_hash_on_top(
-                        child,
-                        exprs.to_vec(),
-                        target_partitions,
-                        repartition_beneficial_stats,
-                    )?;
+                    if add_roundrobin {
+                        // Add round-robin repartitioning on top of the 
operator
+                        // to increase parallelism.
+                        child = add_roundrobin_on_top(child, 
target_partitions)?;
+                    }
+                    child = add_hash_on_top(child, exprs.to_vec(), 
target_partitions)?;
+                }
+                Distribution::UnspecifiedDistribution => {
+                    if add_roundrobin {
+                        // Add round-robin repartitioning on top of the 
operator
+                        // to increase parallelism.
+                        child = add_roundrobin_on_top(child, 
target_partitions)?;
+                    }
                 }
-                Distribution::UnspecifiedDistribution => {}
             };
 
             // There is an ordering requirement of the operator:
@@ -1908,7 +1900,7 @@ pub(crate) mod tests {
         let distribution_context = DistributionContext::new(plan);
         let mut config = ConfigOptions::new();
         config.execution.target_partitions = target_partitions;
-        config.optimizer.enable_round_robin_repartition = false;
+        config.optimizer.enable_round_robin_repartition = true;
         config.optimizer.repartition_file_scans = false;
         config.optimizer.repartition_file_min_size = 1024;
         config.optimizer.prefer_existing_sort = prefer_existing_sort;
diff --git a/datafusion/sqllogictest/test_files/repartition.slt 
b/datafusion/sqllogictest/test_files/repartition.slt
new file mode 100644
index 0000000000..9829299f43
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/repartition.slt
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+
+#   http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+##########
+# Tests for repartitioning
+##########
+
+# Set 4 partitions for deterministic output plans
+statement ok
+set datafusion.execution.target_partitions = 4;
+
+statement ok
+COPY  (VALUES (1, 2), (2, 5), (3, 2), (4, 5), (5, 0)) TO 
'test_files/scratch/repartition/parquet_table/2.parquet'
+(FORMAT PARQUET, SINGLE_FILE_OUTPUT true);
+
+statement ok
+CREATE EXTERNAL TABLE parquet_table(column1 int, column2 int)
+STORED AS PARQUET
+LOCATION 'test_files/scratch/repartition/parquet_table/';
+
+# enable round robin repartitioning
+statement ok
+set datafusion.optimizer.enable_round_robin_repartition = true;
+
+query TT
+EXPLAIN SELECT column1, SUM(column2) FROM parquet_table GROUP BY column1;
+----
+logical_plan
+Aggregate: groupBy=[[parquet_table.column1]], 
aggr=[[SUM(CAST(parquet_table.column2 AS Int64))]]
+--TableScan: parquet_table projection=[column1, column2]
+physical_plan
+AggregateExec: mode=FinalPartitioned, gby=[column1@0 as column1], 
aggr=[SUM(parquet_table.column2)]
+--CoalesceBatchesExec: target_batch_size=8192
+----RepartitionExec: partitioning=Hash([column1@0], 4), input_partitions=4
+------AggregateExec: mode=Partial, gby=[column1@0 as column1], 
aggr=[SUM(parquet_table.column2)]
+--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+----------ParquetExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition/parquet_table/2.parquet]]},
 projection=[column1, column2]
+
+# disable round robin repartitioning
+statement ok
+set datafusion.optimizer.enable_round_robin_repartition = false;
+
+query TT
+EXPLAIN SELECT column1, SUM(column2) FROM parquet_table GROUP BY column1;
+----
+logical_plan
+Aggregate: groupBy=[[parquet_table.column1]], 
aggr=[[SUM(CAST(parquet_table.column2 AS Int64))]]
+--TableScan: parquet_table projection=[column1, column2]
+physical_plan
+AggregateExec: mode=FinalPartitioned, gby=[column1@0 as column1], 
aggr=[SUM(parquet_table.column2)]
+--CoalesceBatchesExec: target_batch_size=8192
+----RepartitionExec: partitioning=Hash([column1@0], 4), input_partitions=1
+------AggregateExec: mode=Partial, gby=[column1@0 as column1], 
aggr=[SUM(parquet_table.column2)]
+--------ParquetExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition/parquet_table/2.parquet]]},
 projection=[column1, column2]
+
+
+# Cleanup
+statement ok
+DROP TABLE parquet_table;

Reply via email to