[
https://issues.apache.org/jira/browse/SPARK-22923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16328116#comment-16328116
]
Juhong Jung commented on SPARK-22923:
-------------------------------------
I realized sort merge join cannot to be applied every theta condition, but
greater/smaller condition is ok. (Thanks [~hvanhovell])
But currently Spark doesn't use sort merge join for greater/smaller condition
theta join.
Using sort merge join for such conditions will improve performance really much.
> Non-equi join(theta join) should use sort merge join
> ----------------------------------------------------
>
> Key: SPARK-22923
> URL: https://issues.apache.org/jira/browse/SPARK-22923
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core, SQL
> Affects Versions: 2.2.1
> Reporter: Juhong Jung
> Priority: Major
>
> I found this issue during self join for retrieving same key last record
> problem. (similar with
> https://stackoverflow.com/questions/1313120/retrieving-the-last-record-in-each-group)
> Currently, SortMergeJoinExec is chosen only if join expression include
> equality expression cause SparkStrategies pattern matcher use
> ExtractEquiJoinKeys. (See SparkStrategies.scala apply method).
> When join with non-equi condition only expression, that expression is not
> matched with ExtractEquiJoinKeys and go to last case, so
> BroadcastNestedLoopJoinExec is chosen even if data size is larger than
> spark.sql.autoBroadcastJoinThreshold.
> For example, Dataframe is about 50G and spark.sql.autoBroadcastJoinThreshold
> is 10MB, but BroadcastNestedLoopJoinExec is chosen and large size dataframe
> is sent to driver to broadcast.
> Now the job is aborted because of spark.driver.maxResultSize option or driver
> container is dead because of OutOfMemory.
> I think sort merge join is good join strategy for non-equi join, so maybe
> modifying pattern matcher is one of way to being spark planner chose sort
> merge join for non-equi join.
> And, I have just added trivial equal condition to join expression for using
> sort merge join.
> Below code is sample.
> {code:java}
> val data = (1 to 10000).
> map(t => (t, scala.util.Random.nextInt(10000))).
> toDF("id", "number").
> dropDuplicates("number").
> as("data")
> val laterData = data.
> as("laterData").
> select(data.schema.fields.map(f => col(f.name).as("later_" + f.name)): _*)
> val latestData = data.
> join(
> laterData,
> 'number < 'later_number,
> "leftouter"
> ).
> filter('later_id.isNull)
> latestData.explain
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]