This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 42a101e9e8 Always use `PartitionMode::Auto` in planner (#15339)
42a101e9e8 is described below

commit 42a101e9e89adf3f13450cfda3dc615f023ebcfb
Author: Daniël Heres <[email protected]>
AuthorDate: Wed Mar 26 10:17:33 2025 +0100

    Always use `PartitionMode::Auto` in planner (#15339)
    
    * Always use PartitionMode::Auto
    
    * Always use PartitionMode::Auto
    
    * Fix test
    
    * Fix test
    
    * Fix tests
    
    * Update plans
    
    * Add changed splans
---
 datafusion/core/src/physical_planner.rs            |   9 +-
 datafusion/core/tests/dataframe/mod.rs             |  32 +--
 datafusion/core/tests/memory_limit/mod.rs          |   6 +-
 .../physical-optimizer/src/join_selection.rs       |   3 +-
 .../sqllogictest/test_files/explain_tree.slt       | 117 +++++----
 datafusion/sqllogictest/test_files/group_by.slt    |   2 +-
 datafusion/sqllogictest/test_files/join.slt.part   |  35 ++-
 datafusion/sqllogictest/test_files/joins.slt       | 281 ++++++++-------------
 datafusion/sqllogictest/test_files/predicates.slt  |  33 ++-
 datafusion/sqllogictest/test_files/subquery.slt    | 109 ++++----
 .../test_files/tpch/plans/q15.slt.part             |  56 ++--
 datafusion/sqllogictest/test_files/union.slt       |  98 ++++---
 12 files changed, 343 insertions(+), 438 deletions(-)

diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index 9a089796bc..f1a99a7714 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -1135,13 +1135,6 @@ impl DefaultPhysicalPlanner {
                     && session_state.config().repartition_joins()
                     && prefer_hash_join
                 {
-                    let partition_mode = {
-                        if session_state.config().collect_statistics() {
-                            PartitionMode::Auto
-                        } else {
-                            PartitionMode::Partitioned
-                        }
-                    };
                     Arc::new(HashJoinExec::try_new(
                         physical_left,
                         physical_right,
@@ -1149,7 +1142,7 @@ impl DefaultPhysicalPlanner {
                         join_filter,
                         join_type,
                         None,
-                        partition_mode,
+                        PartitionMode::Auto,
                         null_equals_null,
                     )?)
                 } else {
diff --git a/datafusion/core/tests/dataframe/mod.rs 
b/datafusion/core/tests/dataframe/mod.rs
index b19c0b9786..b5923269ab 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -2574,7 +2574,7 @@ async fn test_count_wildcard_on_where_in() -> Result<()> {
 
     assert_snapshot!(
         pretty_format_batches(&sql_results).unwrap(),
-        @r###"
+        @r"
     
+---------------+------------------------------------------------------------------------------------------------------------------------+
     | plan_type     | plan                                                     
                                                              |
     
+---------------+------------------------------------------------------------------------------------------------------------------------+
@@ -2585,14 +2585,14 @@ async fn test_count_wildcard_on_where_in() -> 
Result<()> {
     |               |       Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]  
                                                              |
     |               |         TableScan: t2 projection=[]                      
                                                              |
     | physical_plan | CoalesceBatchesExec: target_batch_size=8192              
                                                              |
-    |               |   HashJoinExec: mode=Partitioned, join_type=RightSemi, 
on=[(count(*)@0, CAST(t1.a AS Int64)@2)], projection=[a@0, b@1] |
+    |               |   HashJoinExec: mode=CollectLeft, join_type=RightSemi, 
on=[(count(*)@0, CAST(t1.a AS Int64)@2)], projection=[a@0, b@1] |
     |               |     ProjectionExec: expr=[4 as count(*)]                 
                                                              |
     |               |       PlaceholderRowExec                                 
                                                              |
     |               |     ProjectionExec: expr=[a@0 as a, b@1 as b, CAST(a@0 
AS Int64) as CAST(t1.a AS Int64)]                               |
     |               |       DataSourceExec: partitions=1, partition_sizes=[1]  
                                                              |
     |               |                                                          
                                                              |
     
+---------------+------------------------------------------------------------------------------------------------------------------------+
-    "###
+    "
     );
 
     // In the same SessionContext, AliasGenerator will increase subquery_alias 
id by 1
@@ -2620,7 +2620,7 @@ async fn test_count_wildcard_on_where_in() -> Result<()> {
     // make sure sql plan same with df plan
     assert_snapshot!(
         pretty_format_batches(&df_results).unwrap(),
-        @r###"
+        @r"
     
+---------------+------------------------------------------------------------------------------------------------------------------------+
     | plan_type     | plan                                                     
                                                              |
     
+---------------+------------------------------------------------------------------------------------------------------------------------+
@@ -2630,14 +2630,14 @@ async fn test_count_wildcard_on_where_in() -> 
Result<()> {
     |               |     Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS 
count(*)]]                                                      |
     |               |       TableScan: t2 projection=[]                        
                                                              |
     | physical_plan | CoalesceBatchesExec: target_batch_size=8192              
                                                              |
-    |               |   HashJoinExec: mode=Partitioned, join_type=RightSemi, 
on=[(count(*)@0, CAST(t1.a AS Int64)@2)], projection=[a@0, b@1] |
+    |               |   HashJoinExec: mode=CollectLeft, join_type=RightSemi, 
on=[(count(*)@0, CAST(t1.a AS Int64)@2)], projection=[a@0, b@1] |
     |               |     ProjectionExec: expr=[4 as count(*)]                 
                                                              |
     |               |       PlaceholderRowExec                                 
                                                              |
     |               |     ProjectionExec: expr=[a@0 as a, b@1 as b, CAST(a@0 
AS Int64) as CAST(t1.a AS Int64)]                               |
     |               |       DataSourceExec: partitions=1, partition_sizes=[1]  
                                                              |
     |               |                                                          
                                                              |
     
+---------------+------------------------------------------------------------------------------------------------------------------------+
-    "###
+    "
     );
 
     Ok(())
@@ -2851,7 +2851,7 @@ async fn test_count_wildcard_on_where_scalar_subquery() 
-> Result<()> {
 
     assert_snapshot!(
         pretty_format_batches(&sql_results).unwrap(),
-        @r###"
+        @r"
     
+---------------+---------------------------------------------------------------------------------------------------------------------------+
     | plan_type     | plan                                                     
                                                                 |
     
+---------------+---------------------------------------------------------------------------------------------------------------------------+
@@ -2867,10 +2867,8 @@ async fn test_count_wildcard_on_where_scalar_subquery() 
-> Result<()> {
     | physical_plan | CoalesceBatchesExec: target_batch_size=8192              
                                                                 |
     |               |   FilterExec: CASE WHEN __always_true@3 IS NULL THEN 0 
ELSE count(*)@2 END > 0, projection=[a@0, b@1]                     |
     |               |     CoalesceBatchesExec: target_batch_size=8192          
                                                                 |
-    |               |       HashJoinExec: mode=Partitioned, join_type=Left, 
on=[(a@0, a@1)], projection=[a@0, b@1, count(*)@2, __always_true@4] |
-    |               |         CoalesceBatchesExec: target_batch_size=8192      
                                                                 |
-    |               |           RepartitionExec: partitioning=Hash([a@0], 4), 
input_partitions=1                                                |
-    |               |             DataSourceExec: partitions=1, 
partition_sizes=[1]                                                             
|
+    |               |       HashJoinExec: mode=CollectLeft, join_type=Left, 
on=[(a@0, a@1)], projection=[a@0, b@1, count(*)@2, __always_true@4] |
+    |               |         DataSourceExec: partitions=1, 
partition_sizes=[1]                                                             
    |
     |               |         ProjectionExec: expr=[count(Int64(1))@1 as 
count(*), a@0 as a, true as __always_true]                             |
     |               |           AggregateExec: mode=FinalPartitioned, gby=[a@0 
as a], aggr=[count(Int64(1))]                                    |
     |               |             CoalesceBatchesExec: target_batch_size=8192  
                                                                 |
@@ -2880,7 +2878,7 @@ async fn test_count_wildcard_on_where_scalar_subquery() 
-> Result<()> {
     |               |                     DataSourceExec: partitions=1, 
partition_sizes=[1]                                                     |
     |               |                                                          
                                                                 |
     
+---------------+---------------------------------------------------------------------------------------------------------------------------+
-    "###
+    "
     );
 
     // In the same SessionContext, AliasGenerator will increase subquery_alias 
id by 1
@@ -2910,7 +2908,7 @@ async fn test_count_wildcard_on_where_scalar_subquery() 
-> Result<()> {
 
     assert_snapshot!(
         pretty_format_batches(&df_results).unwrap(),
-        @r###"
+        @r"
     
+---------------+---------------------------------------------------------------------------------------------------------------------------+
     | plan_type     | plan                                                     
                                                                 |
     
+---------------+---------------------------------------------------------------------------------------------------------------------------+
@@ -2926,10 +2924,8 @@ async fn test_count_wildcard_on_where_scalar_subquery() 
-> Result<()> {
     | physical_plan | CoalesceBatchesExec: target_batch_size=8192              
                                                                 |
     |               |   FilterExec: CASE WHEN __always_true@3 IS NULL THEN 0 
ELSE count(*)@2 END > 0, projection=[a@0, b@1]                     |
     |               |     CoalesceBatchesExec: target_batch_size=8192          
                                                                 |
-    |               |       HashJoinExec: mode=Partitioned, join_type=Left, 
on=[(a@0, a@1)], projection=[a@0, b@1, count(*)@2, __always_true@4] |
-    |               |         CoalesceBatchesExec: target_batch_size=8192      
                                                                 |
-    |               |           RepartitionExec: partitioning=Hash([a@0], 4), 
input_partitions=1                                                |
-    |               |             DataSourceExec: partitions=1, 
partition_sizes=[1]                                                             
|
+    |               |       HashJoinExec: mode=CollectLeft, join_type=Left, 
on=[(a@0, a@1)], projection=[a@0, b@1, count(*)@2, __always_true@4] |
+    |               |         DataSourceExec: partitions=1, 
partition_sizes=[1]                                                             
    |
     |               |         ProjectionExec: expr=[count(*)@1 as count(*), 
a@0 as a, true as __always_true]                                    |
     |               |           AggregateExec: mode=FinalPartitioned, gby=[a@0 
as a], aggr=[count(*)]                                           |
     |               |             CoalesceBatchesExec: target_batch_size=8192  
                                                                 |
@@ -2939,7 +2935,7 @@ async fn test_count_wildcard_on_where_scalar_subquery() 
-> Result<()> {
     |               |                     DataSourceExec: partitions=1, 
partition_sizes=[1]                                                     |
     |               |                                                          
                                                                 |
     
+---------------+---------------------------------------------------------------------------------------------------------------------------+
-    "###
+    "
     );
 
     Ok(())
diff --git a/datafusion/core/tests/memory_limit/mod.rs 
b/datafusion/core/tests/memory_limit/mod.rs
index 8f690edc54..6a0a797d4d 100644
--- a/datafusion/core/tests/memory_limit/mod.rs
+++ b/datafusion/core/tests/memory_limit/mod.rs
@@ -119,7 +119,7 @@ async fn join_by_key_multiple_partitions() {
     TestCase::new()
         .with_query("select t1.* from t t1 JOIN t t2 ON t1.service = 
t2.service")
         .with_expected_errors(vec![
-            "Resources exhausted: Additional allocation failed with top memory 
consumers (across reservations) as: HashJoinInput[0]",
+            "Resources exhausted: Additional allocation failed with top memory 
consumers (across reservations) as: HashJoinInput",
         ])
         .with_memory_limit(1_000)
         .with_config(config)
@@ -156,7 +156,7 @@ async fn join_by_expression() {
 #[tokio::test]
 async fn cross_join() {
     TestCase::new()
-        .with_query("select t1.* from t t1 CROSS JOIN t t2")
+        .with_query("select t1.*, t2.* from t t1 CROSS JOIN t t2")
         .with_expected_errors(vec![
             "Resources exhausted: Additional allocation failed with top memory 
consumers (across reservations) as: CrossJoinExec",
         ])
@@ -777,7 +777,7 @@ impl Scenario {
                 // Disabling physical optimizer rules to avoid sorts /
                 // repartitions (since RepartitionExec / SortExec also
                 // has a memory budget which we'll likely hit first)
-                Some(vec![])
+                Some(vec![Arc::new(JoinSelection::new())])
             }
             Self::AccessLogStreaming => {
                 // Disable all physical optimizer rules except the
diff --git a/datafusion/physical-optimizer/src/join_selection.rs 
b/datafusion/physical-optimizer/src/join_selection.rs
index 03bfb69788..5a772ccdd2 100644
--- a/datafusion/physical-optimizer/src/join_selection.rs
+++ b/datafusion/physical-optimizer/src/join_selection.rs
@@ -571,7 +571,8 @@ pub(crate) fn swap_join_according_to_unboundedness(
             hash_join.swap_inputs(PartitionMode::CollectLeft)
         }
         (PartitionMode::Auto, _) => {
-            internal_err!("Auto is not acceptable for unbounded input here.")
+            // Use `PartitionMode::Partitioned` as default if `Auto` is 
selected.
+            hash_join.swap_inputs(PartitionMode::Partitioned)
         }
     }
 }
diff --git a/datafusion/sqllogictest/test_files/explain_tree.slt 
b/datafusion/sqllogictest/test_files/explain_tree.slt
index aaea05be76..7a0e322eb8 100644
--- a/datafusion/sqllogictest/test_files/explain_tree.slt
+++ b/datafusion/sqllogictest/test_files/explain_tree.slt
@@ -345,63 +345,68 @@ FROM
 ----
 physical_plan
 01)┌───────────────────────────┐
-02)│    CoalesceBatchesExec    │
+02)│       ProjectionExec      │
 03)│    --------------------   │
-04)│     target_batch_size:    │
-05)│            8192           │
-06)└─────────────┬─────────────┘
-07)┌─────────────┴─────────────┐
-08)│        HashJoinExec       │
-09)│    --------------------   │
-10)│            on:            ├───────────────────────────────────────────┐
-11)│    (int_col = int_col)    │                                           │
-12)└─────────────┬─────────────┘                                           │
-13)┌─────────────┴─────────────┐                             
┌─────────────┴─────────────┐
-14)│    CoalesceBatchesExec    │                             │    
CoalesceBatchesExec    │
-15)│    --------------------   │                             │    
--------------------   │
-16)│     target_batch_size:    │                             │     
target_batch_size:    │
-17)│            8192           │                             │            8192 
          │
