[ https://issues.apache.org/jira/browse/SPARK-12704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15956285#comment-15956285 ]
yucai commented on SPARK-12704: ------------------------------- When sort-merge-join and shuffled-hash-join, the sparkPlan looks like containing *equivalent class* info in _leftKeys_ and _rightKeys_ For example: {code} scala> sql("select * from t1 join t2 on t1.f1 = t2.x and t2.y = t1.f2").explain 17/04/05 12:19:42 INFO SparkSqlParser: Parsing command: select * from t1 join t2 on t1.f1 = t2.x and t2.y = t1.f2 == Physical Plan == *SortMergeJoin [f1#80L, f2#81L], [x#88L, y#87L], Inner {code} It knows *f1* is equivalent to *x* and *f2* is equivalent to *y* (ExtractEquiJoinKeys.unapply), is it possible we use this info to improve this feature? > we may repartition a relation even it's not needed > -------------------------------------------------- > > Key: SPARK-12704 > URL: https://issues.apache.org/jira/browse/SPARK-12704 > Project: Spark > Issue Type: Improvement > Components: SQL > Reporter: Wenchen Fan > > The implementation of {{HashPartitioning.compatibleWith}} has been > sub-optimal for a while. Think of the following case: > if {{table_a}} is hash partitioned by int column `i`, and {{table_b}} is also > partitioned by int column `i`, logically these 2 partitionings are > compatible. However, {{HashPartitioning.compatibleWith}} will return false > for this case as the {{AttributeReference}} of column `i` between these 2 > tables have different expr ids. > With this wrong result of {{HashPartitioning.compatibleWith}}, we will go > into [this > branch|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala#L390] > and may add unnecessary shuffle. > This won't impact correctness if the join keys are exactly the same with hash > partitioning keys, as there’s still an opportunity to not partition that > child in that branch: > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala#L428 > However, if the join keys are a super-set of hash partitioning keys, for > example, {{table_a}} and {{table_b}} are both hash partitioned by column `i`, > and we wanna join them using column `i, j`, logically we don't need shuffle > but in fact the 2 tables start out as partitioned only by `i` and redundantly > be repartitioned by `i, j`. > A quick fix is just set the expr id of {{AttributeReference}} to 0 before we > call {{this.semanticEquals(o)}} in {{HashPartitioning.compatibleWith}}, but > for long term, I think we need a better design than the `compatibleWith`, > `guarantees`, and `satisfies` mechanism, as it's quite complex -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org