This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fbe82fb8ffa [SPARK-36194][SQL][FOLLOWUP] Propagate distinct keys more 
precisely
fbe82fb8ffa is described below

commit fbe82fb8ffaa0243c4085627e6e9a2813dc93e57
Author: Wenchen Fan <[email protected]>
AuthorDate: Fri Apr 8 10:29:39 2022 +0800

    [SPARK-36194][SQL][FOLLOWUP] Propagate distinct keys more precisely
    
    ### What changes were proposed in this pull request?
    
    This PR is a followup of https://github.com/apache/spark/pull/35779 , to 
propagate distinct keys more precisely in 2 cases:
    1. For `LIMIT 1`, each output attribute is a distinct key, not the entire 
tuple.
    2. For aggregate, we can still propagate distinct keys from child.
    
    ### Why are the changes needed?
    
    make the optimization cover more cases
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    new tests
    
    Closes #36100 from cloud-fan/followup.
    
    Authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Yuming Wang <[email protected]>
---
 .../catalyst/plans/logical/DistinctKeyVisitor.scala  | 20 ++++++++++++++++++--
 .../plans/logical/DistinctKeyVisitorSuite.scala      |  7 ++++++-
 2 files changed, 24 insertions(+), 3 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitor.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitor.scala
index bb2bc4e3d2f..726c5259288 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitor.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitor.scala
@@ -50,11 +50,27 @@ object DistinctKeyVisitor extends 
LogicalPlanVisitor[Set[ExpressionSet]] {
     }.filter(_.nonEmpty)
   }
 
+  /**
+   * Add a new ExpressionSet S into distinctKeys D.
+   * To minimize the size of D:
+   * 1. If there is a subset of S in D, return D.
+   * 2. Otherwise, remove all the ExpressionSet containing S from D, and add 
the new one.
+   */
+  private def addDistinctKey(
+      keys: Set[ExpressionSet],
+      newExpressionSet: ExpressionSet): Set[ExpressionSet] = {
+    if (keys.exists(_.subsetOf(newExpressionSet))) {
+      keys
+    } else {
+      keys.filterNot(s => newExpressionSet.subsetOf(s)) + newExpressionSet
+    }
+  }
+
   override def default(p: LogicalPlan): Set[ExpressionSet] = 
Set.empty[ExpressionSet]
 
   override def visitAggregate(p: Aggregate): Set[ExpressionSet] = {
     val groupingExps = ExpressionSet(p.groupingExpressions) // handle group by 
a, a
-    projectDistinctKeys(Set(groupingExps), p.aggregateExpressions)
+    projectDistinctKeys(addDistinctKey(p.child.distinctKeys, groupingExps), 
p.aggregateExpressions)
   }
 
   override def visitDistinct(p: Distinct): Set[ExpressionSet] = 
Set(ExpressionSet(p.output))
@@ -70,7 +86,7 @@ object DistinctKeyVisitor extends 
LogicalPlanVisitor[Set[ExpressionSet]] {
 
   override def visitGlobalLimit(p: GlobalLimit): Set[ExpressionSet] = {
     p.maxRows match {
-      case Some(value) if value <= 1 => Set(ExpressionSet(p.output))
+      case Some(value) if value <= 1 => p.output.map(attr => 
ExpressionSet(Seq(attr))).toSet
       case _ => p.child.distinctKeys
     }
   }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitorSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitorSuite.scala
index b884b27fe3b..80342f6dd7a 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitorSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitorSuite.scala
@@ -66,6 +66,10 @@ class DistinctKeyVisitorSuite extends PlanTest {
       Set(ExpressionSet(Seq(a)), ExpressionSet(Seq(d.toAttribute))))
     checkDistinctAttributes(t1.groupBy(f.child, $"b")(f, $"b", sum($"c")),
       Set(ExpressionSet(Seq(f.toAttribute, b))))
+
+    // Aggregate should also propagate distinct keys from child
+    checkDistinctAttributes(t1.limit(1).groupBy($"a", $"b")($"a", $"b"),
+      Set(ExpressionSet(Seq(a)), ExpressionSet(Seq(b))))
   }
 
   test("Distinct's distinct attributes") {
@@ -86,7 +90,8 @@ class DistinctKeyVisitorSuite extends PlanTest {
   test("Limit's distinct attributes") {
     checkDistinctAttributes(Distinct(t1).limit(10), Set(ExpressionSet(Seq(a, 
b, c))))
     checkDistinctAttributes(LocalLimit(10, Distinct(t1)), 
Set(ExpressionSet(Seq(a, b, c))))
-    checkDistinctAttributes(t1.limit(1), Set(ExpressionSet(Seq(a, b, c))))
+    checkDistinctAttributes(t1.limit(1),
+      Set(ExpressionSet(Seq(a)), ExpressionSet(Seq(b)), ExpressionSet(Seq(c))))
   }
 
   test("Intersect's distinct attributes") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to