-18)└─────────────┬─────────────┘                             
└─────────────┬─────────────┘
-19)┌─────────────┴─────────────┐                             
┌─────────────┴─────────────┐
-20)│        HashJoinExec       │                             │      
RepartitionExec      │
-21)│    --------------------   │                             │    
--------------------   │
-22)│            on:            │                             │  
output_partition_count:  │
-23)│    (int_col = int_col)    ├──────────────┐              │             1   
          │
-24)│                           │              │              │                 
          │
-25)│                           │              │              │    
partitioning_scheme:   │
-26)│                           │              │              │    
Hash([int_col@0], 4)   │
-27)└─────────────┬─────────────┘              │              
└─────────────┬─────────────┘
-28)┌─────────────┴─────────────┐┌─────────────┴─────────────┐┌─────────────┴─────────────┐
-29)│    CoalesceBatchesExec    ││    CoalesceBatchesExec    ││       
DataSourceExec      │
-30)│    --------------------   ││    --------------------   ││    
--------------------   │
-31)│     target_batch_size:    ││     target_batch_size:    ││        bytes: 
1560        │
-32)│            8192           ││            8192           ││       format: 
memory      │
-33)│                           ││                           ││          rows: 
1          │
-34)└─────────────┬─────────────┘└─────────────┬─────────────┘└───────────────────────────┘
-35)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
-36)│      RepartitionExec      ││      RepartitionExec      │
-37)│    --------------------   ││    --------------------   │
-38)│  output_partition_count:  ││  output_partition_count:  │
-39)│             4             ││             4             │
-40)│                           ││                           │
-41)│    partitioning_scheme:   ││    partitioning_scheme:   │
-42)│    Hash([int_col@0], 4)   ││    Hash([int_col@0], 4)   │
-43)└─────────────┬─────────────┘└─────────────┬─────────────┘
-44)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
-45)│      RepartitionExec      ││      RepartitionExec      │
-46)│    --------------------   ││    --------------------   │
-47)│  output_partition_count:  ││  output_partition_count:  │
-48)│             1             ││             1             │
-49)│                           ││                           │
-50)│    partitioning_scheme:   ││    partitioning_scheme:   │
-51)│     RoundRobinBatch(4)    ││     RoundRobinBatch(4)    │
-52)└─────────────┬─────────────┘└─────────────┬─────────────┘
-53)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
-54)│       DataSourceExec      ││       DataSourceExec      │
-55)│    --------------------   ││    --------------------   │
-56)│          files: 1         ││          files: 1         │
-57)│        format: csv        ││      format: parquet      │
-58)└───────────────────────────┘└───────────────────────────┘
+04)│     date_col: date_col    │
+05)│                           │
+06)│        string_col:        │
+07)│         string_col        │
+08)└─────────────┬─────────────┘
+09)┌─────────────┴─────────────┐
+10)│    CoalesceBatchesExec    │
+11)│    --------------------   │
+12)│     target_batch_size:    │
+13)│            8192           │
+14)└─────────────┬─────────────┘
+15)┌─────────────┴─────────────┐
+16)│        HashJoinExec       │
+17)│    --------------------   │
+18)│            on:            ├──────────────┐
+19)│    (int_col = int_col)    │              │
+20)└─────────────┬─────────────┘              │
+21)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
+22)│       DataSourceExec      ││    CoalesceBatchesExec    │
+23)│    --------------------   ││    --------------------   │
+24)│        bytes: 1560        ││     target_batch_size:    │
+25)│       format: memory      ││            8192           │
+26)│          rows: 1          ││                           │
+27)└───────────────────────────┘└─────────────┬─────────────┘
+28)-----------------------------┌─────────────┴─────────────┐
+29)-----------------------------│        HashJoinExec       │
+30)-----------------------------│    --------------------   │
+31)-----------------------------│            on:            ├──────────────┐
+32)-----------------------------│    (int_col = int_col)    │              │
+33)-----------------------------└─────────────┬─────────────┘              │
+34)-----------------------------┌─────────────┴─────────────┐┌─────────────┴─────────────┐
+35)-----------------------------│    CoalesceBatchesExec    ││    
CoalesceBatchesExec    │
+36)-----------------------------│    --------------------   ││    
--------------------   │
+37)-----------------------------│     target_batch_size:    ││     
target_batch_size:    │
+38)-----------------------------│            8192           ││            8192 
          │
+39)-----------------------------└─────────────┬─────────────┘└─────────────┬─────────────┘
+40)-----------------------------┌─────────────┴─────────────┐┌─────────────┴─────────────┐
+41)-----------------------------│      RepartitionExec      ││      
RepartitionExec      │
+42)-----------------------------│    --------------------   ││    
--------------------   │
+43)-----------------------------│  output_partition_count:  ││  
output_partition_count:  │
+44)-----------------------------│             4             ││             4   
          │
