[ 
https://issues.apache.org/jira/browse/SPARK-20010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15931732#comment-15931732
 ] 

Takeshi Yamamuro commented on SPARK-20010:
------------------------------------------

As you explained, outputOrdering in SortMergeJoinExec certainly does not have 
rightKeys as sorted (But, I'm not sure why... cc: [~cloud_fan])
https://github.com/apache/spark/blame/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala#L83

{code}
scala> sql("SET spark.sql.join.preferSortMergeJoin=true")
scala> sql("SET spark.sql.autoBroadcastJoinThreshold=1024")
scala> val leftDf1 = spark.range(1000000).selectExpr("id * 2 AS key", "0")
scala> val rightDf1 = spark.range(1000000).selectExpr("id * 3 AS key", "0")
scala> val leftDf2 = leftDf1.join(rightDf1, leftDf1("key") === rightDf1("key"), 
"INNER")
scala> val rightDf2 = spark.range(1000000).selectExpr("id * 4 AS key", "0")
scala> leftDf2.join(rightDf2, leftDf1("key") === rightDf2("key"), 
"INNER").explain
== Physical Plan ==
*SortMergeJoin [key#207L], [key#239L], Inner
:- *SortMergeJoin [key#207L], [key#215L], Inner
:  :- *Sort [key#207L ASC NULLS FIRST], false, 0
:  :  +- Exchange hashpartitioning(key#207L, 200)
:  :     +- *Project [(id#204L * 2) AS key#207L, 0 AS 0#208]
:  :        +- *Range (0, 1000000, step=1, splits=Some(4))
:  +- *Sort [key#215L ASC NULLS FIRST], false, 0
:     +- Exchange hashpartitioning(key#215L, 200)
:        +- *Project [(id#212L * 3) AS key#215L, 0 AS 0#216]
:           +- *Range (0, 1000000, step=1, splits=Some(4))
+- *Sort [key#239L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(key#239L, 200)
      +- *Project [(id#236L * 4) AS key#239L, 0 AS 0#240]
         +- *Range (0, 1000000, step=1, splits=Some(4))

scala> leftDf2.join(rightDf2, rightDf1("key") === rightDf2("key"), 
"INNER").explain
== Physical Plan ==
*SortMergeJoin [key#215L], [key#239L], Inner
:- *Sort [key#215L ASC NULLS FIRST], false, 0        <----- Unnecessary sort?
:  +- *SortMergeJoin [key#207L], [key#215L], Inner
:     :- *Sort [key#207L ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(key#207L, 200)
:     :     +- *Project [(id#204L * 2) AS key#207L, 0 AS 0#208]
:     :        +- *Range (0, 1000000, step=1, splits=Some(4))
:     +- *Sort [key#215L ASC NULLS FIRST], false, 0
:        +- Exchange hashpartitioning(key#215L, 200)
:           +- *Project [(id#212L * 3) AS key#215L, 0 AS 0#216]
:              +- *Range (0, 1000000, step=1, splits=Some(4))
+- *Sort [key#239L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(key#239L, 200)
      +- *Project [(id#236L * 4) AS key#239L, 0 AS 0#240]
         +- *Range (0, 1000000, step=1, splits=Some(4))
{code}

> Sort information is lost after sort merge join
> ----------------------------------------------
>
>                 Key: SPARK-20010
>                 URL: https://issues.apache.org/jira/browse/SPARK-20010
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.2.0
>            Reporter: Zhenhua Wang
>
> After sort merge join for inner join, now we only keep left key ordering. 
> However, after inner join, right key has the same value and order as left 
> key. So if we need another smj on right key, we will unnecessarily add a sort 
> which causes additional cost.
> As a more complicated example, A join B on A.key = B.key join C on B.key = 
> C.key join D on A.key = D.key. We will unnecessarily add a sort on B.key when 
> join \{A, B\} and C, and add a sort on A.key when join \{A, B, C\} and D.
> To fix this, we need to propagate all sorted information (equivalent 
> expressions) from bottom up through `outputOrdering` and `SortOrder`.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to