This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 29f23ebffb Move `repartition_file_scans` out of  `enable_round_robin` 
check in `EnforceDistribution` rule (#8731)
29f23ebffb is described below

commit 29f23ebffb8a95f6663d23a3a2a0fb8e87765f7b
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Fri Jan 5 12:38:56 2024 -0800

    Move `repartition_file_scans` out of  `enable_round_robin` check in 
`EnforceDistribution` rule (#8731)
    
    * Cleanup
    
    * More
    
    * Restore add_roundrobin_on_top
    
    * Restore test files
    
    * More
    
    * Restore
    
    * More
    
    * More
    
    * Make test stable
    
    * For review
    
    * Add test
---
 .../src/physical_optimizer/enforce_distribution.rs | 42 ++++++++++------------
 .../sqllogictest/test_files/arrow_typeof.slt       |  2 +-
 .../sqllogictest/test_files/repartition_scan.slt   | 24 +++++++++++--
 datafusion/sqllogictest/test_files/timestamps.slt  |  4 +--
 .../sqllogictest/test_files/tpch/q2.slt.part       |  2 +-
 datafusion/sqllogictest/test_files/window.slt      |  2 +-
 6 files changed, 45 insertions(+), 31 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 1c86c4c320..bf3f9ef0f3 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -1197,32 +1197,33 @@ fn ensure_distribution(
     )
     .map(
         |(mut child, requirement, required_input_ordering, would_benefit, 
maintains)| {
-            // Don't need to apply when the returned row count is not greater 
than 1:
+            // Don't need to apply when the returned row count is not greater 
than batch size
             let num_rows = child.plan.statistics()?.num_rows;
             let repartition_beneficial_stats = if 
num_rows.is_exact().unwrap_or(false) {
                 num_rows
                     .get_value()
                     .map(|value| value > &batch_size)
-                    .unwrap_or(true)
+                    .unwrap() // safe to unwrap since is_exact() is true
             } else {
                 true
             };
 
+            // When `repartition_file_scans` is set, attempt to increase
+            // parallelism at the source.
+            if repartition_file_scans && repartition_beneficial_stats {
+                if let Some(new_child) =
+                    child.plan.repartitioned(target_partitions, config)?
+                {
+                    child.plan = new_child;
+                }
+            }
+
             if enable_round_robin
                 // Operator benefits from partitioning (e.g. filter):
                 && (would_benefit && repartition_beneficial_stats)
                 // Unless partitioning doesn't increase the partition count, 
it is not beneficial:
                 && child.plan.output_partitioning().partition_count() < 
target_partitions
             {
-                // When `repartition_file_scans` is set, attempt to increase
-                // parallelism at the source.
-                if repartition_file_scans {
-                    if let Some(new_child) =
-                        child.plan.repartitioned(target_partitions, config)?
-                    {
-                        child.plan = new_child;
-                    }
-                }
                 // Increase parallelism by adding round-robin repartitioning
                 // on top of the operator. Note that we only do this if the
                 // partition count is not already equal to the desired 
partition
@@ -1361,17 +1362,10 @@ impl DistributionContext {
 
     fn update_children(mut self) -> Result<Self> {
         for child_context in self.children_nodes.iter_mut() {
-            child_context.distribution_connection = match 
child_context.plan.as_any() {
-                plan_any if plan_any.is::<RepartitionExec>() => matches!(
-                    plan_any
-                        .downcast_ref::<RepartitionExec>()
-                        .unwrap()
-                        .partitioning(),
-                    Partitioning::RoundRobinBatch(_) | Partitioning::Hash(_, _)
-                ),
-                plan_any
-                    if plan_any.is::<SortPreservingMergeExec>()
-                        || plan_any.is::<CoalescePartitionsExec>() =>
+            child_context.distribution_connection = match &child_context.plan {
+                plan if is_repartition(plan)
+                    || is_coalesce_partitions(plan)
+                    || is_sort_preserving_merge(plan) =>
                 {
                     true
                 }
@@ -3870,14 +3864,14 @@ pub(crate) mod tests {
             "RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
             "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
             // Plan already has two partitions
-            "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, 
b, c, d, e]",
+            "ParquetExec: file_groups={2 groups: [[x:0..100], [y:0..100]]}, 
projection=[a, b, c, d, e]",
         ];
         let expected_csv = [
             "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
             "RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
             "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
             // Plan already has two partitions
-            "CsvExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, 
d, e], has_header=false",
+            "CsvExec: file_groups={2 groups: [[x:0..100], [y:0..100]]}, 
projection=[a, b, c, d, e], has_header=false",
         ];
 
         assert_optimized!(expected_parquet, plan_parquet, true, false, 2, 
true, 10);
diff --git a/datafusion/sqllogictest/test_files/arrow_typeof.slt 
b/datafusion/sqllogictest/test_files/arrow_typeof.slt
index 3fad4d0f61..6a623e6c92 100644
--- a/datafusion/sqllogictest/test_files/arrow_typeof.slt
+++ b/datafusion/sqllogictest/test_files/arrow_typeof.slt
@@ -375,4 +375,4 @@ select arrow_cast(make_array(1, 2, 3), 'LargeList(Int64)');
 query T
 select arrow_typeof(arrow_cast(make_array(1, 2, 3), 'LargeList(Int64)'));
 ----
-LargeList(Field { name: "item", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} })
\ No newline at end of file
+LargeList(Field { name: "item", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} })
diff --git a/datafusion/sqllogictest/test_files/repartition_scan.slt 
b/datafusion/sqllogictest/test_files/repartition_scan.slt
index 02eccd7c5d..9d4951c7ec 100644
--- a/datafusion/sqllogictest/test_files/repartition_scan.slt
+++ b/datafusion/sqllogictest/test_files/repartition_scan.slt
@@ -63,6 +63,26 @@ CoalesceBatchesExec: target_batch_size=8192
 --FilterExec: column1@0 != 42
 ----ParquetExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..101],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:101..202],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:202..303],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:303..403]]},
 project [...]
 