+45)-----------------------------│                           ││                 
          │
+46)-----------------------------│    partitioning_scheme:   ││    
partitioning_scheme:   │
+47)-----------------------------│    Hash([int_col@0], 4)   ││    
Hash([int_col@0], 4)   │
+48)-----------------------------└─────────────┬─────────────┘└─────────────┬─────────────┘
+49)-----------------------------┌─────────────┴─────────────┐┌─────────────┴─────────────┐
+50)-----------------------------│      RepartitionExec      ││      
RepartitionExec      │
+51)-----------------------------│    --------------------   ││    
--------------------   │
+52)-----------------------------│  output_partition_count:  ││  
output_partition_count:  │
+53)-----------------------------│             1             ││             1   
          │
+54)-----------------------------│                           ││                 
          │
+55)-----------------------------│    partitioning_scheme:   ││    
partitioning_scheme:   │
+56)-----------------------------│     RoundRobinBatch(4)    ││     
RoundRobinBatch(4)    │
+57)-----------------------------└─────────────┬─────────────┘└─────────────┬─────────────┘
+58)-----------------------------┌─────────────┴─────────────┐┌─────────────┴─────────────┐
+59)-----------------------------│       DataSourceExec      ││       
DataSourceExec      │
+60)-----------------------------│    --------------------   ││    
--------------------   │
+61)-----------------------------│          files: 1         ││          files: 
1         │
+62)-----------------------------│        format: csv        ││      format: 
parquet      │
+63)-----------------------------└───────────────────────────┘└───────────────────────────┘
 
 # Long Filter (demonstrate what happens with wrapping)
 query TT
diff --git a/datafusion/sqllogictest/test_files/group_by.slt 
b/datafusion/sqllogictest/test_files/group_by.slt
index d9ef12496e..4c4999a364 100644
--- a/datafusion/sqllogictest/test_files/group_by.slt
+++ b/datafusion/sqllogictest/test_files/group_by.slt
@@ -2023,7 +2023,7 @@ physical_plan
 08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
 09)----------------ProjectionExec: expr=[col0@2 as col0, col1@3 as col1, 
col2@4 as col2, col0@0 as col0, col1@1 as col1]
 10)------------------CoalesceBatchesExec: target_batch_size=8192
-11)--------------------HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(col0@0, col0@0)]
+11)--------------------HashJoinExec: mode=CollectLeft, join_type=Inner, 
on=[(col0@0, col0@0)]
 12)----------------------DataSourceExec: partitions=1, partition_sizes=[3]
 13)----------------------DataSourceExec: partitions=1, partition_sizes=[3]
 
diff --git a/datafusion/sqllogictest/test_files/join.slt.part 
b/datafusion/sqllogictest/test_files/join.slt.part
index 2bbf31dad9..972dd22653 100644
--- a/datafusion/sqllogictest/test_files/join.slt.part
+++ b/datafusion/sqllogictest/test_files/join.slt.part
@@ -1373,26 +1373,21 @@ logical_plan
 06)----TableScan: f projection=[a]
 07)--TableScan: s projection=[b]
 physical_plan
-01)CoalesceBatchesExec: target_batch_size=8192
-02)--HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col1@1, CAST(s.b AS 
Int64)@1)], projection=[col0@0, col1@1, a@2, b@3]
-03)----ProjectionExec: expr=[col0@1 as col0, col1@2 as col1, a@0 as a]
-04)------CoalesceBatchesExec: target_batch_size=8192
-05)--------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(CAST(f.a AS 
Int64)@1, col0@0)], projection=[a@0, col0@2, col1@3]
-06)----------CoalesceBatchesExec: target_batch_size=8192
-07)------------RepartitionExec: partitioning=Hash([CAST(f.a AS Int64)@1], 16), 
input_partitions=1
-08)--------------ProjectionExec: expr=[a@0 as a, CAST(a@0 AS Int64) as 
CAST(f.a AS Int64)]
-09)----------------DataSourceExec: partitions=1, partition_sizes=[1]
-10)----------CoalesceBatchesExec: target_batch_size=8192
-11)------------RepartitionExec: partitioning=Hash([col0@0], 16), 
input_partitions=16
-12)--------------ProjectionExec: expr=[CAST(x@0 AS Int64) + 1 as col0, 
CAST(y@1 AS Int64) + 1 as col1]
-13)----------------RepartitionExec: partitioning=RoundRobinBatch(16), 
input_partitions=1
-14)------------------CoalesceBatchesExec: target_batch_size=8192
-15)--------------------FilterExec: y@1 = x@0
-16)----------------------DataSourceExec: partitions=1, partition_sizes=[1]
-17)----CoalesceBatchesExec: target_batch_size=8192
-18)------RepartitionExec: partitioning=Hash([CAST(s.b AS Int64)@1], 16), 
input_partitions=1
-19)--------ProjectionExec: expr=[b@0 as b, CAST(b@0 AS Int64) as CAST(s.b AS 
Int64)]
-20)----------DataSourceExec: partitions=1, partition_sizes=[1]
+01)ProjectionExec: expr=[col0@1 as col0, col1@2 as col1, a@3 as a, b@0 as b]
+02)--CoalesceBatchesExec: target_batch_size=8192
+03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(CAST(s.b AS 
Int64)@1, col1@1)], projection=[b@0, col0@2, col1@3, a@4]
+04)------ProjectionExec: expr=[b@0 as b, CAST(b@0 AS Int64) as CAST(s.b AS 
Int64)]
+05)--------DataSourceExec: partitions=1, partition_sizes=[1]
+06)------ProjectionExec: expr=[col0@1 as col0, col1@2 as col1, a@0 as a]
+07)--------CoalesceBatchesExec: target_batch_size=8192
+08)----------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(CAST(f.a AS 
Int64)@1, col0@0)], projection=[a@0, col0@2, col1@3]
+09)------------ProjectionExec: expr=[a@0 as a, CAST(a@0 AS Int64) as CAST(f.a 
AS Int64)]
+10)--------------DataSourceExec: partitions=1, partition_sizes=[1]
+11)------------ProjectionExec: expr=[CAST(x@0 AS Int64) + 1 as col0, CAST(y@1 
AS Int64) + 1 as col1]
+12)--------------RepartitionExec: partitioning=RoundRobinBatch(16), 
input_partitions=1
+13)----------------CoalesceBatchesExec: target_batch_size=8192
+14)------------------FilterExec: y@1 = x@0
+15)--------------------DataSourceExec: partitions=1, partition_sizes=[1]
 
 statement ok
 drop table pairs;
diff --git a/datafusion/sqllogictest/test_files/joins.slt 
b/datafusion/sqllogictest/test_files/joins.slt
index 50af06dc40..ca86dbfcc3 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -1338,17 +1338,15 @@ logical_plan
 04)------TableScan: join_t1 projection=[t1_id]
 05)------TableScan: join_t2 projection=[t2_id]
 physical_plan
-01)AggregateExec: mode=SinglePartitioned, gby=[t1_id@0 as t1_id], aggr=[]
+01)AggregateExec: mode=FinalPartitioned, gby=[t1_id@0 as t1_id], aggr=[]
 02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0, 
t2_id@0)], projection=[t1_id@0]
-04)------CoalesceBatchesExec: target_batch_size=2
-05)--------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
-06)----------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
+03)----RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
+04)------AggregateExec: mode=Partial, gby=[t1_id@0 as t1_id], aggr=[]
+05)--------CoalesceBatchesExec: target_batch_size=2
+06)----------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t1_id@0, 
t2_id@0)], projection=[t1_id@0]
 07)------------DataSourceExec: partitions=1, partition_sizes=[1]
-08)------CoalesceBatchesExec: target_batch_size=2
-09)--------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
-10)----------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-11)------------DataSourceExec: partitions=1, partition_sizes=[1]
+08)------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
+09)--------------DataSourceExec: partitions=1, partition_sizes=[1]
 
 # Join on struct
 query TT
@@ -1362,15 +1360,10 @@ logical_plan
 03)--TableScan: join_t4 projection=[s4]
 physical_plan
 01)CoalesceBatchesExec: target_batch_size=2
-02)--HashJoinExec: mode=Partitioned, join_type=Inner, on=[(s3@0, s4@0)]
-03)----CoalesceBatchesExec: target_batch_size=2
-04)------RepartitionExec: partitioning=Hash([s3@0], 2), input_partitions=2
-05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-06)----------DataSourceExec: partitions=1, partition_sizes=[1]
-07)----CoalesceBatchesExec: target_batch_size=2
-08)------RepartitionExec: partitioning=Hash([s4@0], 2), input_partitions=2
-09)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-10)----------DataSourceExec: partitions=1, partition_sizes=[1]
+02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(s3@0, s4@0)]
+03)----DataSourceExec: partitions=1, partition_sizes=[1]
+04)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+05)------DataSourceExec: partitions=1, partition_sizes=[1]
 
 query ??
 select join_t3.s3, join_t4.s4
@@ -1403,17 +1396,15 @@ logical_plan
 06)--------TableScan: join_t2 projection=[t2_id]
 physical_plan
 01)ProjectionExec: expr=[count(Int64(1))@1 as count(*)]
-02)--AggregateExec: mode=SinglePartitioned, gby=[t1_id@0 as t1_id], 
aggr=[count(Int64(1))]
+02)--AggregateExec: mode=FinalPartitioned, gby=[t1_id@0 as t1_id], 
aggr=[count(Int64(1))]
 03)----CoalesceBatchesExec: target_batch_size=2
