Repository: spark
Updated Branches:
refs/heads/master 3f67382e7 -> 67bd8e3c2
[SQL] Set outputPartitioning of BroadcastHashJoin correctly.
I think we will not generate the plan triggering this bug at this moment. But,
let me explain it...
Right now, we are using `left.outputPartitioning` as the `outputPartitioning`
of a `BroadcastHashJoin`. We may have a wrong physical plan for cases like...
```sql
SELECT l.key, count(*)
FROM (SELECT key, count(*) as cnt
FROM src
GROUP BY key) l // This is buildPlan
JOIN r // This is the streamedPlan
ON (l.cnt = r.value)
GROUP BY l.key
```
Let's say we have a `BroadcastHashJoin` on `l` and `r`. For this case, we will
pick `l`'s `outputPartitioning` for the `outputPartitioning`of the
`BroadcastHashJoin` on `l` and `r`. Also, because the last `GROUP BY` is using
`l.key` as the key, we will not introduce an `Exchange` for this aggregation.
However, `r`'s outputPartitioning may not match the required distribution of
the last `GROUP BY` and we fail to group data correctly.
JIRA is being reindexed. I will create a JIRA ticket once it is back online.
Author: Yin Huai <[email protected]>
Closes #1735 from yhuai/BroadcastHashJoin and squashes the following commits:
96d9cb3 [Yin Huai] Set outputPartitioning correctly.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/67bd8e3c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/67bd8e3c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/67bd8e3c
Branch: refs/heads/master
Commit: 67bd8e3c217a80c3117a6e3853aa60fe13d08c91
Parents: 3f67382
Author: Yin Huai <[email protected]>
Authored: Sat Aug 2 13:16:41 2014 -0700
Committer: Michael Armbrust <[email protected]>
Committed: Sat Aug 2 13:16:41 2014 -0700
----------------------------------------------------------------------
.../src/main/scala/org/apache/spark/sql/execution/joins.scala | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/67bd8e3c/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
index cc138c7..51bb615 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
@@ -405,8 +405,7 @@ case class BroadcastHashJoin(
left: SparkPlan,
right: SparkPlan) extends BinaryNode with HashJoin {
-
- override def outputPartitioning: Partitioning = left.outputPartitioning
+ override def outputPartitioning: Partitioning =
streamedPlan.outputPartitioning
override def requiredChildDistribution =
UnspecifiedDistribution :: UnspecifiedDistribution :: Nil
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]