[
https://issues.apache.org/jira/browse/SPARK-22923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16325074#comment-16325074
]
Marco Gaido commented on SPARK-22923:
-------------------------------------
I dob't think SortMergeJoinExec can be used, since the conditions in theta join
may be of any kind, therefore you should always compare all the rows from both
sides. I think a more interesting question is why CartesianProductExec is used
only for InnerLike operations.
> Non-equi join(theta join) should use sort merge join
> ----------------------------------------------------
>
> Key: SPARK-22923
> URL: https://issues.apache.org/jira/browse/SPARK-22923
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core, SQL
> Affects Versions: 2.2.1
> Reporter: Juhong Jung
>
> I found this issue during self join for retrieving same key last record
> problem. (similar with
> https://stackoverflow.com/questions/1313120/retrieving-the-last-record-in-each-group)
> Currently, SortMergeJoinExec is chosen only if join expression include
> equality expression cause SparkStrategies pattern matcher use
> ExtractEquiJoinKeys. (See SparkStrategies.scala apply method).
> When join with non-equi condition only expression, that expression is not
> matched with ExtractEquiJoinKeys and go to last case, so
> BroadcastNestedLoopJoinExec is chosen even if data size is larger than
> spark.sql.autoBroadcastJoinThreshold.
> For example, Dataframe is about 50G and spark.sql.autoBroadcastJoinThreshold
> is 10MB, but BroadcastNestedLoopJoinExec is chosen and large size dataframe
> is sent to driver to broadcast.
> Now the job is aborted because of spark.driver.maxResultSize option or driver
> container is dead because of OutOfMemory.
> I think sort merge join is good join strategy for non-equi join, so maybe
> modifying pattern matcher is one of way to being spark planner chose sort
> merge join for non-equi join.
> And, I have just added trivial equal condition to join expression for using
> sort merge join.
> Below code is sample.
> {code:java}
> val data = (1 to 10000).
> map(t => (t, scala.util.Random.nextInt(10000))).
> toDF("id", "number").
> dropDuplicates("number").
> as("data")
> val laterData = data.
> as("laterData").
> select(data.schema.fields.map(f => col(f.name).as("later_" + f.name)): _*)
> val latestData = data.
> join(
> laterData,
> 'number < 'later_number,
> "leftouter"
> ).
> filter('later_id.isNull)
> latestData.explain
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]