-04)------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0, 
t2_id@0)], projection=[t1_id@0]
-05)--------CoalesceBatchesExec: target_batch_size=2
-06)----------RepartitionExec: partitioning=Hash([t1_id@0], 2), 
input_partitions=2
-07)------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
+04)------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
+05)--------AggregateExec: mode=Partial, gby=[t1_id@0 as t1_id], 
aggr=[count(Int64(1))]
+06)----------CoalesceBatchesExec: target_batch_size=2
+07)------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t1_id@0, 
t2_id@0)], projection=[t1_id@0]
 08)--------------DataSourceExec: partitions=1, partition_sizes=[1]
-09)--------CoalesceBatchesExec: target_batch_size=2
-10)----------RepartitionExec: partitioning=Hash([t2_id@0], 2), 
input_partitions=2
-11)------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-12)--------------DataSourceExec: partitions=1, partition_sizes=[1]
+09)--------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
+10)----------------DataSourceExec: partitions=1, partition_sizes=[1]
 
 query TT
 EXPLAIN
@@ -1434,17 +1425,15 @@ physical_plan
 02)--AggregateExec: mode=Final, gby=[], aggr=[count(alias1)]
 03)----CoalescePartitionsExec
 04)------AggregateExec: mode=Partial, gby=[], aggr=[count(alias1)]
-05)--------AggregateExec: mode=SinglePartitioned, gby=[t1_id@0 as alias1], 
aggr=[]
+05)--------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], 
aggr=[]
 06)----------CoalesceBatchesExec: target_batch_size=2
-07)------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0, 
t2_id@0)], projection=[t1_id@0]
-08)--------------CoalesceBatchesExec: target_batch_size=2
-09)----------------RepartitionExec: partitioning=Hash([t1_id@0], 2), 
input_partitions=2
-10)------------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
+07)------------RepartitionExec: partitioning=Hash([alias1@0], 2), 
input_partitions=2
+08)--------------AggregateExec: mode=Partial, gby=[t1_id@0 as alias1], aggr=[]
+09)----------------CoalesceBatchesExec: target_batch_size=2
+10)------------------HashJoinExec: mode=CollectLeft, join_type=Inner, 
on=[(t1_id@0, t2_id@0)], projection=[t1_id@0]
 11)--------------------DataSourceExec: partitions=1, partition_sizes=[1]
-12)--------------CoalesceBatchesExec: target_batch_size=2
-13)----------------RepartitionExec: partitioning=Hash([t2_id@0], 2), 
input_partitions=2
-14)------------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-15)--------------------DataSourceExec: partitions=1, partition_sizes=[1]
+12)--------------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
+13)----------------------DataSourceExec: partitions=1, partition_sizes=[1]
 
 statement ok
 set datafusion.explain.logical_plan_only = true;
@@ -1530,17 +1519,14 @@ logical_plan
 physical_plan
 01)ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as 
t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int, 
CAST(t1_id@0 AS Int64) + 11 as join_t1.t1_id + Int64(11)]
 02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t1.t1_id + 
Int64(11)@3, CAST(join_t2.t2_id AS Int64)@3)], projection=[t1_id@0, t1_name@1, 
t1_int@2, t2_id@4, t2_name@5, t2_int@6]
-04)------CoalesceBatchesExec: target_batch_size=2
-05)--------RepartitionExec: partitioning=Hash([join_t1.t1_id + Int64(11)@3], 
2), input_partitions=2
-06)----------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, 
t1_int@2 as t1_int, CAST(t1_id@0 AS Int64) + 11 as join_t1.t1_id + Int64(11)]
-07)------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-08)--------------DataSourceExec: partitions=1, partition_sizes=[1]
-09)------CoalesceBatchesExec: target_batch_size=2
-10)--------RepartitionExec: partitioning=Hash([CAST(join_t2.t2_id AS 
Int64)@3], 2), input_partitions=2
-11)----------ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, 
t2_int@2 as t2_int, CAST(t2_id@0 AS Int64) as CAST(join_t2.t2_id AS Int64)]
-12)------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-13)--------------DataSourceExec: partitions=1, partition_sizes=[1]
+03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t1.t1_id + 
Int64(11)@3, CAST(join_t2.t2_id AS Int64)@3)], projection=[t1_id@0, t1_name@1, 
t1_int@2, t2_id@4, t2_name@5, t2_int@6]
+04)------CoalescePartitionsExec
+05)--------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, 
t1_int@2 as t1_int, CAST(t1_id@0 AS Int64) + 11 as join_t1.t1_id + Int64(11)]
+06)----------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
+07)------------DataSourceExec: partitions=1, partition_sizes=[1]
+08)------ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, 
t2_int@2 as t2_int, CAST(t2_id@0 AS Int64) as CAST(join_t2.t2_id AS Int64)]
+09)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+10)----------DataSourceExec: partitions=1, partition_sizes=[1]
 
 # Both side expr key inner join
 
@@ -1587,17 +1573,14 @@ logical_plan
 physical_plan
 01)ProjectionExec: expr=[t1_id@1 as t1_id, t2_id@0 as t2_id, t1_name@2 as 
t1_name]
 02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t2.t2_id + 
UInt32(1)@1, join_t1.t1_id + UInt32(12)@2)], projection=[t2_id@0, t1_id@2, 
t1_name@3]
-04)------CoalesceBatchesExec: target_batch_size=2
-05)--------RepartitionExec: partitioning=Hash([join_t2.t2_id + UInt32(1)@1], 
2), input_partitions=2
-06)----------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as 
join_t2.t2_id + UInt32(1)]
-07)------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-08)--------------DataSourceExec: partitions=1, partition_sizes=[1]
-09)------CoalesceBatchesExec: target_batch_size=2
-10)--------RepartitionExec: partitioning=Hash([join_t1.t1_id + UInt32(12)@2], 
2), input_partitions=2
-11)----------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, 
t1_id@0 + 12 as join_t1.t1_id + UInt32(12)]
-12)------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-13)--------------DataSourceExec: partitions=1, partition_sizes=[1]
+03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t2.t2_id + 
UInt32(1)@1, join_t1.t1_id + UInt32(12)@2)], projection=[t2_id@0, t1_id@2, 
t1_name@3]
+04)------CoalescePartitionsExec
+05)--------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as 
join_t2.t2_id + UInt32(1)]
+06)----------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
+07)------------DataSourceExec: partitions=1, partition_sizes=[1]
+08)------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 
+ 12 as join_t1.t1_id + UInt32(12)]
+09)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+10)----------DataSourceExec: partitions=1, partition_sizes=[1]
 
 # Left side expr key inner join
 
@@ -1643,16 +1626,11 @@ logical_plan
 physical_plan
 01)ProjectionExec: expr=[t1_id@1 as t1_id, t2_id@0 as t2_id, t1_name@2 as 
t1_name]
 02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t2_id@0, 
join_t1.t1_id + UInt32(11)@2)], projection=[t2_id@0, t1_id@1, t1_name@2]
-04)------CoalesceBatchesExec: target_batch_size=2
-05)--------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
-06)----------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-07)------------DataSourceExec: partitions=1, partition_sizes=[1]
-08)------CoalesceBatchesExec: target_batch_size=2
-09)--------RepartitionExec: partitioning=Hash([join_t1.t1_id + UInt32(11)@2], 
2), input_partitions=2
-10)----------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, 
t1_id@0 + 11 as join_t1.t1_id + UInt32(11)]
-11)------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-12)--------------DataSourceExec: partitions=1, partition_sizes=[1]
+03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t2_id@0, 
join_t1.t1_id + UInt32(11)@2)], projection=[t2_id@0, t1_id@1, t1_name@2]
+04)------DataSourceExec: partitions=1, partition_sizes=[1]
+05)------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 
+ 11 as join_t1.t1_id + UInt32(11)]
+06)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+07)----------DataSourceExec: partitions=1, partition_sizes=[1]
 
 # Right side expr key inner join
 
@@ -1700,16 +1678,13 @@ logical_plan
 physical_plan
 01)ProjectionExec: expr=[t1_id@1 as t1_id, t2_id@0 as t2_id, t1_name@2 as 
t1_name]
 02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t2.t2_id - 
UInt32(11)@1, t1_id@0)], projection=[t2_id@0, t1_id@2, t1_name@3]
-04)------CoalesceBatchesExec: target_batch_size=2
-05)--------RepartitionExec: partitioning=Hash([join_t2.t2_id - UInt32(11)@1], 
2), input_partitions=2
-06)----------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as 
join_t2.t2_id - UInt32(11)]
-07)------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-08)--------------DataSourceExec: partitions=1, partition_sizes=[1]
-09)------CoalesceBatchesExec: target_batch_size=2
-10)--------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
-11)----------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-12)------------DataSourceExec: partitions=1, partition_sizes=[1]
+03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t2.t2_id - 
UInt32(11)@1, t1_id@0)], projection=[t2_id@0, t1_id@2, t1_name@3]
+04)------CoalescePartitionsExec
+05)--------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as 
join_t2.t2_id - UInt32(11)]
+06)----------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
+07)------------DataSourceExec: partitions=1, partition_sizes=[1]
+08)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+09)--------DataSourceExec: partitions=1, partition_sizes=[1]
 
 # Select wildcard with expr key inner join
 
@@ -1751,16 +1726,11 @@ logical_plan
 03)--TableScan: join_t2 projection=[t2_id, t2_name, t2_int]
 physical_plan
 01)CoalesceBatchesExec: target_batch_size=2
