This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 29f23ebffb Move `repartition_file_scans` out of `enable_round_robin`
check in `EnforceDistribution` rule (#8731)
29f23ebffb is described below
commit 29f23ebffb8a95f6663d23a3a2a0fb8e87765f7b
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Fri Jan 5 12:38:56 2024 -0800
Move `repartition_file_scans` out of `enable_round_robin` check in
`EnforceDistribution` rule (#8731)
* Cleanup
* More
* Restore add_roundrobin_on_top
* Restore test files
* More
* Restore
* More
* More
* Make test stable
* For review
* Add test
---
.../src/physical_optimizer/enforce_distribution.rs | 42 ++++++++++------------
.../sqllogictest/test_files/arrow_typeof.slt | 2 +-
.../sqllogictest/test_files/repartition_scan.slt | 24 +++++++++++--
datafusion/sqllogictest/test_files/timestamps.slt | 4 +--
.../sqllogictest/test_files/tpch/q2.slt.part | 2 +-
datafusion/sqllogictest/test_files/window.slt | 2 +-
6 files changed, 45 insertions(+), 31 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 1c86c4c320..bf3f9ef0f3 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -1197,32 +1197,33 @@ fn ensure_distribution(
)
.map(
|(mut child, requirement, required_input_ordering, would_benefit,
maintains)| {
- // Don't need to apply when the returned row count is not greater
than 1:
+ // Don't need to apply when the returned row count is not greater
than batch size
let num_rows = child.plan.statistics()?.num_rows;
let repartition_beneficial_stats = if
num_rows.is_exact().unwrap_or(false) {
num_rows
.get_value()
.map(|value| value > &batch_size)
- .unwrap_or(true)
+ .unwrap() // safe to unwrap since is_exact() is true
} else {
true
};
+ // When `repartition_file_scans` is set, attempt to increase
+ // parallelism at the source.
+ if repartition_file_scans && repartition_beneficial_stats {
+ if let Some(new_child) =
+ child.plan.repartitioned(target_partitions, config)?
+ {
+ child.plan = new_child;
+ }
+ }
+
if enable_round_robin
// Operator benefits from partitioning (e.g. filter):
&& (would_benefit && repartition_beneficial_stats)
// Unless partitioning doesn't increase the partition count,
it is not beneficial:
&& child.plan.output_partitioning().partition_count() <
target_partitions
{
- // When `repartition_file_scans` is set, attempt to increase
- // parallelism at the source.
- if repartition_file_scans {
- if let Some(new_child) =
- child.plan.repartitioned(target_partitions, config)?
- {
- child.plan = new_child;
- }
- }
// Increase parallelism by adding round-robin repartitioning
// on top of the operator. Note that we only do this if the
// partition count is not already equal to the desired
partition
@@ -1361,17 +1362,10 @@ impl DistributionContext {
fn update_children(mut self) -> Result<Self> {
for child_context in self.children_nodes.iter_mut() {
- child_context.distribution_connection = match
child_context.plan.as_any() {
- plan_any if plan_any.is::<RepartitionExec>() => matches!(
- plan_any
- .downcast_ref::<RepartitionExec>()
- .unwrap()
- .partitioning(),
- Partitioning::RoundRobinBatch(_) | Partitioning::Hash(_, _)
- ),
- plan_any
- if plan_any.is::<SortPreservingMergeExec>()
- || plan_any.is::<CoalescePartitionsExec>() =>
+ child_context.distribution_connection = match &child_context.plan {
+ plan if is_repartition(plan)
+ || is_coalesce_partitions(plan)
+ || is_sort_preserving_merge(plan) =>
{
true
}
@@ -3870,14 +3864,14 @@ pub(crate) mod tests {
"RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
// Plan already has two partitions
- "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a,
b, c, d, e]",
+ "ParquetExec: file_groups={2 groups: [[x:0..100], [y:0..100]]},
projection=[a, b, c, d, e]",
];
let expected_csv = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
// Plan already has two partitions
- "CsvExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c,
d, e], has_header=false",
+ "CsvExec: file_groups={2 groups: [[x:0..100], [y:0..100]]},
projection=[a, b, c, d, e], has_header=false",
];
assert_optimized!(expected_parquet, plan_parquet, true, false, 2,
true, 10);
diff --git a/datafusion/sqllogictest/test_files/arrow_typeof.slt
b/datafusion/sqllogictest/test_files/arrow_typeof.slt
index 3fad4d0f61..6a623e6c92 100644
--- a/datafusion/sqllogictest/test_files/arrow_typeof.slt
+++ b/datafusion/sqllogictest/test_files/arrow_typeof.slt
@@ -375,4 +375,4 @@ select arrow_cast(make_array(1, 2, 3), 'LargeList(Int64)');
query T
select arrow_typeof(arrow_cast(make_array(1, 2, 3), 'LargeList(Int64)'));
----
-LargeList(Field { name: "item", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} })
\ No newline at end of file
+LargeList(Field { name: "item", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} })
diff --git a/datafusion/sqllogictest/test_files/repartition_scan.slt
b/datafusion/sqllogictest/test_files/repartition_scan.slt
index 02eccd7c5d..9d4951c7ec 100644
--- a/datafusion/sqllogictest/test_files/repartition_scan.slt
+++ b/datafusion/sqllogictest/test_files/repartition_scan.slt
@@ -63,6 +63,26 @@ CoalesceBatchesExec: target_batch_size=8192
--FilterExec: column1@0 != 42
----ParquetExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..101],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:101..202],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:202..303],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:303..403]]},
project [...]
+# disable round robin repartitioning
+statement ok
+set datafusion.optimizer.enable_round_robin_repartition = false;
+
+## Expect to see the scan read the file as "4" groups with even sizes
(offsets) again
+query TT
+EXPLAIN SELECT column1 FROM parquet_table WHERE column1 <> 42;
+----
+logical_plan
+Filter: parquet_table.column1 != Int32(42)
+--TableScan: parquet_table projection=[column1],
partial_filters=[parquet_table.column1 != Int32(42)]
+physical_plan
+CoalesceBatchesExec: target_batch_size=8192
+--FilterExec: column1@0 != 42
+----ParquetExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..101],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:101..202],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:202..303],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:303..403]]},
project [...]
+
+# enable round robin repartitioning again
+statement ok
+set datafusion.optimizer.enable_round_robin_repartition = true;
+
# create a second parquet file
statement ok
COPY (VALUES (100), (200)) TO
'test_files/scratch/repartition_scan/parquet_table/1.parquet'
@@ -147,7 +167,7 @@ WITH HEADER ROW
LOCATION 'test_files/scratch/repartition_scan/csv_table/';
query I
-select * from csv_table;
+select * from csv_table ORDER BY column1;
----
1
2
@@ -190,7 +210,7 @@ STORED AS json
LOCATION 'test_files/scratch/repartition_scan/json_table/';
query I
-select * from "json_table";
+select * from "json_table" ORDER BY column1;
----
1
2
diff --git a/datafusion/sqllogictest/test_files/timestamps.slt
b/datafusion/sqllogictest/test_files/timestamps.slt
index c84e46c965..8b0f50cedf 100644
--- a/datafusion/sqllogictest/test_files/timestamps.slt
+++ b/datafusion/sqllogictest/test_files/timestamps.slt
@@ -1862,7 +1862,7 @@ SELECT to_timestamp(null) is null as c1,
----
true true true true true true true true true true true true true
-# verify timestamp output types
+# verify timestamp output types
query TTT
SELECT arrow_typeof(to_timestamp(1)), arrow_typeof(to_timestamp(null)),
arrow_typeof(to_timestamp('2023-01-10 12:34:56.000'))
----
@@ -1880,7 +1880,7 @@ SELECT arrow_typeof(to_timestamp(1)) =
arrow_typeof(1::timestamp) as c1,
true true true true true true
# known issues. currently overflows (expects default precision to be
microsecond instead of nanoseconds. Work pending)
-#verify extreme values
+#verify extreme values
#query PPPPPPPP
#SELECT to_timestamp(-62125747200), to_timestamp(1926632005177),
-62125747200::timestamp, 1926632005177::timestamp, cast(-62125747200 as
timestamp), cast(1926632005177 as timestamp)
#----
diff --git a/datafusion/sqllogictest/test_files/tpch/q2.slt.part
b/datafusion/sqllogictest/test_files/tpch/q2.slt.part
index ed439348d2..ed950db190 100644
--- a/datafusion/sqllogictest/test_files/tpch/q2.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/q2.slt.part
@@ -238,7 +238,7 @@ order by
p_partkey
limit 10;
----
-9828.21 Supplier#000000647 UNITED KINGDOM 13120 Manufacturer#5 x5U7MBZmwfG9
33-258-202-4782 s the slyly even ideas poach fluffily
+9828.21 Supplier#000000647 UNITED KINGDOM 13120 Manufacturer#5 x5U7MBZmwfG9
33-258-202-4782 s the slyly even ideas poach fluffily
9508.37 Supplier#000000070 FRANCE 3563 Manufacturer#1
INWNH2w,OOWgNDq0BRCcBwOMQc6PdFDc4 16-821-608-1166 ests sleep quickly express
ideas. ironic ideas haggle about the final T
9508.37 Supplier#000000070 FRANCE 17268 Manufacturer#4
INWNH2w,OOWgNDq0BRCcBwOMQc6PdFDc4 16-821-608-1166 ests sleep quickly express
ideas. ironic ideas haggle about the final T
9453.01 Supplier#000000802 ROMANIA 10021 Manufacturer#5
,6HYXb4uaHITmtMBj4Ak57Pd 29-342-882-6463 gular frets. permanently special
multipliers believe blithely alongs
diff --git a/datafusion/sqllogictest/test_files/window.slt
b/datafusion/sqllogictest/test_files/window.slt
index 7d6d592013..100c214383 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -3794,7 +3794,7 @@ select a,
1 1
2 1
-# support scalar value in ORDER BY
+# support scalar value in ORDER BY
query I
select rank() over (order by 1) rnk from (select 1 a union all select 2 a) x
----