[
https://issues.apache.org/jira/browse/SPARK-57039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kent Yao updated SPARK-57039:
-----------------------------
Description:
h2. Background
{{ConvertToLocalRelation}} folds {{Project}} / {{Filter}} / {{Limit}} over
{{LocalRelation}}, but not {{InnerJoin(LocalRelation(single-row), other)}} or
{{InnerJoin(OneRowRelation(), other)}} — even when the single-row side has no
streaming / Unevaluable hazards and the user gave no join hint.
The redundant join node:
* Forces broadcast/shuffle planning for what is logically a constant attach.
* Blocks downstream constant-folding and column pruning of the single-row side.
* Mirrors the TODO at {{DecorrelateInnerQuery.scala:435}}: {{// TODO add a more
general rule to optimize join with OneRowRelation}}.
h2. Proposal
Two rules in {{Optimizer.scala}}:
# {{ConvertToLocalRelation}} case 5/6 — fold {{Inner Join}} where one side is
{{LocalRelation(out, [row], false, _)}} into {{Project(literals ++
other.output, other)}}, optionally wrapped in {{Filter(cond, ...)}}. Literals
preserve original {{ExprId}} via {{Alias(Literal.create(...))(exprId)}}.
# New {{FoldInnerJoinWithOneRowRelation}} (tree-pattern {{INNER_LIKE_JOIN}}) —
fold {{Inner Join}} where one side is {{OneRowRelation()}}. {{OneRowRelation}}
does not publish {{LOCAL_RELATION}}, so a single combined rule using
{{transformWithPruning(_.containsPattern(LOCAL_RELATION))}} would miss it.
Four narrowings:
|| Dimension || Accepted || Rejected || Reason ||
| JoinType | {{Inner}} | Outer / Semi / Anti / Cross | Outer needs null-pad;
Semi/Anti needs existence; Cross has no condition. |
| JoinHint | {{NONE}} | BROADCAST / SHUFFLE_* | User hints take priority. |
| Row count | {{data.length == 1}} | 0 ({{PropagateEmptyRelation}}) / >1 |
Inlining >1 rows requires Cartesian materialization. |
| Condition | {{!cond.exists(hasUnevaluableExpr)}} && {{!other.isStreaming}} |
ScalarSubquery / DynamicPruning / streaming | Unevaluable can't move into
Filter; streaming has state-store ordering. |
h2. Tests
* New {{FoldInnerJoinWithOneRowRelationSuite}}: 9 tests (OneRow × table
left/right, no-cond, LeftOuter negative, ArrayType/MapType/nested StructType,
nested struct-of-array, Unevaluable-condition negative).
* {{ConvertToLocalRelationSuite}}: +4 tests including ExprId-preservation
strong assertion ({{collectFirst \{ case _: Join \}.isEmpty && output.length ==
4}}).
* Plan-stability: TPCDSV1_4 + V1_4Stats + V2_7 + V2_7Stats + Modified +
ModifiedStats + TPCH = 322/322 PASS, 0 plan-diff.
h2. Backward compatibility
No user-facing change. Results bit-identical; only the optimized logical plan
shape changes. No new SQLConf — the four narrowings make this strictly safe.
was:
h2. Background
Currently {{ConvertToLocalRelation}} (Optimizer.scala) folds {{Project}} /
{{Filter}} / {{Limit}} over {{LocalRelation}}, but does *not* fold
{{InnerJoin(LocalRelation(single-row), other, Inner)}} or
{{InnerJoin(OneRowRelation(), other, Inner)}} — even when the single-row side
has no streaming / Unevaluable hazards and the user gave no join hint.
This leaves a redundant join node that:
* Forces broadcast/shuffle planning for what is logically a constant attach.
* Blocks downstream constant-folding and column pruning of the single-row
side's projected attributes.
* Is structurally identical to what {{DecorrelateInnerQuery.scala:435}} flags
as a TODO: {{// TODO add a more general rule to optimize join with
OneRowRelation}}.
h2. Proposal
Add two complementary rules in {{Optimizer.scala}}:
# *{{ConvertToLocalRelation}} case 5/6* (left/right symmetric arms) — fold
{{Inner Join}} where one side is {{LocalRelation(out, [row], false, _)}}
(single-row, ≥1 column) into {{Project(literals ++ other.output, other)}},
optionally wrapped in {{Filter(cond, ...)}}. Literal values preserve the
original {{ExprId}} via {{Alias(Literal.create(...))(exprId)}}.
# *{{FoldInnerJoinWithOneRowRelation}}* (new independent rule, tree-pattern
{{INNER_LIKE_JOIN}}) — fold {{Inner Join}} where one side is
{{OneRowRelation()}} (single-row, 0 columns) into {{other}} or {{Filter(cond,
other)}}. {{OneRowRelation}} does *not* publish {{LOCAL_RELATION}}
tree-pattern, so a single combined rule using
{{transformWithPruning(_.containsPattern(LOCAL_RELATION))}} would silently miss
it — hence two rules.
Four narrowings (all strictly conservative):
|| Dimension || Accepted || Rejected || Reason ||
| JoinType | {{Inner}} only | LeftOuter / RightOuter / FullOuter / LeftSemi /
LeftAnti / Cross | Outer needs null-pad; Semi/Anti needs existence semantics;
Cross has no condition (already handled). |
| JoinHint | {{JoinHint.NONE}} only | BROADCAST / SHUFFLE_HASH / SHUFFLE_MERGE
/ SHUFFLE_REPLICATE_NL | User hints take priority over plan rewrite. |
| Row count | {{data.length == 1}} | 0 (covered by {{PropagateEmptyRelation}})
/ >1 | Inlining >1 rows as Literals would LoC-explode and require Cartesian
materialization. |
| Condition | {{!cond.exists(hasUnevaluableExpr)}} && {{!other.isStreaming}} |
ScalarSubquery / DynamicPruning / streaming | Unevaluable expressions can't
safely move into Filter; streaming has state-store ordering constraints. |
h2. Test coverage
* New {{FoldInnerJoinWithOneRowRelationSuite}}: 9 tests (OneRow × table
left/right, no-cond, LeftOuter negative, ArrayType / MapType / nested
StructType columns preserved, nested struct-of-array, Unevaluable-condition
negative).
* Existing {{ConvertToLocalRelationSuite}}: +4 tests (T7-T10) including a
strong-assertion ExprId preservation test ({{collectFirst { case _: Join
}.isEmpty && output.length == 4}}).
* All 19 new unit tests verified RED-sensitive (fail under deliberately-broken
rule, pass under correct impl).
* Plan-stability regression: {{TPCDSV1_4 + TPCDSV1_4Stats + TPCDSV2_7 +
TPCDSV2_7Stats + TPCDSModified + TPCDSModifiedStats + TPCH}} = *322 / 322 PASS,
0 plan-diff*.
h2. Backward compatibility
No user-facing change. Query results are bit-identical; only the optimized
logical plan shape changes (one less Join, one more Project/Filter). No new
SQLConf knob — the four narrowings make this strictly safe.
h2. Related work
Trino: {{RemoveRedundantCrossJoin}} + {{EvaluateZeroLimit}}. DuckDB:
{{ConstantPropagation}} over cartesian. Doris: handled in FE constant-folding
pass.
> Fold InnerJoin with single-row LocalRelation/OneRowRelation into Project
> ------------------------------------------------------------------------
>
> Key: SPARK-57039
> URL: https://issues.apache.org/jira/browse/SPARK-57039
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 5.0.0
> Reporter: Kent Yao
> Priority: Major
> Labels: pull-request-available
>
> h2. Background
> {{ConvertToLocalRelation}} folds {{Project}} / {{Filter}} / {{Limit}} over
> {{LocalRelation}}, but not {{InnerJoin(LocalRelation(single-row), other)}} or
> {{InnerJoin(OneRowRelation(), other)}} — even when the single-row side has no
> streaming / Unevaluable hazards and the user gave no join hint.
> The redundant join node:
> * Forces broadcast/shuffle planning for what is logically a constant attach.
> * Blocks downstream constant-folding and column pruning of the single-row
> side.
> * Mirrors the TODO at {{DecorrelateInnerQuery.scala:435}}: {{// TODO add a
> more general rule to optimize join with OneRowRelation}}.
> h2. Proposal
> Two rules in {{Optimizer.scala}}:
> # {{ConvertToLocalRelation}} case 5/6 — fold {{Inner Join}} where one side is
> {{LocalRelation(out, [row], false, _)}} into {{Project(literals ++
> other.output, other)}}, optionally wrapped in {{Filter(cond, ...)}}. Literals
> preserve original {{ExprId}} via {{Alias(Literal.create(...))(exprId)}}.
> # New {{FoldInnerJoinWithOneRowRelation}} (tree-pattern {{INNER_LIKE_JOIN}})
> — fold {{Inner Join}} where one side is {{OneRowRelation()}}.
> {{OneRowRelation}} does not publish {{LOCAL_RELATION}}, so a single combined
> rule using {{transformWithPruning(_.containsPattern(LOCAL_RELATION))}} would
> miss it.
> Four narrowings:
> || Dimension || Accepted || Rejected || Reason ||
> | JoinType | {{Inner}} | Outer / Semi / Anti / Cross | Outer needs null-pad;
> Semi/Anti needs existence; Cross has no condition. |
> | JoinHint | {{NONE}} | BROADCAST / SHUFFLE_* | User hints take priority. |
> | Row count | {{data.length == 1}} | 0 ({{PropagateEmptyRelation}}) / >1 |
> Inlining >1 rows requires Cartesian materialization. |
> | Condition | {{!cond.exists(hasUnevaluableExpr)}} && {{!other.isStreaming}}
> | ScalarSubquery / DynamicPruning / streaming | Unevaluable can't move into
> Filter; streaming has state-store ordering. |
> h2. Tests
> * New {{FoldInnerJoinWithOneRowRelationSuite}}: 9 tests (OneRow × table
> left/right, no-cond, LeftOuter negative, ArrayType/MapType/nested StructType,
> nested struct-of-array, Unevaluable-condition negative).
> * {{ConvertToLocalRelationSuite}}: +4 tests including ExprId-preservation
> strong assertion ({{collectFirst \{ case _: Join \}.isEmpty && output.length
> == 4}}).
> * Plan-stability: TPCDSV1_4 + V1_4Stats + V2_7 + V2_7Stats + Modified +
> ModifiedStats + TPCH = 322/322 PASS, 0 plan-diff.
> h2. Backward compatibility
> No user-facing change. Results bit-identical; only the optimized logical plan
> shape changes. No new SQLConf — the four narrowings make this strictly safe.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]