-02)--HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0, 
join_t2.t2_id - UInt32(11)@3)], projection=[t1_id@0, t1_name@1, t1_int@2, 
t2_id@3, t2_name@4, t2_int@5]
-03)----CoalesceBatchesExec: target_batch_size=2
-04)------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
-05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-06)----------DataSourceExec: partitions=1, partition_sizes=[1]
-07)----CoalesceBatchesExec: target_batch_size=2
-08)------RepartitionExec: partitioning=Hash([join_t2.t2_id - UInt32(11)@3], 
2), input_partitions=2
-09)--------ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, 
t2_int@2 as t2_int, t2_id@0 - 11 as join_t2.t2_id - UInt32(11)]
-10)----------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-11)------------DataSourceExec: partitions=1, partition_sizes=[1]
+02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t1_id@0, 
join_t2.t2_id - UInt32(11)@3)], projection=[t1_id@0, t1_name@1, t1_int@2, 
t2_id@3, t2_name@4, t2_int@5]
+03)----DataSourceExec: partitions=1, partition_sizes=[1]
+04)----ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2 
as t2_int, t2_id@0 - 11 as join_t2.t2_id - UInt32(11)]
+05)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+06)--------DataSourceExec: partitions=1, partition_sizes=[1]
 
 #####
 # Config teardown
@@ -2598,15 +2568,10 @@ logical_plan
 05)----TableScan: test_timestamps_tz_table projection=[nanos, micros, millis, 
secs, names]
 physical_plan
 01)CoalesceBatchesExec: target_batch_size=2
-02)--HashJoinExec: mode=Partitioned, join_type=Inner, on=[(millis@2, millis@2)]
-03)----CoalesceBatchesExec: target_batch_size=2
-04)------RepartitionExec: partitioning=Hash([millis@2], 2), input_partitions=2
-05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-06)----------DataSourceExec: partitions=1, partition_sizes=[1]
-07)----CoalesceBatchesExec: target_batch_size=2
-08)------RepartitionExec: partitioning=Hash([millis@2], 2), input_partitions=2
-09)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-10)----------DataSourceExec: partitions=1, partition_sizes=[1]
+02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(millis@2, millis@2)]
+03)----DataSourceExec: partitions=1, partition_sizes=[1]
+04)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+05)------DataSourceExec: partitions=1, partition_sizes=[1]
 
 # left_join_using_2
 query II
@@ -2869,16 +2834,11 @@ explain SELECT t1_id, t1_name FROM 
left_semi_anti_join_table_t1 t1 WHERE t1_id I
 physical_plan
 01)SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
 02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0, 
t1_id@0)]
-04)------CoalesceBatchesExec: target_batch_size=2
-05)--------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
-06)----------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-07)------------DataSourceExec: partitions=1, partition_sizes=[1]
-08)------SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
-09)--------CoalesceBatchesExec: target_batch_size=2
-10)----------RepartitionExec: partitioning=Hash([t1_id@0], 2), 
input_partitions=2
-11)------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-12)--------------DataSourceExec: partitions=1, partition_sizes=[1]
+03)----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, 
t1_id@0)]
+04)------DataSourceExec: partitions=1, partition_sizes=[1]
+05)------SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
+06)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+07)----------DataSourceExec: partitions=1, partition_sizes=[1]
 
 query IT rowsort
 SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id IN 
(SELECT t2_id FROM left_semi_anti_join_table_t2 t2) ORDER BY t1_id
@@ -2910,16 +2870,11 @@ explain SELECT t1_id, t1_name FROM 
left_semi_anti_join_table_t1 t1 LEFT SEMI JOI
 physical_plan
 01)SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
 02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0, 
t1_id@0)]
-04)------CoalesceBatchesExec: target_batch_size=2
-05)--------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
-06)----------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-07)------------DataSourceExec: partitions=1, partition_sizes=[1]
-08)------SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
-09)--------CoalesceBatchesExec: target_batch_size=2
-10)----------RepartitionExec: partitioning=Hash([t1_id@0], 2), 
input_partitions=2
-11)------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-12)--------------DataSourceExec: partitions=1, partition_sizes=[1]
+03)----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, 
t1_id@0)]
+04)------DataSourceExec: partitions=1, partition_sizes=[1]
+05)------SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
+06)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+07)----------DataSourceExec: partitions=1, partition_sizes=[1]
 
 query IT
 SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 LEFT SEMI JOIN 
left_semi_anti_join_table_t2 t2 ON (t1_id = t2_id) ORDER BY t1_id
@@ -3066,16 +3021,11 @@ explain SELECT t1_id, t1_name, t1_int FROM 
right_semi_anti_join_table_t1 t1 WHER
 physical_plan
 01)SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
 02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0, 
t1_id@0)], filter=t2_name@1 != t1_name@0
-04)------CoalesceBatchesExec: target_batch_size=2
-05)--------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
-06)----------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-07)------------DataSourceExec: partitions=1, partition_sizes=[1]
-08)------SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
-09)--------CoalesceBatchesExec: target_batch_size=2
-10)----------RepartitionExec: partitioning=Hash([t1_id@0], 2), 
input_partitions=2
-11)------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-12)--------------DataSourceExec: partitions=1, partition_sizes=[1]
+03)----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, 
t1_id@0)], filter=t2_name@1 != t1_name@0
+04)------DataSourceExec: partitions=1, partition_sizes=[1]
+05)------SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
+06)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+07)----------DataSourceExec: partitions=1, partition_sizes=[1]
 
 query ITI rowsort
 SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1 WHERE 
EXISTS (SELECT * FROM right_semi_anti_join_table_t2 t2 where t2.t2_id = 
t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id
@@ -3088,16 +3038,11 @@ explain SELECT t1_id, t1_name, t1_int FROM 
right_semi_anti_join_table_t2 t2 RIGH
 physical_plan
 01)SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
 02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0, 
t1_id@0)], filter=t2_name@0 != t1_name@1
-04)------CoalesceBatchesExec: target_batch_size=2
-05)--------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
-06)----------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-07)------------DataSourceExec: partitions=1, partition_sizes=[1]
-08)------SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
-09)--------CoalesceBatchesExec: target_batch_size=2
-10)----------RepartitionExec: partitioning=Hash([t1_id@0], 2), 
input_partitions=2
-11)------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-12)--------------DataSourceExec: partitions=1, partition_sizes=[1]
+03)----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, 
t1_id@0)], filter=t2_name@0 != t1_name@1
+04)------DataSourceExec: partitions=1, partition_sizes=[1]
+05)------SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
+06)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+07)----------DataSourceExec: partitions=1, partition_sizes=[1]
 
 query ITI rowsort
 SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t2 t2 RIGHT SEMI 
JOIN right_semi_anti_join_table_t1 t1 on (t2.t2_id = t1.t1_id and t2.t2_name <> 
t1.t1_name) ORDER BY t1_id
@@ -3668,18 +3613,14 @@ logical_plan
 physical_plan
 01)ProjectionExec: expr=[c@2 as c, d@3 as d, e@0 as e, f@1 as f]
 02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=Partitioned, join_type=Full, on=[(e@0, c@0)]
-04)------CoalesceBatchesExec: target_batch_size=2
-05)--------RepartitionExec: partitioning=Hash([e@0], 2), input_partitions=1
-06)----------ProjectionExec: expr=[1 as e, 3 as f]
-07)------------PlaceholderRowExec
-08)------CoalesceBatchesExec: target_batch_size=2
-09)--------RepartitionExec: partitioning=Hash([c@0], 2), input_partitions=2
-10)----------UnionExec
-11)------------ProjectionExec: expr=[1 as c, 2 as d]
-12)--------------PlaceholderRowExec
-13)------------ProjectionExec: expr=[1 as c, 3 as d]
-14)--------------PlaceholderRowExec
+03)----HashJoinExec: mode=CollectLeft, join_type=Full, on=[(e@0, c@0)]
+04)------ProjectionExec: expr=[1 as e, 3 as f]
+05)--------PlaceholderRowExec
+06)------UnionExec
+07)--------ProjectionExec: expr=[1 as c, 2 as d]
+08)----------PlaceholderRowExec
+09)--------ProjectionExec: expr=[1 as c, 3 as d]
+10)----------PlaceholderRowExec
 
 query IIII rowsort
 SELECT * FROM (
@@ -4484,18 +4425,15 @@ logical_plan
 physical_plan
 01)SortPreservingMergeExec: [c@2 DESC]
 02)--CoalesceBatchesExec: target_batch_size=3
-03)----HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(a@0, a@0)]
-04)------CoalesceBatchesExec: target_batch_size=3
-05)--------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2
-06)----------CoalesceBatchesExec: target_batch_size=3
-07)------------FilterExec: b@1 > 3, projection=[a@0]
-08)--------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-09)----------------DataSourceExec: partitions=1, partition_sizes=[1]
-10)------SortExec: expr=[c@2 DESC], preserve_partitioning=[true]
-11)--------CoalesceBatchesExec: target_batch_size=3
-12)----------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2
-13)------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-14)--------------DataSourceExec: partitions=1, partition_sizes=[1]
+03)----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(a@0, a@0)]
+04)------CoalescePartitionsExec
+05)--------CoalesceBatchesExec: target_batch_size=3
+06)----------FilterExec: b@1 > 3, projection=[a@0]
+07)------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
+08)--------------DataSourceExec: partitions=1, partition_sizes=[1]
+09)------SortExec: expr=[c@2 DESC], preserve_partitioning=[true]
+10)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+11)----------DataSourceExec: partitions=1, partition_sizes=[1]
 
 query TT
 explain select * from test where a in (select a from test where b > 3) order 
by c desc nulls last;
@@ -4511,18 +4449,15 @@ logical_plan
 physical_plan
 01)SortPreservingMergeExec: [c@2 DESC NULLS LAST]
 02)--CoalesceBatchesExec: target_batch_size=3
