This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 42a101e9e8 Always use `PartitionMode::Auto` in planner (#15339)
42a101e9e8 is described below
commit 42a101e9e89adf3f13450cfda3dc615f023ebcfb
Author: Daniël Heres <[email protected]>
AuthorDate: Wed Mar 26 10:17:33 2025 +0100
Always use `PartitionMode::Auto` in planner (#15339)
* Always use PartitionMode::Auto
* Always use PartitionMode::Auto
* Fix test
* Fix test
* Fix tests
* Update plans
* Add changed splans
---
datafusion/core/src/physical_planner.rs | 9 +-
datafusion/core/tests/dataframe/mod.rs | 32 +--
datafusion/core/tests/memory_limit/mod.rs | 6 +-
.../physical-optimizer/src/join_selection.rs | 3 +-
.../sqllogictest/test_files/explain_tree.slt | 117 +++++----
datafusion/sqllogictest/test_files/group_by.slt | 2 +-
datafusion/sqllogictest/test_files/join.slt.part | 35 ++-
datafusion/sqllogictest/test_files/joins.slt | 281 ++++++++-------------
datafusion/sqllogictest/test_files/predicates.slt | 33 ++-
datafusion/sqllogictest/test_files/subquery.slt | 109 ++++----
.../test_files/tpch/plans/q15.slt.part | 56 ++--
datafusion/sqllogictest/test_files/union.slt | 98 ++++---
12 files changed, 343 insertions(+), 438 deletions(-)
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index 9a089796bc..f1a99a7714 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -1135,13 +1135,6 @@ impl DefaultPhysicalPlanner {
&& session_state.config().repartition_joins()
&& prefer_hash_join
{
- let partition_mode = {
- if session_state.config().collect_statistics() {
- PartitionMode::Auto
- } else {
- PartitionMode::Partitioned
- }
- };
Arc::new(HashJoinExec::try_new(
physical_left,
physical_right,
@@ -1149,7 +1142,7 @@ impl DefaultPhysicalPlanner {
join_filter,
join_type,
None,
- partition_mode,
+ PartitionMode::Auto,
null_equals_null,
)?)
} else {
diff --git a/datafusion/core/tests/dataframe/mod.rs
b/datafusion/core/tests/dataframe/mod.rs
index b19c0b9786..b5923269ab 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -2574,7 +2574,7 @@ async fn test_count_wildcard_on_where_in() -> Result<()> {
assert_snapshot!(
pretty_format_batches(&sql_results).unwrap(),
- @r###"
+ @r"
+---------------+------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan
|
+---------------+------------------------------------------------------------------------------------------------------------------------+
@@ -2585,14 +2585,14 @@ async fn test_count_wildcard_on_where_in() ->
Result<()> {
| | Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
|
| | TableScan: t2 projection=[]
|
| physical_plan | CoalesceBatchesExec: target_batch_size=8192
|
- | | HashJoinExec: mode=Partitioned, join_type=RightSemi,
on=[(count(*)@0, CAST(t1.a AS Int64)@2)], projection=[a@0, b@1] |
+ | | HashJoinExec: mode=CollectLeft, join_type=RightSemi,
on=[(count(*)@0, CAST(t1.a AS Int64)@2)], projection=[a@0, b@1] |
| | ProjectionExec: expr=[4 as count(*)]
|
| | PlaceholderRowExec
|
| | ProjectionExec: expr=[a@0 as a, b@1 as b, CAST(a@0
AS Int64) as CAST(t1.a AS Int64)] |
| | DataSourceExec: partitions=1, partition_sizes=[1]
|
| |
|
+---------------+------------------------------------------------------------------------------------------------------------------------+
- "###
+ "
);
// In the same SessionContext, AliasGenerator will increase subquery_alias
id by 1
@@ -2620,7 +2620,7 @@ async fn test_count_wildcard_on_where_in() -> Result<()> {
// make sure sql plan same with df plan
assert_snapshot!(
pretty_format_batches(&df_results).unwrap(),
- @r###"
+ @r"
+---------------+------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan
|
+---------------+------------------------------------------------------------------------------------------------------------------------+
@@ -2630,14 +2630,14 @@ async fn test_count_wildcard_on_where_in() ->
Result<()> {
| | Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS
count(*)]] |
| | TableScan: t2 projection=[]
|
| physical_plan | CoalesceBatchesExec: target_batch_size=8192
|
- | | HashJoinExec: mode=Partitioned, join_type=RightSemi,
on=[(count(*)@0, CAST(t1.a AS Int64)@2)], projection=[a@0, b@1] |
+ | | HashJoinExec: mode=CollectLeft, join_type=RightSemi,
on=[(count(*)@0, CAST(t1.a AS Int64)@2)], projection=[a@0, b@1] |
| | ProjectionExec: expr=[4 as count(*)]
|
| | PlaceholderRowExec
|
| | ProjectionExec: expr=[a@0 as a, b@1 as b, CAST(a@0
AS Int64) as CAST(t1.a AS Int64)] |
| | DataSourceExec: partitions=1, partition_sizes=[1]
|
| |
|
+---------------+------------------------------------------------------------------------------------------------------------------------+
- "###
+ "
);
Ok(())
@@ -2851,7 +2851,7 @@ async fn test_count_wildcard_on_where_scalar_subquery()
-> Result<()> {
assert_snapshot!(
pretty_format_batches(&sql_results).unwrap(),
- @r###"
+ @r"
+---------------+---------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan
|
+---------------+---------------------------------------------------------------------------------------------------------------------------+
@@ -2867,10 +2867,8 @@ async fn test_count_wildcard_on_where_scalar_subquery()
-> Result<()> {
| physical_plan | CoalesceBatchesExec: target_batch_size=8192
|
| | FilterExec: CASE WHEN __always_true@3 IS NULL THEN 0
ELSE count(*)@2 END > 0, projection=[a@0, b@1] |
| | CoalesceBatchesExec: target_batch_size=8192
|
- | | HashJoinExec: mode=Partitioned, join_type=Left,
on=[(a@0, a@1)], projection=[a@0, b@1, count(*)@2, __always_true@4] |
- | | CoalesceBatchesExec: target_batch_size=8192
|
- | | RepartitionExec: partitioning=Hash([a@0], 4),
input_partitions=1 |
- | | DataSourceExec: partitions=1,
partition_sizes=[1]
|
+ | | HashJoinExec: mode=CollectLeft, join_type=Left,
on=[(a@0, a@1)], projection=[a@0, b@1, count(*)@2, __always_true@4] |
+ | | DataSourceExec: partitions=1,
partition_sizes=[1]
|
| | ProjectionExec: expr=[count(Int64(1))@1 as
count(*), a@0 as a, true as __always_true] |
| | AggregateExec: mode=FinalPartitioned, gby=[a@0
as a], aggr=[count(Int64(1))] |
| | CoalesceBatchesExec: target_batch_size=8192
|
@@ -2880,7 +2878,7 @@ async fn test_count_wildcard_on_where_scalar_subquery()
-> Result<()> {
| | DataSourceExec: partitions=1,
partition_sizes=[1] |
| |
|
+---------------+---------------------------------------------------------------------------------------------------------------------------+
- "###
+ "
);
// In the same SessionContext, AliasGenerator will increase subquery_alias
id by 1
@@ -2910,7 +2908,7 @@ async fn test_count_wildcard_on_where_scalar_subquery()
-> Result<()> {
assert_snapshot!(
pretty_format_batches(&df_results).unwrap(),
- @r###"
+ @r"
+---------------+---------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan
|
+---------------+---------------------------------------------------------------------------------------------------------------------------+
@@ -2926,10 +2924,8 @@ async fn test_count_wildcard_on_where_scalar_subquery()
-> Result<()> {
| physical_plan | CoalesceBatchesExec: target_batch_size=8192
|
| | FilterExec: CASE WHEN __always_true@3 IS NULL THEN 0
ELSE count(*)@2 END > 0, projection=[a@0, b@1] |
| | CoalesceBatchesExec: target_batch_size=8192
|
- | | HashJoinExec: mode=Partitioned, join_type=Left,
on=[(a@0, a@1)], projection=[a@0, b@1, count(*)@2, __always_true@4] |
- | | CoalesceBatchesExec: target_batch_size=8192
|
- | | RepartitionExec: partitioning=Hash([a@0], 4),
input_partitions=1 |
- | | DataSourceExec: partitions=1,
partition_sizes=[1]
|
+ | | HashJoinExec: mode=CollectLeft, join_type=Left,
on=[(a@0, a@1)], projection=[a@0, b@1, count(*)@2, __always_true@4] |
+ | | DataSourceExec: partitions=1,
partition_sizes=[1]
|
| | ProjectionExec: expr=[count(*)@1 as count(*),
a@0 as a, true as __always_true] |
| | AggregateExec: mode=FinalPartitioned, gby=[a@0
as a], aggr=[count(*)] |
| | CoalesceBatchesExec: target_batch_size=8192
|
@@ -2939,7 +2935,7 @@ async fn test_count_wildcard_on_where_scalar_subquery()
-> Result<()> {
| | DataSourceExec: partitions=1,
partition_sizes=[1] |
| |
|
+---------------+---------------------------------------------------------------------------------------------------------------------------+
- "###
+ "
);
Ok(())
diff --git a/datafusion/core/tests/memory_limit/mod.rs
b/datafusion/core/tests/memory_limit/mod.rs
index 8f690edc54..6a0a797d4d 100644
--- a/datafusion/core/tests/memory_limit/mod.rs
+++ b/datafusion/core/tests/memory_limit/mod.rs
@@ -119,7 +119,7 @@ async fn join_by_key_multiple_partitions() {
TestCase::new()
.with_query("select t1.* from t t1 JOIN t t2 ON t1.service =
t2.service")
.with_expected_errors(vec![
- "Resources exhausted: Additional allocation failed with top memory
consumers (across reservations) as: HashJoinInput[0]",
+ "Resources exhausted: Additional allocation failed with top memory
consumers (across reservations) as: HashJoinInput",
])
.with_memory_limit(1_000)
.with_config(config)
@@ -156,7 +156,7 @@ async fn join_by_expression() {
#[tokio::test]
async fn cross_join() {
TestCase::new()
- .with_query("select t1.* from t t1 CROSS JOIN t t2")
+ .with_query("select t1.*, t2.* from t t1 CROSS JOIN t t2")
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed with top memory
consumers (across reservations) as: CrossJoinExec",
])
@@ -777,7 +777,7 @@ impl Scenario {
// Disabling physical optimizer rules to avoid sorts /
// repartitions (since RepartitionExec / SortExec also
// has a memory budget which we'll likely hit first)
- Some(vec![])
+ Some(vec![Arc::new(JoinSelection::new())])
}
Self::AccessLogStreaming => {
// Disable all physical optimizer rules except the
diff --git a/datafusion/physical-optimizer/src/join_selection.rs
b/datafusion/physical-optimizer/src/join_selection.rs
index 03bfb69788..5a772ccdd2 100644
--- a/datafusion/physical-optimizer/src/join_selection.rs
+++ b/datafusion/physical-optimizer/src/join_selection.rs
@@ -571,7 +571,8 @@ pub(crate) fn swap_join_according_to_unboundedness(
hash_join.swap_inputs(PartitionMode::CollectLeft)
}
(PartitionMode::Auto, _) => {
- internal_err!("Auto is not acceptable for unbounded input here.")
+ // Use `PartitionMode::Partitioned` as default if `Auto` is
selected.
+ hash_join.swap_inputs(PartitionMode::Partitioned)
}
}
}
diff --git a/datafusion/sqllogictest/test_files/explain_tree.slt
b/datafusion/sqllogictest/test_files/explain_tree.slt
index aaea05be76..7a0e322eb8 100644
--- a/datafusion/sqllogictest/test_files/explain_tree.slt
+++ b/datafusion/sqllogictest/test_files/explain_tree.slt
@@ -345,63 +345,68 @@ FROM
----
physical_plan
01)┌───────────────────────────┐
-02)│ CoalesceBatchesExec │
+02)│ ProjectionExec │
03)│ -------------------- │
-04)│ target_batch_size: │
-05)│ 8192 │
-06)└─────────────┬─────────────┘
-07)┌─────────────┴─────────────┐
-08)│ HashJoinExec │
-09)│ -------------------- │
-10)│ on: ├───────────────────────────────────────────┐
-11)│ (int_col = int_col) │ │
-12)└─────────────┬─────────────┘ │
-13)┌─────────────┴─────────────┐
┌─────────────┴─────────────┐
-14)│ CoalesceBatchesExec │ │
CoalesceBatchesExec │
-15)│ -------------------- │ │
-------------------- │
-16)│ target_batch_size: │ │
target_batch_size: │
-17)│ 8192 │ │ 8192
│
-18)└─────────────┬─────────────┘
└─────────────┬─────────────┘
-19)┌─────────────┴─────────────┐
┌─────────────┴─────────────┐
-20)│ HashJoinExec │ │
RepartitionExec │
-21)│ -------------------- │ │
-------------------- │
-22)│ on: │ │
output_partition_count: │
-23)│ (int_col = int_col) ├──────────────┐ │ 1
│
-24)│ │ │ │
│
-25)│ │ │ │
partitioning_scheme: │
-26)│ │ │ │
Hash([int_col@0], 4) │
-27)└─────────────┬─────────────┘ │
└─────────────┬─────────────┘
-28)┌─────────────┴─────────────┐┌─────────────┴─────────────┐┌─────────────┴─────────────┐
-29)│ CoalesceBatchesExec ││ CoalesceBatchesExec ││
DataSourceExec │
-30)│ -------------------- ││ -------------------- ││
-------------------- │
-31)│ target_batch_size: ││ target_batch_size: ││ bytes:
1560 │
-32)│ 8192 ││ 8192 ││ format:
memory │
-33)│ ││ ││ rows:
1 │
-34)└─────────────┬─────────────┘└─────────────┬─────────────┘└───────────────────────────┘
-35)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
-36)│ RepartitionExec ││ RepartitionExec │
-37)│ -------------------- ││ -------------------- │
-38)│ output_partition_count: ││ output_partition_count: │
-39)│ 4 ││ 4 │
-40)│ ││ │
-41)│ partitioning_scheme: ││ partitioning_scheme: │
-42)│ Hash([int_col@0], 4) ││ Hash([int_col@0], 4) │
-43)└─────────────┬─────────────┘└─────────────┬─────────────┘
-44)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
-45)│ RepartitionExec ││ RepartitionExec │
-46)│ -------------------- ││ -------------------- │
-47)│ output_partition_count: ││ output_partition_count: │
-48)│ 1 ││ 1 │
-49)│ ││ │
-50)│ partitioning_scheme: ││ partitioning_scheme: │
-51)│ RoundRobinBatch(4) ││ RoundRobinBatch(4) │
-52)└─────────────┬─────────────┘└─────────────┬─────────────┘
-53)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
-54)│ DataSourceExec ││ DataSourceExec │
-55)│ -------------------- ││ -------------------- │
-56)│ files: 1 ││ files: 1 │
-57)│ format: csv ││ format: parquet │
-58)└───────────────────────────┘└───────────────────────────┘
+04)│ date_col: date_col │
+05)│ │
+06)│ string_col: │
+07)│ string_col │
+08)└─────────────┬─────────────┘
+09)┌─────────────┴─────────────┐
+10)│ CoalesceBatchesExec │
+11)│ -------------------- │
+12)│ target_batch_size: │
+13)│ 8192 │
+14)└─────────────┬─────────────┘
+15)┌─────────────┴─────────────┐
+16)│ HashJoinExec │
+17)│ -------------------- │
+18)│ on: ├──────────────┐
+19)│ (int_col = int_col) │ │
+20)└─────────────┬─────────────┘ │
+21)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
+22)│ DataSourceExec ││ CoalesceBatchesExec │
+23)│ -------------------- ││ -------------------- │
+24)│ bytes: 1560 ││ target_batch_size: │
+25)│ format: memory ││ 8192 │
+26)│ rows: 1 ││ │
+27)└───────────────────────────┘└─────────────┬─────────────┘
+28)-----------------------------┌─────────────┴─────────────┐
+29)-----------------------------│ HashJoinExec │
+30)-----------------------------│ -------------------- │
+31)-----------------------------│ on: ├──────────────┐
+32)-----------------------------│ (int_col = int_col) │ │
+33)-----------------------------└─────────────┬─────────────┘ │
+34)-----------------------------┌─────────────┴─────────────┐┌─────────────┴─────────────┐
+35)-----------------------------│ CoalesceBatchesExec ││
CoalesceBatchesExec │
+36)-----------------------------│ -------------------- ││
-------------------- │
+37)-----------------------------│ target_batch_size: ││
target_batch_size: │
+38)-----------------------------│ 8192 ││ 8192
│
+39)-----------------------------└─────────────┬─────────────┘└─────────────┬─────────────┘
+40)-----------------------------┌─────────────┴─────────────┐┌─────────────┴─────────────┐
+41)-----------------------------│ RepartitionExec ││
RepartitionExec │
+42)-----------------------------│ -------------------- ││
-------------------- │
+43)-----------------------------│ output_partition_count: ││
output_partition_count: │
+44)-----------------------------│ 4 ││ 4
│
+45)-----------------------------│ ││
│
+46)-----------------------------│ partitioning_scheme: ││
partitioning_scheme: │
+47)-----------------------------│ Hash([int_col@0], 4) ││
Hash([int_col@0], 4) │
+48)-----------------------------└─────────────┬─────────────┘└─────────────┬─────────────┘
+49)-----------------------------┌─────────────┴─────────────┐┌─────────────┴─────────────┐
+50)-----------------------------│ RepartitionExec ││
RepartitionExec │
+51)-----------------------------│ -------------------- ││
-------------------- │
+52)-----------------------------│ output_partition_count: ││
output_partition_count: │
+53)-----------------------------│ 1 ││ 1
│
+54)-----------------------------│ ││
│
+55)-----------------------------│ partitioning_scheme: ││
partitioning_scheme: │
+56)-----------------------------│ RoundRobinBatch(4) ││
RoundRobinBatch(4) │
+57)-----------------------------└─────────────┬─────────────┘└─────────────┬─────────────┘
+58)-----------------------------┌─────────────┴─────────────┐┌─────────────┴─────────────┐
+59)-----------------------------│ DataSourceExec ││
DataSourceExec │
+60)-----------------------------│ -------------------- ││
-------------------- │
+61)-----------------------------│ files: 1 ││ files:
1 │
+62)-----------------------------│ format: csv ││ format:
parquet │
+63)-----------------------------└───────────────────────────┘└───────────────────────────┘
# Long Filter (demonstrate what happens with wrapping)
query TT
diff --git a/datafusion/sqllogictest/test_files/group_by.slt
b/datafusion/sqllogictest/test_files/group_by.slt
index d9ef12496e..4c4999a364 100644
--- a/datafusion/sqllogictest/test_files/group_by.slt
+++ b/datafusion/sqllogictest/test_files/group_by.slt
@@ -2023,7 +2023,7 @@ physical_plan
08)--------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
09)----------------ProjectionExec: expr=[col0@2 as col0, col1@3 as col1,
col2@4 as col2, col0@0 as col0, col1@1 as col1]
10)------------------CoalesceBatchesExec: target_batch_size=8192
-11)--------------------HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(col0@0, col0@0)]
+11)--------------------HashJoinExec: mode=CollectLeft, join_type=Inner,
on=[(col0@0, col0@0)]
12)----------------------DataSourceExec: partitions=1, partition_sizes=[3]
13)----------------------DataSourceExec: partitions=1, partition_sizes=[3]
diff --git a/datafusion/sqllogictest/test_files/join.slt.part
b/datafusion/sqllogictest/test_files/join.slt.part
index 2bbf31dad9..972dd22653 100644
--- a/datafusion/sqllogictest/test_files/join.slt.part
+++ b/datafusion/sqllogictest/test_files/join.slt.part
@@ -1373,26 +1373,21 @@ logical_plan
06)----TableScan: f projection=[a]
07)--TableScan: s projection=[b]
physical_plan
-01)CoalesceBatchesExec: target_batch_size=8192
-02)--HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col1@1, CAST(s.b AS
Int64)@1)], projection=[col0@0, col1@1, a@2, b@3]
-03)----ProjectionExec: expr=[col0@1 as col0, col1@2 as col1, a@0 as a]
-04)------CoalesceBatchesExec: target_batch_size=8192
-05)--------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(CAST(f.a AS
Int64)@1, col0@0)], projection=[a@0, col0@2, col1@3]
-06)----------CoalesceBatchesExec: target_batch_size=8192
-07)------------RepartitionExec: partitioning=Hash([CAST(f.a AS Int64)@1], 16),
input_partitions=1
-08)--------------ProjectionExec: expr=[a@0 as a, CAST(a@0 AS Int64) as
CAST(f.a AS Int64)]
-09)----------------DataSourceExec: partitions=1, partition_sizes=[1]
-10)----------CoalesceBatchesExec: target_batch_size=8192
-11)------------RepartitionExec: partitioning=Hash([col0@0], 16),
input_partitions=16
-12)--------------ProjectionExec: expr=[CAST(x@0 AS Int64) + 1 as col0,
CAST(y@1 AS Int64) + 1 as col1]
-13)----------------RepartitionExec: partitioning=RoundRobinBatch(16),
input_partitions=1
-14)------------------CoalesceBatchesExec: target_batch_size=8192
-15)--------------------FilterExec: y@1 = x@0
-16)----------------------DataSourceExec: partitions=1, partition_sizes=[1]
-17)----CoalesceBatchesExec: target_batch_size=8192
-18)------RepartitionExec: partitioning=Hash([CAST(s.b AS Int64)@1], 16),
input_partitions=1
-19)--------ProjectionExec: expr=[b@0 as b, CAST(b@0 AS Int64) as CAST(s.b AS
Int64)]
-20)----------DataSourceExec: partitions=1, partition_sizes=[1]
+01)ProjectionExec: expr=[col0@1 as col0, col1@2 as col1, a@3 as a, b@0 as b]
+02)--CoalesceBatchesExec: target_batch_size=8192
+03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(CAST(s.b AS
Int64)@1, col1@1)], projection=[b@0, col0@2, col1@3, a@4]
+04)------ProjectionExec: expr=[b@0 as b, CAST(b@0 AS Int64) as CAST(s.b AS
Int64)]
+05)--------DataSourceExec: partitions=1, partition_sizes=[1]
+06)------ProjectionExec: expr=[col0@1 as col0, col1@2 as col1, a@0 as a]
+07)--------CoalesceBatchesExec: target_batch_size=8192
+08)----------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(CAST(f.a AS
Int64)@1, col0@0)], projection=[a@0, col0@2, col1@3]
+09)------------ProjectionExec: expr=[a@0 as a, CAST(a@0 AS Int64) as CAST(f.a
AS Int64)]
+10)--------------DataSourceExec: partitions=1, partition_sizes=[1]
+11)------------ProjectionExec: expr=[CAST(x@0 AS Int64) + 1 as col0, CAST(y@1
AS Int64) + 1 as col1]
+12)--------------RepartitionExec: partitioning=RoundRobinBatch(16),
input_partitions=1
+13)----------------CoalesceBatchesExec: target_batch_size=8192
+14)------------------FilterExec: y@1 = x@0
+15)--------------------DataSourceExec: partitions=1, partition_sizes=[1]
statement ok
drop table pairs;
diff --git a/datafusion/sqllogictest/test_files/joins.slt
b/datafusion/sqllogictest/test_files/joins.slt
index 50af06dc40..ca86dbfcc3 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -1338,17 +1338,15 @@ logical_plan
04)------TableScan: join_t1 projection=[t1_id]
05)------TableScan: join_t2 projection=[t2_id]
physical_plan
-01)AggregateExec: mode=SinglePartitioned, gby=[t1_id@0 as t1_id], aggr=[]
+01)AggregateExec: mode=FinalPartitioned, gby=[t1_id@0 as t1_id], aggr=[]
02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0,
t2_id@0)], projection=[t1_id@0]
-04)------CoalesceBatchesExec: target_batch_size=2
-05)--------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
-06)----------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
+03)----RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
+04)------AggregateExec: mode=Partial, gby=[t1_id@0 as t1_id], aggr=[]
+05)--------CoalesceBatchesExec: target_batch_size=2
+06)----------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t1_id@0,
t2_id@0)], projection=[t1_id@0]
07)------------DataSourceExec: partitions=1, partition_sizes=[1]
-08)------CoalesceBatchesExec: target_batch_size=2
-09)--------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
-10)----------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
-11)------------DataSourceExec: partitions=1, partition_sizes=[1]
+08)------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
+09)--------------DataSourceExec: partitions=1, partition_sizes=[1]
# Join on struct
query TT
@@ -1362,15 +1360,10 @@ logical_plan
03)--TableScan: join_t4 projection=[s4]
physical_plan
01)CoalesceBatchesExec: target_batch_size=2
-02)--HashJoinExec: mode=Partitioned, join_type=Inner, on=[(s3@0, s4@0)]
-03)----CoalesceBatchesExec: target_batch_size=2
-04)------RepartitionExec: partitioning=Hash([s3@0], 2), input_partitions=2
-05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-06)----------DataSourceExec: partitions=1, partition_sizes=[1]
-07)----CoalesceBatchesExec: target_batch_size=2
-08)------RepartitionExec: partitioning=Hash([s4@0], 2), input_partitions=2
-09)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-10)----------DataSourceExec: partitions=1, partition_sizes=[1]
+02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(s3@0, s4@0)]
+03)----DataSourceExec: partitions=1, partition_sizes=[1]
+04)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+05)------DataSourceExec: partitions=1, partition_sizes=[1]
query ??
select join_t3.s3, join_t4.s4
@@ -1403,17 +1396,15 @@ logical_plan
06)--------TableScan: join_t2 projection=[t2_id]
physical_plan
01)ProjectionExec: expr=[count(Int64(1))@1 as count(*)]
-02)--AggregateExec: mode=SinglePartitioned, gby=[t1_id@0 as t1_id],
aggr=[count(Int64(1))]
+02)--AggregateExec: mode=FinalPartitioned, gby=[t1_id@0 as t1_id],
aggr=[count(Int64(1))]
03)----CoalesceBatchesExec: target_batch_size=2
-04)------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0,
t2_id@0)], projection=[t1_id@0]
-05)--------CoalesceBatchesExec: target_batch_size=2
-06)----------RepartitionExec: partitioning=Hash([t1_id@0], 2),
input_partitions=2
-07)------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
+04)------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
+05)--------AggregateExec: mode=Partial, gby=[t1_id@0 as t1_id],
aggr=[count(Int64(1))]
+06)----------CoalesceBatchesExec: target_batch_size=2
+07)------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t1_id@0,
t2_id@0)], projection=[t1_id@0]
08)--------------DataSourceExec: partitions=1, partition_sizes=[1]
-09)--------CoalesceBatchesExec: target_batch_size=2
-10)----------RepartitionExec: partitioning=Hash([t2_id@0], 2),
input_partitions=2
-11)------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
-12)--------------DataSourceExec: partitions=1, partition_sizes=[1]
+09)--------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
+10)----------------DataSourceExec: partitions=1, partition_sizes=[1]
query TT
EXPLAIN
@@ -1434,17 +1425,15 @@ physical_plan
02)--AggregateExec: mode=Final, gby=[], aggr=[count(alias1)]
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(alias1)]
-05)--------AggregateExec: mode=SinglePartitioned, gby=[t1_id@0 as alias1],
aggr=[]
+05)--------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1],
aggr=[]
06)----------CoalesceBatchesExec: target_batch_size=2
-07)------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0,
t2_id@0)], projection=[t1_id@0]
-08)--------------CoalesceBatchesExec: target_batch_size=2
-09)----------------RepartitionExec: partitioning=Hash([t1_id@0], 2),
input_partitions=2
-10)------------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
+07)------------RepartitionExec: partitioning=Hash([alias1@0], 2),
input_partitions=2
+08)--------------AggregateExec: mode=Partial, gby=[t1_id@0 as alias1], aggr=[]
+09)----------------CoalesceBatchesExec: target_batch_size=2
+10)------------------HashJoinExec: mode=CollectLeft, join_type=Inner,
on=[(t1_id@0, t2_id@0)], projection=[t1_id@0]
11)--------------------DataSourceExec: partitions=1, partition_sizes=[1]
-12)--------------CoalesceBatchesExec: target_batch_size=2
-13)----------------RepartitionExec: partitioning=Hash([t2_id@0], 2),
input_partitions=2
-14)------------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
-15)--------------------DataSourceExec: partitions=1, partition_sizes=[1]
+12)--------------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
+13)----------------------DataSourceExec: partitions=1, partition_sizes=[1]
statement ok
set datafusion.explain.logical_plan_only = true;
@@ -1530,17 +1519,14 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as
t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int,
CAST(t1_id@0 AS Int64) + 11 as join_t1.t1_id + Int64(11)]
02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t1.t1_id +
Int64(11)@3, CAST(join_t2.t2_id AS Int64)@3)], projection=[t1_id@0, t1_name@1,
t1_int@2, t2_id@4, t2_name@5, t2_int@6]
-04)------CoalesceBatchesExec: target_batch_size=2
-05)--------RepartitionExec: partitioning=Hash([join_t1.t1_id + Int64(11)@3],
2), input_partitions=2
-06)----------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name,
t1_int@2 as t1_int, CAST(t1_id@0 AS Int64) + 11 as join_t1.t1_id + Int64(11)]
-07)------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
-08)--------------DataSourceExec: partitions=1, partition_sizes=[1]
-09)------CoalesceBatchesExec: target_batch_size=2
-10)--------RepartitionExec: partitioning=Hash([CAST(join_t2.t2_id AS
Int64)@3], 2), input_partitions=2
-11)----------ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name,
t2_int@2 as t2_int, CAST(t2_id@0 AS Int64) as CAST(join_t2.t2_id AS Int64)]
-12)------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
-13)--------------DataSourceExec: partitions=1, partition_sizes=[1]
+03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t1.t1_id +
Int64(11)@3, CAST(join_t2.t2_id AS Int64)@3)], projection=[t1_id@0, t1_name@1,
t1_int@2, t2_id@4, t2_name@5, t2_int@6]
+04)------CoalescePartitionsExec
+05)--------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name,
t1_int@2 as t1_int, CAST(t1_id@0 AS Int64) + 11 as join_t1.t1_id + Int64(11)]
+06)----------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
+07)------------DataSourceExec: partitions=1, partition_sizes=[1]
+08)------ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name,
t2_int@2 as t2_int, CAST(t2_id@0 AS Int64) as CAST(join_t2.t2_id AS Int64)]
+09)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+10)----------DataSourceExec: partitions=1, partition_sizes=[1]
# Both side expr key inner join
@@ -1587,17 +1573,14 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[t1_id@1 as t1_id, t2_id@0 as t2_id, t1_name@2 as
t1_name]
02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t2.t2_id +
UInt32(1)@1, join_t1.t1_id + UInt32(12)@2)], projection=[t2_id@0, t1_id@2,
t1_name@3]
-04)------CoalesceBatchesExec: target_batch_size=2
-05)--------RepartitionExec: partitioning=Hash([join_t2.t2_id + UInt32(1)@1],
2), input_partitions=2
-06)----------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as
join_t2.t2_id + UInt32(1)]
-07)------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
-08)--------------DataSourceExec: partitions=1, partition_sizes=[1]
-09)------CoalesceBatchesExec: target_batch_size=2
-10)--------RepartitionExec: partitioning=Hash([join_t1.t1_id + UInt32(12)@2],
2), input_partitions=2
-11)----------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name,
t1_id@0 + 12 as join_t1.t1_id + UInt32(12)]
-12)------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
-13)--------------DataSourceExec: partitions=1, partition_sizes=[1]
+03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t2.t2_id +
UInt32(1)@1, join_t1.t1_id + UInt32(12)@2)], projection=[t2_id@0, t1_id@2,
t1_name@3]
+04)------CoalescePartitionsExec
+05)--------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as
join_t2.t2_id + UInt32(1)]
+06)----------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
+07)------------DataSourceExec: partitions=1, partition_sizes=[1]
+08)------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0
+ 12 as join_t1.t1_id + UInt32(12)]
+09)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+10)----------DataSourceExec: partitions=1, partition_sizes=[1]
# Left side expr key inner join
@@ -1643,16 +1626,11 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[t1_id@1 as t1_id, t2_id@0 as t2_id, t1_name@2 as
t1_name]
02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t2_id@0,
join_t1.t1_id + UInt32(11)@2)], projection=[t2_id@0, t1_id@1, t1_name@2]
-04)------CoalesceBatchesExec: target_batch_size=2
-05)--------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
-06)----------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
-07)------------DataSourceExec: partitions=1, partition_sizes=[1]
-08)------CoalesceBatchesExec: target_batch_size=2
-09)--------RepartitionExec: partitioning=Hash([join_t1.t1_id + UInt32(11)@2],
2), input_partitions=2
-10)----------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name,
t1_id@0 + 11 as join_t1.t1_id + UInt32(11)]
-11)------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
-12)--------------DataSourceExec: partitions=1, partition_sizes=[1]
+03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t2_id@0,
join_t1.t1_id + UInt32(11)@2)], projection=[t2_id@0, t1_id@1, t1_name@2]
+04)------DataSourceExec: partitions=1, partition_sizes=[1]
+05)------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0
+ 11 as join_t1.t1_id + UInt32(11)]
+06)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+07)----------DataSourceExec: partitions=1, partition_sizes=[1]
# Right side expr key inner join
@@ -1700,16 +1678,13 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[t1_id@1 as t1_id, t2_id@0 as t2_id, t1_name@2 as
t1_name]
02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t2.t2_id -
UInt32(11)@1, t1_id@0)], projection=[t2_id@0, t1_id@2, t1_name@3]
-04)------CoalesceBatchesExec: target_batch_size=2
-05)--------RepartitionExec: partitioning=Hash([join_t2.t2_id - UInt32(11)@1],
2), input_partitions=2
-06)----------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as
join_t2.t2_id - UInt32(11)]
-07)------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
-08)--------------DataSourceExec: partitions=1, partition_sizes=[1]
-09)------CoalesceBatchesExec: target_batch_size=2
-10)--------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
-11)----------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
-12)------------DataSourceExec: partitions=1, partition_sizes=[1]
+03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t2.t2_id -
UInt32(11)@1, t1_id@0)], projection=[t2_id@0, t1_id@2, t1_name@3]
+04)------CoalescePartitionsExec
+05)--------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as
join_t2.t2_id - UInt32(11)]
+06)----------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
+07)------------DataSourceExec: partitions=1, partition_sizes=[1]
+08)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+09)--------DataSourceExec: partitions=1, partition_sizes=[1]
# Select wildcard with expr key inner join
@@ -1751,16 +1726,11 @@ logical_plan
03)--TableScan: join_t2 projection=[t2_id, t2_name, t2_int]
physical_plan
01)CoalesceBatchesExec: target_batch_size=2
-02)--HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0,
join_t2.t2_id - UInt32(11)@3)], projection=[t1_id@0, t1_name@1, t1_int@2,
t2_id@3, t2_name@4, t2_int@5]
-03)----CoalesceBatchesExec: target_batch_size=2
-04)------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
-05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-06)----------DataSourceExec: partitions=1, partition_sizes=[1]
-07)----CoalesceBatchesExec: target_batch_size=2
-08)------RepartitionExec: partitioning=Hash([join_t2.t2_id - UInt32(11)@3],
2), input_partitions=2
-09)--------ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name,
t2_int@2 as t2_int, t2_id@0 - 11 as join_t2.t2_id - UInt32(11)]
-10)----------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
-11)------------DataSourceExec: partitions=1, partition_sizes=[1]
+02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t1_id@0,
join_t2.t2_id - UInt32(11)@3)], projection=[t1_id@0, t1_name@1, t1_int@2,
t2_id@3, t2_name@4, t2_int@5]
+03)----DataSourceExec: partitions=1, partition_sizes=[1]
+04)----ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2
as t2_int, t2_id@0 - 11 as join_t2.t2_id - UInt32(11)]
+05)------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+06)--------DataSourceExec: partitions=1, partition_sizes=[1]
#####
# Config teardown
@@ -2598,15 +2568,10 @@ logical_plan
05)----TableScan: test_timestamps_tz_table projection=[nanos, micros, millis,
secs, names]
physical_plan
01)CoalesceBatchesExec: target_batch_size=2
-02)--HashJoinExec: mode=Partitioned, join_type=Inner, on=[(millis@2, millis@2)]
-03)----CoalesceBatchesExec: target_batch_size=2
-04)------RepartitionExec: partitioning=Hash([millis@2], 2), input_partitions=2
-05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-06)----------DataSourceExec: partitions=1, partition_sizes=[1]
-07)----CoalesceBatchesExec: target_batch_size=2
-08)------RepartitionExec: partitioning=Hash([millis@2], 2), input_partitions=2
-09)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-10)----------DataSourceExec: partitions=1, partition_sizes=[1]
+02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(millis@2, millis@2)]
+03)----DataSourceExec: partitions=1, partition_sizes=[1]
+04)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+05)------DataSourceExec: partitions=1, partition_sizes=[1]
# left_join_using_2
query II
@@ -2869,16 +2834,11 @@ explain SELECT t1_id, t1_name FROM
left_semi_anti_join_table_t1 t1 WHERE t1_id I
physical_plan
01)SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0,
t1_id@0)]
-04)------CoalesceBatchesExec: target_batch_size=2
-05)--------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
-06)----------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
-07)------------DataSourceExec: partitions=1, partition_sizes=[1]
-08)------SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
-09)--------CoalesceBatchesExec: target_batch_size=2
-10)----------RepartitionExec: partitioning=Hash([t1_id@0], 2),
input_partitions=2
-11)------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
-12)--------------DataSourceExec: partitions=1, partition_sizes=[1]
+03)----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0,
t1_id@0)]
+04)------DataSourceExec: partitions=1, partition_sizes=[1]
+05)------SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
+06)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+07)----------DataSourceExec: partitions=1, partition_sizes=[1]
query IT rowsort
SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id IN
(SELECT t2_id FROM left_semi_anti_join_table_t2 t2) ORDER BY t1_id
@@ -2910,16 +2870,11 @@ explain SELECT t1_id, t1_name FROM
left_semi_anti_join_table_t1 t1 LEFT SEMI JOI
physical_plan
01)SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0,
t1_id@0)]
-04)------CoalesceBatchesExec: target_batch_size=2
-05)--------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
-06)----------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
-07)------------DataSourceExec: partitions=1, partition_sizes=[1]
-08)------SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
-09)--------CoalesceBatchesExec: target_batch_size=2
-10)----------RepartitionExec: partitioning=Hash([t1_id@0], 2),
input_partitions=2
-11)------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
-12)--------------DataSourceExec: partitions=1, partition_sizes=[1]
+03)----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0,
t1_id@0)]
+04)------DataSourceExec: partitions=1, partition_sizes=[1]
+05)------SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
+06)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+07)----------DataSourceExec: partitions=1, partition_sizes=[1]
query IT
SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 LEFT SEMI JOIN
left_semi_anti_join_table_t2 t2 ON (t1_id = t2_id) ORDER BY t1_id
@@ -3066,16 +3021,11 @@ explain SELECT t1_id, t1_name, t1_int FROM
right_semi_anti_join_table_t1 t1 WHER
physical_plan
01)SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0,
t1_id@0)], filter=t2_name@1 != t1_name@0
-04)------CoalesceBatchesExec: target_batch_size=2
-05)--------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
-06)----------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
-07)------------DataSourceExec: partitions=1, partition_sizes=[1]
-08)------SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
-09)--------CoalesceBatchesExec: target_batch_size=2
-10)----------RepartitionExec: partitioning=Hash([t1_id@0], 2),
input_partitions=2
-11)------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
-12)--------------DataSourceExec: partitions=1, partition_sizes=[1]
+03)----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0,
t1_id@0)], filter=t2_name@1 != t1_name@0
+04)------DataSourceExec: partitions=1, partition_sizes=[1]
+05)------SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
+06)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+07)----------DataSourceExec: partitions=1, partition_sizes=[1]
query ITI rowsort
SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1 WHERE
EXISTS (SELECT * FROM right_semi_anti_join_table_t2 t2 where t2.t2_id =
t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id
@@ -3088,16 +3038,11 @@ explain SELECT t1_id, t1_name, t1_int FROM
right_semi_anti_join_table_t2 t2 RIGH
physical_plan
01)SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0,
t1_id@0)], filter=t2_name@0 != t1_name@1
-04)------CoalesceBatchesExec: target_batch_size=2
-05)--------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
-06)----------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
-07)------------DataSourceExec: partitions=1, partition_sizes=[1]
-08)------SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
-09)--------CoalesceBatchesExec: target_batch_size=2
-10)----------RepartitionExec: partitioning=Hash([t1_id@0], 2),
input_partitions=2
-11)------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
-12)--------------DataSourceExec: partitions=1, partition_sizes=[1]
+03)----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0,
t1_id@0)], filter=t2_name@0 != t1_name@1
+04)------DataSourceExec: partitions=1, partition_sizes=[1]
+05)------SortExec: expr=[t1_id@0 ASC NULLS LAST], preserve_partitioning=[true]
+06)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+07)----------DataSourceExec: partitions=1, partition_sizes=[1]
query ITI rowsort
SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t2 t2 RIGHT SEMI
JOIN right_semi_anti_join_table_t1 t1 on (t2.t2_id = t1.t1_id and t2.t2_name <>
t1.t1_name) ORDER BY t1_id
@@ -3668,18 +3613,14 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[c@2 as c, d@3 as d, e@0 as e, f@1 as f]
02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=Partitioned, join_type=Full, on=[(e@0, c@0)]
-04)------CoalesceBatchesExec: target_batch_size=2
-05)--------RepartitionExec: partitioning=Hash([e@0], 2), input_partitions=1
-06)----------ProjectionExec: expr=[1 as e, 3 as f]
-07)------------PlaceholderRowExec
-08)------CoalesceBatchesExec: target_batch_size=2
-09)--------RepartitionExec: partitioning=Hash([c@0], 2), input_partitions=2
-10)----------UnionExec
-11)------------ProjectionExec: expr=[1 as c, 2 as d]
-12)--------------PlaceholderRowExec
-13)------------ProjectionExec: expr=[1 as c, 3 as d]
-14)--------------PlaceholderRowExec
+03)----HashJoinExec: mode=CollectLeft, join_type=Full, on=[(e@0, c@0)]
+04)------ProjectionExec: expr=[1 as e, 3 as f]
+05)--------PlaceholderRowExec
+06)------UnionExec
+07)--------ProjectionExec: expr=[1 as c, 2 as d]
+08)----------PlaceholderRowExec
+09)--------ProjectionExec: expr=[1 as c, 3 as d]
+10)----------PlaceholderRowExec
query IIII rowsort
SELECT * FROM (
@@ -4484,18 +4425,15 @@ logical_plan
physical_plan
01)SortPreservingMergeExec: [c@2 DESC]
02)--CoalesceBatchesExec: target_batch_size=3
-03)----HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(a@0, a@0)]
-04)------CoalesceBatchesExec: target_batch_size=3
-05)--------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2
-06)----------CoalesceBatchesExec: target_batch_size=3
-07)------------FilterExec: b@1 > 3, projection=[a@0]
-08)--------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
-09)----------------DataSourceExec: partitions=1, partition_sizes=[1]
-10)------SortExec: expr=[c@2 DESC], preserve_partitioning=[true]
-11)--------CoalesceBatchesExec: target_batch_size=3
-12)----------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2
-13)------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
-14)--------------DataSourceExec: partitions=1, partition_sizes=[1]
+03)----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(a@0, a@0)]
+04)------CoalescePartitionsExec
+05)--------CoalesceBatchesExec: target_batch_size=3
+06)----------FilterExec: b@1 > 3, projection=[a@0]
+07)------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
+08)--------------DataSourceExec: partitions=1, partition_sizes=[1]
+09)------SortExec: expr=[c@2 DESC], preserve_partitioning=[true]
+10)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+11)----------DataSourceExec: partitions=1, partition_sizes=[1]
query TT
explain select * from test where a in (select a from test where b > 3) order
by c desc nulls last;
@@ -4511,18 +4449,15 @@ logical_plan
physical_plan
01)SortPreservingMergeExec: [c@2 DESC NULLS LAST]
02)--CoalesceBatchesExec: target_batch_size=3
-03)----HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(a@0, a@0)]
-04)------CoalesceBatchesExec: target_batch_size=3
-05)--------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2
-06)----------CoalesceBatchesExec: target_batch_size=3
-07)------------FilterExec: b@1 > 3, projection=[a@0]
-08)--------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
-09)----------------DataSourceExec: partitions=1, partition_sizes=[1]
-10)------SortExec: expr=[c@2 DESC NULLS LAST], preserve_partitioning=[true]
-11)--------CoalesceBatchesExec: target_batch_size=3
-12)----------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2
-13)------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
-14)--------------DataSourceExec: partitions=1, partition_sizes=[1]
+03)----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(a@0, a@0)]
+04)------CoalescePartitionsExec
+05)--------CoalesceBatchesExec: target_batch_size=3
+06)----------FilterExec: b@1 > 3, projection=[a@0]
+07)------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
+08)--------------DataSourceExec: partitions=1, partition_sizes=[1]
+09)------SortExec: expr=[c@2 DESC NULLS LAST], preserve_partitioning=[true]
+10)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+11)----------DataSourceExec: partitions=1, partition_sizes=[1]
query III
select * from test where a in (select a from test where b > 3) order by c desc
nulls first;
diff --git a/datafusion/sqllogictest/test_files/predicates.slt
b/datafusion/sqllogictest/test_files/predicates.slt
index 8bf1caa003..b263e39f3b 100644
--- a/datafusion/sqllogictest/test_files/predicates.slt
+++ b/datafusion/sqllogictest/test_files/predicates.slt
@@ -760,23 +760,22 @@ logical_plan
10)------TableScan: partsupp projection=[ps_partkey, ps_suppkey]
physical_plan
01)AggregateExec: mode=SinglePartitioned, gby=[p_partkey@2 as p_partkey],
aggr=[sum(lineitem.l_extendedprice), avg(lineitem.l_discount), count(DISTINCT
partsupp.ps_suppkey)]
-02)--CoalesceBatchesExec: target_batch_size=8192
-03)----HashJoinExec: mode=Partitioned, join_type=Inner, on=[(p_partkey@2,
ps_partkey@0)], projection=[l_extendedprice@0, l_discount@1, p_partkey@2,
ps_suppkey@4]
-04)------CoalesceBatchesExec: target_batch_size=8192
-05)--------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(l_partkey@0,
p_partkey@0)], projection=[l_extendedprice@1, l_discount@2, p_partkey@3]
-06)----------CoalesceBatchesExec: target_batch_size=8192
-07)------------RepartitionExec: partitioning=Hash([l_partkey@0], 4),
input_partitions=4
-08)--------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-09)----------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/lineitem.csv]]},
projection=[l_partkey, l_extendedprice, l_discount], file_type=csv,
has_header=true
-10)----------CoalesceBatchesExec: target_batch_size=8192
-11)------------RepartitionExec: partitioning=Hash([p_partkey@0], 4),
input_partitions=4
-12)--------------CoalesceBatchesExec: target_batch_size=8192
-13)----------------FilterExec: p_brand@1 = Brand#12 OR p_brand@1 = Brand#23,
projection=[p_partkey@0]
-14)------------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-15)--------------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/part.csv]]},
projection=[p_partkey, p_brand], file_type=csv, has_header=true
-16)------CoalesceBatchesExec: target_batch_size=8192
-17)--------RepartitionExec: partitioning=Hash([ps_partkey@0], 4),
input_partitions=1
-18)----------DataSourceExec: partitions=1, partition_sizes=[1]
+02)--ProjectionExec: expr=[l_extendedprice@1 as l_extendedprice, l_discount@2
as l_discount, p_partkey@3 as p_partkey, ps_suppkey@0 as ps_suppkey]
+03)----CoalesceBatchesExec: target_batch_size=8192
+04)------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(ps_partkey@0,
p_partkey@2)], projection=[ps_suppkey@1, l_extendedprice@2, l_discount@3,
p_partkey@4]
+05)--------DataSourceExec: partitions=1, partition_sizes=[1]
+06)--------CoalesceBatchesExec: target_batch_size=8192
+07)----------HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(l_partkey@0, p_partkey@0)], projection=[l_extendedprice@1, l_discount@2,
p_partkey@3]
+08)------------CoalesceBatchesExec: target_batch_size=8192
+09)--------------RepartitionExec: partitioning=Hash([l_partkey@0], 4),
input_partitions=4
+10)----------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+11)------------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/lineitem.csv]]},
projection=[l_partkey, l_extendedprice, l_discount], file_type=csv,
has_header=true
+12)------------CoalesceBatchesExec: target_batch_size=8192
+13)--------------RepartitionExec: partitioning=Hash([p_partkey@0], 4),
input_partitions=4
+14)----------------CoalesceBatchesExec: target_batch_size=8192
+15)------------------FilterExec: p_brand@1 = Brand#12 OR p_brand@1 = Brand#23,
projection=[p_partkey@0]
+16)--------------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+17)----------------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/part.csv]]},
projection=[p_partkey, p_brand], file_type=csv, has_header=true
# Inlist simplification
diff --git a/datafusion/sqllogictest/test_files/subquery.slt
b/datafusion/sqllogictest/test_files/subquery.slt
index 4c1565c7f0..aaccaaa43c 100644
--- a/datafusion/sqllogictest/test_files/subquery.slt
+++ b/datafusion/sqllogictest/test_files/subquery.slt
@@ -202,18 +202,17 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[t1_id@1 as t1_id, sum(t2.t2_int)@0 as t2_sum]
02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=Partitioned, join_type=Right, on=[(t2_id@1,
t1_id@0)], projection=[sum(t2.t2_int)@0, t1_id@2]
-04)------ProjectionExec: expr=[sum(t2.t2_int)@1 as sum(t2.t2_int), t2_id@0 as
t2_id]
-05)--------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id],
aggr=[sum(t2.t2_int)]
-06)----------CoalesceBatchesExec: target_batch_size=2
-07)------------RepartitionExec: partitioning=Hash([t2_id@0], 4),
input_partitions=4
-08)--------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id],
aggr=[sum(t2.t2_int)]
-09)----------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-10)------------------DataSourceExec: partitions=1, partition_sizes=[1]
-11)------CoalesceBatchesExec: target_batch_size=2
-12)--------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
-13)----------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-14)------------DataSourceExec: partitions=1, partition_sizes=[1]
+03)----HashJoinExec: mode=CollectLeft, join_type=Right, on=[(t2_id@1,
t1_id@0)], projection=[sum(t2.t2_int)@0, t1_id@2]
+04)------CoalescePartitionsExec
+05)--------ProjectionExec: expr=[sum(t2.t2_int)@1 as sum(t2.t2_int), t2_id@0
as t2_id]
+06)----------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id],
aggr=[sum(t2.t2_int)]
+07)------------CoalesceBatchesExec: target_batch_size=2
+08)--------------RepartitionExec: partitioning=Hash([t2_id@0], 4),
input_partitions=4
+09)----------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id],
aggr=[sum(t2.t2_int)]
+10)------------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+11)--------------------DataSourceExec: partitions=1, partition_sizes=[1]
+12)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+13)--------DataSourceExec: partitions=1, partition_sizes=[1]
query II rowsort
SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id) as t2_sum
from t1
@@ -238,18 +237,17 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[t1_id@1 as t1_id, sum(t2.t2_int * Float64(1)) +
Int64(1)@0 as t2_sum]
02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=Partitioned, join_type=Right, on=[(t2_id@1,
t1_id@0)], projection=[sum(t2.t2_int * Float64(1)) + Int64(1)@0, t1_id@2]
-04)------ProjectionExec: expr=[sum(t2.t2_int * Float64(1))@1 + 1 as
sum(t2.t2_int * Float64(1)) + Int64(1), t2_id@0 as t2_id]
-05)--------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id],
aggr=[sum(t2.t2_int * Float64(1))]
-06)----------CoalesceBatchesExec: target_batch_size=2
-07)------------RepartitionExec: partitioning=Hash([t2_id@0], 4),
input_partitions=4
-08)--------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id],
aggr=[sum(t2.t2_int * Float64(1))]
-09)----------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-10)------------------DataSourceExec: partitions=1, partition_sizes=[1]
-11)------CoalesceBatchesExec: target_batch_size=2
-12)--------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
-13)----------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-14)------------DataSourceExec: partitions=1, partition_sizes=[1]
+03)----HashJoinExec: mode=CollectLeft, join_type=Right, on=[(t2_id@1,
t1_id@0)], projection=[sum(t2.t2_int * Float64(1)) + Int64(1)@0, t1_id@2]
+04)------CoalescePartitionsExec
+05)--------ProjectionExec: expr=[sum(t2.t2_int * Float64(1))@1 + 1 as
sum(t2.t2_int * Float64(1)) + Int64(1), t2_id@0 as t2_id]
+06)----------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id],
aggr=[sum(t2.t2_int * Float64(1))]
+07)------------CoalesceBatchesExec: target_batch_size=2
+08)--------------RepartitionExec: partitioning=Hash([t2_id@0], 4),
input_partitions=4
+09)----------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id],
aggr=[sum(t2.t2_int * Float64(1))]
+10)------------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+11)--------------------DataSourceExec: partitions=1, partition_sizes=[1]
+12)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+13)--------DataSourceExec: partitions=1, partition_sizes=[1]
query IR rowsort
SELECT t1_id, (SELECT sum(t2_int * 1.0) + 1 FROM t2 WHERE t2.t2_id = t1.t1_id)
as t2_sum from t1
@@ -274,18 +272,17 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[t1_id@1 as t1_id, sum(t2.t2_int)@0 as t2_sum]
02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=Partitioned, join_type=Right, on=[(t2_id@1,
t1_id@0)], projection=[sum(t2.t2_int)@0, t1_id@2]
-04)------ProjectionExec: expr=[sum(t2.t2_int)@1 as sum(t2.t2_int), t2_id@0 as
t2_id]
-05)--------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id],
aggr=[sum(t2.t2_int)]
-06)----------CoalesceBatchesExec: target_batch_size=2
-07)------------RepartitionExec: partitioning=Hash([t2_id@0], 4),
input_partitions=4
-08)--------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id],
aggr=[sum(t2.t2_int)]
-09)----------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-10)------------------DataSourceExec: partitions=1, partition_sizes=[1]
-11)------CoalesceBatchesExec: target_batch_size=2
-12)--------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
-13)----------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-14)------------DataSourceExec: partitions=1, partition_sizes=[1]
+03)----HashJoinExec: mode=CollectLeft, join_type=Right, on=[(t2_id@1,
t1_id@0)], projection=[sum(t2.t2_int)@0, t1_id@2]
+04)------CoalescePartitionsExec
+05)--------ProjectionExec: expr=[sum(t2.t2_int)@1 as sum(t2.t2_int), t2_id@0
as t2_id]
+06)----------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id],
aggr=[sum(t2.t2_int)]
+07)------------CoalesceBatchesExec: target_batch_size=2
+08)--------------RepartitionExec: partitioning=Hash([t2_id@0], 4),
input_partitions=4
+09)----------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id],
aggr=[sum(t2.t2_int)]
+10)------------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+11)--------------------DataSourceExec: partitions=1, partition_sizes=[1]
+12)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+13)--------DataSourceExec: partitions=1, partition_sizes=[1]
query II rowsort
SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id group by
t2_id, 'a') as t2_sum from t1
@@ -311,20 +308,19 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[t1_id@1 as t1_id, sum(t2.t2_int)@0 as t2_sum]
02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=Partitioned, join_type=Right, on=[(t2_id@1,
t1_id@0)], projection=[sum(t2.t2_int)@0, t1_id@2]
-04)------ProjectionExec: expr=[sum(t2.t2_int)@1 as sum(t2.t2_int), t2_id@0 as
t2_id]
-05)--------CoalesceBatchesExec: target_batch_size=2
-06)----------FilterExec: sum(t2.t2_int)@1 < 3
-07)------------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id],
aggr=[sum(t2.t2_int)]
-08)--------------CoalesceBatchesExec: target_batch_size=2
-09)----------------RepartitionExec: partitioning=Hash([t2_id@0], 4),
input_partitions=4
-10)------------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id],
aggr=[sum(t2.t2_int)]
-11)--------------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-12)----------------------DataSourceExec: partitions=1, partition_sizes=[1]
-13)------CoalesceBatchesExec: target_batch_size=2
-14)--------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
-15)----------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-16)------------DataSourceExec: partitions=1, partition_sizes=[1]
+03)----HashJoinExec: mode=CollectLeft, join_type=Right, on=[(t2_id@1,
t1_id@0)], projection=[sum(t2.t2_int)@0, t1_id@2]
+04)------CoalescePartitionsExec
+05)--------ProjectionExec: expr=[sum(t2.t2_int)@1 as sum(t2.t2_int), t2_id@0
as t2_id]
+06)----------CoalesceBatchesExec: target_batch_size=2
+07)------------FilterExec: sum(t2.t2_int)@1 < 3
+08)--------------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id],
aggr=[sum(t2.t2_int)]
+09)----------------CoalesceBatchesExec: target_batch_size=2
+10)------------------RepartitionExec: partitioning=Hash([t2_id@0], 4),
input_partitions=4
+11)--------------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id],
aggr=[sum(t2.t2_int)]
+12)----------------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+13)------------------------DataSourceExec: partitions=1, partition_sizes=[1]
+14)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+15)--------DataSourceExec: partitions=1, partition_sizes=[1]
query II rowsort
SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id having
sum(t2_int) < 3) as t2_sum from t1
@@ -1156,15 +1152,10 @@ physical_plan
01)CoalesceBatchesExec: target_batch_size=2
02)--FilterExec: t1_id@0 > 40 OR NOT mark@3, projection=[t1_id@0, t1_name@1,
t1_int@2]
03)----CoalesceBatchesExec: target_batch_size=2
-04)------HashJoinExec: mode=Partitioned, join_type=LeftMark, on=[(t1_id@0,
t2_id@0)]
-05)--------CoalesceBatchesExec: target_batch_size=2
-06)----------RepartitionExec: partitioning=Hash([t1_id@0], 4),
input_partitions=4
-07)------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-08)--------------DataSourceExec: partitions=1, partition_sizes=[1]
-09)--------CoalesceBatchesExec: target_batch_size=2
-10)----------RepartitionExec: partitioning=Hash([t2_id@0], 4),
input_partitions=4
-11)------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-12)--------------DataSourceExec: partitions=1, partition_sizes=[1]
+04)------HashJoinExec: mode=CollectLeft, join_type=LeftMark, on=[(t1_id@0,
t2_id@0)]
+05)--------DataSourceExec: partitions=1, partition_sizes=[1]
+06)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+07)----------DataSourceExec: partitions=1, partition_sizes=[1]
statement ok
set datafusion.explain.logical_plan_only = true;
diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q15.slt.part
b/datafusion/sqllogictest/test_files/tpch/plans/q15.slt.part
index e2bd651c4a..0636a033b2 100644
--- a/datafusion/sqllogictest/test_files/tpch/plans/q15.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/plans/q15.slt.part
@@ -74,33 +74,29 @@ physical_plan
01)SortPreservingMergeExec: [s_suppkey@0 ASC NULLS LAST]
02)--SortExec: expr=[s_suppkey@0 ASC NULLS LAST], preserve_partitioning=[true]
03)----CoalesceBatchesExec: target_batch_size=8192
-04)------HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(total_revenue@4, max(revenue0.total_revenue)@0)], projection=[s_suppkey@0,
s_name@1, s_address@2, s_phone@3, total_revenue@4]
-05)--------CoalesceBatchesExec: target_batch_size=8192
-06)----------RepartitionExec: partitioning=Hash([total_revenue@4], 4),
input_partitions=4
-07)------------CoalesceBatchesExec: target_batch_size=8192
-08)--------------HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(s_suppkey@0, supplier_no@0)], projection=[s_suppkey@0, s_name@1,
s_address@2, s_phone@3, total_revenue@5]
-09)----------------CoalesceBatchesExec: target_batch_size=8192
-10)------------------RepartitionExec: partitioning=Hash([s_suppkey@0], 4),
input_partitions=4
-11)--------------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-12)----------------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/supplier.tbl]]},
projection=[s_suppkey, s_name, s_address, s_phone], file_type=csv,
has_header=false
-13)----------------ProjectionExec: expr=[l_suppkey@0 as supplier_no,
sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@1 as
total_revenue]
-14)------------------AggregateExec: mode=FinalPartitioned, gby=[l_suppkey@0 as
l_suppkey], aggr=[sum(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount)]
-15)--------------------CoalesceBatchesExec: target_batch_size=8192
-16)----------------------RepartitionExec: partitioning=Hash([l_suppkey@0], 4),
input_partitions=4
-17)------------------------AggregateExec: mode=Partial, gby=[l_suppkey@0 as
l_suppkey], aggr=[sum(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount)]
-18)--------------------------CoalesceBatchesExec: target_batch_size=8192
-19)----------------------------FilterExec: l_shipdate@3 >= 1996-01-01 AND
l_shipdate@3 < 1996-04-01, projection=[l_suppkey@0, l_extendedprice@1,
l_discount@2]
-20)------------------------------DataSourceExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]},
projection=[l_suppkey, l_extendedprice, l_di [...]
-21)--------CoalesceBatchesExec: target_batch_size=8192
-22)----------RepartitionExec:
partitioning=Hash([max(revenue0.total_revenue)@0], 4), input_partitions=1
-23)------------AggregateExec: mode=Final, gby=[],
aggr=[max(revenue0.total_revenue)]
-24)--------------CoalescePartitionsExec
-25)----------------AggregateExec: mode=Partial, gby=[],
aggr=[max(revenue0.total_revenue)]
-26)------------------ProjectionExec: expr=[sum(lineitem.l_extendedprice *
Int64(1) - lineitem.l_discount)@1 as total_revenue]
-27)--------------------AggregateExec: mode=FinalPartitioned, gby=[l_suppkey@0
as l_suppkey], aggr=[sum(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount)]
-28)----------------------CoalesceBatchesExec: target_batch_size=8192
-29)------------------------RepartitionExec: partitioning=Hash([l_suppkey@0],
4), input_partitions=4
-30)--------------------------AggregateExec: mode=Partial, gby=[l_suppkey@0 as
l_suppkey], aggr=[sum(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount)]
-31)----------------------------CoalesceBatchesExec: target_batch_size=8192
-32)------------------------------FilterExec: l_shipdate@3 >= 1996-01-01 AND
l_shipdate@3 < 1996-04-01, projection=[l_suppkey@0, l_extendedprice@1,
l_discount@2]
-33)--------------------------------DataSourceExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]},
projection=[l_suppkey, l_extendedprice, l_ [...]
+04)------HashJoinExec: mode=CollectLeft, join_type=Inner,
on=[(max(revenue0.total_revenue)@0, total_revenue@4)], projection=[s_suppkey@1,
s_name@2, s_address@3, s_phone@4, total_revenue@5]
+05)--------AggregateExec: mode=Final, gby=[],
aggr=[max(revenue0.total_revenue)]
+06)----------CoalescePartitionsExec
+07)------------AggregateExec: mode=Partial, gby=[],
aggr=[max(revenue0.total_revenue)]
+08)--------------ProjectionExec: expr=[sum(lineitem.l_extendedprice * Int64(1)
- lineitem.l_discount)@1 as total_revenue]
+09)----------------AggregateExec: mode=FinalPartitioned, gby=[l_suppkey@0 as
l_suppkey], aggr=[sum(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount)]
+10)------------------CoalesceBatchesExec: target_batch_size=8192
+11)--------------------RepartitionExec: partitioning=Hash([l_suppkey@0], 4),
input_partitions=4
+12)----------------------AggregateExec: mode=Partial, gby=[l_suppkey@0 as
l_suppkey], aggr=[sum(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount)]
+13)------------------------CoalesceBatchesExec: target_batch_size=8192
+14)--------------------------FilterExec: l_shipdate@3 >= 1996-01-01 AND
l_shipdate@3 < 1996-04-01, projection=[l_suppkey@0, l_extendedprice@1,
l_discount@2]
+15)----------------------------DataSourceExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]},
projection=[l_suppkey, l_extendedprice, l_disc [...]
+16)--------CoalesceBatchesExec: target_batch_size=8192
+17)----------HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(s_suppkey@0, supplier_no@0)], projection=[s_suppkey@0, s_name@1,
s_address@2, s_phone@3, total_revenue@5]
+18)------------CoalesceBatchesExec: target_batch_size=8192
+19)--------------RepartitionExec: partitioning=Hash([s_suppkey@0], 4),
input_partitions=4
+20)----------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+21)------------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/supplier.tbl]]},
projection=[s_suppkey, s_name, s_address, s_phone], file_type=csv,
has_header=false
+22)------------ProjectionExec: expr=[l_suppkey@0 as supplier_no,
sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@1 as
total_revenue]
+23)--------------AggregateExec: mode=FinalPartitioned, gby=[l_suppkey@0 as
l_suppkey], aggr=[sum(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount)]
+24)----------------CoalesceBatchesExec: target_batch_size=8192
+25)------------------RepartitionExec: partitioning=Hash([l_suppkey@0], 4),
input_partitions=4
+26)--------------------AggregateExec: mode=Partial, gby=[l_suppkey@0 as
l_suppkey], aggr=[sum(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount)]
+27)----------------------CoalesceBatchesExec: target_batch_size=8192
+28)------------------------FilterExec: l_shipdate@3 >= 1996-01-01 AND
l_shipdate@3 < 1996-04-01, projection=[l_suppkey@0, l_extendedprice@1,
l_discount@2]
+29)--------------------------DataSourceExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]},
projection=[l_suppkey, l_extendedprice, l_discou [...]
diff --git a/datafusion/sqllogictest/test_files/union.slt
b/datafusion/sqllogictest/test_files/union.slt
index 9e9a40c510..356f1598bc 100644
--- a/datafusion/sqllogictest/test_files/union.slt
+++ b/datafusion/sqllogictest/test_files/union.slt
@@ -308,34 +308,30 @@ logical_plan
physical_plan
01)UnionExec
02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(id@0,
CAST(t2.id AS Int32)@2), (name@1, name@1)]
-04)------AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as
name], aggr=[]
-05)--------CoalesceBatchesExec: target_batch_size=2
-06)----------RepartitionExec: partitioning=Hash([id@0, name@1], 4),
input_partitions=4
-07)------------AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as name],
aggr=[]
-08)--------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-09)----------------DataSourceExec: partitions=1, partition_sizes=[1]
-10)------CoalesceBatchesExec: target_batch_size=2
-11)--------RepartitionExec: partitioning=Hash([CAST(t2.id AS Int32)@2,
name@1], 4), input_partitions=4
-12)----------ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS
Int32) as CAST(t2.id AS Int32)]
-13)------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-14)--------------DataSourceExec: partitions=1, partition_sizes=[1]
-15)--ProjectionExec: expr=[CAST(id@0 AS Int32) as id, name@1 as name]
-16)----CoalesceBatchesExec: target_batch_size=2
-17)------HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(CAST(t2.id
AS Int32)@2, id@0), (name@1, name@1)], projection=[id@0, name@1]
-18)--------CoalesceBatchesExec: target_batch_size=2
-19)----------RepartitionExec: partitioning=Hash([CAST(t2.id AS Int32)@2,
name@1], 4), input_partitions=4
-20)------------ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS
Int32) as CAST(t2.id AS Int32)]
-21)--------------AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1
as name], aggr=[]
-22)----------------CoalesceBatchesExec: target_batch_size=2
-23)------------------RepartitionExec: partitioning=Hash([id@0, name@1], 4),
input_partitions=4
-24)--------------------AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as
name], aggr=[]
-25)----------------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-26)------------------------DataSourceExec: partitions=1, partition_sizes=[1]
-27)--------CoalesceBatchesExec: target_batch_size=2
-28)----------RepartitionExec: partitioning=Hash([id@0, name@1], 4),
input_partitions=4
-29)------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-30)--------------DataSourceExec: partitions=1, partition_sizes=[1]
+03)----HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(id@0,
CAST(t2.id AS Int32)@2), (name@1, name@1)]
+04)------CoalescePartitionsExec
+05)--------AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as
name], aggr=[]
+06)----------CoalesceBatchesExec: target_batch_size=2
+07)------------RepartitionExec: partitioning=Hash([id@0, name@1], 4),
input_partitions=4
+08)--------------AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as
name], aggr=[]
+09)----------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+10)------------------DataSourceExec: partitions=1, partition_sizes=[1]
+11)------ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS Int32)
as CAST(t2.id AS Int32)]
+12)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+13)----------DataSourceExec: partitions=1, partition_sizes=[1]
+14)--ProjectionExec: expr=[CAST(id@0 AS Int32) as id, name@1 as name]
+15)----CoalesceBatchesExec: target_batch_size=2
+16)------HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(CAST(t2.id
AS Int32)@2, id@0), (name@1, name@1)], projection=[id@0, name@1]
+17)--------CoalescePartitionsExec
+18)----------ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS
Int32) as CAST(t2.id AS Int32)]
+19)------------AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1
as name], aggr=[]
+20)--------------CoalesceBatchesExec: target_batch_size=2
+21)----------------RepartitionExec: partitioning=Hash([id@0, name@1], 4),
input_partitions=4
+22)------------------AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as
name], aggr=[]
+23)--------------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+24)----------------------DataSourceExec: partitions=1, partition_sizes=[1]
+25)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+26)----------DataSourceExec: partitions=1, partition_sizes=[1]
query IT rowsort
@@ -380,31 +376,29 @@ logical_plan
08)------TableScan: t2 projection=[name]
09)----TableScan: t1 projection=[name]
physical_plan
-01)InterleaveExec
+01)UnionExec
02)--CoalesceBatchesExec: target_batch_size=2
-03)----HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(name@0,
name@0)]
-04)------AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
-05)--------CoalesceBatchesExec: target_batch_size=2
-06)----------RepartitionExec: partitioning=Hash([name@0], 4),
input_partitions=4
-07)------------AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
-08)--------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-09)----------------DataSourceExec: partitions=1, partition_sizes=[1]
-10)------CoalesceBatchesExec: target_batch_size=2
-11)--------RepartitionExec: partitioning=Hash([name@0], 4), input_partitions=4
-12)----------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-13)------------DataSourceExec: partitions=1, partition_sizes=[1]
-14)--CoalesceBatchesExec: target_batch_size=2
-15)----HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(name@0,
name@0)]
-16)------AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
-17)--------CoalesceBatchesExec: target_batch_size=2
-18)----------RepartitionExec: partitioning=Hash([name@0], 4),
input_partitions=4
-19)------------AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
-20)--------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-21)----------------DataSourceExec: partitions=1, partition_sizes=[1]
-22)------CoalesceBatchesExec: target_batch_size=2
-23)--------RepartitionExec: partitioning=Hash([name@0], 4), input_partitions=4
-24)----------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-25)------------DataSourceExec: partitions=1, partition_sizes=[1]
+03)----HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(name@0,
name@0)]
+04)------CoalescePartitionsExec
+05)--------AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
+06)----------CoalesceBatchesExec: target_batch_size=2
+07)------------RepartitionExec: partitioning=Hash([name@0], 4),
input_partitions=4
+08)--------------AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
+09)----------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+10)------------------DataSourceExec: partitions=1, partition_sizes=[1]
+11)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+12)--------DataSourceExec: partitions=1, partition_sizes=[1]
+13)--CoalesceBatchesExec: target_batch_size=2
+14)----HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(name@0,
name@0)]
+15)------CoalescePartitionsExec
+16)--------AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
+17)----------CoalesceBatchesExec: target_batch_size=2
+18)------------RepartitionExec: partitioning=Hash([name@0], 4),
input_partitions=4
+19)--------------AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
+20)----------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+21)------------------DataSourceExec: partitions=1, partition_sizes=[1]
+22)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+23)--------DataSourceExec: partitions=1, partition_sizes=[1]
# union_upcast_types
query TT
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]