This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new de68152f01c [SPARK-39887][SQL][3.1] RemoveRedundantAliases should keep
aliases that make the output of projection nodes unique
de68152f01c is described below
commit de68152f01c13ff69d61dca31db1a516e7145bfe
Author: Peter Toth <[email protected]>
AuthorDate: Mon Aug 15 21:45:01 2022 +0800
[SPARK-39887][SQL][3.1] RemoveRedundantAliases should keep aliases that
make the output of projection nodes unique
### What changes were proposed in this pull request?
Keep the output attributes of a `Union` node's first child in the
`RemoveRedundantAliases` rule to avoid correctness issues.
### Why are the changes needed?
To fix the result of the following query:
```
SELECT a, b AS a FROM (
SELECT a, a AS b FROM (SELECT a FROM VALUES (1) AS t(a))
UNION ALL
SELECT a, b FROM (SELECT a, b FROM VALUES (1, 2) AS t(a, b))
)
```
Before this PR the query returns the incorrect result:
```
+---+---+
| a| a|
+---+---+
| 1| 1|
| 2| 2|
+---+---+
```
After this PR it returns the expected result:
```
+---+---+
| a| a|
+---+---+
| 1| 1|
| 1| 2|
+---+---+
```
### Does this PR introduce _any_ user-facing change?
Yes, fixes a correctness issue.
### How was this patch tested?
Added new UTs.
Closes #37496 from
peter-toth/SPARK-39887-keep-attributes-of-unions-first-child-3.1.
Authored-by: Peter Toth <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/catalyst/optimizer/Optimizer.scala | 27 +++++++---
.../RemoveRedundantAliasAndProjectSuite.scala | 2 +-
.../org/apache/spark/sql/DataFrameSuite.scala | 61 ++++++++++++++++++++++
.../sql/execution/metric/SQLMetricsSuite.scala | 5 +-
4 files changed, 86 insertions(+), 9 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index e5f531ff2f5..03e50e5c386 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -429,9 +429,11 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
}
/**
- * Remove redundant alias expression from a LogicalPlan and its subtree. A
set of excludes is used
- * to prevent the removal of seemingly redundant aliases used to deduplicate
the input for a
- * (self) join or to prevent the removal of top-level subquery attributes.
+ * Remove redundant alias expression from a LogicalPlan and its subtree.
+ * A set of excludes is used to prevent the removal of:
+ * - seemingly redundant aliases used to deduplicate the input for a (self)
join,
+ * - top-level subquery attributes and
+ * - attributes of a Union's first child
*/
private def removeRedundantAliases(plan: LogicalPlan, excluded:
AttributeSet): LogicalPlan = {
plan match {
@@ -455,6 +457,22 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
})
Join(newLeft, newRight, joinType, newCondition, hint)
+ case u: Union =>
+ var first = true
+ plan.mapChildren { child =>
+ if (first) {
+ first = false
+ // `Union` inherits its first child's outputs. We don't remove
those aliases from the
+ // first child's tree that prevent aliased attributes to appear
multiple times in the
+ // `Union`'s output. A parent projection node on the top of an
`Union` with non-unique
+ // output attributes could return incorrect result.
+ removeRedundantAliases(child, excluded ++ child.outputSet)
+ } else {
+ // We don't need to exclude those attributes that `Union` inherits
from its first child.
+ removeRedundantAliases(child, excluded --
u.children.head.outputSet)
+ }
+ }
+
case _ =>
// Remove redundant aliases in the subtree(s).
val currentNextAttrPairs = mutable.Buffer.empty[(Attribute, Attribute)]
@@ -464,9 +482,6 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
newChild
}
- // Create the attribute mapping. Note that the currentNextAttrPairs
can contain duplicate
- // keys in case of Union (this is caused by the
PushProjectionThroughUnion rule); in this
- // case we use the first mapping (which should be provided by the
first child).
val mapping = AttributeMap(currentNextAttrPairs.toSeq)
// Create a an expression cleaning function for nodes that can
actually produce redundant
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala
index 2e0ab7f64f4..c09ff39a7ae 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala
@@ -97,7 +97,7 @@ class RemoveRedundantAliasAndProjectSuite extends PlanTest
with PredicateHelper
val r2 = LocalRelation('b.int)
val query = r1.select('a as 'a).union(r2.select('b as
'b)).select('a).analyze
val optimized = Optimize.execute(query)
- val expected = r1.union(r2)
+ val expected = r1.select($"a" as "a").union(r2).analyze
comparePlans(optimized, expected)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 78dbddc7494..5523c278e0c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -2779,6 +2779,67 @@ class DataFrameSuite extends QueryTest
}
}
}
+
+ test("SPARK-39887: RemoveRedundantAliases should keep attributes of a
Union's first child") {
+ val df = sql(
+ """
+ |SELECT a, b AS a FROM (
+ | SELECT a, a AS b FROM (SELECT a FROM VALUES (1) AS t(a))
+ | UNION ALL
+ | SELECT a, b FROM (SELECT a, b FROM VALUES (1, 2) AS t(a, b))
+ |)
+ |""".stripMargin)
+ val stringCols = df.logicalPlan.output.map(Column(_).cast(StringType))
+ val castedDf = df.select(stringCols: _*)
+ checkAnswer(castedDf, Row("1", "1") :: Row("1", "2") :: Nil)
+ }
+
+ test("SPARK-39887: RemoveRedundantAliases should keep attributes of a
Union's first child 2") {
+ val df = sql(
+ """
+ |SELECT
+ | to_date(a) a,
+ | to_date(b) b
+ |FROM
+ | (
+ | SELECT
+ | a,
+ | a AS b
+ | FROM
+ | (
+ | SELECT
+ | to_date(a) a
+ | FROM
+ | VALUES
+ | ('2020-02-01') AS t1(a)
+ | GROUP BY
+ | to_date(a)
+ | ) t3
+ | UNION ALL
+ | SELECT
+ | a,
+ | b
+ | FROM
+ | (
+ | SELECT
+ | to_date(a) a,
+ | to_date(b) b
+ | FROM
+ | VALUES
+ | ('2020-01-01', '2020-01-02') AS t1(a, b)
+ | GROUP BY
+ | to_date(a),
+ | to_date(b)
+ | ) t4
+ | ) t5
+ |GROUP BY
+ | to_date(a),
+ | to_date(b);
+ |""".stripMargin)
+ checkAnswer(df,
+ Row(java.sql.Date.valueOf("2020-02-01"),
java.sql.Date.valueOf("2020-02-01")) ::
+ Row(java.sql.Date.valueOf("2020-01-01"),
java.sql.Date.valueOf("2020-01-02")) :: Nil)
+ }
}
case class GroupByKey(a: Int, b: Int)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 6628567b9a3..2cecf33a6e8 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -597,8 +597,9 @@ class SQLMetricsSuite extends SharedSparkSession with
SQLMetricsTestUtils
val union = view.union(view)
testSparkPlanMetrics(union, 1, Map(
0L -> ("Union" -> Map()),
- 1L -> ("LocalTableScan" -> Map("number of output rows" -> 2L)),
- 2L -> ("LocalTableScan" -> Map("number of output rows" -> 2L))))
+ 1L -> ("Project" -> Map()),
+ 2L -> ("LocalTableScan" -> Map("number of output rows" -> 2L)),
+ 3L -> ("LocalTableScan" -> Map("number of output rows" -> 2L))))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]