-03)----HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(a@0, a@0)]
-04)------CoalesceBatchesExec: target_batch_size=3
-05)--------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2
-06)----------CoalesceBatchesExec: target_batch_size=3
-07)------------FilterExec: b@1 > 3, projection=[a@0]
-08)--------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-09)----------------DataSourceExec: partitions=1, partition_sizes=[1]
-10)------SortExec: expr=[c@2 DESC NULLS LAST], preserve_partitioning=[true]
-11)--------CoalesceBatchesExec: target_batch_size=3
-12)----------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2
-13)------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-14)--------------DataSourceExec: partitions=1, partition_sizes=[1]
+03)----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(a@0, a@0)]
+04)------CoalescePartitionsExec
+05)--------CoalesceBatchesExec: target_batch_size=3
+06)----------FilterExec: b@1 > 3, projection=[a@0]
+07)------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
+08)--------------DataSourceExec: partitions=1, partition_sizes=[1]
+09)------SortExec: expr=[c@2 DESC NULLS LAST], preserve_partitioning=[true]
+10)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+11)----------DataSourceExec: partitions=1, partition_sizes=[1]
 
 query III
 select * from test where a in (select a from test where b > 3) order by c desc 
nulls first;
diff --git a/datafusion/sqllogictest/test_files/predicates.slt 
b/datafusion/sqllogictest/test_files/predicates.slt
index 8bf1caa003..b263e39f3b 100644
--- a/datafusion/sqllogictest/test_files/predicates.slt
+++ b/datafusion/sqllogictest/test_files/predicates.slt
@@ -760,23 +760,22 @@ logical_plan
 10)------TableScan: partsupp projection=[ps_partkey, ps_suppkey]
 physical_plan
 01)AggregateExec: mode=SinglePartitioned, gby=[p_partkey@2 as p_partkey], 
aggr=[sum(lineitem.l_extendedprice), avg(lineitem.l_discount), count(DISTINCT 
partsupp.ps_suppkey)]
-02)--CoalesceBatchesExec: target_batch_size=8192
-03)----HashJoinExec: mode=Partitioned, join_type=Inner, on=[(p_partkey@2, 
ps_partkey@0)], projection=[l_extendedprice@0, l_discount@1, p_partkey@2, 
ps_suppkey@4]
-04)------CoalesceBatchesExec: target_batch_size=8192
-05)--------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(l_partkey@0, 
p_partkey@0)], projection=[l_extendedprice@1, l_discount@2, p_partkey@3]
-06)----------CoalesceBatchesExec: target_batch_size=8192
-07)------------RepartitionExec: partitioning=Hash([l_partkey@0], 4), 
input_partitions=4
-08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-09)----------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/lineitem.csv]]}, 
projection=[l_partkey, l_extendedprice, l_discount], file_type=csv, 
has_header=true
-10)----------CoalesceBatchesExec: target_batch_size=8192
-11)------------RepartitionExec: partitioning=Hash([p_partkey@0], 4), 
input_partitions=4
-12)--------------CoalesceBatchesExec: target_batch_size=8192
-13)----------------FilterExec: p_brand@1 = Brand#12 OR p_brand@1 = Brand#23, 
projection=[p_partkey@0]
-14)------------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-15)--------------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/part.csv]]}, 
projection=[p_partkey, p_brand], file_type=csv, has_header=true
-16)------CoalesceBatchesExec: target_batch_size=8192
-17)--------RepartitionExec: partitioning=Hash([ps_partkey@0], 4), 
input_partitions=1
-18)----------DataSourceExec: partitions=1, partition_sizes=[1]
+02)--ProjectionExec: expr=[l_extendedprice@1 as l_extendedprice, l_discount@2 
as l_discount, p_partkey@3 as p_partkey, ps_suppkey@0 as ps_suppkey]
+03)----CoalesceBatchesExec: target_batch_size=8192
+04)------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(ps_partkey@0, 
p_partkey@2)], projection=[ps_suppkey@1, l_extendedprice@2, l_discount@3, 
p_partkey@4]
+05)--------DataSourceExec: partitions=1, partition_sizes=[1]
+06)--------CoalesceBatchesExec: target_batch_size=8192
+07)----------HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(l_partkey@0, p_partkey@0)], projection=[l_extendedprice@1, l_discount@2, 
p_partkey@3]
+08)------------CoalesceBatchesExec: target_batch_size=8192
+09)--------------RepartitionExec: partitioning=Hash([l_partkey@0], 4), 
input_partitions=4
+10)----------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+11)------------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/lineitem.csv]]}, 
projection=[l_partkey, l_extendedprice, l_discount], file_type=csv, 
has_header=true
+12)------------CoalesceBatchesExec: target_batch_size=8192
+13)--------------RepartitionExec: partitioning=Hash([p_partkey@0], 4), 
input_partitions=4
+14)----------------CoalesceBatchesExec: target_batch_size=8192
+15)------------------FilterExec: p_brand@1 = Brand#12 OR p_brand@1 = Brand#23, 
projection=[p_partkey@0]
+16)--------------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+17)----------------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/part.csv]]}, 
projection=[p_partkey, p_brand], file_type=csv, has_header=true
 
 # Inlist simplification
 
diff --git a/datafusion/sqllogictest/test_files/subquery.slt 
b/datafusion/sqllogictest/test_files/subquery.slt
index 4c1565c7f0..aaccaaa43c 100644
--- a/datafusion/sqllogictest/test_files/subquery.slt
+++ b/datafusion/sqllogictest/test_files/subquery.slt
@@ -202,18 +202,17 @@ logical_plan
 physical_plan
 01)ProjectionExec: expr=[t1_id@1 as t1_id, sum(t2.t2_int)@0 as t2_sum]
 02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=Partitioned, join_type=Right, on=[(t2_id@1, 
t1_id@0)], projection=[sum(t2.t2_int)@0, t1_id@2]
-04)------ProjectionExec: expr=[sum(t2.t2_int)@1 as sum(t2.t2_int), t2_id@0 as 
t2_id]
-05)--------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], 
aggr=[sum(t2.t2_int)]
-06)----------CoalesceBatchesExec: target_batch_size=2
-07)------------RepartitionExec: partitioning=Hash([t2_id@0], 4), 
input_partitions=4
-08)--------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], 
aggr=[sum(t2.t2_int)]
-09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-10)------------------DataSourceExec: partitions=1, partition_sizes=[1]
-11)------CoalesceBatchesExec: target_batch_size=2
-12)--------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
-13)----------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-14)------------DataSourceExec: partitions=1, partition_sizes=[1]
+03)----HashJoinExec: mode=CollectLeft, join_type=Right, on=[(t2_id@1, 
t1_id@0)], projection=[sum(t2.t2_int)@0, t1_id@2]
+04)------CoalescePartitionsExec
+05)--------ProjectionExec: expr=[sum(t2.t2_int)@1 as sum(t2.t2_int), t2_id@0 
as t2_id]
+06)----------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], 
aggr=[sum(t2.t2_int)]
+07)------------CoalesceBatchesExec: target_batch_size=2
+08)--------------RepartitionExec: partitioning=Hash([t2_id@0], 4), 
input_partitions=4
+09)----------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], 
aggr=[sum(t2.t2_int)]
+10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+11)--------------------DataSourceExec: partitions=1, partition_sizes=[1]
+12)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+13)--------DataSourceExec: partitions=1, partition_sizes=[1]
 
 query II rowsort
 SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id) as t2_sum 
from t1
@@ -238,18 +237,17 @@ logical_plan
 physical_plan
 01)ProjectionExec: expr=[t1_id@1 as t1_id, sum(t2.t2_int * Float64(1)) + 
Int64(1)@0 as t2_sum]
 02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=Partitioned, join_type=Right, on=[(t2_id@1, 
t1_id@0)], projection=[sum(t2.t2_int * Float64(1)) + Int64(1)@0, t1_id@2]
-04)------ProjectionExec: expr=[sum(t2.t2_int * Float64(1))@1 + 1 as 
sum(t2.t2_int * Float64(1)) + Int64(1), t2_id@0 as t2_id]
-05)--------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], 
aggr=[sum(t2.t2_int * Float64(1))]
-06)----------CoalesceBatchesExec: target_batch_size=2
-07)------------RepartitionExec: partitioning=Hash([t2_id@0], 4), 
input_partitions=4
-08)--------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], 
aggr=[sum(t2.t2_int * Float64(1))]
-09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-10)------------------DataSourceExec: partitions=1, partition_sizes=[1]
-11)------CoalesceBatchesExec: target_batch_size=2
-12)--------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
-13)----------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-14)------------DataSourceExec: partitions=1, partition_sizes=[1]
+03)----HashJoinExec: mode=CollectLeft, join_type=Right, on=[(t2_id@1, 
t1_id@0)], projection=[sum(t2.t2_int * Float64(1)) + Int64(1)@0, t1_id@2]
+04)------CoalescePartitionsExec
+05)--------ProjectionExec: expr=[sum(t2.t2_int * Float64(1))@1 + 1 as 
sum(t2.t2_int * Float64(1)) + Int64(1), t2_id@0 as t2_id]
+06)----------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], 
aggr=[sum(t2.t2_int * Float64(1))]
+07)------------CoalesceBatchesExec: target_batch_size=2
+08)--------------RepartitionExec: partitioning=Hash([t2_id@0], 4), 
input_partitions=4
+09)----------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], 
aggr=[sum(t2.t2_int * Float64(1))]
+10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+11)--------------------DataSourceExec: partitions=1, partition_sizes=[1]
+12)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+13)--------DataSourceExec: partitions=1, partition_sizes=[1]
 
 query IR rowsort
 SELECT t1_id, (SELECT sum(t2_int * 1.0) + 1 FROM t2 WHERE t2.t2_id = t1.t1_id) 
