[
https://issues.apache.org/jira/browse/SPARK-20010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15931732#comment-15931732
]
Takeshi Yamamuro commented on SPARK-20010:
------------------------------------------
As you explained, outputOrdering in SortMergeJoinExec certainly does not have
rightKeys as sorted (But, I'm not sure why... cc: [~cloud_fan])
https://github.com/apache/spark/blame/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala#L83
{code}
scala> sql("SET spark.sql.join.preferSortMergeJoin=true")
scala> sql("SET spark.sql.autoBroadcastJoinThreshold=1024")
scala> val leftDf1 = spark.range(1000000).selectExpr("id * 2 AS key", "0")
scala> val rightDf1 = spark.range(1000000).selectExpr("id * 3 AS key", "0")
scala> val leftDf2 = leftDf1.join(rightDf1, leftDf1("key") === rightDf1("key"),
"INNER")
scala> val rightDf2 = spark.range(1000000).selectExpr("id * 4 AS key", "0")
scala> leftDf2.join(rightDf2, leftDf1("key") === rightDf2("key"),
"INNER").explain
== Physical Plan ==
*SortMergeJoin [key#207L], [key#239L], Inner
:- *SortMergeJoin [key#207L], [key#215L], Inner
: :- *Sort [key#207L ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(key#207L, 200)
: : +- *Project [(id#204L * 2) AS key#207L, 0 AS 0#208]
: : +- *Range (0, 1000000, step=1, splits=Some(4))
: +- *Sort [key#215L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(key#215L, 200)
: +- *Project [(id#212L * 3) AS key#215L, 0 AS 0#216]
: +- *Range (0, 1000000, step=1, splits=Some(4))
+- *Sort [key#239L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(key#239L, 200)
+- *Project [(id#236L * 4) AS key#239L, 0 AS 0#240]
+- *Range (0, 1000000, step=1, splits=Some(4))
scala> leftDf2.join(rightDf2, rightDf1("key") === rightDf2("key"),
"INNER").explain
== Physical Plan ==
*SortMergeJoin [key#215L], [key#239L], Inner
:- *Sort [key#215L ASC NULLS FIRST], false, 0 <----- Unnecessary sort?
: +- *SortMergeJoin [key#207L], [key#215L], Inner
: :- *Sort [key#207L ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(key#207L, 200)
: : +- *Project [(id#204L * 2) AS key#207L, 0 AS 0#208]
: : +- *Range (0, 1000000, step=1, splits=Some(4))
: +- *Sort [key#215L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(key#215L, 200)
: +- *Project [(id#212L * 3) AS key#215L, 0 AS 0#216]
: +- *Range (0, 1000000, step=1, splits=Some(4))
+- *Sort [key#239L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(key#239L, 200)
+- *Project [(id#236L * 4) AS key#239L, 0 AS 0#240]
+- *Range (0, 1000000, step=1, splits=Some(4))
{code}
> Sort information is lost after sort merge join
> ----------------------------------------------
>
> Key: SPARK-20010
> URL: https://issues.apache.org/jira/browse/SPARK-20010
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.2.0
> Reporter: Zhenhua Wang
>
> After sort merge join for inner join, now we only keep left key ordering.
> However, after inner join, right key has the same value and order as left
> key. So if we need another smj on right key, we will unnecessarily add a sort
> which causes additional cost.
> As a more complicated example, A join B on A.key = B.key join C on B.key =
> C.key join D on A.key = D.key. We will unnecessarily add a sort on B.key when
> join \{A, B\} and C, and add a sort on A.key when join \{A, B, C\} and D.
> To fix this, we need to propagate all sorted information (equivalent
> expressions) from bottom up through `outputOrdering` and `SortOrder`.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]