[ 
https://issues.apache.org/jira/browse/SPARK-22923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16328116#comment-16328116
 ] 

Juhong Jung commented on SPARK-22923:
-------------------------------------

 I realized sort merge join cannot to be applied every theta condition, but 
greater/smaller condition is ok. (Thanks [~hvanhovell])

But currently Spark doesn't use sort merge join for greater/smaller condition 
theta join.

Using sort merge join for such conditions will improve performance really much.

> Non-equi join(theta join) should use sort merge join
> ----------------------------------------------------
>
>                 Key: SPARK-22923
>                 URL: https://issues.apache.org/jira/browse/SPARK-22923
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core, SQL
>    Affects Versions: 2.2.1
>            Reporter: Juhong Jung
>            Priority: Major
>
> I found this issue during self join for retrieving same key last record 
> problem. (similar with 
> https://stackoverflow.com/questions/1313120/retrieving-the-last-record-in-each-group)
> Currently, SortMergeJoinExec is chosen only if join expression include 
> equality expression cause SparkStrategies pattern matcher use 
> ExtractEquiJoinKeys. (See SparkStrategies.scala apply method).
> When join with non-equi condition only expression, that expression is not 
> matched with ExtractEquiJoinKeys and go to last case, so 
> BroadcastNestedLoopJoinExec is chosen even if data size is larger than 
> spark.sql.autoBroadcastJoinThreshold.
> For example, Dataframe is about 50G and spark.sql.autoBroadcastJoinThreshold 
> is 10MB, but BroadcastNestedLoopJoinExec is chosen and large size dataframe 
> is sent to driver to broadcast.
> Now the job is aborted because of spark.driver.maxResultSize option or driver 
> container is dead because of OutOfMemory.
> I think sort merge join is good join strategy for non-equi join, so maybe 
> modifying pattern matcher is one of way to being spark planner chose sort 
> merge join for non-equi join.
> And, I have just added trivial equal condition to join expression for using 
> sort merge join.
> Below code is sample.
> {code:java}
> val data = (1 to 10000).
>   map(t => (t, scala.util.Random.nextInt(10000))).
>   toDF("id", "number").
>   dropDuplicates("number").
>   as("data")
> val laterData = data.
>   as("laterData").
>   select(data.schema.fields.map(f => col(f.name).as("later_" + f.name)): _*)
> val latestData = data.
>   join(
>     laterData,
>     'number < 'later_number,
>     "leftouter"
>   ).
>   filter('later_id.isNull)
> latestData.explain
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to