[ https://issues.apache.org/jira/browse/SPARK-22923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16328116#comment-16328116 ]
Juhong Jung commented on SPARK-22923: ------------------------------------- I realized sort merge join cannot to be applied every theta condition, but greater/smaller condition is ok. (Thanks [~hvanhovell]) But currently Spark doesn't use sort merge join for greater/smaller condition theta join. Using sort merge join for such conditions will improve performance really much. > Non-equi join(theta join) should use sort merge join > ---------------------------------------------------- > > Key: SPARK-22923 > URL: https://issues.apache.org/jira/browse/SPARK-22923 > Project: Spark > Issue Type: Improvement > Components: Spark Core, SQL > Affects Versions: 2.2.1 > Reporter: Juhong Jung > Priority: Major > > I found this issue during self join for retrieving same key last record > problem. (similar with > https://stackoverflow.com/questions/1313120/retrieving-the-last-record-in-each-group) > Currently, SortMergeJoinExec is chosen only if join expression include > equality expression cause SparkStrategies pattern matcher use > ExtractEquiJoinKeys. (See SparkStrategies.scala apply method). > When join with non-equi condition only expression, that expression is not > matched with ExtractEquiJoinKeys and go to last case, so > BroadcastNestedLoopJoinExec is chosen even if data size is larger than > spark.sql.autoBroadcastJoinThreshold. > For example, Dataframe is about 50G and spark.sql.autoBroadcastJoinThreshold > is 10MB, but BroadcastNestedLoopJoinExec is chosen and large size dataframe > is sent to driver to broadcast. > Now the job is aborted because of spark.driver.maxResultSize option or driver > container is dead because of OutOfMemory. > I think sort merge join is good join strategy for non-equi join, so maybe > modifying pattern matcher is one of way to being spark planner chose sort > merge join for non-equi join. > And, I have just added trivial equal condition to join expression for using > sort merge join. > Below code is sample. > {code:java} > val data = (1 to 10000). > map(t => (t, scala.util.Random.nextInt(10000))). > toDF("id", "number"). > dropDuplicates("number"). > as("data") > val laterData = data. > as("laterData"). > select(data.schema.fields.map(f => col(f.name).as("later_" + f.name)): _*) > val latestData = data. > join( > laterData, > 'number < 'later_number, > "leftouter" > ). > filter('later_id.isNull) > latestData.explain > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org