[ https://issues.apache.org/jira/browse/SPARK-12704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Wenchen Fan updated SPARK-12704: -------------------------------- Description: The implementation of {{HashPartitioning.compatibleWith}} has been wrong for a while. Think of the following case: if {{table_a}} is hash partitioned by int column `i`, and {{table_b}} is also partitioned by int column `i`, logically these 2 partitionings are compatible. However, {{HashPartitioning.compatibleWith}} will return false for this case as the {{AttributeReference}} of column `i` between these 2 tables have different expr ids. With this wrong result of {{HashPartitioning.compatibleWith}}, we will go into [this branch|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala#L390] and may add unnecessary shuffle. This won't impact correctness if the join keys are exactly the same with hash partitioning keys, as there’s still an opportunity to not partition that child in that branch: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala#L428 However, if the join keys are a super-set of hash partitioning keys, for example, {{table_a}} and {{table_b}} are both hash partitioned by column `i`, and we wanna join them using column `i, j`, logically we don't need shuffle but in fact the 2 tables start out as partitioned only by `i` and is redundantly repartitioned by `i, j`. A quick fix is just set the expr id of {{AttributeReference}} to 0 before we call {{this.semanticEquals(o)}} in {{HashPartitioning.compatibleWith}}, but for long term, I think we need a better design than the `compatibleWith`, `guarantees`, and `satisfies` mechanism, as it's quite complex was: The implementation of {{HashPartitioning.compatibleWith}} has been wrong for a while. Think of the following case: if {{table_a}} is hash partitioned by int column `i`, and {{table_b}} is also partitioned by int column `i`, logically these 2 partitionings are compatible. However, {{HashPartitioning.compatibleWith}} will return false for this case as the {{AttributeReference}} of column `i` between these 2 tables have different expr ids. With this wrong result of {{HashPartitioning.compatibleWith}}, we will go into [this branch|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala#L390] and may add unnecessary shuffle. This won't impact correctness if the join keys are exactly the same with hash partitioning keys, as there’s still an opportunity to not partition that child in that branch: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala#L428 However, if the join keys are a super-set of hash partitioning keys, for example, {{table_a}} and {{table_b}} are both hash partitioned by column `i`, and we wanna join them using column `i, j`, logically we don't need shuffle but in fact the 2 tables start out as partitioned only by `i` and is redundantly repartitioned by `i, j`. > we may repartition a relation even it's not needed > -------------------------------------------------- > > Key: SPARK-12704 > URL: https://issues.apache.org/jira/browse/SPARK-12704 > Project: Spark > Issue Type: Improvement > Components: SQL > Reporter: Wenchen Fan > > The implementation of {{HashPartitioning.compatibleWith}} has been wrong for > a while. Think of the following case: > if {{table_a}} is hash partitioned by int column `i`, and {{table_b}} is also > partitioned by int column `i`, logically these 2 partitionings are > compatible. However, {{HashPartitioning.compatibleWith}} will return false > for this case as the {{AttributeReference}} of column `i` between these 2 > tables have different expr ids. > With this wrong result of {{HashPartitioning.compatibleWith}}, we will go > into [this > branch|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala#L390] > and may add unnecessary shuffle. > This won't impact correctness if the join keys are exactly the same with hash > partitioning keys, as there’s still an opportunity to not partition that > child in that branch: > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala#L428 > However, if the join keys are a super-set of hash partitioning keys, for > example, {{table_a}} and {{table_b}} are both hash partitioned by column `i`, > and we wanna join them using column `i, j`, logically we don't need shuffle > but in fact the 2 tables start out as partitioned only by `i` and is > redundantly repartitioned by `i, j`. > A quick fix is just set the expr id of {{AttributeReference}} to 0 before we > call {{this.semanticEquals(o)}} in {{HashPartitioning.compatibleWith}}, but > for long term, I think we need a better design than the `compatibleWith`, > `guarantees`, and `satisfies` mechanism, as it's quite complex -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org