as t2_sum from t1
@@ -274,18 +272,17 @@ logical_plan
 physical_plan
 01)ProjectionExec: expr=[t1_id@1 as t1_id, sum(t2.t2_int)@0 as t2_sum]
 02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=Partitioned, join_type=Right, on=[(t2_id@1, 
t1_id@0)], projection=[sum(t2.t2_int)@0, t1_id@2]
-04)------ProjectionExec: expr=[sum(t2.t2_int)@1 as sum(t2.t2_int), t2_id@0 as 
t2_id]
-05)--------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], 
aggr=[sum(t2.t2_int)]
-06)----------CoalesceBatchesExec: target_batch_size=2
-07)------------RepartitionExec: partitioning=Hash([t2_id@0], 4), 
input_partitions=4
-08)--------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], 
aggr=[sum(t2.t2_int)]
-09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-10)------------------DataSourceExec: partitions=1, partition_sizes=[1]
-11)------CoalesceBatchesExec: target_batch_size=2
-12)--------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
-13)----------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-14)------------DataSourceExec: partitions=1, partition_sizes=[1]
+03)----HashJoinExec: mode=CollectLeft, join_type=Right, on=[(t2_id@1, 
t1_id@0)], projection=[sum(t2.t2_int)@0, t1_id@2]
+04)------CoalescePartitionsExec
+05)--------ProjectionExec: expr=[sum(t2.t2_int)@1 as sum(t2.t2_int), t2_id@0 
as t2_id]
+06)----------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], 
aggr=[sum(t2.t2_int)]
+07)------------CoalesceBatchesExec: target_batch_size=2
+08)--------------RepartitionExec: partitioning=Hash([t2_id@0], 4), 
input_partitions=4
+09)----------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], 
aggr=[sum(t2.t2_int)]
+10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+11)--------------------DataSourceExec: partitions=1, partition_sizes=[1]
+12)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+13)--------DataSourceExec: partitions=1, partition_sizes=[1]
 
 query II rowsort
 SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id group by 
t2_id, 'a') as t2_sum from t1
@@ -311,20 +308,19 @@ logical_plan
 physical_plan
 01)ProjectionExec: expr=[t1_id@1 as t1_id, sum(t2.t2_int)@0 as t2_sum]
 02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=Partitioned, join_type=Right, on=[(t2_id@1, 
t1_id@0)], projection=[sum(t2.t2_int)@0, t1_id@2]
-04)------ProjectionExec: expr=[sum(t2.t2_int)@1 as sum(t2.t2_int), t2_id@0 as 
t2_id]
-05)--------CoalesceBatchesExec: target_batch_size=2
-06)----------FilterExec: sum(t2.t2_int)@1 < 3
-07)------------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], 
aggr=[sum(t2.t2_int)]
-08)--------------CoalesceBatchesExec: target_batch_size=2
-09)----------------RepartitionExec: partitioning=Hash([t2_id@0], 4), 
input_partitions=4
-10)------------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], 
aggr=[sum(t2.t2_int)]
-11)--------------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-12)----------------------DataSourceExec: partitions=1, partition_sizes=[1]
-13)------CoalesceBatchesExec: target_batch_size=2
-14)--------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
-15)----------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-16)------------DataSourceExec: partitions=1, partition_sizes=[1]
+03)----HashJoinExec: mode=CollectLeft, join_type=Right, on=[(t2_id@1, 
t1_id@0)], projection=[sum(t2.t2_int)@0, t1_id@2]
+04)------CoalescePartitionsExec
+05)--------ProjectionExec: expr=[sum(t2.t2_int)@1 as sum(t2.t2_int), t2_id@0 
as t2_id]
+06)----------CoalesceBatchesExec: target_batch_size=2
+07)------------FilterExec: sum(t2.t2_int)@1 < 3
+08)--------------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], 
aggr=[sum(t2.t2_int)]
+09)----------------CoalesceBatchesExec: target_batch_size=2
+10)------------------RepartitionExec: partitioning=Hash([t2_id@0], 4), 
input_partitions=4
+11)--------------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], 
aggr=[sum(t2.t2_int)]
+12)----------------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+13)------------------------DataSourceExec: partitions=1, partition_sizes=[1]
+14)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+15)--------DataSourceExec: partitions=1, partition_sizes=[1]
 
 query II rowsort
 SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id having 
sum(t2_int) < 3) as t2_sum from t1
@@ -1156,15 +1152,10 @@ physical_plan
 01)CoalesceBatchesExec: target_batch_size=2
 02)--FilterExec: t1_id@0 > 40 OR NOT mark@3, projection=[t1_id@0, t1_name@1, 
t1_int@2]
 03)----CoalesceBatchesExec: target_batch_size=2
-04)------HashJoinExec: mode=Partitioned, join_type=LeftMark, on=[(t1_id@0, 
t2_id@0)]
-05)--------CoalesceBatchesExec: target_batch_size=2
-06)----------RepartitionExec: partitioning=Hash([t1_id@0], 4), 
input_partitions=4
-07)------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-08)--------------DataSourceExec: partitions=1, partition_sizes=[1]
-09)--------CoalesceBatchesExec: target_batch_size=2
-10)----------RepartitionExec: partitioning=Hash([t2_id@0], 4), 
input_partitions=4
-11)------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-12)--------------DataSourceExec: partitions=1, partition_sizes=[1]
+04)------HashJoinExec: mode=CollectLeft, join_type=LeftMark, on=[(t1_id@0, 
t2_id@0)]
+05)--------DataSourceExec: partitions=1, partition_sizes=[1]
+06)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+07)----------DataSourceExec: partitions=1, partition_sizes=[1]
 
 statement ok
 set datafusion.explain.logical_plan_only = true;
diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q15.slt.part 
b/datafusion/sqllogictest/test_files/tpch/plans/q15.slt.part
index e2bd651c4a..0636a033b2 100644
--- a/datafusion/sqllogictest/test_files/tpch/plans/q15.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/plans/q15.slt.part
@@ -74,33 +74,29 @@ physical_plan
 01)SortPreservingMergeExec: [s_suppkey@0 ASC NULLS LAST]
 02)--SortExec: expr=[s_suppkey@0 ASC NULLS LAST], preserve_partitioning=[true]
 03)----CoalesceBatchesExec: target_batch_size=8192
