[ 
https://issues.apache.org/jira/browse/SPARK-57039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kent Yao updated SPARK-57039:
-----------------------------
    Description: 
h2. Background

{{ConvertToLocalRelation}} folds {{Project}} / {{Filter}} / {{Limit}} over 
{{LocalRelation}}, but not {{InnerJoin(LocalRelation(single-row), other)}} or 
{{InnerJoin(OneRowRelation(), other)}} — even when the single-row side has no 
streaming / Unevaluable hazards and the user gave no join hint.

The redundant join node:
* Forces broadcast/shuffle planning for what is logically a constant attach.
* Blocks downstream constant-folding and column pruning of the single-row side.
* Mirrors the TODO at {{DecorrelateInnerQuery.scala:435}}: {{// TODO add a more 
general rule to optimize join with OneRowRelation}}.

h2. Proposal

Two rules in {{Optimizer.scala}}:

# {{ConvertToLocalRelation}} case 5/6 — fold {{Inner Join}} where one side is 
{{LocalRelation(out, [row], false, _)}} into {{Project(literals ++ 
other.output, other)}}, optionally wrapped in {{Filter(cond, ...)}}. Literals 
preserve original {{ExprId}} via {{Alias(Literal.create(...))(exprId)}}.
# New {{FoldInnerJoinWithOneRowRelation}} (tree-pattern {{INNER_LIKE_JOIN}}) — 
fold {{Inner Join}} where one side is {{OneRowRelation()}}. {{OneRowRelation}} 
does not publish {{LOCAL_RELATION}}, so a single combined rule using 
{{transformWithPruning(_.containsPattern(LOCAL_RELATION))}} would miss it.

Four narrowings:

|| Dimension || Accepted || Rejected || Reason ||
| JoinType | {{Inner}} | Outer / Semi / Anti / Cross | Outer needs null-pad; 
Semi/Anti needs existence; Cross has no condition. |
| JoinHint | {{NONE}} | BROADCAST / SHUFFLE_* | User hints take priority. |
| Row count | {{data.length == 1}} | 0 ({{PropagateEmptyRelation}}) / >1 | 
Inlining >1 rows requires Cartesian materialization. |
| Condition | {{!cond.exists(hasUnevaluableExpr)}} && {{!other.isStreaming}} | 
ScalarSubquery / DynamicPruning / streaming | Unevaluable can't move into 
Filter; streaming has state-store ordering. |

h2. Tests

* New {{FoldInnerJoinWithOneRowRelationSuite}}: 9 tests (OneRow × table 
left/right, no-cond, LeftOuter negative, ArrayType/MapType/nested StructType, 
nested struct-of-array, Unevaluable-condition negative).
* {{ConvertToLocalRelationSuite}}: +4 tests including ExprId-preservation 
strong assertion ({{collectFirst \{ case _: Join \}.isEmpty && output.length == 
4}}).
* Plan-stability: TPCDSV1_4 + V1_4Stats + V2_7 + V2_7Stats + Modified + 
ModifiedStats + TPCH = 322/322 PASS, 0 plan-diff.

h2. Backward compatibility

No user-facing change. Results bit-identical; only the optimized logical plan 
shape changes. No new SQLConf — the four narrowings make this strictly safe.


  was:
h2. Background

Currently {{ConvertToLocalRelation}} (Optimizer.scala) folds {{Project}} / 
{{Filter}} / {{Limit}} over {{LocalRelation}}, but does *not* fold 
{{InnerJoin(LocalRelation(single-row), other, Inner)}} or 
{{InnerJoin(OneRowRelation(), other, Inner)}} — even when the single-row side 
has no streaming / Unevaluable hazards and the user gave no join hint.

This leaves a redundant join node that:

* Forces broadcast/shuffle planning for what is logically a constant attach.
* Blocks downstream constant-folding and column pruning of the single-row 
side's projected attributes.
* Is structurally identical to what {{DecorrelateInnerQuery.scala:435}} flags 
as a TODO: {{// TODO add a more general rule to optimize join with 
OneRowRelation}}.

h2. Proposal

Add two complementary rules in {{Optimizer.scala}}:

# *{{ConvertToLocalRelation}} case 5/6* (left/right symmetric arms) — fold 
{{Inner Join}} where one side is {{LocalRelation(out, [row], false, _)}} 
(single-row, ≥1 column) into {{Project(literals ++ other.output, other)}}, 
optionally wrapped in {{Filter(cond, ...)}}. Literal values preserve the 
original {{ExprId}} via {{Alias(Literal.create(...))(exprId)}}.
# *{{FoldInnerJoinWithOneRowRelation}}* (new independent rule, tree-pattern 
{{INNER_LIKE_JOIN}}) — fold {{Inner Join}} where one side is 
{{OneRowRelation()}} (single-row, 0 columns) into {{other}} or {{Filter(cond, 
other)}}. {{OneRowRelation}} does *not* publish {{LOCAL_RELATION}} 
tree-pattern, so a single combined rule using 
{{transformWithPruning(_.containsPattern(LOCAL_RELATION))}} would silently miss 
it — hence two rules.

Four narrowings (all strictly conservative):

