This is an automated email from the ASF dual-hosted git repository.
yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new fbe82fb8ffa [SPARK-36194][SQL][FOLLOWUP] Propagate distinct keys more
precisely
fbe82fb8ffa is described below
commit fbe82fb8ffaa0243c4085627e6e9a2813dc93e57
Author: Wenchen Fan <[email protected]>
AuthorDate: Fri Apr 8 10:29:39 2022 +0800
[SPARK-36194][SQL][FOLLOWUP] Propagate distinct keys more precisely
### What changes were proposed in this pull request?
This PR is a followup of https://github.com/apache/spark/pull/35779 , to
propagate distinct keys more precisely in 2 cases:
1. For `LIMIT 1`, each output attribute is a distinct key, not the entire
tuple.
2. For aggregate, we can still propagate distinct keys from child.
### Why are the changes needed?
make the optimization cover more cases
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
new tests
Closes #36100 from cloud-fan/followup.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Yuming Wang <[email protected]>
---
.../catalyst/plans/logical/DistinctKeyVisitor.scala | 20 ++++++++++++++++++--
.../plans/logical/DistinctKeyVisitorSuite.scala | 7 ++++++-
2 files changed, 24 insertions(+), 3 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitor.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitor.scala
index bb2bc4e3d2f..726c5259288 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitor.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitor.scala
@@ -50,11 +50,27 @@ object DistinctKeyVisitor extends
LogicalPlanVisitor[Set[ExpressionSet]] {
}.filter(_.nonEmpty)
}
+ /**
+ * Add a new ExpressionSet S into distinctKeys D.
+ * To minimize the size of D:
+ * 1. If there is a subset of S in D, return D.
+ * 2. Otherwise, remove all the ExpressionSet containing S from D, and add
the new one.
+ */
+ private def addDistinctKey(
+ keys: Set[ExpressionSet],
+ newExpressionSet: ExpressionSet): Set[ExpressionSet] = {
+ if (keys.exists(_.subsetOf(newExpressionSet))) {
+ keys
+ } else {
+ keys.filterNot(s => newExpressionSet.subsetOf(s)) + newExpressionSet
+ }
+ }
+
override def default(p: LogicalPlan): Set[ExpressionSet] =
Set.empty[ExpressionSet]
override def visitAggregate(p: Aggregate): Set[ExpressionSet] = {
val groupingExps = ExpressionSet(p.groupingExpressions) // handle group by
a, a
- projectDistinctKeys(Set(groupingExps), p.aggregateExpressions)
+ projectDistinctKeys(addDistinctKey(p.child.distinctKeys, groupingExps),
p.aggregateExpressions)
}
override def visitDistinct(p: Distinct): Set[ExpressionSet] =
Set(ExpressionSet(p.output))
@@ -70,7 +86,7 @@ object DistinctKeyVisitor extends
LogicalPlanVisitor[Set[ExpressionSet]] {
override def visitGlobalLimit(p: GlobalLimit): Set[ExpressionSet] = {
p.maxRows match {
- case Some(value) if value <= 1 => Set(ExpressionSet(p.output))
+ case Some(value) if value <= 1 => p.output.map(attr =>
ExpressionSet(Seq(attr))).toSet
case _ => p.child.distinctKeys
}
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitorSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitorSuite.scala
index b884b27fe3b..80342f6dd7a 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitorSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitorSuite.scala
@@ -66,6 +66,10 @@ class DistinctKeyVisitorSuite extends PlanTest {
Set(ExpressionSet(Seq(a)), ExpressionSet(Seq(d.toAttribute))))
checkDistinctAttributes(t1.groupBy(f.child, $"b")(f, $"b", sum($"c")),
Set(ExpressionSet(Seq(f.toAttribute, b))))
+
+ // Aggregate should also propagate distinct keys from child
+ checkDistinctAttributes(t1.limit(1).groupBy($"a", $"b")($"a", $"b"),
+ Set(ExpressionSet(Seq(a)), ExpressionSet(Seq(b))))
}
test("Distinct's distinct attributes") {
@@ -86,7 +90,8 @@ class DistinctKeyVisitorSuite extends PlanTest {
test("Limit's distinct attributes") {
checkDistinctAttributes(Distinct(t1).limit(10), Set(ExpressionSet(Seq(a,
b, c))))
checkDistinctAttributes(LocalLimit(10, Distinct(t1)),
Set(ExpressionSet(Seq(a, b, c))))
- checkDistinctAttributes(t1.limit(1), Set(ExpressionSet(Seq(a, b, c))))
+ checkDistinctAttributes(t1.limit(1),
+ Set(ExpressionSet(Seq(a)), ExpressionSet(Seq(b)), ExpressionSet(Seq(c))))
}
test("Intersect's distinct attributes") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]