This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 1f13607e7e Cleanup the usage of round-robin repartitioning (#8794)
1f13607e7e is described below
commit 1f13607e7e2c3613af57d4e5731e0cd212d3b017
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Thu Jan 11 09:33:01 2024 -0800
Cleanup the usage of round-robin repartitioning (#8794)
* Clean up
* Add sqllogictest
* fix
* Fix
* Enable enable_round_robin_repartition for enforce_distribution tests
* Fix
* Try to stable test
* Update datafusion/sqllogictest/test_files/repartition.slt
Co-authored-by: Andrew Lamb <[email protected]>
* Update datafusion/sqllogictest/test_files/repartition.slt
Co-authored-by: Andrew Lamb <[email protected]>
* Fix scratch space
* Fix test
* Update datafusion/core/src/physical_optimizer/enforce_distribution.rs
Co-authored-by: Andrew Lamb <[email protected]>
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
.../src/physical_optimizer/enforce_distribution.rs | 48 ++++++--------
datafusion/sqllogictest/test_files/repartition.slt | 73 ++++++++++++++++++++++
2 files changed, 93 insertions(+), 28 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index bf3f9ef0f3..a2f530c0e6 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -925,7 +925,6 @@ fn add_hash_on_top(
mut input: DistributionContext,
hash_exprs: Vec<Arc<dyn PhysicalExpr>>,
n_target: usize,
- repartition_beneficial_stats: bool,
) -> Result<DistributionContext> {
// Early return if hash repartition is unnecessary
if n_target == 1 {
@@ -950,12 +949,6 @@ fn add_hash_on_top(
// requirements.
// - Usage of order preserving variants is not desirable (per the flag
// `config.optimizer.prefer_existing_sort`).
- if repartition_beneficial_stats {
- // Since hashing benefits from partitioning, add a round-robin
repartition
- // before it:
- input = add_roundrobin_on_top(input, n_target)?;
- }
-
let partitioning = Partitioning::Hash(hash_exprs, n_target);
let repartition = RepartitionExec::try_new(input.plan.clone(),
partitioning)?
.with_preserve_order();
@@ -1208,6 +1201,12 @@ fn ensure_distribution(
true
};
+ let add_roundrobin = enable_round_robin
+ // Operator benefits from partitioning (e.g. filter):
+ && (would_benefit && repartition_beneficial_stats)
+ // Unless partitioning increases the partition count, it is
not beneficial:
+ && child.plan.output_partitioning().partition_count() <
target_partitions;
+
// When `repartition_file_scans` is set, attempt to increase
// parallelism at the source.
if repartition_file_scans && repartition_beneficial_stats {
@@ -1218,33 +1217,26 @@ fn ensure_distribution(
}
}
- if enable_round_robin
- // Operator benefits from partitioning (e.g. filter):
- && (would_benefit && repartition_beneficial_stats)
- // Unless partitioning doesn't increase the partition count,
it is not beneficial:
- && child.plan.output_partitioning().partition_count() <
target_partitions
- {
- // Increase parallelism by adding round-robin repartitioning
- // on top of the operator. Note that we only do this if the
- // partition count is not already equal to the desired
partition
- // count.
- child = add_roundrobin_on_top(child, target_partitions)?;
- }
-
// Satisfy the distribution requirement if it is unmet.
match requirement {
Distribution::SinglePartition => {
child = add_spm_on_top(child);
}
Distribution::HashPartitioned(exprs) => {
- child = add_hash_on_top(
- child,
- exprs.to_vec(),
- target_partitions,
- repartition_beneficial_stats,
- )?;
+ if add_roundrobin {
+ // Add round-robin repartitioning on top of the
operator
+ // to increase parallelism.
+ child = add_roundrobin_on_top(child,
target_partitions)?;
+ }
+ child = add_hash_on_top(child, exprs.to_vec(),
target_partitions)?;
+ }
+ Distribution::UnspecifiedDistribution => {
+ if add_roundrobin {
+ // Add round-robin repartitioning on top of the
operator
+ // to increase parallelism.
+ child = add_roundrobin_on_top(child,
target_partitions)?;
+ }
}
- Distribution::UnspecifiedDistribution => {}
};
// There is an ordering requirement of the operator:
@@ -1908,7 +1900,7 @@ pub(crate) mod tests {
let distribution_context = DistributionContext::new(plan);
let mut config = ConfigOptions::new();
config.execution.target_partitions = target_partitions;
- config.optimizer.enable_round_robin_repartition = false;
+ config.optimizer.enable_round_robin_repartition = true;
config.optimizer.repartition_file_scans = false;
config.optimizer.repartition_file_min_size = 1024;
config.optimizer.prefer_existing_sort = prefer_existing_sort;
diff --git a/datafusion/sqllogictest/test_files/repartition.slt
b/datafusion/sqllogictest/test_files/repartition.slt
new file mode 100644
index 0000000000..9829299f43
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/repartition.slt
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+##########
+# Tests for repartitioning
+##########
+
+# Set 4 partitions for deterministic output plans
+statement ok
+set datafusion.execution.target_partitions = 4;
+
+statement ok
+COPY (VALUES (1, 2), (2, 5), (3, 2), (4, 5), (5, 0)) TO
'test_files/scratch/repartition/parquet_table/2.parquet'
+(FORMAT PARQUET, SINGLE_FILE_OUTPUT true);
+
+statement ok
+CREATE EXTERNAL TABLE parquet_table(column1 int, column2 int)
+STORED AS PARQUET
+LOCATION 'test_files/scratch/repartition/parquet_table/';
+
+# enable round robin repartitioning
+statement ok
+set datafusion.optimizer.enable_round_robin_repartition = true;
+
+query TT
+EXPLAIN SELECT column1, SUM(column2) FROM parquet_table GROUP BY column1;
+----
+logical_plan
+Aggregate: groupBy=[[parquet_table.column1]],
aggr=[[SUM(CAST(parquet_table.column2 AS Int64))]]
+--TableScan: parquet_table projection=[column1, column2]
+physical_plan
+AggregateExec: mode=FinalPartitioned, gby=[column1@0 as column1],
aggr=[SUM(parquet_table.column2)]
+--CoalesceBatchesExec: target_batch_size=8192
+----RepartitionExec: partitioning=Hash([column1@0], 4), input_partitions=4
+------AggregateExec: mode=Partial, gby=[column1@0 as column1],
aggr=[SUM(parquet_table.column2)]
+--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+----------ParquetExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition/parquet_table/2.parquet]]},
projection=[column1, column2]
+
+# disable round robin repartitioning
+statement ok
+set datafusion.optimizer.enable_round_robin_repartition = false;
+
+query TT
+EXPLAIN SELECT column1, SUM(column2) FROM parquet_table GROUP BY column1;
+----
+logical_plan
+Aggregate: groupBy=[[parquet_table.column1]],
aggr=[[SUM(CAST(parquet_table.column2 AS Int64))]]
+--TableScan: parquet_table projection=[column1, column2]
+physical_plan
+AggregateExec: mode=FinalPartitioned, gby=[column1@0 as column1],
aggr=[SUM(parquet_table.column2)]
+--CoalesceBatchesExec: target_batch_size=8192
+----RepartitionExec: partitioning=Hash([column1@0], 4), input_partitions=1
+------AggregateExec: mode=Partial, gby=[column1@0 as column1],
aggr=[SUM(parquet_table.column2)]
+--------ParquetExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition/parquet_table/2.parquet]]},
projection=[column1, column2]
+
+
+# Cleanup
+statement ok
+DROP TABLE parquet_table;