|| Dimension || Accepted || Rejected || Reason ||
| JoinType | {{Inner}} only | LeftOuter / RightOuter / FullOuter / LeftSemi / 
LeftAnti / Cross | Outer needs null-pad; Semi/Anti needs existence semantics; 
Cross has no condition (already handled). |
| JoinHint | {{JoinHint.NONE}} only | BROADCAST / SHUFFLE_HASH / SHUFFLE_MERGE 
/ SHUFFLE_REPLICATE_NL | User hints take priority over plan rewrite. |
| Row count | {{data.length == 1}} | 0 (covered by {{PropagateEmptyRelation}}) 
/ >1 | Inlining >1 rows as Literals would LoC-explode and require Cartesian 
materialization. |
| Condition | {{!cond.exists(hasUnevaluableExpr)}} && {{!other.isStreaming}} | 
ScalarSubquery / DynamicPruning / streaming | Unevaluable expressions can't 
safely move into Filter; streaming has state-store ordering constraints. |

h2. Test coverage

* New {{FoldInnerJoinWithOneRowRelationSuite}}: 9 tests (OneRow × table 
left/right, no-cond, LeftOuter negative, ArrayType / MapType / nested 
StructType columns preserved, nested struct-of-array, Unevaluable-condition 
negative).
* Existing {{ConvertToLocalRelationSuite}}: +4 tests (T7-T10) including a 
strong-assertion ExprId preservation test ({{collectFirst { case _: Join 
}.isEmpty && output.length == 4}}).
* All 19 new unit tests verified RED-sensitive (fail under deliberately-broken 
rule, pass under correct impl).
* Plan-stability regression: {{TPCDSV1_4 + TPCDSV1_4Stats + TPCDSV2_7 + 
TPCDSV2_7Stats + TPCDSModified + TPCDSModifiedStats + TPCH}} = *322 / 322 PASS, 
0 plan-diff*.

h2. Backward compatibility

No user-facing change. Query results are bit-identical; only the optimized 
logical plan shape changes (one less Join, one more Project/Filter). No new 
SQLConf knob — the four narrowings make this strictly safe.

h2. Related work

Trino: {{RemoveRedundantCrossJoin}} + {{EvaluateZeroLimit}}. DuckDB: 
{{ConstantPropagation}} over cartesian. Doris: handled in FE constant-folding 
pass.


> Fold InnerJoin with single-row LocalRelation/OneRowRelation into Project
> ------------------------------------------------------------------------
>
>                 Key: SPARK-57039
>                 URL: https://issues.apache.org/jira/browse/SPARK-57039
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 5.0.0
>            Reporter: Kent Yao
>            Priority: Major
>              Labels: pull-request-available
>
> h2. Background
> {{ConvertToLocalRelation}} folds {{Project}} / {{Filter}} / {{Limit}} over 
> {{LocalRelation}}, but not {{InnerJoin(LocalRelation(single-row), other)}} or 
> {{InnerJoin(OneRowRelation(), other)}} — even when the single-row side has no 
> streaming / Unevaluable hazards and the user gave no join hint.
> The redundant join node:
> * Forces broadcast/shuffle planning for what is logically a constant attach.
> * Blocks downstream constant-folding and column pruning of the single-row 
> side.
> * Mirrors the TODO at {{DecorrelateInnerQuery.scala:435}}: {{// TODO add a 
> more general rule to optimize join with OneRowRelation}}.
> h2. Proposal
> Two rules in {{Optimizer.scala}}:
> # {{ConvertToLocalRelation}} case 5/6 — fold {{Inner Join}} where one side is 
> {{LocalRelation(out, [row], false, _)}} into {{Project(literals ++ 
> other.output, other)}}, optionally wrapped in {{Filter(cond, ...)}}. Literals 
> preserve original {{ExprId}} via {{Alias(Literal.create(...))(exprId)}}.
> # New {{FoldInnerJoinWithOneRowRelation}} (tree-pattern {{INNER_LIKE_JOIN}}) 
> — fold {{Inner Join}} where one side is {{OneRowRelation()}}. 
> {{OneRowRelation}} does not publish {{LOCAL_RELATION}}, so a single combined 
> rule using {{transformWithPruning(_.containsPattern(LOCAL_RELATION))}} would 
> miss it.
> Four narrowings:
> || Dimension || Accepted || Rejected || Reason ||
> | JoinType | {{Inner}} | Outer / Semi / Anti / Cross | Outer needs null-pad; 
> Semi/Anti needs existence; Cross has no condition. |
> | JoinHint | {{NONE}} | BROADCAST / SHUFFLE_* | User hints take priority. |
> | Row count | {{data.length == 1}} | 0 ({{PropagateEmptyRelation}}) / >1 | 
> Inlining >1 rows requires Cartesian materialization. |
> | Condition | {{!cond.exists(hasUnevaluableExpr)}} && {{!other.isStreaming}} 
> | ScalarSubquery / DynamicPruning / streaming | Unevaluable can't move into 
> Filter; streaming has state-store ordering. |
> h2. Tests
> * New {{FoldInnerJoinWithOneRowRelationSuite}}: 9 tests (OneRow × table 
> left/right, no-cond, LeftOuter negative, ArrayType/MapType/nested StructType, 
> nested struct-of-array, Unevaluable-condition negative).
> * {{ConvertToLocalRelationSuite}}: +4 tests including ExprId-preservation 
> strong assertion ({{collectFirst \{ case _: Join \}.isEmpty && output.length 
> == 4}}).
> * Plan-stability: TPCDSV1_4 + V1_4Stats + V2_7 + V2_7Stats + Modified + 
> ModifiedStats + TPCH = 322/322 PASS, 0 plan-diff.
> h2. Backward compatibility
> No user-facing change. Results bit-identical; only the optimized logical plan 
> shape changes. No new SQLConf — the four narrowings make this strictly safe.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to