This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 9e61043 [SPARK-34681][SQL] Fix bug for full outer shuffled hash join
when building left side with non-equal condition
9e61043 is described below
commit 9e610438a6211aa8629637644c512a41332d12a5
Author: Cheng Su <[email protected]>
AuthorDate: Tue Mar 9 22:55:27 2021 -0800
[SPARK-34681][SQL] Fix bug for full outer shuffled hash join when building
left side with non-equal condition
### What changes were proposed in this pull request?
For full outer shuffled hash join with building hash map on left side, and
having non-equal condition, the join can produce wrong result.
The root cause is `boundCondition` in `HashJoin.scala` always assumes the
left side row is `streamedPlan` and right side row is `buildPlan`
([streamedPlan.output ++
buildPlan.output](https://github.com/apache/spark/blob/branch-3.1/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala#L141)).
This is valid assumption, except for full outer + build left case.
The fix is to correct `boundCondition` in `HashJoin.scala` to handle full
outer + build left case properly. See reproduce in
https://issues.apache.org/jira/browse/SPARK-32399?focusedCommentId=17298414&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17298414
.
### Why are the changes needed?
Fix data correctness bug.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Changed the test in `OuterJoinSuite.scala` to cover full outer shuffled
hash join.
Before this change, the unit test `basic full outer join using
ShuffledHashJoin` in `OuterJoinSuite.scala` is failed.
Closes #31792 from c21/join-bugfix.
Authored-by: Cheng Su <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit a916690dd9aac40df38922dbea233785354a2f2a)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/sql/execution/joins/HashJoin.scala | 8 +++++++-
.../spark/sql/execution/joins/OuterJoinSuite.scala | 22 ++++++++++------------
2 files changed, 17 insertions(+), 13 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index 53bd591..42219ee 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -138,7 +138,13 @@ trait HashJoin extends BaseJoinExec with CodegenSupport {
UnsafeProjection.create(streamedBoundKeys)
@transient protected[this] lazy val boundCondition = if
(condition.isDefined) {
- Predicate.create(condition.get, streamedPlan.output ++
buildPlan.output).eval _
+ if (joinType == FullOuter && buildSide == BuildLeft) {
+ // Put join left side before right side. This is to be consistent with
+ // `ShuffledHashJoinExec.fullOuterJoin`.
+ Predicate.create(condition.get, buildPlan.output ++
streamedPlan.output).eval _
+ } else {
+ Predicate.create(condition.get, streamedPlan.output ++
buildPlan.output).eval _
+ }
} else {
(r: InternalRow) => true
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
index 9f7e0a1..238d37a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
@@ -104,18 +104,16 @@ class OuterJoinSuite extends SparkPlanTest with
SharedSparkSession {
ExtractEquiJoinKeys.unapply(join)
}
- if (joinType != FullOuter) {
- test(s"$testName using ShuffledHashJoin") {
- extractJoinParts().foreach { case (_, leftKeys, rightKeys,
boundCondition, _, _, _) =>
- withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
- val buildSide = if (joinType == LeftOuter) BuildRight else
BuildLeft
- checkAnswer2(leftRows, rightRows, (left: SparkPlan, right:
SparkPlan) =>
- EnsureRequirements.apply(
- ShuffledHashJoinExec(
- leftKeys, rightKeys, joinType, buildSide, boundCondition,
left, right)),
- expectedAnswer.map(Row.fromTuple),
- sortAnswers = true)
- }
+ test(s"$testName using ShuffledHashJoin") {
+ extractJoinParts().foreach { case (_, leftKeys, rightKeys,
boundCondition, _, _, _) =>
+ withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
+ val buildSide = if (joinType == LeftOuter) BuildRight else BuildLeft
+ checkAnswer2(leftRows, rightRows, (left: SparkPlan, right:
SparkPlan) =>
+ EnsureRequirements.apply(
+ ShuffledHashJoinExec(
+ leftKeys, rightKeys, joinType, buildSide, boundCondition,
left, right)),
+ expectedAnswer.map(Row.fromTuple),
+ sortAnswers = true)
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]