-04)------HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(total_revenue@4, max(revenue0.total_revenue)@0)], projection=[s_suppkey@0, 
s_name@1, s_address@2, s_phone@3, total_revenue@4]
-05)--------CoalesceBatchesExec: target_batch_size=8192
-06)----------RepartitionExec: partitioning=Hash([total_revenue@4], 4), 
input_partitions=4
-07)------------CoalesceBatchesExec: target_batch_size=8192
-08)--------------HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(s_suppkey@0, supplier_no@0)], projection=[s_suppkey@0, s_name@1, 
s_address@2, s_phone@3, total_revenue@5]
-09)----------------CoalesceBatchesExec: target_batch_size=8192
-10)------------------RepartitionExec: partitioning=Hash([s_suppkey@0], 4), 
input_partitions=4
-11)--------------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-12)----------------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/supplier.tbl]]}, 
projection=[s_suppkey, s_name, s_address, s_phone], file_type=csv, 
has_header=false
-13)----------------ProjectionExec: expr=[l_suppkey@0 as supplier_no, 
sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@1 as 
total_revenue]
-14)------------------AggregateExec: mode=FinalPartitioned, gby=[l_suppkey@0 as 
l_suppkey], aggr=[sum(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount)]
-15)--------------------CoalesceBatchesExec: target_batch_size=8192
-16)----------------------RepartitionExec: partitioning=Hash([l_suppkey@0], 4), 
input_partitions=4
-17)------------------------AggregateExec: mode=Partial, gby=[l_suppkey@0 as 
l_suppkey], aggr=[sum(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount)]
-18)--------------------------CoalesceBatchesExec: target_batch_size=8192
-19)----------------------------FilterExec: l_shipdate@3 >= 1996-01-01 AND 
l_shipdate@3 < 1996-04-01, projection=[l_suppkey@0, l_extendedprice@1, 
l_discount@2]
-20)------------------------------DataSourceExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]},
 projection=[l_suppkey, l_extendedprice, l_di [...]
-21)--------CoalesceBatchesExec: target_batch_size=8192
-22)----------RepartitionExec: 
partitioning=Hash([max(revenue0.total_revenue)@0], 4), input_partitions=1
-23)------------AggregateExec: mode=Final, gby=[], 
aggr=[max(revenue0.total_revenue)]
-24)--------------CoalescePartitionsExec
-25)----------------AggregateExec: mode=Partial, gby=[], 
aggr=[max(revenue0.total_revenue)]
-26)------------------ProjectionExec: expr=[sum(lineitem.l_extendedprice * 
Int64(1) - lineitem.l_discount)@1 as total_revenue]
-27)--------------------AggregateExec: mode=FinalPartitioned, gby=[l_suppkey@0 
as l_suppkey], aggr=[sum(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount)]
-28)----------------------CoalesceBatchesExec: target_batch_size=8192
-29)------------------------RepartitionExec: partitioning=Hash([l_suppkey@0], 
4), input_partitions=4
-30)--------------------------AggregateExec: mode=Partial, gby=[l_suppkey@0 as 
l_suppkey], aggr=[sum(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount)]
-31)----------------------------CoalesceBatchesExec: target_batch_size=8192
-32)------------------------------FilterExec: l_shipdate@3 >= 1996-01-01 AND 
l_shipdate@3 < 1996-04-01, projection=[l_suppkey@0, l_extendedprice@1, 
l_discount@2]
-33)--------------------------------DataSourceExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]},
 projection=[l_suppkey, l_extendedprice, l_ [...]
+04)------HashJoinExec: mode=CollectLeft, join_type=Inner, 
on=[(max(revenue0.total_revenue)@0, total_revenue@4)], projection=[s_suppkey@1, 
s_name@2, s_address@3, s_phone@4, total_revenue@5]
+05)--------AggregateExec: mode=Final, gby=[], 
aggr=[max(revenue0.total_revenue)]
+06)----------CoalescePartitionsExec
+07)------------AggregateExec: mode=Partial, gby=[], 
aggr=[max(revenue0.total_revenue)]
+08)--------------ProjectionExec: expr=[sum(lineitem.l_extendedprice * Int64(1) 
- lineitem.l_discount)@1 as total_revenue]
+09)----------------AggregateExec: mode=FinalPartitioned, gby=[l_suppkey@0 as 
l_suppkey], aggr=[sum(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount)]
+10)------------------CoalesceBatchesExec: target_batch_size=8192
+11)--------------------RepartitionExec: partitioning=Hash([l_suppkey@0], 4), 
input_partitions=4
+12)----------------------AggregateExec: mode=Partial, gby=[l_suppkey@0 as 
l_suppkey], aggr=[sum(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount)]
+13)------------------------CoalesceBatchesExec: target_batch_size=8192
+14)--------------------------FilterExec: l_shipdate@3 >= 1996-01-01 AND 
l_shipdate@3 < 1996-04-01, projection=[l_suppkey@0, l_extendedprice@1, 
l_discount@2]
+15)----------------------------DataSourceExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]},
 projection=[l_suppkey, l_extendedprice, l_disc [...]
+16)--------CoalesceBatchesExec: target_batch_size=8192
+17)----------HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(s_suppkey@0, supplier_no@0)], projection=[s_suppkey@0, s_name@1, 
s_address@2, s_phone@3, total_revenue@5]
+18)------------CoalesceBatchesExec: target_batch_size=8192
+19)--------------RepartitionExec: partitioning=Hash([s_suppkey@0], 4), 
input_partitions=4
+20)----------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+21)------------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/supplier.tbl]]}, 
projection=[s_suppkey, s_name, s_address, s_phone], file_type=csv, 
has_header=false
+22)------------ProjectionExec: expr=[l_suppkey@0 as supplier_no, 
sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@1 as 
total_revenue]
+23)--------------AggregateExec: mode=FinalPartitioned, gby=[l_suppkey@0 as 
l_suppkey], aggr=[sum(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount)]
+24)----------------CoalesceBatchesExec: target_batch_size=8192
+25)------------------RepartitionExec: partitioning=Hash([l_suppkey@0], 4), 
input_partitions=4
+26)--------------------AggregateExec: mode=Partial, gby=[l_suppkey@0 as 
l_suppkey], aggr=[sum(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount)]
+27)----------------------CoalesceBatchesExec: target_batch_size=8192
+28)------------------------FilterExec: l_shipdate@3 >= 1996-01-01 AND 
l_shipdate@3 < 1996-04-01, projection=[l_suppkey@0, l_extendedprice@1, 
l_discount@2]
+29)--------------------------DataSourceExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]},
 projection=[l_suppkey, l_extendedprice, l_discou [...]
diff --git a/datafusion/sqllogictest/test_files/union.slt 
b/datafusion/sqllogictest/test_files/union.slt
index 9e9a40c510..356f1598bc 100644
--- a/datafusion/sqllogictest/test_files/union.slt
+++ b/datafusion/sqllogictest/test_files/union.slt
@@ -308,34 +308,30 @@ logical_plan
 physical_plan
 01)UnionExec
 02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(id@0, 
CAST(t2.id AS Int32)@2), (name@1, name@1)]
-04)------AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as 
name], aggr=[]
-05)--------CoalesceBatchesExec: target_batch_size=2
-06)----------RepartitionExec: partitioning=Hash([id@0, name@1], 4), 
input_partitions=4
-07)------------AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as name], 
aggr=[]
-08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-09)----------------DataSourceExec: partitions=1, partition_sizes=[1]
-10)------CoalesceBatchesExec: target_batch_size=2
-11)--------RepartitionExec: partitioning=Hash([CAST(t2.id AS Int32)@2, 
name@1], 4), input_partitions=4
-12)----------ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS 
Int32) as CAST(t2.id AS Int32)]
-13)------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-14)--------------DataSourceExec: partitions=1, partition_sizes=[1]
-15)--ProjectionExec: expr=[CAST(id@0 AS Int32) as id, name@1 as name]
-16)----CoalesceBatchesExec: target_batch_size=2
-17)------HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(CAST(t2.id 
AS Int32)@2, id@0), (name@1, name@1)], projection=[id@0, name@1]
-18)--------CoalesceBatchesExec: target_batch_size=2
-19)----------RepartitionExec: partitioning=Hash([CAST(t2.id AS Int32)@2, 
name@1], 4), input_partitions=4
-20)------------ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS 
Int32) as CAST(t2.id AS Int32)]
-21)--------------AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 
as name], aggr=[]
-22)----------------CoalesceBatchesExec: target_batch_size=2
-23)------------------RepartitionExec: partitioning=Hash([id@0, name@1], 4), 
input_partitions=4
-24)--------------------AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as 
name], aggr=[]
-25)----------------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-26)------------------------DataSourceExec: partitions=1, partition_sizes=[1]
-27)--------CoalesceBatchesExec: target_batch_size=2
-28)----------RepartitionExec: partitioning=Hash([id@0, name@1], 4), 
input_partitions=4
-29)------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-30)--------------DataSourceExec: partitions=1, partition_sizes=[1]
+03)----HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(id@0, 
CAST(t2.id AS Int32)@2), (name@1, name@1)]
+04)------CoalescePartitionsExec
+05)--------AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as 
name], aggr=[]
+06)----------CoalesceBatchesExec: target_batch_size=2
+07)------------RepartitionExec: partitioning=Hash([id@0, name@1], 4), 
input_partitions=4
+08)--------------AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as 
name], aggr=[]
+09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+10)------------------DataSourceExec: partitions=1, partition_sizes=[1]
+11)------ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS Int32) 
as CAST(t2.id AS Int32)]
+12)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+13)----------DataSourceExec: partitions=1, partition_sizes=[1]
+14)--ProjectionExec: expr=[CAST(id@0 AS Int32) as id, name@1 as name]
+15)----CoalesceBatchesExec: target_batch_size=2
+16)------HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(CAST(t2.id 
AS Int32)@2, id@0), (name@1, name@1)], projection=[id@0, name@1]
+17)--------CoalescePartitionsExec
+18)----------ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS 
Int32) as CAST(t2.id AS Int32)]
+19)------------AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 
as name], aggr=[]
+20)--------------CoalesceBatchesExec: target_batch_size=2
+21)----------------RepartitionExec: partitioning=Hash([id@0, name@1], 4), 
input_partitions=4
+22)------------------AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as 
name], aggr=[]
+23)--------------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+24)----------------------DataSourceExec: partitions=1, partition_sizes=[1]
+25)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+26)----------DataSourceExec: partitions=1, partition_sizes=[1]
 
 
 query IT rowsort
@@ -380,31 +376,29 @@ logical_plan
 08)------TableScan: t2 projection=[name]
 09)----TableScan: t1 projection=[name]
 physical_plan
-01)InterleaveExec
+01)UnionExec
 02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(name@0, 
name@0)]
-04)------AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
-05)--------CoalesceBatchesExec: target_batch_size=2
-06)----------RepartitionExec: partitioning=Hash([name@0], 4), 
input_partitions=4
-07)------------AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
-08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-09)----------------DataSourceExec: partitions=1, partition_sizes=[1]
-10)------CoalesceBatchesExec: target_batch_size=2
-11)--------RepartitionExec: partitioning=Hash([name@0], 4), input_partitions=4
-12)----------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-13)------------DataSourceExec: partitions=1, partition_sizes=[1]
-14)--CoalesceBatchesExec: target_batch_size=2
-15)----HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(name@0, 
name@0)]
-16)------AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
-17)--------CoalesceBatchesExec: target_batch_size=2
-18)----------RepartitionExec: partitioning=Hash([name@0], 4), 
input_partitions=4
-19)------------AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
-20)--------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-21)----------------DataSourceExec: partitions=1, partition_sizes=[1]
-22)------CoalesceBatchesExec: target_batch_size=2
-23)--------RepartitionExec: partitioning=Hash([name@0], 4), input_partitions=4
-24)----------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-25)------------DataSourceExec: partitions=1, partition_sizes=[1]
+03)----HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(name@0, 
name@0)]
+04)------CoalescePartitionsExec
+05)--------AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
+06)----------CoalesceBatchesExec: target_batch_size=2
+07)------------RepartitionExec: partitioning=Hash([name@0], 4), 
input_partitions=4
+08)--------------AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
+09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+10)------------------DataSourceExec: partitions=1, partition_sizes=[1]
+11)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+12)--------DataSourceExec: partitions=1, partition_sizes=[1]
+13)--CoalesceBatchesExec: target_batch_size=2
+14)----HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(name@0, 
name@0)]
+15)------CoalescePartitionsExec
+16)--------AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
+17)----------CoalesceBatchesExec: target_batch_size=2
+18)------------RepartitionExec: partitioning=Hash([name@0], 4), 
input_partitions=4
+19)--------------AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
+20)----------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+21)------------------DataSourceExec: partitions=1, partition_sizes=[1]
+22)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+23)--------DataSourceExec: partitions=1, partition_sizes=[1]
 
 # union_upcast_types
 query TT


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to