This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 1bf46c1  [SPARK-37064][SQL] Fix outer join return the wrong max rows 
if other side is empty
1bf46c1 is described below

commit 1bf46c161a555506b2935dcc534d06fc4fa77ae4
Author: ulysses-you <ulyssesyo...@gmail.com>
AuthorDate: Wed Oct 20 16:04:33 2021 +0800

    [SPARK-37064][SQL] Fix outer join return the wrong max rows if other side 
is empty
    
    ### What changes were proposed in this pull request?
    
    Add min rows check in `Join.maxRows` if the join type is outer join.
    
    ### Why are the changes needed?
    
    Outer join should return at least num rows of it's outer side, i.e left 
outer join with its left side, right outer join with its right side, full outer 
join with its both side.
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, bug fix
    
    ### How was this patch tested?
    
    add test
    
    Closes #34336 from ulysses-you/SPARK-37064.
    
    Authored-by: ulysses-you <ulyssesyo...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 618072c0d0881ab240c14bcc1ff1620504c29714)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../plans/logical/basicLogicalOperators.scala      | 12 +++++-
 .../catalyst/optimizer/CombiningLimitsSuite.scala  | 43 ++++++++++++++++++++++
 2 files changed, 53 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 50e8d64..9a1b641 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -383,8 +383,16 @@ case class Join(
   override def maxRows: Option[Long] = {
     joinType match {
       case Inner | Cross | FullOuter | LeftOuter | RightOuter
-        if left.maxRows.isDefined && right.maxRows.isDefined =>
-        val maxRows = BigInt(left.maxRows.get) * BigInt(right.maxRows.get)
+          if left.maxRows.isDefined && right.maxRows.isDefined =>
+        val leftMaxRows = BigInt(left.maxRows.get)
+        val rightMaxRows = BigInt(right.maxRows.get)
+        val minRows = joinType match {
+          case LeftOuter => leftMaxRows
+          case RightOuter => rightMaxRows
+          case FullOuter => leftMaxRows + rightMaxRows
+          case _ => BigInt(0)
+        }
+        val maxRows = (leftMaxRows * rightMaxRows).max(minRows)
         if (maxRows.isValidLong) {
           Some(maxRows.toLong)
         } else {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala
index 423ff81..46e9dea 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala
@@ -51,6 +51,7 @@ class CombiningLimitsSuite extends PlanTest {
   )
   val testRelation3 = RelationWithoutMaxRows(Seq("i".attr.int))
   val testRelation4 = LongMaxRelation(Seq("j".attr.int))
+  val testRelation5 = EmptyRelation(Seq("k".attr.int))
 
   test("limits: combines two limits") {
     val originalQuery =
@@ -235,6 +236,44 @@ class CombiningLimitsSuite extends PlanTest {
     comparePlans(optimized, testRelation)
   }
 
+  test("SPARK-37064: Fix outer join return the wrong max rows if other side is 
empty") {
+    Seq(LeftOuter, FullOuter).foreach { joinType =>
+      checkPlanAndMaxRow(
+        testRelation.join(testRelation5, joinType).limit(9),
+        testRelation.join(testRelation5, joinType).limit(9),
+        9
+      )
+
+      checkPlanAndMaxRow(
+        testRelation.join(testRelation5, joinType).limit(10),
+        testRelation.join(testRelation5, joinType),
+        10
+      )
+    }
+
+    Seq(RightOuter, FullOuter).foreach { joinType =>
+      checkPlanAndMaxRow(
+        testRelation5.join(testRelation, joinType).limit(9),
+        testRelation5.join(testRelation, joinType).limit(9),
+        9
+      )
+
+      checkPlanAndMaxRow(
+        testRelation5.join(testRelation, joinType).limit(10),
+        testRelation5.join(testRelation, joinType),
+        10
+      )
+    }
+
+    Seq(Inner, Cross).foreach { joinType =>
+      checkPlanAndMaxRow(
+        testRelation.join(testRelation5, joinType).limit(9),
+        testRelation.join(testRelation5, joinType),
+        0
+      )
+    }
+  }
+
   private def checkPlanAndMaxRow(
       optimized: LogicalPlan, expected: LogicalPlan, expectedMaxRow: Long): 
Unit = {
     comparePlans(Optimize.execute(optimized.analyze), expected.analyze)
@@ -249,3 +288,7 @@ case class RelationWithoutMaxRows(output: Seq[Attribute]) 
extends LeafNode {
 case class LongMaxRelation(output: Seq[Attribute]) extends LeafNode {
   override def maxRows: Option[Long] = Some(Long.MaxValue)
 }
+
+case class EmptyRelation(output: Seq[Attribute]) extends LeafNode {
+  override def maxRows: Option[Long] = Some(0)
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to