+# disable round robin repartitioning
+statement ok
+set datafusion.optimizer.enable_round_robin_repartition = false;
+
+## Expect to see the scan read the file as "4" groups with even sizes 
(offsets) again
+query TT
+EXPLAIN SELECT column1 FROM parquet_table WHERE column1 <> 42;
+----
+logical_plan
+Filter: parquet_table.column1 != Int32(42)
+--TableScan: parquet_table projection=[column1], 
partial_filters=[parquet_table.column1 != Int32(42)]
+physical_plan
+CoalesceBatchesExec: target_batch_size=8192
+--FilterExec: column1@0 != 42
+----ParquetExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..101],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:101..202],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:202..303],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:303..403]]},
 project [...]
+
+# enable round robin repartitioning again
+statement ok
+set datafusion.optimizer.enable_round_robin_repartition = true;
+
 # create a second parquet file
 statement ok
 COPY  (VALUES (100), (200)) TO 
'test_files/scratch/repartition_scan/parquet_table/1.parquet'
@@ -147,7 +167,7 @@ WITH HEADER ROW
 LOCATION 'test_files/scratch/repartition_scan/csv_table/';
 
 query I
-select * from csv_table;
+select * from csv_table ORDER BY column1;
 ----
 1
 2
@@ -190,7 +210,7 @@ STORED AS json
 LOCATION 'test_files/scratch/repartition_scan/json_table/';
 
 query I
-select * from "json_table";
+select * from "json_table" ORDER BY column1;
 ----
 1
 2
diff --git a/datafusion/sqllogictest/test_files/timestamps.slt 
b/datafusion/sqllogictest/test_files/timestamps.slt
index c84e46c965..8b0f50cedf 100644
--- a/datafusion/sqllogictest/test_files/timestamps.slt
+++ b/datafusion/sqllogictest/test_files/timestamps.slt
@@ -1862,7 +1862,7 @@ SELECT to_timestamp(null) is null as c1,
 ----
 true true true true true true true true true true true true true
 
-# verify timestamp output types 
+# verify timestamp output types
 query TTT
 SELECT arrow_typeof(to_timestamp(1)), arrow_typeof(to_timestamp(null)), 
arrow_typeof(to_timestamp('2023-01-10 12:34:56.000'))
 ----
@@ -1880,7 +1880,7 @@ SELECT arrow_typeof(to_timestamp(1)) = 
arrow_typeof(1::timestamp) as c1,
 true true true true true true
 
 # known issues. currently overflows (expects default precision to be 
microsecond instead of nanoseconds. Work pending)
-#verify extreme values 
+#verify extreme values
 #query PPPPPPPP
 #SELECT to_timestamp(-62125747200), to_timestamp(1926632005177), 
-62125747200::timestamp, 1926632005177::timestamp, cast(-62125747200 as 
timestamp), cast(1926632005177 as timestamp)
 #----
diff --git a/datafusion/sqllogictest/test_files/tpch/q2.slt.part 
b/datafusion/sqllogictest/test_files/tpch/q2.slt.part
index ed439348d2..ed950db190 100644
--- a/datafusion/sqllogictest/test_files/tpch/q2.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/q2.slt.part
@@ -238,7 +238,7 @@ order by
     p_partkey
 limit 10;
 ----
-9828.21 Supplier#000000647 UNITED KINGDOM 13120 Manufacturer#5 x5U7MBZmwfG9 
33-258-202-4782 s the slyly even ideas poach fluffily
+9828.21 Supplier#000000647 UNITED KINGDOM 13120 Manufacturer#5 x5U7MBZmwfG9 
33-258-202-4782 s the slyly even ideas poach fluffily 
 9508.37 Supplier#000000070 FRANCE 3563 Manufacturer#1 
INWNH2w,OOWgNDq0BRCcBwOMQc6PdFDc4 16-821-608-1166 ests sleep quickly express 
ideas. ironic ideas haggle about the final T
 9508.37 Supplier#000000070 FRANCE 17268 Manufacturer#4 
INWNH2w,OOWgNDq0BRCcBwOMQc6PdFDc4 16-821-608-1166 ests sleep quickly express 
ideas. ironic ideas haggle about the final T
 9453.01 Supplier#000000802 ROMANIA 10021 Manufacturer#5 
,6HYXb4uaHITmtMBj4Ak57Pd 29-342-882-6463 gular frets. permanently special 
multipliers believe blithely alongs
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index 7d6d592013..100c214383 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -3794,7 +3794,7 @@ select a,
 1 1
 2 1
 
-# support scalar value in ORDER BY 
+# support scalar value in ORDER BY
 query I
 select rank() over (order by 1) rnk from (select 1 a union all select 2 a) x
 ----

Reply via email to