This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new af5e0a267e5a [SPARK-46037][SQL] Correctness fix for Shuffled Hash Join 
build left without codegen
af5e0a267e5a is described below

commit af5e0a267e5a37adc25bdf9c78b6fe207ef7bfb5
Author: Wenchen Fan <[email protected]>
AuthorDate: Thu Aug 29 12:27:41 2024 +0800

    [SPARK-46037][SQL] Correctness fix for Shuffled Hash Join build left 
without codegen
    
    ### What changes were proposed in this pull request?
    
    This is a re-submitting of https://github.com/apache/spark/pull/43938 to 
fix a join correctness bug caused by https://github.com/apache/spark/pull/41398 
. Credits go to mcdull-zhang
    
    ### Why are the changes needed?
    
    correctness fix
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, the query result will be corrected.
    
    ### How was this patch tested?
    
    new test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #47905 from cloud-fan/join.
    
    Authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/execution/joins/HashJoin.scala       |  5 ++---
 .../spark/sql/execution/joins/OuterJoinSuite.scala | 22 ++++++++++++++++++++--
 2 files changed, 22 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index 3ae76a1db22b..5d59a48d544a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -138,9 +138,8 @@ trait HashJoin extends JoinCodegenSupport {
     UnsafeProjection.create(streamedBoundKeys)
 
   @transient protected[this] lazy val boundCondition = if 
(condition.isDefined) {
-    if (joinType == FullOuter && buildSide == BuildLeft) {
-      // Put join left side before right side. This is to be consistent with
-      // `ShuffledHashJoinExec.fullOuterJoin`.
+    if ((joinType == FullOuter || joinType == LeftOuter) && buildSide == 
BuildLeft) {
+      // Put join left side before right side.
       Predicate.create(condition.get, buildPlan.output ++ 
streamedPlan.output).eval _
     } else {
       Predicate.create(condition.get, streamedPlan.output ++ 
buildPlan.output).eval _
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
index e4ea88067c7c..7ba93ee13e18 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
@@ -26,11 +26,12 @@ import org.apache.spark.sql.catalyst.plans.logical.{Join, 
JoinHint}
 import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest}
 import org.apache.spark.sql.execution.exchange.EnsureRequirements
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestData}
 import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType}
 
-class OuterJoinSuite extends SparkPlanTest with SharedSparkSession {
+class OuterJoinSuite extends SparkPlanTest with SharedSparkSession with 
SQLTestData {
   import testImplicits.toRichColumn
+  setupTestData()
 
   private val EnsureRequirements = new EnsureRequirements()
 
@@ -326,4 +327,21 @@ class OuterJoinSuite extends SparkPlanTest with 
SharedSparkSession {
       (null, null, 7, 7.0)
     )
   )
+
+  testWithWholeStageCodegenOnAndOff(
+    "SPARK-46037: ShuffledHashJoin build left with left outer join, codegen 
off") { _ =>
+    def join(hint: String): DataFrame = {
+      sql(
+        s"""
+          |SELECT /*+ $hint */ *
+          |FROM testData t1
+          |LEFT OUTER JOIN
+          |testData2 t2
+          |ON key = a AND concat(value, b) = '12'
+          |""".stripMargin)
+    }
+    val df1 = join("SHUFFLE_HASH(t1)")
+    val df2 = join("SHUFFLE_MERGE(t1)")
+    checkAnswer(df1, identity, df2.collect().toSeq)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to