This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 286d3364ab2 [SPARK-40149][SQL][FOLLOWUP] Avoid adding extra Project in
AddMetadataColumns
286d3364ab2 is described below
commit 286d3364ab258e68b4dd0df8bc0980645119d222
Author: allisonwang-db <[email protected]>
AuthorDate: Tue Feb 7 10:21:58 2023 +0800
[SPARK-40149][SQL][FOLLOWUP] Avoid adding extra Project in
AddMetadataColumns
### What changes were proposed in this pull request?
This PR is a follow-up for #37758. It updates the rule `AddMetadataColumns`
to avoid introducing extra `Project`.
### Why are the changes needed?
To fix an issue introduced by #37758.
```sql
-- t1: [key, value] t2: [key, value]
select t1.key, t2.key from t1 full outer join t2 using (key)
```
Before this PR, the rule `AddMetadataColumns` will add a new Project
between the using join and the select list:
```
Project [key, key]
+- Project [key, key, key, key] <--- extra project
+- Project [coalesce(key, key) AS key, value, value, key, key]
+- Join FullOuter, (key = key)
:- LocalRelation <empty>, [key#0, value#0]
+- LocalRelation <empty>, [key#0, value#0]
```
After this PR, this extra Project will be removed.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Add a new UT.
Closes #39895 from allisonwang-db/spark-40149-follow-up.
Authored-by: allisonwang-db <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 +-
.../spark/sql/catalyst/analysis/AnalysisSuite.scala | 19 ++++++++++++++++++-
2 files changed, 19 insertions(+), 2 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index a4e73bb2337..9a2648a79a5 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -992,7 +992,7 @@ class Analyzer(override val catalogManager: CatalogManager)
extends RuleExecutor
if (metaCols.isEmpty) {
node
} else {
- val newNode = addMetadataCol(node, metaCols.map(_.exprId).toSet)
+ val newNode = node.mapChildren(addMetadataCol(_,
metaCols.map(_.exprId).toSet))
// We should not change the output schema of the plan. We should
project away the extra
// metadata columns if necessary.
if (newNode.sameOutput(node)) {
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 69f2147714a..0f26d3a2dc9 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -36,7 +36,7 @@ import
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions._
import
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
Count, Sum}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan
-import org.apache.spark.sql.catalyst.plans.{Cross, Inner}
+import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, UsingJoin}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning,
Partitioning, RangePartitioning, RoundRobinPartitioning}
import org.apache.spark.sql.catalyst.util._
@@ -1416,4 +1416,21 @@ class AnalysisSuite extends AnalysisTest with Matchers {
assert(!cg.rightOrder.flatMap(_.references).exists(cg.left.output.contains))
}
}
+
+ test("SPARK-40149: add metadata column with no extra project") {
+ val t1 = LocalRelation($"key".int, $"value".string).as("t1")
+ val t2 = LocalRelation($"key".int, $"value".string).as("t2")
+ val query =
+ Project(Seq($"t1.key", $"t2.key"),
+ Join(t1, t2, UsingJoin(FullOuter, Seq("key")), None, JoinHint.NONE))
+ checkAnalysis(
+ query,
+ Project(Seq($"t1.key", $"t2.key"),
+ Project(Seq(coalesce($"t1.key", $"t2.key").as("key"),
+ $"t1.value", $"t2.value", $"t1.key", $"t2.key"),
+ Join(t1, t2, FullOuter, Some($"t1.key" === $"t2.key"), JoinHint.NONE)
+ )
+ ).analyze
+ )
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]