This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new b914409f86 [Bug Fix]: Deem hash repartition unnecessary when input and 
output has 1 partition (#10095)
b914409f86 is described below

commit b914409f862caf3f28700bc4df3c17258f7abede
Author: Mustafa Akur <[email protected]>
AuthorDate: Tue Apr 16 16:33:14 2024 +0300

    [Bug Fix]: Deem hash repartition unnecessary when input and output has 1 
partition (#10095)
    
    * Add input partition number check
    
    * Minor changes
---
 .../src/physical_optimizer/enforce_distribution.rs |   3 +-
 datafusion/sqllogictest/test_files/joins.slt       | 101 +++++++++++++++++++++
 2 files changed, 103 insertions(+), 1 deletion(-)

diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 145f08af76..c9c54a46bd 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -875,7 +875,8 @@ fn add_hash_on_top(
     n_target: usize,
 ) -> Result<DistributionContext> {
     // Early return if hash repartition is unnecessary
-    if n_target == 1 {
+    // `RepartitionExec: partitioning=Hash([...], 1), input_partitions=1` is 
unnecessary.
+    if n_target == 1 && input.plan.output_partitioning().partition_count() == 
1 {
         return Ok(input);
     }
 
diff --git a/datafusion/sqllogictest/test_files/joins.slt 
b/datafusion/sqllogictest/test_files/joins.slt
index 04d6e46caf..0fe73b2c95 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -3587,3 +3587,104 @@ SELECT 1 FROM join_partitioned_table JOIN (SELECT c1 AS 
id1 FROM join_partitione
 1
 1
 1
+
+
+statement ok
+set datafusion.explain.logical_plan_only = false;
+
+query TT
+EXPLAIN SELECT * FROM (
+    SELECT 1 as c, 2 as d
+    UNION ALL
+    SELECT 1 as c, 3 AS d
+) as a FULL JOIN (SELECT 1 as e, 3 AS f) AS rhs ON a.c=rhs.e;
+----
+logical_plan
+01)Projection: a.c, a.d, rhs.e, rhs.f
+02)--Full Join: a.c = rhs.e
+03)----SubqueryAlias: a
+04)------Union
+05)--------Projection: Int64(1) AS c, Int64(2) AS d
+06)----------EmptyRelation
+07)--------Projection: Int64(1) AS c, Int64(3) AS d
+08)----------EmptyRelation
+09)----SubqueryAlias: rhs
+10)------Projection: Int64(1) AS e, Int64(3) AS f
+11)--------EmptyRelation
+physical_plan
+01)ProjectionExec: expr=[c@2 as c, d@3 as d, e@0 as e, f@1 as f]
+02)--CoalesceBatchesExec: target_batch_size=2
+03)----HashJoinExec: mode=Partitioned, join_type=Full, on=[(e@0, c@0)]
+04)------CoalesceBatchesExec: target_batch_size=2
+05)--------RepartitionExec: partitioning=Hash([e@0], 2), input_partitions=1
+06)----------ProjectionExec: expr=[1 as e, 3 as f]
+07)------------PlaceholderRowExec
+08)------CoalesceBatchesExec: target_batch_size=2
+09)--------RepartitionExec: partitioning=Hash([c@0], 2), input_partitions=2
+10)----------UnionExec
+11)------------ProjectionExec: expr=[1 as c, 2 as d]
+12)--------------PlaceholderRowExec
+13)------------ProjectionExec: expr=[1 as c, 3 as d]
+14)--------------PlaceholderRowExec
+
+query IIII
+SELECT * FROM (
+    SELECT 1 as c, 2 as d
+    UNION ALL
+    SELECT 1 as c, 3 AS d
+) as a FULL JOIN (SELECT 1 as e, 3 AS f) AS rhs ON a.c=rhs.e;
+----
+1 2 1 3
+1 3 1 3
+
+statement ok
+set datafusion.execution.target_partitions = 1;
+
+query TT
+EXPLAIN SELECT * FROM (
+    SELECT 1 as c, 2 as d
+    UNION ALL
+    SELECT 1 as c, 3 AS d
+) as a FULL JOIN (SELECT 1 as e, 3 AS f) AS rhs ON a.c=rhs.e;
+----
+logical_plan
+01)Projection: a.c, a.d, rhs.e, rhs.f
+02)--Full Join: a.c = rhs.e
+03)----SubqueryAlias: a
+04)------Union
+05)--------Projection: Int64(1) AS c, Int64(2) AS d
+06)----------EmptyRelation
+07)--------Projection: Int64(1) AS c, Int64(3) AS d
+08)----------EmptyRelation
+09)----SubqueryAlias: rhs
+10)------Projection: Int64(1) AS e, Int64(3) AS f
+11)--------EmptyRelation
+physical_plan
+01)ProjectionExec: expr=[c@2 as c, d@3 as d, e@0 as e, f@1 as f]
+02)--CoalesceBatchesExec: target_batch_size=2
+03)----HashJoinExec: mode=Partitioned, join_type=Full, on=[(e@0, c@0)]
+04)------ProjectionExec: expr=[1 as e, 3 as f]
+05)--------PlaceholderRowExec
+06)------CoalesceBatchesExec: target_batch_size=2
+07)--------RepartitionExec: partitioning=Hash([c@0], 1), input_partitions=2
+08)----------UnionExec
+09)------------ProjectionExec: expr=[1 as c, 2 as d]
+10)--------------PlaceholderRowExec
+11)------------ProjectionExec: expr=[1 as c, 3 as d]
+12)--------------PlaceholderRowExec
+
+query IIII
+SELECT * FROM (
+    SELECT 1 as c, 2 as d
+    UNION ALL
+    SELECT 1 as c, 3 AS d
+) as a FULL JOIN (SELECT 1 as e, 3 AS f) AS rhs ON a.c=rhs.e;
+----
+1 2 1 3
+1 3 1 3
+
+statement ok
+set datafusion.explain.logical_plan_only = true;
+
+statement ok
+set datafusion.execution.target_partitions = 2;

Reply via email to