blaginin commented on code in PR #18185:
URL: https://github.com/apache/datafusion/pull/18185#discussion_r2447705927
##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -1386,235 +1433,246 @@ fn multi_smj_joins() -> Result<()> {
)];
let top_join =
sort_merge_join_exec(join.clone(), parquet_exec(), &top_join_on,
&join_type);
- let top_join_plan =
- format!("SortMergeJoin: join_type={join_type}, on=[(a@0, c@2)]");
-
- let expected = match join_type {
- // Should include 6 RepartitionExecs (3 hash, 3 round-robin), 3
SortExecs
- JoinType::Inner | JoinType::Left | JoinType::LeftSemi |
JoinType::LeftAnti =>
- vec![
- top_join_plan.as_str(),
- &join_plan_indent2,
- " SortExec: expr=[a@0 ASC],
preserve_partitioning=[true]",
- " RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- " SortExec: expr=[b1@1 ASC],
preserve_partitioning=[true]",
- " RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2
as c1, d@3 as d1, e@4 as e1]",
- " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- " SortExec: expr=[c@2 ASC], preserve_partitioning=[true]",
- " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
- " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- ],
- // Should include 7 RepartitionExecs (4 hash, 3 round-robin), 4
SortExecs
- // Since ordering of the left child is not preserved after
SortMergeJoin
- // when mode is Right, RightSemi, RightAnti, Full
- // - We need to add one additional SortExec after SortMergeJoin in
contrast the test cases
- // when mode is Inner, Left, LeftSemi, LeftAnti
- // Similarly, since partitioning of the left side is not preserved
- // when mode is Right, RightSemi, RightAnti, Full
- // - We need to add one additional Hash Repartition after
SortMergeJoin in contrast the test
- // cases when mode is Inner, Left, LeftSemi, LeftAnti
- _ => vec![
- top_join_plan.as_str(),
- // Below 2 operators are differences introduced, when join
mode is changed
- " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
- " RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10",
- &join_plan_indent6,
- " SortExec: expr=[a@0 ASC],
preserve_partitioning=[true]",
- " RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
- " SortExec: expr=[b1@1 ASC],
preserve_partitioning=[true]",
- " RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " ProjectionExec: expr=[a@0 as a1, b@1 as b1,
c@2 as c1, d@3 as d1, e@4 as e1]",
- " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
- " SortExec: expr=[c@2 ASC], preserve_partitioning=[true]",
- " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
- " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- ],
- };
- // TODO(wiedld): show different test result if enforce sorting first.
- test_config.run(&expected, top_join.clone(), &DISTRIB_DISTRIB_SORT)?;
-
- let expected_first_sort_enforcement = match join_type {
- // Should include 6 RepartitionExecs (3 hash, 3 round-robin), 3
SortExecs
- JoinType::Inner | JoinType::Left | JoinType::LeftSemi |
JoinType::LeftAnti =>
- vec![
- top_join_plan.as_str(),
- &join_plan_indent2,
- " RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
- " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- " SortExec: expr=[a@0 ASC],
preserve_partitioning=[false]",
- " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- " RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC",
- " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- " SortExec: expr=[b1@1 ASC],
preserve_partitioning=[false]",
- " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2
as c1, d@3 as d1, e@4 as e1]",
- " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC",
- " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- " SortExec: expr=[c@2 ASC],
preserve_partitioning=[false]",
- " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- ],
- // Should include 8 RepartitionExecs (4 hash, 8 round-robin), 4
SortExecs
- // Since ordering of the left child is not preserved after
SortMergeJoin
- // when mode is Right, RightSemi, RightAnti, Full
- // - We need to add one additional SortExec after SortMergeJoin in
contrast the test cases
- // when mode is Inner, Left, LeftSemi, LeftAnti
- // Similarly, since partitioning of the left side is not preserved
- // when mode is Right, RightSemi, RightAnti, Full
- // - We need to add one additional Hash Repartition and Roundrobin
repartition after
- // SortMergeJoin in contrast the test cases when mode is Inner,
Left, LeftSemi, LeftAnti
- _ => vec![
- top_join_plan.as_str(),
- // Below 4 operators are differences introduced, when join
mode is changed
- " RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
- " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- " SortExec: expr=[a@0 ASC],
preserve_partitioning=[false]",
- " CoalescePartitionsExec",
- &join_plan_indent10,
- " RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " SortExec: expr=[a@0 ASC],
preserve_partitioning=[false]",
- " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
- " RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " SortExec: expr=[b1@1 ASC],
preserve_partitioning=[false]",
- " ProjectionExec: expr=[a@0 as a1, b@1 as b1,
c@2 as c1, d@3 as d1, e@4 as e1]",
- " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
- " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC",
- " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- " SortExec: expr=[c@2 ASC],
preserve_partitioning=[false]",
- " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- ],
- };
- // TODO(wiedld): show different test result if enforce distribution
first.
- test_config.run(
- &expected_first_sort_enforcement,
- top_join,
- &SORT_DISTRIB_DISTRIB,
- )?;
- match join_type {
- JoinType::Inner | JoinType::Left | JoinType::Right |
JoinType::Full => {
- // This time we use (b1 == c) for top join
- // Join on (b1 == c)
- let top_join_on = vec![(
- Arc::new(Column::new_with_schema("b1",
&join.schema()).unwrap()) as _,
- Arc::new(Column::new_with_schema("c", &schema()).unwrap())
as _,
- )];
- let top_join =
- sort_merge_join_exec(join, parquet_exec(), &top_join_on,
&join_type);
- let top_join_plan =
- format!("SortMergeJoin: join_type={join_type}, on=[(b1@6,
c@2)]");
-
- let expected = match join_type {
- // Should include 6 RepartitionExecs(3 hash, 3
round-robin) and 3 SortExecs
- JoinType::Inner | JoinType::Right => vec![
- top_join_plan.as_str(),
- &join_plan_indent2,
- " SortExec: expr=[a@0 ASC],
preserve_partitioning=[true]",
- " RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
- " SortExec: expr=[b1@1 ASC],
preserve_partitioning=[true]",
- " RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " ProjectionExec: expr=[a@0 as a1, b@1 as b1,
c@2 as c1, d@3 as d1, e@4 as e1]",
- " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
- " SortExec: expr=[c@2 ASC],
preserve_partitioning=[true]",
- " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- ],
- // Should include 7 RepartitionExecs (4 hash, 3
round-robin) and 4 SortExecs
- JoinType::Left | JoinType::Full => vec![
- top_join_plan.as_str(),
- " SortExec: expr=[b1@6 ASC],
preserve_partitioning=[true]",
- " RepartitionExec: partitioning=Hash([b1@6], 10),
input_partitions=10",
- &join_plan_indent6,
- " SortExec: expr=[a@0 ASC],
preserve_partitioning=[true]",
- " RepartitionExec: partitioning=Hash([a@0],
10), input_partitions=10",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
- " SortExec: expr=[b1@1 ASC],
preserve_partitioning=[true]",
- " RepartitionExec: partitioning=Hash([b1@1],
10), input_partitions=10",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " ProjectionExec: expr=[a@0 as a1, b@1 as
b1, c@2 as c1, d@3 as d1, e@4 as e1]",
- " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
- " SortExec: expr=[c@2 ASC],
preserve_partitioning=[true]",
- " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- ],
- // this match arm cannot be reached
- _ => unreachable!()
- };
- // TODO(wiedld): show different test result if enforce sorting
first.
- test_config.run(&expected, top_join.clone(),
&DISTRIB_DISTRIB_SORT)?;
-
- let expected_first_sort_enforcement = match join_type {
- // Should include 6 RepartitionExecs (3 of them preserves
order) and 3 SortExecs
- JoinType::Inner | JoinType::Right => vec![
- top_join_plan.as_str(),
- &join_plan_indent2,
- " RepartitionExec: partitioning=Hash([a@0], 10),
input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " SortExec: expr=[a@0 ASC],
preserve_partitioning=[false]",
- " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
- " RepartitionExec: partitioning=Hash([b1@1], 10),
input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " SortExec: expr=[b1@1 ASC],
preserve_partitioning=[false]",
- " ProjectionExec: expr=[a@0 as a1, b@1 as b1,
c@2 as c1, d@3 as d1, e@4 as e1]",
- " DataSourceExec: file_groups={1 group:
[[x]]}, projection=[a, b, c, d, e], file_type=parquet",
- " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " SortExec: expr=[c@2 ASC],
preserve_partitioning=[false]",
- " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- ],
- // Should include 8 RepartitionExecs (4 of them preserves
order) and 4 SortExecs
- JoinType::Left | JoinType::Full => vec![
- top_join_plan.as_str(),
- " RepartitionExec: partitioning=Hash([b1@6], 10),
input_partitions=10, preserve_order=true, sort_exprs=b1@6 ASC",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " SortExec: expr=[b1@6 ASC],
preserve_partitioning=[false]",
- " CoalescePartitionsExec",
- &join_plan_indent10,
- " RepartitionExec: partitioning=Hash([a@0],
10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " SortExec: expr=[a@0 ASC],
preserve_partitioning=[false]",
- " DataSourceExec: file_groups={1
group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
- " RepartitionExec:
partitioning=Hash([b1@1], 10), input_partitions=10, preserve_order=true,
sort_exprs=b1@1 ASC",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " SortExec: expr=[b1@1 ASC],
preserve_partitioning=[false]",
- " ProjectionExec: expr=[a@0 as a1,
b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
- " DataSourceExec: file_groups={1
group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
- " RepartitionExec: partitioning=Hash([c@2], 10),
input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC",
- " RepartitionExec:
partitioning=RoundRobinBatch(10), input_partitions=1",
- " SortExec: expr=[c@2 ASC],
preserve_partitioning=[false]",
- " DataSourceExec: file_groups={1 group: [[x]]},
projection=[a, b, c, d, e], file_type=parquet",
- ],
- // this match arm cannot be reached
- _ => unreachable!()
- };
+ let mut settings = Settings::clone_current();
+ settings.add_filter(&format!("join_type={join_type}"),
"join_type=...");
+
+ #[rustfmt::skip]
Review Comment:
https://github.com/user-attachments/assets/5f11a60b-31b0-4d9c-87ca-3c6aa32e8144
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]