This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ca044e62536 [SPARK-38886][SQL] Remove outer join if aggregate 
functions are duplicate agnostic on streamed side
ca044e62536 is described below

commit ca044e62536b4c80acbbbab538a5f61ce074a684
Author: ulysses-you <ulyssesyo...@gmail.com>
AuthorDate: Mon Apr 18 23:22:00 2022 +0800

    [SPARK-38886][SQL] Remove outer join if aggregate functions are duplicate 
agnostic on streamed side
    
    ### What changes were proposed in this pull request?
    
    Enhance `EliminateOuterJoin` by removing outer join if match two conditions:
    - all aggregate functions are duplicate agnostic
    - references are coming from stream side
    
    ### Why are the changes needed?
    
    If aggregate child is outer join, and the aggregate references are all 
coming from the streamed side and the aggregate functions are all duplicate 
agnostic, we can remve the outer join.
    
    For example:
    ```sql
    SELECT t1.c1, min(t1.c2) FROM t1 LEFT JOIN t2 ON t1.c1 = t2.c1 GROUP BY 
t1.c1
    ==>
    SELECT t1.c1, min(t1.c2) FROM t1 GROUP BY t1.c1
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    Improve performance
    
    ### How was this patch tested?
    
    add test
    
    Closes #36177 from ulysses-you/SPARK-38886.
    
    Authored-by: ulysses-you <ulyssesyo...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/optimizer/joins.scala       | 35 +++++++++++++-----
 .../optimizer/AggregateOptimizeSuite.scala         | 42 ++++++++++++++++++++++
 2 files changed, 68 insertions(+), 9 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
index 45d8c54ea19..b21594deb70 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
 import scala.annotation.tailrec
 
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
 import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -126,11 +127,17 @@ object ReorderJoin extends Rule[LogicalPlan] with 
PredicateHelper {
  * - full outer -> left outer if only the left side has such predicates
  * - full outer -> right outer if only the right side has such predicates
  *
- * 2. Removes outer join if it only has distinct on streamed side
+ * 2. Removes outer join if aggregate is from streamed side and duplicate 
agnostic
+ *
  * {{{
  *   SELECT DISTINCT f1 FROM t1 LEFT JOIN t2 ON t1.id = t2.id  ==>  SELECT 
DISTINCT f1 FROM t1
  * }}}
  *
+ * {{{
+ *   SELECT t1.c1, max(t1.c2) FROM t1 LEFT JOIN t2 ON t1.c1 = t2.c1 GROUP BY 
t1.c1  ==>
+ *   SELECT t1.c1, max(t1.c2) FROM t1 GROUP BY t1.c1
+ * }}}
+ *
  * This rule should be executed before pushing down the Filter
  */
 object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
@@ -166,23 +173,33 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with 
PredicateHelper {
     }
   }
 
+  private def allDuplicateAgnostic(
+      aggregateExpressions: Seq[NamedExpression]): Boolean = {
+    !aggregateExpressions.exists(_.exists {
+      case agg: AggregateFunction => 
!EliminateDistinct.isDuplicateAgnostic(agg)
+      case _ => false
+    })
+  }
+
   def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
     _.containsPattern(OUTER_JOIN), ruleId) {
     case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | 
FullOuter, _, _)) =>
       val newJoinType = buildNewJoinType(f, j)
       if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType 
= newJoinType))
 
-    case a @ Aggregate(_, _, Join(left, _, LeftOuter, _, _))
-        if a.groupOnly && a.references.subsetOf(left.outputSet) =>
+    case a @ Aggregate(_, aggExprs, Join(left, _, LeftOuter, _, _))
+        if a.references.subsetOf(left.outputSet) && 
allDuplicateAgnostic(aggExprs) =>
       a.copy(child = left)
-    case a @ Aggregate(_, _, Join(_, right, RightOuter, _, _))
-        if a.groupOnly && a.references.subsetOf(right.outputSet) =>
+    case a @ Aggregate(_, aggExprs, Join(_, right, RightOuter, _, _))
+        if a.references.subsetOf(right.outputSet) && 
allDuplicateAgnostic(aggExprs) =>
       a.copy(child = right)
-    case a @ Aggregate(_, _, p @ Project(_, Join(left, _, LeftOuter, _, _)))
-        if a.groupOnly && p.references.subsetOf(left.outputSet) =>
+    case a @ Aggregate(_, aggExprs, p @ Project(projectList, Join(left, _, 
LeftOuter, _, _)))
+        if projectList.forall(_.deterministic) && 
p.references.subsetOf(left.outputSet) &&
+          allDuplicateAgnostic(aggExprs) =>
       a.copy(child = p.copy(child = left))
-    case a @ Aggregate(_, _, p @ Project(_, Join(_, right, RightOuter, _, _)))
-        if a.groupOnly && p.references.subsetOf(right.outputSet) =>
+    case a @ Aggregate(_, aggExprs, p @ Project(projectList, Join(_, right, 
RightOuter, _, _)))
+        if projectList.forall(_.deterministic) && 
p.references.subsetOf(right.outputSet) &&
+          allDuplicateAgnostic(aggExprs) =>
       a.copy(child = p.copy(child = right))
   }
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala
index cef307fcba0..915878f4338 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala
@@ -164,4 +164,46 @@ class AggregateOptimizeSuite extends AnalysisTest {
         .groupBy("x.b".attr)("x.b".attr, TrueLiteral, 
FalseLiteral.as("newAlias"))
         .analyze)
   }
+
+  test("SPARK-38886: Remove outer join if aggregate functions are duplicate 
agnostic on " +
+    "streamed side") {
+    val x = testRelation.subquery(Symbol("x"))
+    val y = testRelation.subquery(Symbol("y"))
+
+    Seq((LeftOuter, "x", x), (RightOuter, "y", y)).foreach { case (joinType, 
t, streamed) =>
+      comparePlans(Optimize.execute(
+        x.join(y, joinType, Some($"x.a" === $"y.a"))
+          .groupBy($"$t.a")($"$t.a", max($"$t.b")).analyze),
+        streamed.groupBy($"$t.a")($"$t.a", max($"$t.b")).analyze)
+
+      // with project
+      comparePlans(Optimize.execute(
+        x.join(y, joinType, Some($"x.a" === $"y.a")).select($"$t.a" as "a1", 
$"$t.b" as "b1")
+          .groupBy($"a1")($"a1", max($"b1")).analyze),
+        streamed.select($"$t.a" as "a1", $"$t.b" as "b1")
+          .groupBy($"a1")($"a1", max($"b1")).analyze)
+
+      // global aggregate
+      comparePlans(Optimize.execute(
+        x.join(y, joinType, Some($"x.a" === $"y.a"))
+          .groupBy()(max($"$t.b"), min($"$t.c")).analyze),
+        streamed.groupBy()(max($"$t.b"), min($"$t.c")).analyze)
+
+      // negative cases
+      // with non-deterministic project
+      val p1 = x.join(y, joinType, Some($"x.a" === $"y.a")).select($"$t.a" as 
"a1", rand(1) as "b1")
+        .groupBy($"b1")($"b1", max($"a1")).analyze
+      comparePlans(Optimize.execute(p1), p1)
+
+      // not from streamed side
+      val p2 = x.join(y, joinType, Some($"x.a" === $"y.a"))
+        .groupBy($"x.a", $"y.b")(min($"x.b"), max($"y.a")).analyze
+      comparePlans(Optimize.execute(p2), p2)
+
+      // not duplicate agnostic
+      val p3 = x.join(y, joinType, Some($"x.a" === $"y.a"))
+        .groupBy($"$t.a")(sum($"$t.a")).analyze
+      comparePlans(Optimize.execute(p3), p3)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to