[ 
https://issues.apache.org/jira/browse/SPARK-57091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anupam Yadav updated SPARK-57091:
---------------------------------
    Description: 
h3. Problem

The current NearestByJoin implementation (RewriteNearestByJoin, added in 
SPARK-56395) rewrites to cross-join + aggregate + generate. This materializes 
all N*M row pairs before the aggregate can bound them. At moderate scale 
(30Kx30K, k=5), this takes ~400s and 1.7GB. At 200Kx200K the current approach 
is infeasible (projected 5+ hours).

The SPIP (SPARK-56395) anticipated this: _"we may benefit from writing a 
dedicated fused physical operator that avoids materializing a full cartesian 
product for performance"_ but declared it out of scope for the initial 
implementation. This JIRA proposes that operator.
h3. Proposal

Add {{{}BroadcastNearestByJoinExec{}}}, a dedicated physical operator that 
broadcasts the right side and iterates per left row with a bounded priority 
queue of size k. This avoids materializing the full cross product entirely.

The operator fires only when:
 * {{spark.sql.join.nearestBy.broadcast.enabled}} is true (default false)
 * The right side fits within {{autoBroadcastJoinThreshold}}

Otherwise the existing rewrite is used as fallback.
h3. Benchmark Results
||Scale||Current (cross-product)||BroadcastNearestByJoin||Speedup||Memory||
|10Kx10K|4.2s|0.38s|11x|7x less|
|30Kx30K|404s|31s|13x|8.3x less|
|50Kx50K|1,158s|96s|12x|~8x less|
|200Kx200K|~5h (extrapolated)|23min|~13x|-|
h3. Design Notes

This follows the same pattern as SPARK-56887 (SortMergeAsOfJoinExec for AS-OF 
join by [~sarutak] ) – a dedicated physical operator to replace an expensive 
rewrite for a specialized join type. Key design decisions:
 * INNER join preserves original nullability; LEFT OUTER makes right columns 
nullable
 * Heap hoisted outside per-row loop and cleared per iteration (reduces GC 
pressure)
 * Stores indices into broadcast array, not row copies
 * Fallback guaranteed when right exceeds broadcast threshold

h3. Initial Implementation

[PR #56101|https://github.com/apache/spark/pull/56101] (11 unit tests for 
correctness pass)
h3. Seeking Feedback

Would appreciate thoughts from the NearestByJoin authors on:
 * Does this approach align with the planned evolution of the feature?
 * Any concerns about adding a dedicated physical operator vs. optimizing the 
existing rewrite?

Happy to collaborate and adjust the approach based on feedback. Thanks!

cc [~dkbiswal]  [~cloud_fan]  [~sarutak] 

  was:
h3. Problem

The current NearestByJoin implementation (RewriteNearestByJoin, added in 
SPARK-56395) rewrites to cross-join + aggregate + generate. This materializes 
all N*M row pairs before the aggregate can bound them. At moderate scale 
(30Kx30K, k=5), this takes ~400s and 1.7GB. At 200Kx200K the current approach 
is infeasible (projected 5+ hours).

The SPIP (SPARK-56395) anticipated this: _"we may benefit from writing a 
dedicated fused physical operator that avoids materializing a full cartesian 
product for performance"_ but declared it out of scope for the initial 
implementation. This JIRA proposes that operator.
h3. Proposal

Add {{{}BroadcastNearestByJoinExec{}}}, a dedicated physical operator that 
broadcasts the right side and iterates per left row with a bounded priority 
queue of size k. This avoids materializing the full cross product entirely.

The operator fires only when:
 * {{spark.sql.join.nearestBy.broadcast.enabled}} is true (default false)
 * The right side fits within {{autoBroadcastJoinThreshold}}

Otherwise the existing rewrite is used as fallback.
h3. Benchmark Results
||Scale||Current (cross-product)||BroadcastNearestByJoin||Speedup||Memory||
|10Kx10K|4.2s|0.38s|11x|7x less|
|30Kx30K|404s|31s|13x|8.3x less|
|50Kx50K|1,158s|96s|12x|~8x less|
|200Kx200K|~5h (extrapolated)|23min|~13x|-|
h3. Design Notes

This follows the same pattern as SPARK-56887 (SortMergeAsOfJoinExec for AS-OF 
join by [~sarutak] ) – a dedicated physical operator to replace an expensive 
rewrite for a specialized join type. Key design decisions:
 * INNER join preserves original nullability; LEFT OUTER makes right columns 
nullable
 * Heap hoisted outside per-row loop and cleared per iteration (reduces GC 
pressure)
 * Stores indices into broadcast array, not row copies
 * Fallback guaranteed when right exceeds broadcast threshold

h3. Initial Implementation

[PR #56101|https://github.com/apache/spark/pull/56101] (draft, 11 unit tests 
passing)
h3. Seeking Feedback

Would appreciate thoughts from the NearestByJoin authors on:
 * Does this approach align with the planned evolution of the feature?
 * Any concerns about adding a dedicated physical operator vs. optimizing the 
existing rewrite?

Happy to collaborate and adjust the approach based on feedback. Thanks!

cc [~dkbiswal]  [~cloud_fan]  [~sarutak] 


> [SQL] Add BroadcastNearestByJoinExec to avoid cross-product materialization 
> for NearestByJoin
> ---------------------------------------------------------------------------------------------
>
>                 Key: SPARK-57091
>                 URL: https://issues.apache.org/jira/browse/SPARK-57091
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 4.0.0
>            Reporter: Anupam Yadav
>            Priority: Major
>
> h3. Problem
> The current NearestByJoin implementation (RewriteNearestByJoin, added in 
> SPARK-56395) rewrites to cross-join + aggregate + generate. This materializes 
> all N*M row pairs before the aggregate can bound them. At moderate scale 
> (30Kx30K, k=5), this takes ~400s and 1.7GB. At 200Kx200K the current approach 
> is infeasible (projected 5+ hours).
> The SPIP (SPARK-56395) anticipated this: _"we may benefit from writing a 
> dedicated fused physical operator that avoids materializing a full cartesian 
> product for performance"_ but declared it out of scope for the initial 
> implementation. This JIRA proposes that operator.
> h3. Proposal
> Add {{{}BroadcastNearestByJoinExec{}}}, a dedicated physical operator that 
> broadcasts the right side and iterates per left row with a bounded priority 
> queue of size k. This avoids materializing the full cross product entirely.
> The operator fires only when:
>  * {{spark.sql.join.nearestBy.broadcast.enabled}} is true (default false)
>  * The right side fits within {{autoBroadcastJoinThreshold}}
> Otherwise the existing rewrite is used as fallback.
> h3. Benchmark Results
> ||Scale||Current (cross-product)||BroadcastNearestByJoin||Speedup||Memory||
> |10Kx10K|4.2s|0.38s|11x|7x less|
> |30Kx30K|404s|31s|13x|8.3x less|
> |50Kx50K|1,158s|96s|12x|~8x less|
> |200Kx200K|~5h (extrapolated)|23min|~13x|-|
> h3. Design Notes
> This follows the same pattern as SPARK-56887 (SortMergeAsOfJoinExec for AS-OF 
> join by [~sarutak] ) – a dedicated physical operator to replace an expensive 
> rewrite for a specialized join type. Key design decisions:
>  * INNER join preserves original nullability; LEFT OUTER makes right columns 
> nullable
>  * Heap hoisted outside per-row loop and cleared per iteration (reduces GC 
> pressure)
>  * Stores indices into broadcast array, not row copies
>  * Fallback guaranteed when right exceeds broadcast threshold
> h3. Initial Implementation
> [PR #56101|https://github.com/apache/spark/pull/56101] (11 unit tests for 
> correctness pass)
> h3. Seeking Feedback
> Would appreciate thoughts from the NearestByJoin authors on:
>  * Does this approach align with the planned evolution of the feature?
>  * Any concerns about adding a dedicated physical operator vs. optimizing the 
> existing rewrite?
> Happy to collaborate and adjust the approach based on feedback. Thanks!
> cc [~dkbiswal]  [~cloud_fan]  [~sarutak] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to